Files
wechat_ipad_pro/db/redisOperation.go
2026-02-17 13:06:23 +08:00

708 lines
18 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
/**
* @author mii
* @date 2020/2/29 0029
*/
package db
import (
"errors"
"fmt"
"sync"
"time"
"xiawan/wx/clientsdk/baseinfo"
"xiawan/wx/db/table"
"xiawan/wx/srv"
"xiawan/wx/srv/srvconfig"
"github.com/gogf/gf/database/gredis"
"github.com/gogf/gf/encoding/gjson"
"github.com/gogf/gf/frame/g"
"github.com/gogf/gf/util/gconv"
"github.com/gomodule/redigo/redis"
)
const (
REDIS_OPERATION_SET = "SET"
REDIS_OPERATION_GET = "GET"
REDIS_OPERATION_LIST_LLEN = "llen"
REDIS_OPERATION_LIST_LPSUH = "LPUSH"
REDIS_OPERATION_LIST_LPOP = "lpop"
REDIS_OPERATION_LIST_RPOP = "rpop"
REDIS_OPERATION_LIST_LTRIM = "ltrim"
REDIS_OPERATION_CHANNEL_PUBLISH = "publish"
REDIS_OPERATION_EXISTS = "EXISTS"
REDIS_OPERATION_DELETE = "DEL"
)
const (
DEFAULT_GROUP_NAME = "default" // Default configuration group name.
DEFAULT_REDIS_PORT = 6379 // Default redis port configuration if not passed.
)
type Obj struct {
Value interface{}
}
func RedisSetup() {
//配置redis
// MaxIdle: 10, // 设置最大空闲连接数
// MaxActive: 100, // 设置最大活跃连接数
// IdleTimeout: 300, // 空闲连接超时时间(单位:秒)
// MaxConnLifetime: 300, // 连接最大存活期(单位:秒)
// WaitTimeout: 5, // 连接等待超时时间(单位:秒)
// ReadTimeout: 10, // 读操作超时时间(单位:秒)
// WriteTimeout: 10, // 写操作超时时间(单位:秒)
gredis.SetConfig(srvconfig.GlobalSetting.RedisConfig, DEFAULT_GROUP_NAME)
conn := g.Redis().GetConn()
err := conn.Err()
if err != nil {
//logger.Errorln(err)
fmt.Println("failed to connect Redis:", err)
}
fmt.Println("connect Redis success")
go scheduleSaveToRedis()
}
// 尝试获取锁
func AcquireLock(key string, timeout time.Duration) (bool, error) {
redisConn := getRedisConn()
defer redisConn.Close()
lockValue := fmt.Sprintf("%d", time.Now().UnixNano()) // Unique lock value
result, err := redisConn.Do(REDIS_OPERATION_SET, key, lockValue, "NX", "PX", int64(timeout.Seconds()*1000))
if err != nil {
return false, err
}
return result != nil, nil
}
// 释放锁
func ReleaseLock(key string) error {
redisConn := getRedisConn()
defer redisConn.Close()
_, err := redisConn.Do(REDIS_OPERATION_DELETE, key)
if err != nil {
return err
}
return nil
}
// acquireLockWithRetry 尝试获取锁,带有重试机制
func acquireLockWithRetry(key string, timeout time.Duration, retryInterval time.Duration, maxRetries int) (bool, error) {
for i := 0; i < maxRetries; i++ {
lockAcquired, err := AcquireLock(key, timeout)
if err != nil {
return false, err
}
if lockAcquired {
return true, nil
}
time.Sleep(retryInterval) // 等待一段时间后重试
}
return false, fmt.Errorf("could not acquire lock after %d retries", maxRetries)
}
// CreateSyncMsgList 创建一个同步信息保存列表
func CreateSyncMsgList(exId string) bool {
ok, err := LPUSH(exId, "撒大声地")
if err != nil {
return false
}
return ok
}
// LPOPObj 从列表中取出对象然后反序列化成对象
func LPOPObj(k string, i interface{}) error {
_var, err := LPOP(k)
if err != nil {
return err
}
err = gjson.DecodeTo(_var.Bytes(), &i)
if err != nil {
return err
}
return nil
}
// RPOPObj 从列表尾部 取出对象然后反序列化成对象
func RPOPObj(k string, i interface{}) error {
_var, err := RPOP(k)
if err != nil {
return err
}
err = gjson.DecodeTo(_var.Bytes(), &i)
if err != nil {
return err
}
return nil
}
// LPUSHObj 将对象序列化成json保存
func LPUSHObj(k string, i interface{}) (bool, error) {
iData, err := gjson.Encode(i)
// 判断 k 的尾号是 _syncMsg
if err != nil {
return false, err
}
ok, err := LPUSH(k, iData)
if err != nil {
return ok, err
}
return ok, nil
}
func LPUSH(k string, i interface{}) (bool, error) {
lockKey := k + "_lock"
maxRetries := 200 // 最大重试次数
retryInterval := 50 * time.Millisecond // 重试间隔时间为500毫秒
// 使用带重试机制的获取锁函数
acquireLockWithRetry(lockKey, 5*time.Second, retryInterval, maxRetries)
defer ReleaseLock(lockKey) // 确保方法结束时释放锁
maxLength := 500 // 设置最大列表长度
redisConn := getRedisConn()
defer redisConn.Close()
result, err := redisConn.Do(REDIS_OPERATION_LIST_LPSUH, k, i)
if err != nil {
//logger.Errorln(err)
return false, err
}
_len, err := LLEN(k)
if err != nil {
return true, err
}
if _len == 0 {
return true, nil
}
// 如果列表长度超过 maxLength则循环执行 LPOP 操作
for _len > int32(maxLength) {
_, err = redisConn.Do(REDIS_OPERATION_LIST_LPOP, k)
if err != nil {
return false, err
}
_len = _len - int32(1)
}
return gconv.String(result) == "OK", nil
}
// LPOP 从列表中取出一个值
func LPOP(k string) (*g.Var, error) {
redisConn := getRedisConn()
defer redisConn.Close()
r, err := redisConn.DoVar(REDIS_OPERATION_LIST_LPOP, k)
if err != nil {
return nil, err
}
return r, nil
}
// RPOP 从列表尾部 取出一个元素
func RPOP(k string) (*g.Var, error) {
redisConn := getRedisConn()
defer redisConn.Close()
r, err := redisConn.DoVar(REDIS_OPERATION_LIST_RPOP, k)
if err != nil {
return nil, err
}
return r, nil
}
// LLEN 取列表长度
func LLEN(k string) (int32, error) {
redisConn := getRedisConn()
defer redisConn.Close()
_var, err := redisConn.DoVar(REDIS_OPERATION_LIST_LLEN, k)
if err != nil {
//logger.Errorln(err)
return 0, err
}
return _var.Int32(), nil
}
// PUBLISH 发布消息
func PUBLISH(k string, i interface{}) error {
return PushQueue(k, i)
}
func SETExpirationObj(k string, i interface{}, expiration int64) error {
redisConn := getRedisConn()
defer redisConn.Close()
iData, err := gjson.Encode(i)
if err != nil {
return err
}
var result interface{}
if expiration > 0 {
result, err = redisConn.Do(REDIS_OPERATION_SET, k, iData, "EX", expiration)
} else {
result, err = redisConn.Do(REDIS_OPERATION_SET, k, iData)
}
if err != nil {
//logger.Errorln(err)
return err
}
if gconv.String(result) == "OK" {
return nil
}
return errors.New(gconv.String(result))
}
func SETObj(k string, i interface{}) error {
redisConn := getRedisConn()
defer redisConn.Close()
iData, err := gjson.Encode(i)
if err != nil {
return err
}
result, err := redisConn.Do(REDIS_OPERATION_SET, k, iData)
if err != nil {
//logger.Errorln(err)
return err
}
if gconv.String(result) == "OK" {
return nil
}
return errors.New(gconv.String(result))
}
func GETObj(k string, i interface{}) error {
redisConn := getRedisConn()
defer redisConn.Close()
_var, err := redisConn.Do(REDIS_OPERATION_GET, k)
if err != nil {
return err
}
err = gjson.DecodeTo(_var, &i)
if err != nil {
return err
}
return nil
}
func DelObj(k string) error {
redisConn := getRedisConn()
defer redisConn.Close()
_, err := redisConn.Do(REDIS_OPERATION_DELETE, k)
if err != nil {
return err
}
return nil
}
func Exists(k string) (bool, error) {
//检查是否存在key值
redisConn := getRedisConn()
defer redisConn.Close()
exists, err := redisConn.Do(REDIS_OPERATION_EXISTS, k)
if err != nil {
fmt.Println("illegal exception")
return false, err
}
//fmt.Printf("exists or not: %v \n", exists)
if exists.(int64) == 1 {
return true, nil
}
return false, nil
}
func getRedisConn() *gredis.Conn {
return g.Redis().GetConn()
}
// PublishSyncMsgWxMessage 发布微信消息; 这里就是接收到 微信后台 推送来的消息(包括其他用户私聊消息、群聊消息、收款消息等等)
func PublishSyncMsgWxMessage(acc *srv.WXAccount, response table.SyncMessageResponse) error {
if acc == nil {
return errors.New("PublishSyncMsgWxMessage acc == nil")
}
response.UUID = acc.GetUserInfo().UUID
response.UserName = acc.GetUserInfo().GetUserName()
response.LoginState = acc.GetLoginState()
response.Type = table.RedisPushSyncTypeWxMsg
response.TargetIp = srvconfig.GlobalSetting.TargetIp
if len(response.GetAddMsgs()) != 0 || len(response.GetContacts()) != 0 {
if srvconfig.GlobalSetting.HttpSyncMsg {
LPUSHObj(response.UUID+"_syncHttp", &response) //缓存reids
}
}
return nil
}
// PublishFavItem 发布收藏消息
func PublishFavItem(acc *srv.WXAccount, favItem *baseinfo.FavItem) error {
if acc == nil {
return errors.New("PublishSyncMsgWxMessage acc == nil")
}
response := table.SyncMessageResponse{}
response.UUID = acc.GetUserInfo().UUID
response.UserName = acc.GetUserInfo().GetUserName()
response.LoginState = acc.GetLoginState()
response.TargetIp = srvconfig.GlobalSetting.TargetIp
response.Type = table.RedisPushFavitem
response.FavItem = favItem
// 缓存reids
if srvconfig.GlobalSetting.NewsSynWxId {
// 缓存reids
LPUSHObj(response.UUID+"_syncMsg", &response)
return PUBLISH(response.UUID+"_wx_sync_msg_topic", &response)
}
return PUBLISH("wx_sync_msg_topic", &response)
}
// PublishSyncMsgLoginState 微信状态
func PublishSyncMsgLoginState(wxid string, state uint32) error {
//fmt.Println("推送->wxid=="+wxid+"--->", state)
response := table.SyncMessageResponse{}
response.TargetIp = srvconfig.GlobalSetting.TargetIp
response.Type = table.RedisPushSyncTypeLoginState
response.UserName = wxid
response.LoginState = state
// 缓存reids
if srvconfig.GlobalSetting.NewsSynWxId {
// 缓存reids
LPUSHObj(response.UUID+"_syncMsg", &response)
return PUBLISH(response.UUID+"_wx_sync_msg_topic", &response)
}
return PUBLISH("wx_sync_msg_topic", &response)
}
// PublishWxInItOk 初始化完成
func PublishWxInItOk(UUID string, state uint32) error {
if UUID == "" || state == 0 {
return errors.New("uuid || state == nil")
}
response := table.SyncMessageResponse{}
response.TargetIp = srvconfig.GlobalSetting.TargetIp
response.Type = table.RedisPushWxInItOk
response.LoginState = state
response.UUID = UUID
response.UserName = "初始化完成!"
// 缓存reids
if srvconfig.GlobalSetting.NewsSynWxId {
_, _ = LPUSHObj(response.UUID+"_wx_sync_msg_topic", &response)
return PUBLISH(response.UUID+"_wx_sync_msg_topic", &response)
}
return PUBLISH("wx_sync_msg_topic", &response)
}
// PublishSyncMsgCheckLogin 扫码结果
func PublishSyncMsgCheckLogin(UUID string, result *baseinfo.CheckLoginQrCodeResult) error {
if UUID == "" || result == nil {
return errors.New("uuid || result == nil")
}
response := table.SubMessageCheckLoginQrCode{}
response.TargetIp = srvconfig.GlobalSetting.TargetIp
response.Type = table.RedisPushSyncTypeCheckLogin
response.CheckLoginResult = result
response.UUID = UUID
// 缓存reids
if srvconfig.GlobalSetting.NewsSynWxId {
_, _ = LPUSHObj(response.UUID+"_wx_sync_msg_topic", &response)
return PUBLISH(response.UUID+"_wx_sync_msg_topic", &response)
}
return PUBLISH("wx_sync_msg_topic", &response)
}
// GETSyncMsg 获取指定号缓存在redis的消息
func GETSyncMsg(uuid string) (int, []*table.SyncMessageResponse, error) {
_len, err := LLEN(uuid + "_syncMsg")
if err != nil {
return 0, nil, err
}
if _len == 0 {
return 0, nil, nil
}
syncMsgLen := _len
if syncMsgLen > 10 {
syncMsgLen = 10
}
opValues := []*table.SyncMessageResponse{}
for i := int32(0); i <= syncMsgLen; i++ {
opVal := &table.SyncMessageResponse{}
err = LPOPObj(uuid+"_wx_sync_msg_topic", opVal)
if err != nil {
continue
}
opValues = append(opValues, opVal)
}
ContinueFlag := 0
if (_len % 10) > 0 {
ContinueFlag = 1
}
return ContinueFlag, opValues, nil
}
// PublishLicenseKey 使用状态, 主要是过期时间和开始时间
func PublishLicenseKey(licenseKey *table.LicenseKey) error {
lockKey := "uuid_syncLicenseKey_lock"
maxRetries := 200 // 最大重试次数
retryInterval := 50 * time.Millisecond // 重试间隔时间为500毫秒
// 使用带重试机制的获取锁函数
acquireLockWithRetry(lockKey, 5*time.Second, retryInterval, maxRetries)
defer ReleaseLock(lockKey) // 确保方法结束时释放锁
// 缓存 reids
LPUSHObj("uuid_syncLicenseKey", licenseKey)
return nil
}
// HttpSyncLicenseKey HTTP-轮询 同步激活状态 获取 Redis 内的消息
func HttpSyncLicenseKey() ([]*table.LicenseKey, error) {
lockKey := "uuid_syncLicenseKey_lock"
maxRetries := 200 // 最大重试次数
retryInterval := 50 * time.Millisecond // 重试间隔时间为500毫秒
// 使用带重试机制的获取锁函数
acquireLockWithRetry(lockKey, 5*time.Second, retryInterval, maxRetries)
defer ReleaseLock(lockKey) // 确保方法结束时释放锁
syncMsgLen, err := LLEN("uuid_syncLicenseKey")
if err != nil {
return nil, err
}
msgInfos := []*table.LicenseKey{}
if !(syncMsgLen > int32(0)) {
return msgInfos, nil
}
for i := int32(0); i < syncMsgLen; i++ {
msg := &table.LicenseKey{}
err = RPOPObj("uuid_syncLicenseKey", msg)
if err != nil {
continue
}
msgInfos = append(msgInfos, msg)
}
return msgInfos, nil
}
// HttpSyncMsy HTTP-轮询 同步消息时 获取 Redis 内的消息
func HttpSyncMsy(uuid string, count int) ([]*table.SyncMessageResponse, error) {
lockKey := uuid + "_syncHttp_lock"
maxRetries := 200 // 最大重试次数
retryInterval := 50 * time.Millisecond // 重试间隔时间为500毫秒
// 使用带重试机制的获取锁函数
acquireLockWithRetry(lockKey, 5*time.Second, retryInterval, maxRetries)
defer ReleaseLock(lockKey) // 确保方法结束时释放锁
_len, err := LLEN(uuid + "_syncHttp")
if err != nil {
return nil, err
}
syncMsgLen := int32(count)
if syncMsgLen == 0 { // count=0 时获取所有消息
syncMsgLen = _len
} else if syncMsgLen > _len {
syncMsgLen = _len
}
msgInfos := []*table.SyncMessageResponse{}
if _len == 0 {
return msgInfos, nil
}
for i := int32(0); i < syncMsgLen; i++ {
msg := &table.SyncMessageResponse{}
err = RPOPObj(uuid+"_syncHttp", msg)
if err != nil {
continue
}
msgInfos = append(msgInfos, msg)
}
return msgInfos, nil
}
// PushQueue 发送队例
func PushQueue(queueName string, i interface{}) (err error) {
con := getRedisConn()
defer con.Close()
maxLength := 200 // 最大 200 条
if i == nil {
return err
}
_, err = con.Do("rpush", queueName, gconv.String(i))
if err != nil {
return fmt.Errorf("PushQueue redis error: %s", err)
}
// 使用 ltrim 裁剪列表,以保持其最大长度
_, err = con.Do("LTRIM", queueName, -maxLength, -1)
return err
}
// ClearSyncMsgCache 清理所有以 nameKey(_syncMsg) 结尾的 Redis 缓存键
func ClearSyncMsgCache(nameKey string) error {
redisConn := getRedisConn()
defer redisConn.Close()
// SCAN 命令用于增量迭代 key效率较高
iter := 0
for {
// 使用 redigo 的 Do 方法执行 SCAN 命令
reply, err := redis.Values(redisConn.Do("SCAN", iter, "MATCH", "*"+nameKey))
if err != nil {
return fmt.Errorf("error during SCAN: %v", err)
}
// 解析出 SCAN 的结果集
var keys []string
_, err = redis.Scan(reply, &iter, &keys)
if err != nil {
return fmt.Errorf("error parsing SCAN result: %v", err)
}
// 删除匹配的 key
for _, key := range keys {
_, err = redisConn.Do(REDIS_OPERATION_DELETE, key)
if err != nil {
return fmt.Errorf("error deleting key %s: %v", key, err)
}
}
// iter 为 0 时表示 SCAN 已迭代完所有 key
if iter == 0 {
break
}
}
return nil
}
const (
maxMessagesPerUser = 125
saveInterval = 5 * time.Second
)
type userCache struct {
mutex sync.RWMutex
msgs map[string]struct{}
}
var (
msgCache = make(map[string]*userCache) // 存储每个用户的缓存和锁
globalMutex sync.RWMutex // 保证对 msgCache 的安全访问
)
// AddOrUpdateMsgId 添加消息ID到 Redis 集合,如果集合中不存在则更新
func AddOrUpdateMsgId(userName string, newMsgId string) (bool, error) {
// 获取或创建用户缓存
userCache := getUserCache(userName)
userCache.mutex.Lock()
defer userCache.mutex.Unlock()
// 检查消息 ID 是否存在
if _, exists := userCache.msgs[newMsgId]; exists {
return false, nil
}
// 添加消息 ID 到缓存
userCache.msgs[newMsgId] = struct{}{}
// 确保缓存不会超过 25 条 // 删除最旧的消息 ID
if len(userCache.msgs) > maxMessagesPerUser {
for msgID := range userCache.msgs {
delete(userCache.msgs, msgID)
if len(userCache.msgs) <= maxMessagesPerUser {
break
}
}
}
return true, nil
}
func getUserCache(userName string) *userCache {
// 使用全局锁来安全地访问或初始化用户缓存
globalMutex.RLock()
cache, exists := msgCache[userName]
globalMutex.RUnlock()
if !exists {
// 使用写锁来修改 msgCache
globalMutex.Lock()
defer globalMutex.Unlock()
// 再次检查,防止竞争条件
if cache, exists = msgCache[userName]; !exists {
cache = &userCache{
msgs: make(map[string]struct{}),
mutex: sync.RWMutex{},
}
// 从 Redis 加载用户消息
loadFromRedis(userName, cache.msgs)
msgCache[userName] = cache
}
}
return cache
}
func loadFromRedis(userName string, userMsgs map[string]struct{}) {
redisConn := getRedisConn()
defer redisConn.Close()
key := fmt.Sprintf("user:%s:msg_ids", userName)
msgs, err := redis.Strings(redisConn.Do("SMEMBERS", key))
if err != nil {
return
}
for _, msgID := range msgs {
userMsgs[msgID] = struct{}{}
}
}
func saveToRedis() {
// 锁定整个操作以安全地遍历 msgCache
globalMutex.RLock()
defer globalMutex.RUnlock()
redisConn := getRedisConn()
defer redisConn.Close()
for userName, userCache := range msgCache {
// 锁定每个用户的缓存数据
userCache.mutex.RLock()
key := fmt.Sprintf("user:%s:msg_ids", userName)
msgIDs := make([]interface{}, 0, len(userCache.msgs)+1)
msgIDs = append(msgIDs, key)
for msgID := range userCache.msgs {
msgIDs = append(msgIDs, msgID)
}
// 更新 Redis
redisConn.Do("DEL", key)
_, err := redisConn.Do("SADD", msgIDs...)
if err != nil {
fmt.Println("Error writing to Redis:", err)
}
userCache.mutex.RUnlock()
}
}
func scheduleSaveToRedis() {
ticker := time.NewTicker(saveInterval)
for range ticker.C {
saveToRedis()
}
}