269 lines
7.4 KiB
Go
269 lines
7.4 KiB
Go
package wxtask
|
||
|
||
import (
|
||
"encoding/json"
|
||
"fmt"
|
||
"net/http"
|
||
"regexp"
|
||
"strconv"
|
||
"sync"
|
||
"xiawan/wx/protobuf/wechat"
|
||
"xiawan/wx/srv/wxface"
|
||
|
||
"github.com/gorilla/websocket"
|
||
)
|
||
|
||
// 全局写锁映射,保护每个连接的写操作
|
||
var connWriteMutexes sync.Map
|
||
|
||
// decodeUnicodeEscapes 解码Unicode转义字符
|
||
func decodeUnicodeEscapes(s string) string {
|
||
// 处理JSON中的Unicode转义字符,如\u003c
|
||
re := regexp.MustCompile(`\\u([0-9a-fA-F]{4})`)
|
||
result := re.ReplaceAllStringFunc(s, func(match string) string {
|
||
// 提取十六进制数字部分
|
||
hexStr := match[2:] // 去掉\u前缀
|
||
// 将十六进制转换为整数
|
||
if codePoint, err := strconv.ParseInt(hexStr, 16, 32); err == nil {
|
||
// 转换为Unicode字符
|
||
return string(rune(codePoint))
|
||
}
|
||
return match // 如果转换失败,返回原字符串
|
||
})
|
||
|
||
return result
|
||
}
|
||
|
||
// 长链接同步消息
|
||
type WXSocketMsgTask struct {
|
||
WxConn wxface.IWXConnect
|
||
// 待消费消息列表
|
||
MsgItemChan chan *wechat.AddMsg
|
||
// 可不可以消费下一条
|
||
canDoNext chan bool
|
||
// 结束标志
|
||
EndChan chan bool
|
||
// 是否正在运行
|
||
isRunning bool
|
||
// 是否启用WebSocket
|
||
wsEnabled bool
|
||
// 当前在消费的消息
|
||
CurrentMsgItem *wechat.AddMsg
|
||
// 存储所有连接的客户端
|
||
Clients map[string]*websocket.Conn
|
||
Upgrader websocket.Upgrader
|
||
|
||
// 定义一个互斥锁; 用于保护 Clients 的线程安全
|
||
//mu sync.Mutex
|
||
// 定义一个读写互斥锁; 用于保护 Clients 的线程安全
|
||
mu sync.RWMutex
|
||
}
|
||
|
||
// NewWXSocketMsgTask 新建抢红包管理器
|
||
func NewWXSocketMsgTask(wxConn wxface.IWXConnect) *WXSocketMsgTask {
|
||
return &WXSocketMsgTask{
|
||
WxConn: wxConn,
|
||
MsgItemChan: make(chan *wechat.AddMsg, 1000),
|
||
canDoNext: make(chan bool, 1),
|
||
EndChan: make(chan bool, 1),
|
||
isRunning: false,
|
||
wsEnabled: true, // 默认启用WebSocket功能,避免连接后延迟
|
||
Clients: make(map[string]*websocket.Conn),
|
||
Upgrader: websocket.Upgrader{
|
||
ReadBufferSize: 1024,
|
||
WriteBufferSize: 1024,
|
||
// 允许所有CORS跨域请求
|
||
CheckOrigin: func(r *http.Request) bool {
|
||
return true
|
||
},
|
||
},
|
||
|
||
mu: sync.RWMutex{},
|
||
}
|
||
}
|
||
|
||
func (wsmsg *WXSocketMsgTask) grapMsg() {
|
||
for {
|
||
select {
|
||
case wsmsg.CurrentMsgItem = <-wsmsg.MsgItemChan:
|
||
key := wsmsg.WxConn.GetWXAccount().GetUserInfo().UUID
|
||
|
||
// 使用消息包装器包含UUID信息
|
||
msgWrapper := NewMessageWrapper(key, wsmsg.CurrentMsgItem, "message")
|
||
jsonBytes, err := json.Marshal(msgWrapper)
|
||
if err != nil {
|
||
fmt.Println("消息序列化错误:", err)
|
||
continue // 继续处理下一条消息,而不是返回
|
||
}
|
||
|
||
conn := wsmsg.GetWebSocket(key)
|
||
if conn == nil {
|
||
fmt.Println("WebSocket连接不存在:", key)
|
||
continue // 继续处理下一条消息,而不是返回
|
||
}
|
||
// 对JSON内容进行Unicode解码处理
|
||
decodedJSON := decodeUnicodeEscapes(string(jsonBytes))
|
||
err = PushMessageToClient(decodedJSON, conn)
|
||
if err != nil {
|
||
fmt.Println("消息推送失败:", err)
|
||
wsmsg.DeleteWebSocket(key)
|
||
continue // 继续处理下一条消息,而不是返回
|
||
}
|
||
// 消息推送成功后继续处理下一条
|
||
case <-wsmsg.EndChan:
|
||
wsmsg.DeleteWebSocket(wsmsg.WxConn.GetWXAccount().GetUserInfo().UUID)
|
||
return
|
||
}
|
||
}
|
||
}
|
||
|
||
// Start 开始
|
||
func (wsmsg *WXSocketMsgTask) Start() {
|
||
if wsmsg.wsEnabled {
|
||
if wsmsg.isRunning {
|
||
return
|
||
}
|
||
wsmsg.isRunning = true
|
||
go wsmsg.grapMsg() // 直接启动消息处理循环
|
||
}
|
||
}
|
||
|
||
// Stop 结束协程
|
||
func (wsmsg *WXSocketMsgTask) Stop() {
|
||
if !wsmsg.isRunning {
|
||
return
|
||
}
|
||
wsmsg.isRunning = false
|
||
wsmsg.EndChan <- true
|
||
}
|
||
|
||
// GrapNext 抢下一个
|
||
func (wsmsg *WXSocketMsgTask) GrapNext() {
|
||
wsmsg.canDoNext <- true
|
||
}
|
||
|
||
// AddMsgItem 添加消息项
|
||
func (wsmsg *WXSocketMsgTask) AddMsgItem(msgItem *wechat.AddMsg) {
|
||
if !wsmsg.wsEnabled {
|
||
return
|
||
}
|
||
if !wsmsg.isRunning {
|
||
wsmsg.Start()
|
||
}
|
||
key := wsmsg.WxConn.GetWXAccount().GetUserInfo().UUID
|
||
conn := wsmsg.GetWebSocket(key)
|
||
|
||
// 打印日志
|
||
// fmt.Println("------------收到消息-----------:\n",
|
||
// "NickName: ",wsmsg.WxConn.GetWXAccount().GetUserInfo().NickName,"\n",
|
||
// "Key: ",key,"\n",
|
||
// "Content: ",msgItem.Content.GetStr(),
|
||
// "\n",
|
||
// "----------------------------------")
|
||
|
||
if conn == nil {
|
||
// 只有 ws 长链接存在才会添加 收到的消息记录
|
||
// fmt.Println("WebSocket连接不存在,消息将不会发送:", key)
|
||
return
|
||
}
|
||
wsmsg.MsgItemChan <- msgItem
|
||
}
|
||
|
||
// GetCurrentMsgItem 获取当前正在消费的消息项
|
||
func (wsmsg *WXSocketMsgTask) GetCurrentMsgItem() *wechat.AddMsg {
|
||
return wsmsg.CurrentMsgItem
|
||
}
|
||
|
||
func PushMessageToClient(msg string, conn *websocket.Conn) error {
|
||
// 获取或创建当前连接的写锁
|
||
val, _ := connWriteMutexes.LoadOrStore(conn, &sync.Mutex{})
|
||
mutex := val.(*sync.Mutex)
|
||
|
||
// 使用互斥锁保护写操作
|
||
mutex.Lock()
|
||
defer mutex.Unlock()
|
||
|
||
// 使用 WebSocket 连接向客户端发送消息
|
||
fmt.Println("发送消息:", msg)
|
||
err := conn.WriteMessage(websocket.TextMessage, []byte(msg))
|
||
if err != nil {
|
||
// 如果发送错误,清理锁
|
||
connWriteMutexes.Delete(conn)
|
||
}
|
||
return err
|
||
}
|
||
|
||
// PushTypedMessageToClient 安全地发送特定类型的WebSocket消息
|
||
func PushTypedMessageToClient(messageType int, data []byte, conn *websocket.Conn) error {
|
||
// 获取或创建当前连接的写锁
|
||
val, _ := connWriteMutexes.LoadOrStore(conn, &sync.Mutex{})
|
||
mutex := val.(*sync.Mutex)
|
||
|
||
// 使用互斥锁保护写操作
|
||
mutex.Lock()
|
||
defer mutex.Unlock()
|
||
|
||
// 使用 WebSocket 连接向客户端发送消息
|
||
fmt.Printf("发送类型[%d]消息\n", messageType)
|
||
err := conn.WriteMessage(messageType, data)
|
||
if err != nil {
|
||
// 如果发送错误,清理锁
|
||
connWriteMutexes.Delete(conn)
|
||
}
|
||
return err
|
||
}
|
||
|
||
// GetWebSocket 获取 Clients 缓存中的 ws 长链接
|
||
func (wsmsg *WXSocketMsgTask) GetWebSocket(key string) (conn *websocket.Conn) {
|
||
wsmsg.mu.RLock() // 读取 Clients 时使用读锁
|
||
defer wsmsg.mu.RUnlock() // 解锁读锁
|
||
conn = wsmsg.Clients[key] // 读取 ws 长链接; 若 key 不存在则返回 nil
|
||
return conn
|
||
}
|
||
|
||
// UpdateWebSocket 更新 Clients 缓存中的 ws 长链接
|
||
func (wsmsg *WXSocketMsgTask) UpdateWebSocket(key string, conn *websocket.Conn) {
|
||
wsmsg.mu.Lock() // 写入或删除 Clients 时使用写锁
|
||
defer wsmsg.mu.Unlock() // 解锁
|
||
wsmsg.Clients[key] = conn // 更新 ws 长链接
|
||
wsmsg.wsEnabled = true // 当连接建立时启用WebSocket功能
|
||
fmt.Println("WebSocket连接已更新:", key)
|
||
}
|
||
|
||
// DeleteWebSocket 删除 Clients 缓存中的 ws 长链接
|
||
func (wsmsg *WXSocketMsgTask) DeleteWebSocket(key string) {
|
||
wsmsg.mu.Lock() // 写入或删除 Clients 时使用写锁
|
||
defer wsmsg.mu.Unlock() // 解锁
|
||
if conn, ok := wsmsg.Clients[key]; ok {
|
||
// 清理连接的写锁
|
||
connWriteMutexes.Delete(conn)
|
||
delete(wsmsg.Clients, key) // 删除 ws 长链接
|
||
fmt.Println("WebSocket连接已删除:", key)
|
||
|
||
// 如果没有连接,禁用WebSocket功能并停止任务
|
||
if len(wsmsg.Clients) == 0 {
|
||
wsmsg.wsEnabled = false
|
||
if wsmsg.isRunning {
|
||
wsmsg.Stop()
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
// 获取WebSocket启用状态
|
||
func (wsmsg *WXSocketMsgTask) IsWebSocketEnabled() bool {
|
||
return wsmsg.wsEnabled
|
||
}
|
||
|
||
// 设置WebSocket启用状态
|
||
func (wsmsg *WXSocketMsgTask) SetWebSocketEnabled(enabled bool) {
|
||
wsmsg.wsEnabled = enabled
|
||
// 如果禁用而且正在运行,停止任务
|
||
if !enabled && wsmsg.isRunning {
|
||
wsmsg.Stop()
|
||
} else if enabled && !wsmsg.isRunning && len(wsmsg.Clients) > 0 {
|
||
// 如果启用而且有客户端连接但未运行,启动任务
|
||
wsmsg.Start()
|
||
}
|
||
}
|