Files
2026-02-17 13:06:23 +08:00

102 lines
3.3 KiB
Go

package wxcore
import (
"fmt"
"xiawan/wx/srv/srvconfig"
"xiawan/wx/srv/wxface"
)
// WXMsgHandler 微信响应管理器
type WXMsgHandler struct {
wxRouterMap map[uint32]wxface.IWXRouter //存放每个MsgId 所对应的处理方法的map属性
wxWorkerPoolSize uint32 //业务工作Worker池的数量
wxTaskQueue []chan wxface.IWXResponse //Worker负责取任务的消息队列
}
// NewWXMsgHandler 新建微信消息处理器
func NewWXMsgHandler() *WXMsgHandler {
return &WXMsgHandler{
wxRouterMap: make(map[uint32]wxface.IWXRouter),
// 一个worker对应一个queue
wxWorkerPoolSize: srvconfig.GlobalSetting.WorkerPoolSize * 2,
wxTaskQueue: make([]chan wxface.IWXResponse, srvconfig.GlobalSetting.WorkerPoolSize*2),
}
}
// AddRouter 增加微信消息路由
func (wxmh *WXMsgHandler) AddRouter(respID uint32, wxRouter wxface.IWXRouter) {
//1 判断当前msg绑定的API处理方法是否已经存在
if _, ok := wxmh.wxRouterMap[respID]; ok {
return
}
//2 添加msg与api的绑定关系
wxmh.wxRouterMap[respID] = wxRouter
}
// GetRouterByRespID 根据响应ID获取对应的路由
func (wxmh *WXMsgHandler) GetRouterByRespID(urlID uint32) wxface.IWXRouter {
handler, ok := wxmh.wxRouterMap[urlID]
if !ok {
return nil
}
return handler
}
// doMsgHandler 马上以非阻塞方式处理消息
func (wxmh *WXMsgHandler) doMsgHandler(response wxface.IWXResponse) {
//xiaoyue处理消息
// fmt.Println("xiaoyue------doMsgHandler")
// defer TryE(response.GetWXConncet().GetWXAccount().GetUserInfo().GetUserName())
handler, ok := wxmh.wxRouterMap[response.GetPackHeader().URLID]
// fmt.Println("[URLID]", response.GetPackHeader().URLID)
if !ok {
return
}
//执行对应处理方法
handler.PreHandle(response)
handler.Handle(response)
handler.PostHandle(response)
}
// startOneWorker 启动一个Worker工作流程
func (wxmh *WXMsgHandler) startOneWorker(workerID int, taskQueue chan wxface.IWXResponse) {
// fmt.Println("xiaoyue------startOneWorker")
defer func() {
if r := recover(); r != nil {
fmt.Printf("startOneWorker recovered from panic: %v\n", r)
// 可以在这里记录日志或执行其他紧急恢复处理措施
}
}()
//不断的等待队列中的消息
for {
select {
//有消息则取出队列的Request,并执行绑定的业务方法
case request := <-taskQueue:
wxmh.doMsgHandler(request)
}
}
}
// StartWorkerPool 启动worker工作池
func (wxmh *WXMsgHandler) StartWorkerPool() {
defer TryE("")
// fmt.Println("xiaoyue------StartWorkerPool")
//遍历需要启动worker的数量,依此启动
for i := 0; i < int(wxmh.wxWorkerPoolSize); i++ {
//一个worker被启动
//给当前worker对应的任务队列开辟空间
wxmh.wxTaskQueue[i] = make(chan wxface.IWXResponse, srvconfig.GlobalSetting.MaxWorkerTaskLen*2)
//启动当前Worker,阻塞的等待对应的任务队列是否有消息传递进来
go wxmh.startOneWorker(i, wxmh.wxTaskQueue[i])
}
}
// SendWXRespToTaskQueue 将消息交给TaskQueue,由worker进行处理
func (wxmh *WXMsgHandler) SendWXRespToTaskQueue(response wxface.IWXResponse) {
//得到需要处理此条连接的workerID
workerID := response.GetWXConncet().GetWXConnID() % wxmh.wxWorkerPoolSize
//将请求消息发送给任务队列
wxmh.wxTaskQueue[workerID] <- response
}