/** * @author mii * @date 2020/2/29 0029 */ package db import ( "errors" "fmt" "sync" "time" "xiawan/wx/clientsdk/baseinfo" "xiawan/wx/db/table" "xiawan/wx/srv" "xiawan/wx/srv/srvconfig" "github.com/gogf/gf/database/gredis" "github.com/gogf/gf/encoding/gjson" "github.com/gogf/gf/frame/g" "github.com/gogf/gf/util/gconv" "github.com/gomodule/redigo/redis" ) const ( REDIS_OPERATION_SET = "SET" REDIS_OPERATION_GET = "GET" REDIS_OPERATION_LIST_LLEN = "llen" REDIS_OPERATION_LIST_LPSUH = "LPUSH" REDIS_OPERATION_LIST_LPOP = "lpop" REDIS_OPERATION_LIST_RPOP = "rpop" REDIS_OPERATION_LIST_LTRIM = "ltrim" REDIS_OPERATION_CHANNEL_PUBLISH = "publish" REDIS_OPERATION_EXISTS = "EXISTS" REDIS_OPERATION_DELETE = "DEL" ) const ( DEFAULT_GROUP_NAME = "default" // Default configuration group name. DEFAULT_REDIS_PORT = 6379 // Default redis port configuration if not passed. ) type Obj struct { Value interface{} } func RedisSetup() { //配置redis // MaxIdle: 10, // 设置最大空闲连接数 // MaxActive: 100, // 设置最大活跃连接数 // IdleTimeout: 300, // 空闲连接超时时间(单位:秒) // MaxConnLifetime: 300, // 连接最大存活期(单位:秒) // WaitTimeout: 5, // 连接等待超时时间(单位:秒) // ReadTimeout: 10, // 读操作超时时间(单位:秒) // WriteTimeout: 10, // 写操作超时时间(单位:秒) gredis.SetConfig(srvconfig.GlobalSetting.RedisConfig, DEFAULT_GROUP_NAME) conn := g.Redis().GetConn() err := conn.Err() if err != nil { //logger.Errorln(err) fmt.Println("failed to connect Redis:", err) } fmt.Println("connect Redis success") go scheduleSaveToRedis() } // 尝试获取锁 func AcquireLock(key string, timeout time.Duration) (bool, error) { redisConn := getRedisConn() defer redisConn.Close() lockValue := fmt.Sprintf("%d", time.Now().UnixNano()) // Unique lock value result, err := redisConn.Do(REDIS_OPERATION_SET, key, lockValue, "NX", "PX", int64(timeout.Seconds()*1000)) if err != nil { return false, err } return result != nil, nil } // 释放锁 func ReleaseLock(key string) error { redisConn := getRedisConn() defer redisConn.Close() _, err := redisConn.Do(REDIS_OPERATION_DELETE, key) if err != nil { return err } return nil } // acquireLockWithRetry 尝试获取锁,带有重试机制 func acquireLockWithRetry(key string, timeout time.Duration, retryInterval time.Duration, maxRetries int) (bool, error) { for i := 0; i < maxRetries; i++ { lockAcquired, err := AcquireLock(key, timeout) if err != nil { return false, err } if lockAcquired { return true, nil } time.Sleep(retryInterval) // 等待一段时间后重试 } return false, fmt.Errorf("could not acquire lock after %d retries", maxRetries) } // CreateSyncMsgList 创建一个同步信息保存列表 func CreateSyncMsgList(exId string) bool { ok, err := LPUSH(exId, "撒大声地") if err != nil { return false } return ok } // LPOPObj 从列表中取出对象然后反序列化成对象 func LPOPObj(k string, i interface{}) error { _var, err := LPOP(k) if err != nil { return err } err = gjson.DecodeTo(_var.Bytes(), &i) if err != nil { return err } return nil } // RPOPObj 从列表尾部 取出对象然后反序列化成对象 func RPOPObj(k string, i interface{}) error { _var, err := RPOP(k) if err != nil { return err } err = gjson.DecodeTo(_var.Bytes(), &i) if err != nil { return err } return nil } // LPUSHObj 将对象序列化成json保存 func LPUSHObj(k string, i interface{}) (bool, error) { iData, err := gjson.Encode(i) // 判断 k 的尾号是 _syncMsg if err != nil { return false, err } ok, err := LPUSH(k, iData) if err != nil { return ok, err } return ok, nil } func LPUSH(k string, i interface{}) (bool, error) { lockKey := k + "_lock" maxRetries := 200 // 最大重试次数 retryInterval := 50 * time.Millisecond // 重试间隔时间为500毫秒 // 使用带重试机制的获取锁函数 acquireLockWithRetry(lockKey, 5*time.Second, retryInterval, maxRetries) defer ReleaseLock(lockKey) // 确保方法结束时释放锁 maxLength := 500 // 设置最大列表长度 redisConn := getRedisConn() defer redisConn.Close() result, err := redisConn.Do(REDIS_OPERATION_LIST_LPSUH, k, i) if err != nil { //logger.Errorln(err) return false, err } _len, err := LLEN(k) if err != nil { return true, err } if _len == 0 { return true, nil } // 如果列表长度超过 maxLength,则循环执行 LPOP 操作 for _len > int32(maxLength) { _, err = redisConn.Do(REDIS_OPERATION_LIST_LPOP, k) if err != nil { return false, err } _len = _len - int32(1) } return gconv.String(result) == "OK", nil } // LPOP 从列表中取出一个值 func LPOP(k string) (*g.Var, error) { redisConn := getRedisConn() defer redisConn.Close() r, err := redisConn.DoVar(REDIS_OPERATION_LIST_LPOP, k) if err != nil { return nil, err } return r, nil } // RPOP 从列表尾部 取出一个元素 func RPOP(k string) (*g.Var, error) { redisConn := getRedisConn() defer redisConn.Close() r, err := redisConn.DoVar(REDIS_OPERATION_LIST_RPOP, k) if err != nil { return nil, err } return r, nil } // LLEN 取列表长度 func LLEN(k string) (int32, error) { redisConn := getRedisConn() defer redisConn.Close() _var, err := redisConn.DoVar(REDIS_OPERATION_LIST_LLEN, k) if err != nil { //logger.Errorln(err) return 0, err } return _var.Int32(), nil } // PUBLISH 发布消息 func PUBLISH(k string, i interface{}) error { return PushQueue(k, i) } func SETExpirationObj(k string, i interface{}, expiration int64) error { redisConn := getRedisConn() defer redisConn.Close() iData, err := gjson.Encode(i) if err != nil { return err } var result interface{} if expiration > 0 { result, err = redisConn.Do(REDIS_OPERATION_SET, k, iData, "EX", expiration) } else { result, err = redisConn.Do(REDIS_OPERATION_SET, k, iData) } if err != nil { //logger.Errorln(err) return err } if gconv.String(result) == "OK" { return nil } return errors.New(gconv.String(result)) } func SETObj(k string, i interface{}) error { redisConn := getRedisConn() defer redisConn.Close() iData, err := gjson.Encode(i) if err != nil { return err } result, err := redisConn.Do(REDIS_OPERATION_SET, k, iData) if err != nil { //logger.Errorln(err) return err } if gconv.String(result) == "OK" { return nil } return errors.New(gconv.String(result)) } func GETObj(k string, i interface{}) error { redisConn := getRedisConn() defer redisConn.Close() _var, err := redisConn.Do(REDIS_OPERATION_GET, k) if err != nil { return err } err = gjson.DecodeTo(_var, &i) if err != nil { return err } return nil } func DelObj(k string) error { redisConn := getRedisConn() defer redisConn.Close() _, err := redisConn.Do(REDIS_OPERATION_DELETE, k) if err != nil { return err } return nil } func Exists(k string) (bool, error) { //检查是否存在key值 redisConn := getRedisConn() defer redisConn.Close() exists, err := redisConn.Do(REDIS_OPERATION_EXISTS, k) if err != nil { fmt.Println("illegal exception") return false, err } //fmt.Printf("exists or not: %v \n", exists) if exists.(int64) == 1 { return true, nil } return false, nil } func getRedisConn() *gredis.Conn { return g.Redis().GetConn() } // PublishSyncMsgWxMessage 发布微信消息; 这里就是接收到 微信后台 推送来的消息(包括其他用户私聊消息、群聊消息、收款消息等等) func PublishSyncMsgWxMessage(acc *srv.WXAccount, response table.SyncMessageResponse) error { if acc == nil { return errors.New("PublishSyncMsgWxMessage acc == nil") } response.UUID = acc.GetUserInfo().UUID response.UserName = acc.GetUserInfo().GetUserName() response.LoginState = acc.GetLoginState() response.Type = table.RedisPushSyncTypeWxMsg response.TargetIp = srvconfig.GlobalSetting.TargetIp if len(response.GetAddMsgs()) != 0 || len(response.GetContacts()) != 0 { if srvconfig.GlobalSetting.HttpSyncMsg { LPUSHObj(response.UUID+"_syncHttp", &response) //缓存reids } } return nil } // PublishFavItem 发布收藏消息 func PublishFavItem(acc *srv.WXAccount, favItem *baseinfo.FavItem) error { if acc == nil { return errors.New("PublishSyncMsgWxMessage acc == nil") } response := table.SyncMessageResponse{} response.UUID = acc.GetUserInfo().UUID response.UserName = acc.GetUserInfo().GetUserName() response.LoginState = acc.GetLoginState() response.TargetIp = srvconfig.GlobalSetting.TargetIp response.Type = table.RedisPushFavitem response.FavItem = favItem // 缓存reids if srvconfig.GlobalSetting.NewsSynWxId { // 缓存reids LPUSHObj(response.UUID+"_syncMsg", &response) return PUBLISH(response.UUID+"_wx_sync_msg_topic", &response) } return PUBLISH("wx_sync_msg_topic", &response) } // PublishSyncMsgLoginState 微信状态 func PublishSyncMsgLoginState(wxid string, state uint32) error { //fmt.Println("推送->wxid=="+wxid+"--->", state) response := table.SyncMessageResponse{} response.TargetIp = srvconfig.GlobalSetting.TargetIp response.Type = table.RedisPushSyncTypeLoginState response.UserName = wxid response.LoginState = state // 缓存reids if srvconfig.GlobalSetting.NewsSynWxId { // 缓存reids LPUSHObj(response.UUID+"_syncMsg", &response) return PUBLISH(response.UUID+"_wx_sync_msg_topic", &response) } return PUBLISH("wx_sync_msg_topic", &response) } // PublishWxInItOk 初始化完成 func PublishWxInItOk(UUID string, state uint32) error { if UUID == "" || state == 0 { return errors.New("uuid || state == nil") } response := table.SyncMessageResponse{} response.TargetIp = srvconfig.GlobalSetting.TargetIp response.Type = table.RedisPushWxInItOk response.LoginState = state response.UUID = UUID response.UserName = "初始化完成!" // 缓存reids if srvconfig.GlobalSetting.NewsSynWxId { _, _ = LPUSHObj(response.UUID+"_wx_sync_msg_topic", &response) return PUBLISH(response.UUID+"_wx_sync_msg_topic", &response) } return PUBLISH("wx_sync_msg_topic", &response) } // PublishSyncMsgCheckLogin 扫码结果 func PublishSyncMsgCheckLogin(UUID string, result *baseinfo.CheckLoginQrCodeResult) error { if UUID == "" || result == nil { return errors.New("uuid || result == nil") } response := table.SubMessageCheckLoginQrCode{} response.TargetIp = srvconfig.GlobalSetting.TargetIp response.Type = table.RedisPushSyncTypeCheckLogin response.CheckLoginResult = result response.UUID = UUID // 缓存reids if srvconfig.GlobalSetting.NewsSynWxId { _, _ = LPUSHObj(response.UUID+"_wx_sync_msg_topic", &response) return PUBLISH(response.UUID+"_wx_sync_msg_topic", &response) } return PUBLISH("wx_sync_msg_topic", &response) } // GETSyncMsg 获取指定号缓存在redis的消息 func GETSyncMsg(uuid string) (int, []*table.SyncMessageResponse, error) { _len, err := LLEN(uuid + "_syncMsg") if err != nil { return 0, nil, err } if _len == 0 { return 0, nil, nil } syncMsgLen := _len if syncMsgLen > 10 { syncMsgLen = 10 } opValues := []*table.SyncMessageResponse{} for i := int32(0); i <= syncMsgLen; i++ { opVal := &table.SyncMessageResponse{} err = LPOPObj(uuid+"_wx_sync_msg_topic", opVal) if err != nil { continue } opValues = append(opValues, opVal) } ContinueFlag := 0 if (_len % 10) > 0 { ContinueFlag = 1 } return ContinueFlag, opValues, nil } // PublishLicenseKey 使用状态, 主要是过期时间和开始时间 func PublishLicenseKey(licenseKey *table.LicenseKey) error { lockKey := "uuid_syncLicenseKey_lock" maxRetries := 200 // 最大重试次数 retryInterval := 50 * time.Millisecond // 重试间隔时间为500毫秒 // 使用带重试机制的获取锁函数 acquireLockWithRetry(lockKey, 5*time.Second, retryInterval, maxRetries) defer ReleaseLock(lockKey) // 确保方法结束时释放锁 // 缓存 reids LPUSHObj("uuid_syncLicenseKey", licenseKey) return nil } // HttpSyncLicenseKey HTTP-轮询 同步激活状态 获取 Redis 内的消息 func HttpSyncLicenseKey() ([]*table.LicenseKey, error) { lockKey := "uuid_syncLicenseKey_lock" maxRetries := 200 // 最大重试次数 retryInterval := 50 * time.Millisecond // 重试间隔时间为500毫秒 // 使用带重试机制的获取锁函数 acquireLockWithRetry(lockKey, 5*time.Second, retryInterval, maxRetries) defer ReleaseLock(lockKey) // 确保方法结束时释放锁 syncMsgLen, err := LLEN("uuid_syncLicenseKey") if err != nil { return nil, err } msgInfos := []*table.LicenseKey{} if !(syncMsgLen > int32(0)) { return msgInfos, nil } for i := int32(0); i < syncMsgLen; i++ { msg := &table.LicenseKey{} err = RPOPObj("uuid_syncLicenseKey", msg) if err != nil { continue } msgInfos = append(msgInfos, msg) } return msgInfos, nil } // HttpSyncMsy HTTP-轮询 同步消息时 获取 Redis 内的消息 func HttpSyncMsy(uuid string, count int) ([]*table.SyncMessageResponse, error) { lockKey := uuid + "_syncHttp_lock" maxRetries := 200 // 最大重试次数 retryInterval := 50 * time.Millisecond // 重试间隔时间为500毫秒 // 使用带重试机制的获取锁函数 acquireLockWithRetry(lockKey, 5*time.Second, retryInterval, maxRetries) defer ReleaseLock(lockKey) // 确保方法结束时释放锁 _len, err := LLEN(uuid + "_syncHttp") if err != nil { return nil, err } syncMsgLen := int32(count) if syncMsgLen == 0 { // count=0 时获取所有消息 syncMsgLen = _len } else if syncMsgLen > _len { syncMsgLen = _len } msgInfos := []*table.SyncMessageResponse{} if _len == 0 { return msgInfos, nil } for i := int32(0); i < syncMsgLen; i++ { msg := &table.SyncMessageResponse{} err = RPOPObj(uuid+"_syncHttp", msg) if err != nil { continue } msgInfos = append(msgInfos, msg) } return msgInfos, nil } // PushQueue 发送队例 func PushQueue(queueName string, i interface{}) (err error) { con := getRedisConn() defer con.Close() maxLength := 200 // 最大 200 条 if i == nil { return err } _, err = con.Do("rpush", queueName, gconv.String(i)) if err != nil { return fmt.Errorf("PushQueue redis error: %s", err) } // 使用 ltrim 裁剪列表,以保持其最大长度 _, err = con.Do("LTRIM", queueName, -maxLength, -1) return err } // ClearSyncMsgCache 清理所有以 nameKey(_syncMsg) 结尾的 Redis 缓存键 func ClearSyncMsgCache(nameKey string) error { redisConn := getRedisConn() defer redisConn.Close() // SCAN 命令用于增量迭代 key,效率较高 iter := 0 for { // 使用 redigo 的 Do 方法执行 SCAN 命令 reply, err := redis.Values(redisConn.Do("SCAN", iter, "MATCH", "*"+nameKey)) if err != nil { return fmt.Errorf("error during SCAN: %v", err) } // 解析出 SCAN 的结果集 var keys []string _, err = redis.Scan(reply, &iter, &keys) if err != nil { return fmt.Errorf("error parsing SCAN result: %v", err) } // 删除匹配的 key for _, key := range keys { _, err = redisConn.Do(REDIS_OPERATION_DELETE, key) if err != nil { return fmt.Errorf("error deleting key %s: %v", key, err) } } // iter 为 0 时表示 SCAN 已迭代完所有 key if iter == 0 { break } } return nil } const ( maxMessagesPerUser = 125 saveInterval = 5 * time.Second ) type userCache struct { mutex sync.RWMutex msgs map[string]struct{} } var ( msgCache = make(map[string]*userCache) // 存储每个用户的缓存和锁 globalMutex sync.RWMutex // 保证对 msgCache 的安全访问 ) // AddOrUpdateMsgId 添加消息ID到 Redis 集合,如果集合中不存在则更新 func AddOrUpdateMsgId(userName string, newMsgId string) (bool, error) { // 获取或创建用户缓存 userCache := getUserCache(userName) userCache.mutex.Lock() defer userCache.mutex.Unlock() // 检查消息 ID 是否存在 if _, exists := userCache.msgs[newMsgId]; exists { return false, nil } // 添加消息 ID 到缓存 userCache.msgs[newMsgId] = struct{}{} // 确保缓存不会超过 25 条 // 删除最旧的消息 ID if len(userCache.msgs) > maxMessagesPerUser { for msgID := range userCache.msgs { delete(userCache.msgs, msgID) if len(userCache.msgs) <= maxMessagesPerUser { break } } } return true, nil } func getUserCache(userName string) *userCache { // 使用全局锁来安全地访问或初始化用户缓存 globalMutex.RLock() cache, exists := msgCache[userName] globalMutex.RUnlock() if !exists { // 使用写锁来修改 msgCache globalMutex.Lock() defer globalMutex.Unlock() // 再次检查,防止竞争条件 if cache, exists = msgCache[userName]; !exists { cache = &userCache{ msgs: make(map[string]struct{}), mutex: sync.RWMutex{}, } // 从 Redis 加载用户消息 loadFromRedis(userName, cache.msgs) msgCache[userName] = cache } } return cache } func loadFromRedis(userName string, userMsgs map[string]struct{}) { redisConn := getRedisConn() defer redisConn.Close() key := fmt.Sprintf("user:%s:msg_ids", userName) msgs, err := redis.Strings(redisConn.Do("SMEMBERS", key)) if err != nil { return } for _, msgID := range msgs { userMsgs[msgID] = struct{}{} } } func saveToRedis() { // 锁定整个操作以安全地遍历 msgCache globalMutex.RLock() defer globalMutex.RUnlock() redisConn := getRedisConn() defer redisConn.Close() for userName, userCache := range msgCache { // 锁定每个用户的缓存数据 userCache.mutex.RLock() key := fmt.Sprintf("user:%s:msg_ids", userName) msgIDs := make([]interface{}, 0, len(userCache.msgs)+1) msgIDs = append(msgIDs, key) for msgID := range userCache.msgs { msgIDs = append(msgIDs, msgID) } // 更新 Redis redisConn.Do("DEL", key) _, err := redisConn.Do("SADD", msgIDs...) if err != nil { fmt.Println("Error writing to Redis:", err) } userCache.mutex.RUnlock() } } func scheduleSaveToRedis() { ticker := time.NewTicker(saveInterval) for range ticker.C { saveToRedis() } }