package wxcore import ( "fmt" "xiawan/wx/srv/srvconfig" "xiawan/wx/srv/wxface" ) // WXMsgHandler 微信响应管理器 type WXMsgHandler struct { wxRouterMap map[uint32]wxface.IWXRouter //存放每个MsgId 所对应的处理方法的map属性 wxWorkerPoolSize uint32 //业务工作Worker池的数量 wxTaskQueue []chan wxface.IWXResponse //Worker负责取任务的消息队列 } // NewWXMsgHandler 新建微信消息处理器 func NewWXMsgHandler() *WXMsgHandler { return &WXMsgHandler{ wxRouterMap: make(map[uint32]wxface.IWXRouter), // 一个worker对应一个queue wxWorkerPoolSize: srvconfig.GlobalSetting.WorkerPoolSize * 2, wxTaskQueue: make([]chan wxface.IWXResponse, srvconfig.GlobalSetting.WorkerPoolSize*2), } } // AddRouter 增加微信消息路由 func (wxmh *WXMsgHandler) AddRouter(respID uint32, wxRouter wxface.IWXRouter) { //1 判断当前msg绑定的API处理方法是否已经存在 if _, ok := wxmh.wxRouterMap[respID]; ok { return } //2 添加msg与api的绑定关系 wxmh.wxRouterMap[respID] = wxRouter } // GetRouterByRespID 根据响应ID获取对应的路由 func (wxmh *WXMsgHandler) GetRouterByRespID(urlID uint32) wxface.IWXRouter { handler, ok := wxmh.wxRouterMap[urlID] if !ok { return nil } return handler } // doMsgHandler 马上以非阻塞方式处理消息 func (wxmh *WXMsgHandler) doMsgHandler(response wxface.IWXResponse) { //xiaoyue处理消息 // fmt.Println("xiaoyue------doMsgHandler") // defer TryE(response.GetWXConncet().GetWXAccount().GetUserInfo().GetUserName()) handler, ok := wxmh.wxRouterMap[response.GetPackHeader().URLID] // fmt.Println("[URLID]", response.GetPackHeader().URLID) if !ok { return } //执行对应处理方法 handler.PreHandle(response) handler.Handle(response) handler.PostHandle(response) } // startOneWorker 启动一个Worker工作流程 func (wxmh *WXMsgHandler) startOneWorker(workerID int, taskQueue chan wxface.IWXResponse) { // fmt.Println("xiaoyue------startOneWorker") defer func() { if r := recover(); r != nil { fmt.Printf("startOneWorker recovered from panic: %v\n", r) // 可以在这里记录日志或执行其他紧急恢复处理措施 } }() //不断的等待队列中的消息 for { select { //有消息则取出队列的Request,并执行绑定的业务方法 case request := <-taskQueue: wxmh.doMsgHandler(request) } } } // StartWorkerPool 启动worker工作池 func (wxmh *WXMsgHandler) StartWorkerPool() { defer TryE("") // fmt.Println("xiaoyue------StartWorkerPool") //遍历需要启动worker的数量,依此启动 for i := 0; i < int(wxmh.wxWorkerPoolSize); i++ { //一个worker被启动 //给当前worker对应的任务队列开辟空间 wxmh.wxTaskQueue[i] = make(chan wxface.IWXResponse, srvconfig.GlobalSetting.MaxWorkerTaskLen*2) //启动当前Worker,阻塞的等待对应的任务队列是否有消息传递进来 go wxmh.startOneWorker(i, wxmh.wxTaskQueue[i]) } } // SendWXRespToTaskQueue 将消息交给TaskQueue,由worker进行处理 func (wxmh *WXMsgHandler) SendWXRespToTaskQueue(response wxface.IWXResponse) { //得到需要处理此条连接的workerID workerID := response.GetWXConncet().GetWXConnID() % wxmh.wxWorkerPoolSize //将请求消息发送给任务队列 wxmh.wxTaskQueue[workerID] <- response }