package wxcore import ( "errors" "fmt" "math/rand" "sync" "sync/atomic" "time" "github.com/lunny/log" "xiawan/wx/clientsdk" "xiawan/wx/clientsdk/baseinfo" "xiawan/wx/clientsdk/mmtls" "xiawan/wx/db" "xiawan/wx/srv" "xiawan/wx/srv/srvconfig" "xiawan/wx/srv/websrv" "xiawan/wx/srv/wxface" ) // WXConnect 微信链接 type WXConnect struct { wxServer wxface.IWXServer // 微信链接ID wxConnID uint32 // 微信账号信息 wxAccount *srv.WXAccount // 发送请求缓存队列 longReqQueue chan wxface.IWXLongRequest // 请求调用器 wxReqInvoker wxface.IWXReqInvoker // 缓存器 wxCache wxface.IWXCache // 任务管理器 wxTaskMgr wxface.IWXTaskMgr // 同步请求管理器 wxSyncMgr wxface.IWXSyncMgr // 文件助手消息管理器 wxFileHelperMgr wxface.IWXFileHelperMgr //用户消息管理器 wxUserMsgMgr wxface.IWXUserMsgMgr // web任务管理器 webTaskMgr *websrv.WebTaskMgr // 心跳定时器 heartBeatTimer *time.Timer // 二次登录定时器 autoAuthTimer *time.Timer // 断开链接 ExitFlagChan chan bool // 是否连接着 (原子操作) connected int32 // 首次登录初始化只执行一次 // onceInit sync.Once // 出错次数 // errCount int // 开始的时间戳 StartDateTime int64 // 是否开启长连接监控 IsOpenShortLink bool // 心跳时间 HeartBeatTime int64 // 扫码时间秒 ScanCodeTime int64 // 上次尝试重连长连接的时间 lastLongRetryTime int64 // 长连接重试计数器 longRetryCount int // 互斥锁 mu sync.RWMutex // 重启标志 restarting int32 } // NewWXConnect 新的微信连接 func NewWXConnect(wxServer wxface.IWXServer, wxAccount *srv.WXAccount) wxface.IWXConnect { // 从根上下文创建一个上下文 wxconn := &WXConnect{ wxServer: wxServer, wxAccount: wxAccount, longReqQueue: make(chan wxface.IWXLongRequest, 5000), ExitFlagChan: make(chan bool, 1), connected: 0, HeartBeatTime: 0, // 默认 0 ,没有开始心跳 StartDateTime: time.Now().Unix(), IsOpenShortLink: true, ScanCodeTime: 0, lastLongRetryTime: 0, longRetryCount: 0, restarting: 0, } wxconn.wxReqInvoker = NewWXReqInvoker(wxconn) wxconn.wxTaskMgr = NewWXTaskMgr(wxconn) wxconn.wxCache = NewWXCache(wxconn) wxconn.wxSyncMgr = NewWXSyncMgr(wxconn) //文件助手消息管理器 wxconn.wxFileHelperMgr = NewWXFileHelperMgr(wxconn) //用户消息管理器 wxconn.wxUserMsgMgr = NewWXUSerMsgMgr(wxconn) // 上报任务 wxconn.webTaskMgr = websrv.NewWebTaskMgr() return wxconn } // startLongWriter 开启长链接发送数据 func (wxconn *WXConnect) startLongWriter() { StartDateTime := wxconn.StartDateTime for { // 判断链接断开立马暂停 if !wxconn.IsConnected() { return } // 判断是否重复启动 for 循环 if StartDateTime != wxconn.StartDateTime { // 结束 return } select { case longReq := <-wxconn.longReqQueue: // 心跳和登录都会走这里 wxconn.ScanCodeTime = time.Now().Unix() mmInfo := wxconn.wxAccount.GetUserInfo().MMInfo err := mmtls.MMTCPSendReq(mmInfo, longReq.GetOpcode(), longReq.GetData()) if err != nil { // 断开链接 fmt.Printf("[%s],[%s] 断开链接 - %s \n", wxconn.GetWXAccount().GetUserInfo().GetUserName(), wxconn.GetWXAccount().GetUserInfo().NickName, err.Error()) // 判断是心跳出错 - 发送二次登录 if longReq.GetOpcode() == mmtls.MMLongOperationHeartBeat { // 等 200 毫秒 time.Sleep(200 * time.Millisecond) if StartDateTime == wxconn.StartDateTime { wxconn.restartLong() } } else { wxconn.Stop() // 断开链接 } } continue case <-wxconn.heartBeatTimer.C: // 发送心跳包 if wxconn.wxAccount.GetLoginState() == baseinfo.MMLoginStateOnLine { _ = wxconn.wxReqInvoker.SendHeartBeatRequest() } // 重置定时器,继续下一次心跳 wxconn.heartBeatTimer.Reset(time.Second * 175) continue case <-wxconn.autoAuthTimer.C: // 进行二次登录 if wxconn.wxAccount.GetLoginState() == baseinfo.MMLoginStateOnLine { _ = wxconn.wxReqInvoker.SendAutoAuthRequest() } continue case <-wxconn.ExitFlagChan: return } } } // 重新链接 func (wxconn *WXConnect) restartLong() { defer TryE("(wxconn *WXConnect) restartLong()") // 防止重复重启 if !atomic.CompareAndSwapInt32(&wxconn.restarting, 0, 1) { log.Printf("[%s] restartLong already in progress, skipping\n", wxconn.GetWXAccount().GetUserInfo().GetUserName()) return } defer atomic.StoreInt32(&wxconn.restarting, 0) wxconn.mu.Lock() // 断开链接 wxconn.setConnected(false) // 安全发送退出信号 select { case wxconn.ExitFlagChan <- true: default: // 通道已满或已关闭,忽略 } // 关闭长链接 userInfo := wxconn.wxAccount.GetUserInfo() if userInfo.MMInfo != nil && userInfo.MMInfo.Conn != nil { userInfo.MMInfo.Conn.Close() userInfo.MMInfo = nil } wxconn.mu.Unlock() // 异步重启以避免阻塞 go func() { defer TryE("restartLong async restart") // 等待 30 秒 time.Sleep(30 * time.Second) // 检查是否仍需要重启 if wxconn.IsConnected() { log.Printf("[%s] Connection already restored, skipping restart\n", wxconn.GetWXAccount().GetUserInfo().GetUserName()) return } //重新开始 err := wxconn.startLongLink() if err != nil { log.Printf("[%s] restartLong - startLongLink error: %v\n", wxconn.GetWXAccount().GetUserInfo().GetUserName(), err) return } fmt.Printf("[%s] 开始长连接状态!startLongLink\n", userInfo.UUID) wxconn.SendHeartBeatWaitingSeconds(1) // 等待30-60分钟 minutes := uint32(rand.Int31n(30)) + 30 // minutes := uint32(1) wxconn.SendAutoAuthWaitingMinutes(minutes) log.Printf("[%s] Long connection restarted successfully\n", wxconn.GetWXAccount().GetUserInfo().GetUserName()) }() } // 监听长连接失败,实际上没掉线 func (wxconn *WXConnect) StartShortReader() { StartDateTime := wxconn.StartDateTime // 延时 20 秒 time.Sleep(20 * time.Second) // 20 秒之后大概率已经建立了长连接,如果建立长连接,则重新启动 // 判断是否关闭 if !wxconn.IsOpenShortLink { return } forCount := 0 // 从配置获取最大重试次数和重试间隔 maxLongRetries := srvconfig.GlobalSetting.ProxyConfig.MaxLongRetryTimes minRetryInterval := int64(srvconfig.GlobalSetting.ProxyConfig.LongRetryInterval) for { // 判断是否重复启动 for 循环 if StartDateTime != wxconn.StartDateTime { // 结束 return } // 已关闭 if !wxconn.IsOpenShortLink { return } TimeSec := time.Now().Unix() if TimeSec-wxconn.ScanCodeTime < 10 { if wxconn.ScanCodeTime != 0 { // 等待 3 秒 time.Sleep(3 * time.Second) // 跳过 continue } } // 判断是否在线 if wxconn.wxAccount.GetLoginState() == baseinfo.MMLoginStateOnLine { // 每10次短连接心跳前先尝试长连接 if forCount%10 == 0 && !wxconn.IsConnected() { // 更新重试时间 wxconn.lastLongRetryTime = time.Now().Unix() // 尝试发送长连接心跳,恢复长连接 log.Printf("[%s],[%s] 定期尝试恢复长连接心跳\n", wxconn.GetWXAccount().GetUserInfo().GetUserName(), wxconn.GetWXAccount().GetUserInfo().NickName) // 使用defer+recover防止恢复过程中的panic影响短连接心跳 func() { defer func() { if r := recover(); r != nil { log.Printf("[%s],[%s] 恢复长连接心跳时发生错误: %v\n", wxconn.GetWXAccount().GetUserInfo().GetUserName(), wxconn.GetWXAccount().GetUserInfo().NickName, r) } }() wxconn.restartLong() }() // 给长连接一些时间建立 time.Sleep(2 * time.Second) // 恢复后检查连接状态并记录日志 if wxconn.IsConnected() { log.Printf("[%s],[%s] 长连接心跳恢复成功\n", wxconn.GetWXAccount().GetUserInfo().GetUserName(), wxconn.GetWXAccount().GetUserInfo().NickName) wxconn.longRetryCount = 0 // 连接成功后重置计数器 // 长连接恢复后,立即触发一次消息同步 wxconn.wxSyncMgr.SendNewSyncRequest() time.Sleep(500 * time.Millisecond) // 给同步请求一些处理时间 // 如果长连接成功,停止短连接心跳 return } else { log.Printf("[%s],[%s] 长连接心跳恢复失败,继续使用短连接同步\n", wxconn.GetWXAccount().GetUserInfo().GetUserName(), wxconn.GetWXAccount().GetUserInfo().NickName) } } // 尝试恢复长连接心跳 - 代理可能已经恢复 if !wxconn.IsConnected() { // 检查是否应该尝试恢复长连接 // 距离上次尝试时间超过最小间隔,并且重试次数在允许范围内 currentTime := time.Now().Unix() shouldRetry := (currentTime-wxconn.lastLongRetryTime >= minRetryInterval) && (wxconn.longRetryCount < maxLongRetries) // 如果重试计数器已达上限,但已经过去了更长时间(比如10倍间隔),重置计数器 if wxconn.longRetryCount >= maxLongRetries && (currentTime-wxconn.lastLongRetryTime >= minRetryInterval*10) { log.Printf("[%s],[%s] 重置长连接重试计数器(已过去足够长时间)\n", wxconn.GetWXAccount().GetUserInfo().GetUserName(), wxconn.GetWXAccount().GetUserInfo().NickName) wxconn.longRetryCount = 0 shouldRetry = true } if shouldRetry { // 更新重试时间和计数器 wxconn.lastLongRetryTime = currentTime wxconn.longRetryCount++ // 尝试发送长连接心跳,恢复长连接 log.Printf("[%s],[%s] 尝试恢复长连接心跳 (尝试 %d/%d)\n", wxconn.GetWXAccount().GetUserInfo().GetUserName(), wxconn.GetWXAccount().GetUserInfo().NickName, wxconn.longRetryCount, maxLongRetries) // 使用defer+recover防止恢复过程中的panic影响短连接心跳 func() { defer func() { if r := recover(); r != nil { log.Printf("[%s],[%s] 恢复长连接心跳时发生错误: %v\n", wxconn.GetWXAccount().GetUserInfo().GetUserName(), wxconn.GetWXAccount().GetUserInfo().NickName, r) } }() wxconn.restartLong() }() // 给长连接一些时间建立 time.Sleep(1 * time.Second) // 恢复后检查连接状态并记录日志 if wxconn.IsConnected() { log.Printf("[%s],[%s] 长连接心跳恢复成功,重置重试计数器\n", wxconn.GetWXAccount().GetUserInfo().GetUserName(), wxconn.GetWXAccount().GetUserInfo().NickName) wxconn.longRetryCount = 0 // 连接成功后重置计数器 // 长连接恢复后,立即触发一次消息同步 wxconn.wxSyncMgr.SendNewSyncRequest() time.Sleep(500 * time.Millisecond) // 给同步请求一些处理时间 // 同时触发收藏同步 currentTaskMgr := wxconn.GetWXTaskMgr() taskMgr, _ := currentTaskMgr.(*WXTaskMgr) currentSnsTransTask := taskMgr.GetSnsTransTask() if currentSnsTransTask.IsSyncTrans() { wxconn.wxSyncMgr.SendFavSyncRequest() } // 如果长连接成功,停止短连接心跳 return } else { log.Printf("[%s],[%s] 长连接心跳恢复失败,将继续使用短连接同步\n", wxconn.GetWXAccount().GetUserInfo().GetUserName(), wxconn.GetWXAccount().GetUserInfo().NickName) } } else if wxconn.longRetryCount >= maxLongRetries { // 只有在第一次达到最大重试次数时记录日志 if wxconn.longRetryCount == maxLongRetries { log.Printf("[%s],[%s] 已达到最大重试次数(%d),暂停长连接恢复尝试(将在%d秒后重置)\n", wxconn.GetWXAccount().GetUserInfo().GetUserName(), wxconn.GetWXAccount().GetUserInfo().NickName, maxLongRetries, minRetryInterval*10) wxconn.longRetryCount++ // 增加一次以避免重复打印 } } } // 发送同步消息 if forCount%20 == 0 { isConn := "true" if !wxconn.IsConnected() { isConn = "false" } log.Printf("[%s],[%s] 短链接同步消息 StartShortReader: isConnected = %s \n", wxconn.GetWXAccount().GetUserInfo().GetUserName(), wxconn.GetWXAccount().GetUserInfo().NickName, isConn) // 同步收藏转发 // 判断是否开启收藏转发 if wxconn.IsConnected() { currentTaskMgr := wxconn.GetWXTaskMgr() taskMgr, _ := currentTaskMgr.(*WXTaskMgr) currentSnsTransTask := taskMgr.GetSnsTransTask() if currentSnsTransTask.IsSyncTrans() { wxconn.wxSyncMgr.SendFavSyncRequest() // 延时一秒 time.Sleep(1 * time.Second) } } } // 即使长连接没有恢复,也继续使用短链接同步消息 wxconn.wxSyncMgr.SendNewSyncRequest() time.Sleep(3 * time.Second) forCount++ if forCount >= 100 { forCount = 0 } continue } forCount++ if forCount >= 100 { forCount = 0 } time.Sleep(3 * time.Second) continue } } // 关闭重启任务 func (wxconn *WXConnect) StopShortReader() { wxconn.IsOpenShortLink = false // 保存状态变更到数据库 currentUserInfo := wxconn.GetWXAccount().GetUserInfo() if currentUserInfo != nil { db.SaveShortLinkStatusToDB(currentUserInfo.UUID, false) } } // startLongReader 开启长连接接受数据 func (wxconn *WXConnect) startLongReader() { StartDateTime := wxconn.StartDateTime // 重试机制相关变量 retryCount := 0 maxRetries := 5 baseDelay := time.Second for { // 判断链接断开立马暂停 if !wxconn.IsConnected() { break } // 判断是否重复启动 for 循环 if StartDateTime != wxconn.StartDateTime { break } // 接收数据 mmInfo := wxconn.wxAccount.GetUserInfo().MMInfo recvInfo, err := mmtls.MMTCPRecvData(mmInfo) if err != nil { errorMsg := err.Error() log.Printf("[%s],[%s] 长连接出错 - %s \n", wxconn.GetWXAccount().GetUserInfo().GetUserName(), wxconn.GetWXAccount().GetUserInfo().NickName, errorMsg) if !wxconn.IsConnected() { return } // 等待200毫秒 time.Sleep(200 * time.Millisecond) // 判断是否重复启动 for 循环(StartDateTime 标识不对说明已经被重启了) if StartDateTime != wxconn.StartDateTime { // 结束 return } wxconn.restartLong() return } // 系统推送-需要同步消息 if recvInfo.HeaderInfo.Operation < 1000000000 { //fmt.Println(recvInfo.HeaderInfo.Operation) loginState := wxconn.GetWXAccount().GetLoginState() if loginState != baseinfo.MMLoginStateOnLine { continue } // 同步收藏 if recvInfo.HeaderInfo.Operation == baseinfo.MMLongOperatorTypeFavSync { wxconn.wxSyncMgr.SendFavSyncRequest() continue } // TODO 发送同步请求 // syncKey := wxconn.wxAccount.GetUserInfo().SyncKey // if syncKey == nil || len(syncKey) == 0 { // wxconn.onceInit.Do(func() { // wxconn.wxSyncMgr.SendSyncInitRequest() // }) // } else { wxconn.wxSyncMgr.SendNewSyncRequest() // } continue } // 解包响应数据 packHeader, err := clientsdk.DecodePackHeader(recvInfo.RespData, nil) if err != nil { // 判断是否重复启动 for 循环 if StartDateTime != wxconn.StartDateTime { // 结束 return } // TODO 接受消息出错,断开链接-token登陆 log.Printf("[%s] startLongReader - DecodePackHeader error: %v, operation: %v\n", wxconn.GetWXAccount().GetUserInfo().GetUserName(), err, recvInfo.HeaderInfo.Operation) /* * 注意: 这里新号首次登录时(同省IP首次可能会多次掉线, 同市掉线少, 家里的内网穿透 socks5 代理IP基本不会掉线) * 另外, 24小时内还可能会接收消息出错(导致掉线), 三天后基本稳定 */ // 网络异常超时或解包失败 - 修复空指针问题 if packHeader == nil { retryCount++ log.Printf("[%s] packHeader is nil (retry %d/%d), restarting connection\n", wxconn.GetWXAccount().GetUserInfo().GetUserName(), retryCount, maxRetries) if retryCount >= maxRetries { wxconn.restartLong() return } // 指数退避 delay := baseDelay * time.Duration(1<