package wxtask import ( "encoding/json" "fmt" "net/http" "regexp" "strconv" "sync" "xiawan/wx/protobuf/wechat" "xiawan/wx/srv/wxface" "github.com/gorilla/websocket" ) // 全局写锁映射,保护每个连接的写操作 var connWriteMutexes sync.Map // decodeUnicodeEscapes 解码Unicode转义字符 func decodeUnicodeEscapes(s string) string { // 处理JSON中的Unicode转义字符,如\u003c re := regexp.MustCompile(`\\u([0-9a-fA-F]{4})`) result := re.ReplaceAllStringFunc(s, func(match string) string { // 提取十六进制数字部分 hexStr := match[2:] // 去掉\u前缀 // 将十六进制转换为整数 if codePoint, err := strconv.ParseInt(hexStr, 16, 32); err == nil { // 转换为Unicode字符 return string(rune(codePoint)) } return match // 如果转换失败,返回原字符串 }) return result } // 长链接同步消息 type WXSocketMsgTask struct { WxConn wxface.IWXConnect // 待消费消息列表 MsgItemChan chan *wechat.AddMsg // 可不可以消费下一条 canDoNext chan bool // 结束标志 EndChan chan bool // 是否正在运行 isRunning bool // 是否启用WebSocket wsEnabled bool // 当前在消费的消息 CurrentMsgItem *wechat.AddMsg // 存储所有连接的客户端 Clients map[string]*websocket.Conn Upgrader websocket.Upgrader // 定义一个互斥锁; 用于保护 Clients 的线程安全 //mu sync.Mutex // 定义一个读写互斥锁; 用于保护 Clients 的线程安全 mu sync.RWMutex } // NewWXSocketMsgTask 新建抢红包管理器 func NewWXSocketMsgTask(wxConn wxface.IWXConnect) *WXSocketMsgTask { return &WXSocketMsgTask{ WxConn: wxConn, MsgItemChan: make(chan *wechat.AddMsg, 1000), canDoNext: make(chan bool, 1), EndChan: make(chan bool, 1), isRunning: false, wsEnabled: true, // 默认启用WebSocket功能,避免连接后延迟 Clients: make(map[string]*websocket.Conn), Upgrader: websocket.Upgrader{ ReadBufferSize: 1024, WriteBufferSize: 1024, // 允许所有CORS跨域请求 CheckOrigin: func(r *http.Request) bool { return true }, }, mu: sync.RWMutex{}, } } func (wsmsg *WXSocketMsgTask) grapMsg() { for { select { case wsmsg.CurrentMsgItem = <-wsmsg.MsgItemChan: key := wsmsg.WxConn.GetWXAccount().GetUserInfo().UUID // 使用消息包装器包含UUID信息 msgWrapper := NewMessageWrapper(key, wsmsg.CurrentMsgItem, "message") jsonBytes, err := json.Marshal(msgWrapper) if err != nil { fmt.Println("消息序列化错误:", err) continue // 继续处理下一条消息,而不是返回 } conn := wsmsg.GetWebSocket(key) if conn == nil { fmt.Println("WebSocket连接不存在:", key) continue // 继续处理下一条消息,而不是返回 } // 对JSON内容进行Unicode解码处理 decodedJSON := decodeUnicodeEscapes(string(jsonBytes)) err = PushMessageToClient(decodedJSON, conn) if err != nil { fmt.Println("消息推送失败:", err) wsmsg.DeleteWebSocket(key) continue // 继续处理下一条消息,而不是返回 } // 消息推送成功后继续处理下一条 case <-wsmsg.EndChan: wsmsg.DeleteWebSocket(wsmsg.WxConn.GetWXAccount().GetUserInfo().UUID) return } } } // Start 开始 func (wsmsg *WXSocketMsgTask) Start() { if wsmsg.wsEnabled { if wsmsg.isRunning { return } wsmsg.isRunning = true go wsmsg.grapMsg() // 直接启动消息处理循环 } } // Stop 结束协程 func (wsmsg *WXSocketMsgTask) Stop() { if !wsmsg.isRunning { return } wsmsg.isRunning = false wsmsg.EndChan <- true } // GrapNext 抢下一个 func (wsmsg *WXSocketMsgTask) GrapNext() { wsmsg.canDoNext <- true } // AddMsgItem 添加消息项 func (wsmsg *WXSocketMsgTask) AddMsgItem(msgItem *wechat.AddMsg) { if !wsmsg.wsEnabled { return } if !wsmsg.isRunning { wsmsg.Start() } key := wsmsg.WxConn.GetWXAccount().GetUserInfo().UUID conn := wsmsg.GetWebSocket(key) // 打印日志 // fmt.Println("------------收到消息-----------:\n", // "NickName: ",wsmsg.WxConn.GetWXAccount().GetUserInfo().NickName,"\n", // "Key: ",key,"\n", // "Content: ",msgItem.Content.GetStr(), // "\n", // "----------------------------------") if conn == nil { // 只有 ws 长链接存在才会添加 收到的消息记录 // fmt.Println("WebSocket连接不存在,消息将不会发送:", key) return } wsmsg.MsgItemChan <- msgItem } // GetCurrentMsgItem 获取当前正在消费的消息项 func (wsmsg *WXSocketMsgTask) GetCurrentMsgItem() *wechat.AddMsg { return wsmsg.CurrentMsgItem } func PushMessageToClient(msg string, conn *websocket.Conn) error { // 获取或创建当前连接的写锁 val, _ := connWriteMutexes.LoadOrStore(conn, &sync.Mutex{}) mutex := val.(*sync.Mutex) // 使用互斥锁保护写操作 mutex.Lock() defer mutex.Unlock() // 使用 WebSocket 连接向客户端发送消息 fmt.Println("发送消息:", msg) err := conn.WriteMessage(websocket.TextMessage, []byte(msg)) if err != nil { // 如果发送错误,清理锁 connWriteMutexes.Delete(conn) } return err } // PushTypedMessageToClient 安全地发送特定类型的WebSocket消息 func PushTypedMessageToClient(messageType int, data []byte, conn *websocket.Conn) error { // 获取或创建当前连接的写锁 val, _ := connWriteMutexes.LoadOrStore(conn, &sync.Mutex{}) mutex := val.(*sync.Mutex) // 使用互斥锁保护写操作 mutex.Lock() defer mutex.Unlock() // 使用 WebSocket 连接向客户端发送消息 fmt.Printf("发送类型[%d]消息\n", messageType) err := conn.WriteMessage(messageType, data) if err != nil { // 如果发送错误,清理锁 connWriteMutexes.Delete(conn) } return err } // GetWebSocket 获取 Clients 缓存中的 ws 长链接 func (wsmsg *WXSocketMsgTask) GetWebSocket(key string) (conn *websocket.Conn) { wsmsg.mu.RLock() // 读取 Clients 时使用读锁 defer wsmsg.mu.RUnlock() // 解锁读锁 conn = wsmsg.Clients[key] // 读取 ws 长链接; 若 key 不存在则返回 nil return conn } // UpdateWebSocket 更新 Clients 缓存中的 ws 长链接 func (wsmsg *WXSocketMsgTask) UpdateWebSocket(key string, conn *websocket.Conn) { wsmsg.mu.Lock() // 写入或删除 Clients 时使用写锁 defer wsmsg.mu.Unlock() // 解锁 wsmsg.Clients[key] = conn // 更新 ws 长链接 wsmsg.wsEnabled = true // 当连接建立时启用WebSocket功能 fmt.Println("WebSocket连接已更新:", key) } // DeleteWebSocket 删除 Clients 缓存中的 ws 长链接 func (wsmsg *WXSocketMsgTask) DeleteWebSocket(key string) { wsmsg.mu.Lock() // 写入或删除 Clients 时使用写锁 defer wsmsg.mu.Unlock() // 解锁 if conn, ok := wsmsg.Clients[key]; ok { // 清理连接的写锁 connWriteMutexes.Delete(conn) delete(wsmsg.Clients, key) // 删除 ws 长链接 fmt.Println("WebSocket连接已删除:", key) // 如果没有连接,禁用WebSocket功能并停止任务 if len(wsmsg.Clients) == 0 { wsmsg.wsEnabled = false if wsmsg.isRunning { wsmsg.Stop() } } } } // 获取WebSocket启用状态 func (wsmsg *WXSocketMsgTask) IsWebSocketEnabled() bool { return wsmsg.wsEnabled } // 设置WebSocket启用状态 func (wsmsg *WXSocketMsgTask) SetWebSocketEnabled(enabled bool) { wsmsg.wsEnabled = enabled // 如果禁用而且正在运行,停止任务 if !enabled && wsmsg.isRunning { wsmsg.Stop() } else if enabled && !wsmsg.isRunning && len(wsmsg.Clients) > 0 { // 如果启用而且有客户端连接但未运行,启动任务 wsmsg.Start() } }