708 lines
18 KiB
Go
708 lines
18 KiB
Go
|
|
/**
|
|||
|
|
* @author mii
|
|||
|
|
* @date 2020/2/29 0029
|
|||
|
|
*/
|
|||
|
|
|
|||
|
|
package db
|
|||
|
|
|
|||
|
|
import (
|
|||
|
|
"errors"
|
|||
|
|
"fmt"
|
|||
|
|
"sync"
|
|||
|
|
"time"
|
|||
|
|
"xiawan/wx/clientsdk/baseinfo"
|
|||
|
|
"xiawan/wx/db/table"
|
|||
|
|
"xiawan/wx/srv"
|
|||
|
|
"xiawan/wx/srv/srvconfig"
|
|||
|
|
|
|||
|
|
"github.com/gogf/gf/database/gredis"
|
|||
|
|
"github.com/gogf/gf/encoding/gjson"
|
|||
|
|
"github.com/gogf/gf/frame/g"
|
|||
|
|
"github.com/gogf/gf/util/gconv"
|
|||
|
|
"github.com/gomodule/redigo/redis"
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
const (
|
|||
|
|
REDIS_OPERATION_SET = "SET"
|
|||
|
|
REDIS_OPERATION_GET = "GET"
|
|||
|
|
REDIS_OPERATION_LIST_LLEN = "llen"
|
|||
|
|
REDIS_OPERATION_LIST_LPSUH = "LPUSH"
|
|||
|
|
REDIS_OPERATION_LIST_LPOP = "lpop"
|
|||
|
|
REDIS_OPERATION_LIST_RPOP = "rpop"
|
|||
|
|
REDIS_OPERATION_LIST_LTRIM = "ltrim"
|
|||
|
|
REDIS_OPERATION_CHANNEL_PUBLISH = "publish"
|
|||
|
|
REDIS_OPERATION_EXISTS = "EXISTS"
|
|||
|
|
REDIS_OPERATION_DELETE = "DEL"
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
const (
|
|||
|
|
DEFAULT_GROUP_NAME = "default" // Default configuration group name.
|
|||
|
|
DEFAULT_REDIS_PORT = 6379 // Default redis port configuration if not passed.
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
type Obj struct {
|
|||
|
|
Value interface{}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
func RedisSetup() {
|
|||
|
|
//配置redis
|
|||
|
|
// MaxIdle: 10, // 设置最大空闲连接数
|
|||
|
|
// MaxActive: 100, // 设置最大活跃连接数
|
|||
|
|
// IdleTimeout: 300, // 空闲连接超时时间(单位:秒)
|
|||
|
|
// MaxConnLifetime: 300, // 连接最大存活期(单位:秒)
|
|||
|
|
// WaitTimeout: 5, // 连接等待超时时间(单位:秒)
|
|||
|
|
// ReadTimeout: 10, // 读操作超时时间(单位:秒)
|
|||
|
|
// WriteTimeout: 10, // 写操作超时时间(单位:秒)
|
|||
|
|
gredis.SetConfig(srvconfig.GlobalSetting.RedisConfig, DEFAULT_GROUP_NAME)
|
|||
|
|
conn := g.Redis().GetConn()
|
|||
|
|
err := conn.Err()
|
|||
|
|
if err != nil {
|
|||
|
|
//logger.Errorln(err)
|
|||
|
|
fmt.Println("failed to connect Redis:", err)
|
|||
|
|
}
|
|||
|
|
fmt.Println("connect Redis success")
|
|||
|
|
go scheduleSaveToRedis()
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// 尝试获取锁
|
|||
|
|
func AcquireLock(key string, timeout time.Duration) (bool, error) {
|
|||
|
|
redisConn := getRedisConn()
|
|||
|
|
defer redisConn.Close()
|
|||
|
|
|
|||
|
|
lockValue := fmt.Sprintf("%d", time.Now().UnixNano()) // Unique lock value
|
|||
|
|
result, err := redisConn.Do(REDIS_OPERATION_SET, key, lockValue, "NX", "PX", int64(timeout.Seconds()*1000))
|
|||
|
|
if err != nil {
|
|||
|
|
return false, err
|
|||
|
|
}
|
|||
|
|
return result != nil, nil
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// 释放锁
|
|||
|
|
func ReleaseLock(key string) error {
|
|||
|
|
redisConn := getRedisConn()
|
|||
|
|
defer redisConn.Close()
|
|||
|
|
|
|||
|
|
_, err := redisConn.Do(REDIS_OPERATION_DELETE, key)
|
|||
|
|
if err != nil {
|
|||
|
|
return err
|
|||
|
|
}
|
|||
|
|
return nil
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// acquireLockWithRetry 尝试获取锁,带有重试机制
|
|||
|
|
func acquireLockWithRetry(key string, timeout time.Duration, retryInterval time.Duration, maxRetries int) (bool, error) {
|
|||
|
|
for i := 0; i < maxRetries; i++ {
|
|||
|
|
lockAcquired, err := AcquireLock(key, timeout)
|
|||
|
|
if err != nil {
|
|||
|
|
return false, err
|
|||
|
|
}
|
|||
|
|
if lockAcquired {
|
|||
|
|
return true, nil
|
|||
|
|
}
|
|||
|
|
time.Sleep(retryInterval) // 等待一段时间后重试
|
|||
|
|
}
|
|||
|
|
return false, fmt.Errorf("could not acquire lock after %d retries", maxRetries)
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// CreateSyncMsgList 创建一个同步信息保存列表
|
|||
|
|
func CreateSyncMsgList(exId string) bool {
|
|||
|
|
ok, err := LPUSH(exId, "撒大声地")
|
|||
|
|
if err != nil {
|
|||
|
|
return false
|
|||
|
|
}
|
|||
|
|
return ok
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// LPOPObj 从列表中取出对象然后反序列化成对象
|
|||
|
|
func LPOPObj(k string, i interface{}) error {
|
|||
|
|
_var, err := LPOP(k)
|
|||
|
|
if err != nil {
|
|||
|
|
return err
|
|||
|
|
}
|
|||
|
|
err = gjson.DecodeTo(_var.Bytes(), &i)
|
|||
|
|
if err != nil {
|
|||
|
|
return err
|
|||
|
|
}
|
|||
|
|
return nil
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// RPOPObj 从列表尾部 取出对象然后反序列化成对象
|
|||
|
|
func RPOPObj(k string, i interface{}) error {
|
|||
|
|
_var, err := RPOP(k)
|
|||
|
|
if err != nil {
|
|||
|
|
return err
|
|||
|
|
}
|
|||
|
|
err = gjson.DecodeTo(_var.Bytes(), &i)
|
|||
|
|
if err != nil {
|
|||
|
|
return err
|
|||
|
|
}
|
|||
|
|
return nil
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// LPUSHObj 将对象序列化成json保存
|
|||
|
|
func LPUSHObj(k string, i interface{}) (bool, error) {
|
|||
|
|
iData, err := gjson.Encode(i)
|
|||
|
|
// 判断 k 的尾号是 _syncMsg
|
|||
|
|
|
|||
|
|
if err != nil {
|
|||
|
|
return false, err
|
|||
|
|
}
|
|||
|
|
ok, err := LPUSH(k, iData)
|
|||
|
|
if err != nil {
|
|||
|
|
return ok, err
|
|||
|
|
}
|
|||
|
|
return ok, nil
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
func LPUSH(k string, i interface{}) (bool, error) {
|
|||
|
|
lockKey := k + "_lock"
|
|||
|
|
maxRetries := 200 // 最大重试次数
|
|||
|
|
retryInterval := 50 * time.Millisecond // 重试间隔时间为500毫秒
|
|||
|
|
// 使用带重试机制的获取锁函数
|
|||
|
|
acquireLockWithRetry(lockKey, 5*time.Second, retryInterval, maxRetries)
|
|||
|
|
defer ReleaseLock(lockKey) // 确保方法结束时释放锁
|
|||
|
|
|
|||
|
|
maxLength := 500 // 设置最大列表长度
|
|||
|
|
redisConn := getRedisConn()
|
|||
|
|
defer redisConn.Close()
|
|||
|
|
result, err := redisConn.Do(REDIS_OPERATION_LIST_LPSUH, k, i)
|
|||
|
|
if err != nil {
|
|||
|
|
//logger.Errorln(err)
|
|||
|
|
return false, err
|
|||
|
|
}
|
|||
|
|
_len, err := LLEN(k)
|
|||
|
|
if err != nil {
|
|||
|
|
return true, err
|
|||
|
|
}
|
|||
|
|
if _len == 0 {
|
|||
|
|
return true, nil
|
|||
|
|
}
|
|||
|
|
// 如果列表长度超过 maxLength,则循环执行 LPOP 操作
|
|||
|
|
for _len > int32(maxLength) {
|
|||
|
|
_, err = redisConn.Do(REDIS_OPERATION_LIST_LPOP, k)
|
|||
|
|
if err != nil {
|
|||
|
|
return false, err
|
|||
|
|
}
|
|||
|
|
_len = _len - int32(1)
|
|||
|
|
}
|
|||
|
|
return gconv.String(result) == "OK", nil
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// LPOP 从列表中取出一个值
|
|||
|
|
func LPOP(k string) (*g.Var, error) {
|
|||
|
|
redisConn := getRedisConn()
|
|||
|
|
defer redisConn.Close()
|
|||
|
|
r, err := redisConn.DoVar(REDIS_OPERATION_LIST_LPOP, k)
|
|||
|
|
if err != nil {
|
|||
|
|
return nil, err
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
return r, nil
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// RPOP 从列表尾部 取出一个元素
|
|||
|
|
func RPOP(k string) (*g.Var, error) {
|
|||
|
|
redisConn := getRedisConn()
|
|||
|
|
defer redisConn.Close()
|
|||
|
|
r, err := redisConn.DoVar(REDIS_OPERATION_LIST_RPOP, k)
|
|||
|
|
if err != nil {
|
|||
|
|
return nil, err
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
return r, nil
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// LLEN 取列表长度
|
|||
|
|
func LLEN(k string) (int32, error) {
|
|||
|
|
redisConn := getRedisConn()
|
|||
|
|
defer redisConn.Close()
|
|||
|
|
_var, err := redisConn.DoVar(REDIS_OPERATION_LIST_LLEN, k)
|
|||
|
|
if err != nil {
|
|||
|
|
//logger.Errorln(err)
|
|||
|
|
return 0, err
|
|||
|
|
}
|
|||
|
|
return _var.Int32(), nil
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// PUBLISH 发布消息
|
|||
|
|
func PUBLISH(k string, i interface{}) error {
|
|||
|
|
return PushQueue(k, i)
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
func SETExpirationObj(k string, i interface{}, expiration int64) error {
|
|||
|
|
redisConn := getRedisConn()
|
|||
|
|
defer redisConn.Close()
|
|||
|
|
|
|||
|
|
iData, err := gjson.Encode(i)
|
|||
|
|
if err != nil {
|
|||
|
|
return err
|
|||
|
|
}
|
|||
|
|
var result interface{}
|
|||
|
|
if expiration > 0 {
|
|||
|
|
result, err = redisConn.Do(REDIS_OPERATION_SET, k, iData, "EX", expiration)
|
|||
|
|
} else {
|
|||
|
|
result, err = redisConn.Do(REDIS_OPERATION_SET, k, iData)
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
if err != nil {
|
|||
|
|
//logger.Errorln(err)
|
|||
|
|
return err
|
|||
|
|
}
|
|||
|
|
if gconv.String(result) == "OK" {
|
|||
|
|
return nil
|
|||
|
|
}
|
|||
|
|
return errors.New(gconv.String(result))
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
func SETObj(k string, i interface{}) error {
|
|||
|
|
redisConn := getRedisConn()
|
|||
|
|
defer redisConn.Close()
|
|||
|
|
|
|||
|
|
iData, err := gjson.Encode(i)
|
|||
|
|
if err != nil {
|
|||
|
|
return err
|
|||
|
|
}
|
|||
|
|
result, err := redisConn.Do(REDIS_OPERATION_SET, k, iData)
|
|||
|
|
if err != nil {
|
|||
|
|
//logger.Errorln(err)
|
|||
|
|
return err
|
|||
|
|
}
|
|||
|
|
if gconv.String(result) == "OK" {
|
|||
|
|
return nil
|
|||
|
|
}
|
|||
|
|
return errors.New(gconv.String(result))
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
func GETObj(k string, i interface{}) error {
|
|||
|
|
redisConn := getRedisConn()
|
|||
|
|
defer redisConn.Close()
|
|||
|
|
|
|||
|
|
_var, err := redisConn.Do(REDIS_OPERATION_GET, k)
|
|||
|
|
if err != nil {
|
|||
|
|
return err
|
|||
|
|
}
|
|||
|
|
err = gjson.DecodeTo(_var, &i)
|
|||
|
|
if err != nil {
|
|||
|
|
return err
|
|||
|
|
}
|
|||
|
|
return nil
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
func DelObj(k string) error {
|
|||
|
|
redisConn := getRedisConn()
|
|||
|
|
defer redisConn.Close()
|
|||
|
|
|
|||
|
|
_, err := redisConn.Do(REDIS_OPERATION_DELETE, k)
|
|||
|
|
if err != nil {
|
|||
|
|
return err
|
|||
|
|
}
|
|||
|
|
return nil
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
func Exists(k string) (bool, error) {
|
|||
|
|
//检查是否存在key值
|
|||
|
|
redisConn := getRedisConn()
|
|||
|
|
defer redisConn.Close()
|
|||
|
|
|
|||
|
|
exists, err := redisConn.Do(REDIS_OPERATION_EXISTS, k)
|
|||
|
|
if err != nil {
|
|||
|
|
fmt.Println("illegal exception")
|
|||
|
|
return false, err
|
|||
|
|
}
|
|||
|
|
//fmt.Printf("exists or not: %v \n", exists)
|
|||
|
|
if exists.(int64) == 1 {
|
|||
|
|
return true, nil
|
|||
|
|
}
|
|||
|
|
return false, nil
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
func getRedisConn() *gredis.Conn {
|
|||
|
|
return g.Redis().GetConn()
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// PublishSyncMsgWxMessage 发布微信消息; 这里就是接收到 微信后台 推送来的消息(包括其他用户私聊消息、群聊消息、收款消息等等)
|
|||
|
|
func PublishSyncMsgWxMessage(acc *srv.WXAccount, response table.SyncMessageResponse) error {
|
|||
|
|
if acc == nil {
|
|||
|
|
return errors.New("PublishSyncMsgWxMessage acc == nil")
|
|||
|
|
}
|
|||
|
|
response.UUID = acc.GetUserInfo().UUID
|
|||
|
|
response.UserName = acc.GetUserInfo().GetUserName()
|
|||
|
|
response.LoginState = acc.GetLoginState()
|
|||
|
|
response.Type = table.RedisPushSyncTypeWxMsg
|
|||
|
|
response.TargetIp = srvconfig.GlobalSetting.TargetIp
|
|||
|
|
if len(response.GetAddMsgs()) != 0 || len(response.GetContacts()) != 0 {
|
|||
|
|
if srvconfig.GlobalSetting.HttpSyncMsg {
|
|||
|
|
LPUSHObj(response.UUID+"_syncHttp", &response) //缓存reids
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
return nil
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// PublishFavItem 发布收藏消息
|
|||
|
|
func PublishFavItem(acc *srv.WXAccount, favItem *baseinfo.FavItem) error {
|
|||
|
|
if acc == nil {
|
|||
|
|
return errors.New("PublishSyncMsgWxMessage acc == nil")
|
|||
|
|
}
|
|||
|
|
response := table.SyncMessageResponse{}
|
|||
|
|
response.UUID = acc.GetUserInfo().UUID
|
|||
|
|
response.UserName = acc.GetUserInfo().GetUserName()
|
|||
|
|
response.LoginState = acc.GetLoginState()
|
|||
|
|
response.TargetIp = srvconfig.GlobalSetting.TargetIp
|
|||
|
|
response.Type = table.RedisPushFavitem
|
|||
|
|
response.FavItem = favItem
|
|||
|
|
// 缓存reids
|
|||
|
|
if srvconfig.GlobalSetting.NewsSynWxId {
|
|||
|
|
// 缓存reids
|
|||
|
|
LPUSHObj(response.UUID+"_syncMsg", &response)
|
|||
|
|
return PUBLISH(response.UUID+"_wx_sync_msg_topic", &response)
|
|||
|
|
}
|
|||
|
|
return PUBLISH("wx_sync_msg_topic", &response)
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// PublishSyncMsgLoginState 微信状态
|
|||
|
|
func PublishSyncMsgLoginState(wxid string, state uint32) error {
|
|||
|
|
//fmt.Println("推送->wxid=="+wxid+"--->", state)
|
|||
|
|
response := table.SyncMessageResponse{}
|
|||
|
|
response.TargetIp = srvconfig.GlobalSetting.TargetIp
|
|||
|
|
response.Type = table.RedisPushSyncTypeLoginState
|
|||
|
|
response.UserName = wxid
|
|||
|
|
response.LoginState = state
|
|||
|
|
// 缓存reids
|
|||
|
|
if srvconfig.GlobalSetting.NewsSynWxId {
|
|||
|
|
// 缓存reids
|
|||
|
|
LPUSHObj(response.UUID+"_syncMsg", &response)
|
|||
|
|
return PUBLISH(response.UUID+"_wx_sync_msg_topic", &response)
|
|||
|
|
}
|
|||
|
|
return PUBLISH("wx_sync_msg_topic", &response)
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// PublishWxInItOk 初始化完成
|
|||
|
|
func PublishWxInItOk(UUID string, state uint32) error {
|
|||
|
|
if UUID == "" || state == 0 {
|
|||
|
|
return errors.New("uuid || state == nil")
|
|||
|
|
}
|
|||
|
|
response := table.SyncMessageResponse{}
|
|||
|
|
response.TargetIp = srvconfig.GlobalSetting.TargetIp
|
|||
|
|
response.Type = table.RedisPushWxInItOk
|
|||
|
|
response.LoginState = state
|
|||
|
|
response.UUID = UUID
|
|||
|
|
response.UserName = "初始化完成!"
|
|||
|
|
// 缓存reids
|
|||
|
|
if srvconfig.GlobalSetting.NewsSynWxId {
|
|||
|
|
_, _ = LPUSHObj(response.UUID+"_wx_sync_msg_topic", &response)
|
|||
|
|
return PUBLISH(response.UUID+"_wx_sync_msg_topic", &response)
|
|||
|
|
}
|
|||
|
|
return PUBLISH("wx_sync_msg_topic", &response)
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// PublishSyncMsgCheckLogin 扫码结果
|
|||
|
|
func PublishSyncMsgCheckLogin(UUID string, result *baseinfo.CheckLoginQrCodeResult) error {
|
|||
|
|
if UUID == "" || result == nil {
|
|||
|
|
return errors.New("uuid || result == nil")
|
|||
|
|
}
|
|||
|
|
response := table.SubMessageCheckLoginQrCode{}
|
|||
|
|
response.TargetIp = srvconfig.GlobalSetting.TargetIp
|
|||
|
|
response.Type = table.RedisPushSyncTypeCheckLogin
|
|||
|
|
response.CheckLoginResult = result
|
|||
|
|
response.UUID = UUID
|
|||
|
|
// 缓存reids
|
|||
|
|
if srvconfig.GlobalSetting.NewsSynWxId {
|
|||
|
|
_, _ = LPUSHObj(response.UUID+"_wx_sync_msg_topic", &response)
|
|||
|
|
return PUBLISH(response.UUID+"_wx_sync_msg_topic", &response)
|
|||
|
|
}
|
|||
|
|
return PUBLISH("wx_sync_msg_topic", &response)
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// GETSyncMsg 获取指定号缓存在redis的消息
|
|||
|
|
func GETSyncMsg(uuid string) (int, []*table.SyncMessageResponse, error) {
|
|||
|
|
_len, err := LLEN(uuid + "_syncMsg")
|
|||
|
|
if err != nil {
|
|||
|
|
return 0, nil, err
|
|||
|
|
}
|
|||
|
|
if _len == 0 {
|
|||
|
|
return 0, nil, nil
|
|||
|
|
}
|
|||
|
|
syncMsgLen := _len
|
|||
|
|
if syncMsgLen > 10 {
|
|||
|
|
syncMsgLen = 10
|
|||
|
|
}
|
|||
|
|
opValues := []*table.SyncMessageResponse{}
|
|||
|
|
for i := int32(0); i <= syncMsgLen; i++ {
|
|||
|
|
opVal := &table.SyncMessageResponse{}
|
|||
|
|
err = LPOPObj(uuid+"_wx_sync_msg_topic", opVal)
|
|||
|
|
if err != nil {
|
|||
|
|
continue
|
|||
|
|
}
|
|||
|
|
opValues = append(opValues, opVal)
|
|||
|
|
}
|
|||
|
|
ContinueFlag := 0
|
|||
|
|
if (_len % 10) > 0 {
|
|||
|
|
ContinueFlag = 1
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
return ContinueFlag, opValues, nil
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// PublishLicenseKey 使用状态, 主要是过期时间和开始时间
|
|||
|
|
func PublishLicenseKey(licenseKey *table.LicenseKey) error {
|
|||
|
|
lockKey := "uuid_syncLicenseKey_lock"
|
|||
|
|
maxRetries := 200 // 最大重试次数
|
|||
|
|
retryInterval := 50 * time.Millisecond // 重试间隔时间为500毫秒
|
|||
|
|
// 使用带重试机制的获取锁函数
|
|||
|
|
acquireLockWithRetry(lockKey, 5*time.Second, retryInterval, maxRetries)
|
|||
|
|
|
|||
|
|
defer ReleaseLock(lockKey) // 确保方法结束时释放锁
|
|||
|
|
// 缓存 reids
|
|||
|
|
LPUSHObj("uuid_syncLicenseKey", licenseKey)
|
|||
|
|
return nil
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// HttpSyncLicenseKey HTTP-轮询 同步激活状态 获取 Redis 内的消息
|
|||
|
|
func HttpSyncLicenseKey() ([]*table.LicenseKey, error) {
|
|||
|
|
lockKey := "uuid_syncLicenseKey_lock"
|
|||
|
|
maxRetries := 200 // 最大重试次数
|
|||
|
|
retryInterval := 50 * time.Millisecond // 重试间隔时间为500毫秒
|
|||
|
|
// 使用带重试机制的获取锁函数
|
|||
|
|
acquireLockWithRetry(lockKey, 5*time.Second, retryInterval, maxRetries)
|
|||
|
|
|
|||
|
|
defer ReleaseLock(lockKey) // 确保方法结束时释放锁
|
|||
|
|
|
|||
|
|
syncMsgLen, err := LLEN("uuid_syncLicenseKey")
|
|||
|
|
if err != nil {
|
|||
|
|
return nil, err
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
msgInfos := []*table.LicenseKey{}
|
|||
|
|
if !(syncMsgLen > int32(0)) {
|
|||
|
|
return msgInfos, nil
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
for i := int32(0); i < syncMsgLen; i++ {
|
|||
|
|
msg := &table.LicenseKey{}
|
|||
|
|
err = RPOPObj("uuid_syncLicenseKey", msg)
|
|||
|
|
if err != nil {
|
|||
|
|
continue
|
|||
|
|
}
|
|||
|
|
msgInfos = append(msgInfos, msg)
|
|||
|
|
}
|
|||
|
|
return msgInfos, nil
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// HttpSyncMsy HTTP-轮询 同步消息时 获取 Redis 内的消息
|
|||
|
|
func HttpSyncMsy(uuid string, count int) ([]*table.SyncMessageResponse, error) {
|
|||
|
|
lockKey := uuid + "_syncHttp_lock"
|
|||
|
|
maxRetries := 200 // 最大重试次数
|
|||
|
|
retryInterval := 50 * time.Millisecond // 重试间隔时间为500毫秒
|
|||
|
|
// 使用带重试机制的获取锁函数
|
|||
|
|
acquireLockWithRetry(lockKey, 5*time.Second, retryInterval, maxRetries)
|
|||
|
|
|
|||
|
|
defer ReleaseLock(lockKey) // 确保方法结束时释放锁
|
|||
|
|
|
|||
|
|
_len, err := LLEN(uuid + "_syncHttp")
|
|||
|
|
if err != nil {
|
|||
|
|
return nil, err
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
syncMsgLen := int32(count)
|
|||
|
|
if syncMsgLen == 0 { // count=0 时获取所有消息
|
|||
|
|
syncMsgLen = _len
|
|||
|
|
} else if syncMsgLen > _len {
|
|||
|
|
syncMsgLen = _len
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
msgInfos := []*table.SyncMessageResponse{}
|
|||
|
|
if _len == 0 {
|
|||
|
|
return msgInfos, nil
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
for i := int32(0); i < syncMsgLen; i++ {
|
|||
|
|
msg := &table.SyncMessageResponse{}
|
|||
|
|
err = RPOPObj(uuid+"_syncHttp", msg)
|
|||
|
|
if err != nil {
|
|||
|
|
continue
|
|||
|
|
}
|
|||
|
|
msgInfos = append(msgInfos, msg)
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
return msgInfos, nil
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// PushQueue 发送队例
|
|||
|
|
func PushQueue(queueName string, i interface{}) (err error) {
|
|||
|
|
con := getRedisConn()
|
|||
|
|
defer con.Close()
|
|||
|
|
|
|||
|
|
maxLength := 200 // 最大 200 条
|
|||
|
|
if i == nil {
|
|||
|
|
return err
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
_, err = con.Do("rpush", queueName, gconv.String(i))
|
|||
|
|
if err != nil {
|
|||
|
|
return fmt.Errorf("PushQueue redis error: %s", err)
|
|||
|
|
}
|
|||
|
|
// 使用 ltrim 裁剪列表,以保持其最大长度
|
|||
|
|
_, err = con.Do("LTRIM", queueName, -maxLength, -1)
|
|||
|
|
return err
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// ClearSyncMsgCache 清理所有以 nameKey(_syncMsg) 结尾的 Redis 缓存键
|
|||
|
|
func ClearSyncMsgCache(nameKey string) error {
|
|||
|
|
redisConn := getRedisConn()
|
|||
|
|
defer redisConn.Close()
|
|||
|
|
|
|||
|
|
// SCAN 命令用于增量迭代 key,效率较高
|
|||
|
|
iter := 0
|
|||
|
|
for {
|
|||
|
|
// 使用 redigo 的 Do 方法执行 SCAN 命令
|
|||
|
|
reply, err := redis.Values(redisConn.Do("SCAN", iter, "MATCH", "*"+nameKey))
|
|||
|
|
if err != nil {
|
|||
|
|
return fmt.Errorf("error during SCAN: %v", err)
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// 解析出 SCAN 的结果集
|
|||
|
|
var keys []string
|
|||
|
|
_, err = redis.Scan(reply, &iter, &keys)
|
|||
|
|
if err != nil {
|
|||
|
|
return fmt.Errorf("error parsing SCAN result: %v", err)
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// 删除匹配的 key
|
|||
|
|
for _, key := range keys {
|
|||
|
|
_, err = redisConn.Do(REDIS_OPERATION_DELETE, key)
|
|||
|
|
if err != nil {
|
|||
|
|
return fmt.Errorf("error deleting key %s: %v", key, err)
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// iter 为 0 时表示 SCAN 已迭代完所有 key
|
|||
|
|
if iter == 0 {
|
|||
|
|
break
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
return nil
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
const (
|
|||
|
|
maxMessagesPerUser = 125
|
|||
|
|
saveInterval = 5 * time.Second
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
type userCache struct {
|
|||
|
|
mutex sync.RWMutex
|
|||
|
|
msgs map[string]struct{}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
var (
|
|||
|
|
msgCache = make(map[string]*userCache) // 存储每个用户的缓存和锁
|
|||
|
|
globalMutex sync.RWMutex // 保证对 msgCache 的安全访问
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
// AddOrUpdateMsgId 添加消息ID到 Redis 集合,如果集合中不存在则更新
|
|||
|
|
func AddOrUpdateMsgId(userName string, newMsgId string) (bool, error) {
|
|||
|
|
|
|||
|
|
// 获取或创建用户缓存
|
|||
|
|
userCache := getUserCache(userName)
|
|||
|
|
|
|||
|
|
userCache.mutex.Lock()
|
|||
|
|
defer userCache.mutex.Unlock()
|
|||
|
|
|
|||
|
|
// 检查消息 ID 是否存在
|
|||
|
|
if _, exists := userCache.msgs[newMsgId]; exists {
|
|||
|
|
return false, nil
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// 添加消息 ID 到缓存
|
|||
|
|
userCache.msgs[newMsgId] = struct{}{}
|
|||
|
|
|
|||
|
|
// 确保缓存不会超过 25 条 // 删除最旧的消息 ID
|
|||
|
|
if len(userCache.msgs) > maxMessagesPerUser {
|
|||
|
|
for msgID := range userCache.msgs {
|
|||
|
|
delete(userCache.msgs, msgID)
|
|||
|
|
if len(userCache.msgs) <= maxMessagesPerUser {
|
|||
|
|
break
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
return true, nil
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
func getUserCache(userName string) *userCache {
|
|||
|
|
// 使用全局锁来安全地访问或初始化用户缓存
|
|||
|
|
globalMutex.RLock()
|
|||
|
|
cache, exists := msgCache[userName]
|
|||
|
|
globalMutex.RUnlock()
|
|||
|
|
|
|||
|
|
if !exists {
|
|||
|
|
// 使用写锁来修改 msgCache
|
|||
|
|
globalMutex.Lock()
|
|||
|
|
defer globalMutex.Unlock()
|
|||
|
|
|
|||
|
|
// 再次检查,防止竞争条件
|
|||
|
|
if cache, exists = msgCache[userName]; !exists {
|
|||
|
|
cache = &userCache{
|
|||
|
|
msgs: make(map[string]struct{}),
|
|||
|
|
mutex: sync.RWMutex{},
|
|||
|
|
}
|
|||
|
|
// 从 Redis 加载用户消息
|
|||
|
|
loadFromRedis(userName, cache.msgs)
|
|||
|
|
msgCache[userName] = cache
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
return cache
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
func loadFromRedis(userName string, userMsgs map[string]struct{}) {
|
|||
|
|
redisConn := getRedisConn()
|
|||
|
|
defer redisConn.Close()
|
|||
|
|
|
|||
|
|
key := fmt.Sprintf("user:%s:msg_ids", userName)
|
|||
|
|
msgs, err := redis.Strings(redisConn.Do("SMEMBERS", key))
|
|||
|
|
if err != nil {
|
|||
|
|
return
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
for _, msgID := range msgs {
|
|||
|
|
userMsgs[msgID] = struct{}{}
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
func saveToRedis() {
|
|||
|
|
// 锁定整个操作以安全地遍历 msgCache
|
|||
|
|
globalMutex.RLock()
|
|||
|
|
defer globalMutex.RUnlock()
|
|||
|
|
|
|||
|
|
redisConn := getRedisConn()
|
|||
|
|
defer redisConn.Close()
|
|||
|
|
|
|||
|
|
for userName, userCache := range msgCache {
|
|||
|
|
// 锁定每个用户的缓存数据
|
|||
|
|
userCache.mutex.RLock()
|
|||
|
|
|
|||
|
|
key := fmt.Sprintf("user:%s:msg_ids", userName)
|
|||
|
|
msgIDs := make([]interface{}, 0, len(userCache.msgs)+1)
|
|||
|
|
msgIDs = append(msgIDs, key)
|
|||
|
|
for msgID := range userCache.msgs {
|
|||
|
|
msgIDs = append(msgIDs, msgID)
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// 更新 Redis
|
|||
|
|
redisConn.Do("DEL", key)
|
|||
|
|
_, err := redisConn.Do("SADD", msgIDs...)
|
|||
|
|
if err != nil {
|
|||
|
|
fmt.Println("Error writing to Redis:", err)
|
|||
|
|
}
|
|||
|
|
userCache.mutex.RUnlock()
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
func scheduleSaveToRedis() {
|
|||
|
|
ticker := time.NewTicker(saveInterval)
|
|||
|
|
for range ticker.C {
|
|||
|
|
saveToRedis()
|
|||
|
|
}
|
|||
|
|
}
|