Files

281 lines
7.2 KiB
Go
Raw Permalink Normal View History

2026-02-17 13:06:23 +08:00
package api
import (
"fmt"
"io"
"log"
// "log"
"net/http"
_ "net/http/pprof"
"os"
"path/filepath"
"sync"
"time"
"xiawan/wx/api/middleware"
"xiawan/wx/api/req"
"xiawan/wx/api/router"
"xiawan/wx/api/service"
"xiawan/wx/clientsdk/baseinfo"
"xiawan/wx/db"
"xiawan/wx/db/table"
"xiawan/wx/srv/srvconfig"
"github.com/gin-gonic/gin"
)
func TLog() {
dir, _ := filepath.Abs(filepath.Dir(""))
logFileName := time.Now().Format("20060102") + ".log"
logFileAllPath := dir + "/log/" + logFileName
_, err := os.Stat(logFileAllPath)
var f *os.File
if err != nil {
f, _ = os.Create(logFileAllPath)
} else {
f, _ = os.OpenFile(logFileAllPath, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0666)
}
gin.DefaultWriter = io.MultiWriter(f, os.Stdout)
gin.DefaultErrorWriter = io.MultiWriter(f, os.Stderr)
log.SetOutput(gin.DefaultWriter)
if srvconfig.GlobalSetting.Debug {
gin.SetMode(gin.DebugMode)
} else {
gin.SetMode(gin.ReleaseMode)
}
}
func WXServerGinHttpApiStart() error {
TLog() // 初始化日志
service.InitWXServerRouter() // 初始化路由
go InitAnewLogin() // 初始化重新上线
// 设置中间件
app := router.SetUpRouter(func(engine *gin.Engine) {
engine.Use(middleware.Cors)
engine.Use(gin.Recovery())
}, false)
// 启动 http 服务
go func() {
_ = http.ListenAndServe(":"+srvconfig.GlobalSetting.Port, nil)
}()
// 启动 gin 服务
err := app.Run(srvconfig.GlobalSetting.Host + ":" + srvconfig.GlobalSetting.Port)
if err != nil {
return err
}
return nil
}
const concurrencyLimit = 100 // 并发限制
// InitAnewLogin 初始化重新上线,后期考虑用分页存redis多线程实现
func InitAnewLogin() {
// 清理缓存
db.DelObj("wx_sync_msg_topic")
db.ClearSyncMsgCache("_syncMsg")
db.ClearSyncMsgCache("_syncHttp")
db.ClearSyncMsgCache("_wx_sync_msg_topic")
db.DelObj("wx_sync_msg_topic")
list := db.QueryListUserInfo()
sem := make(chan struct{}, concurrencyLimit) // 创建带缓冲通道,用于限制并发数
var wg sync.WaitGroup
for _, v := range list {
// 只为已登录的用户初始化连接,避免未登录用户的重连尝试
if v.State == 0 || v.State == 4 { // MMLoginStateNoLogin 或 MMLoginStateLogout
fmt.Printf("跳过未登录用户的连接初始化: %s (状态: %d)\n", v.UUID, v.State)
continue
}
// 初始化代理
key := fmt.Sprintf("%s%s", "wechat:Proxy:", v.UUID)
exists, _ := db.Exists(key)
newModel := &req.GetLoginQrCodeModel{
Proxy: "",
Check: false,
}
if exists {
//获取代理
db.GETObj(key, &newModel)
}
wg.Add(1)
go func(_v table.UserInfoEntity) {
defer wg.Done()
// 获取信号量,确保并发数不超过限制
sem <- struct{}{}
defer func() { <-sem }() // 执行完毕后释放信号量
// 捕获 panic防止程序崩溃
defer func() {
if r := recover(); r != nil {
fmt.Printf("初始化上线-Recovered from panic: %v\n", r)
// 这里可以记录日志或者执行其他的恢复操作
}
}()
service.InitLoginStatusService(_v.UUID, false, true, *newModel)
}(v)
}
// 等待所有协程完成
wg.Wait()
fmt.Println("初始化上线完成")
// 回调上线
go func() {
time.Sleep(3 * time.Second) // 等待连接稳定
connectMgr := service.WXServer.GetWXConnectMgr()
connectInfo := connectMgr.GetConnectInfo()
if connections, ok := connectInfo["connections"].([]map[string]interface{}); ok {
for _, conn := range connections {
if userInfoRaw, exists := conn["userInfo"]; exists {
var uuid string
switch v := userInfoRaw.(type) {
case *baseinfo.UserInfo:
uuid = v.UUID
case map[string]interface{}:
if uuidRaw, ok := v["UUID"]; ok {
if uuidStr, ok := uuidRaw.(string); ok {
uuid = uuidStr
}
}
}
if uuid != "" {
wxConn := connectMgr.GetWXConnectByUserInfoUUID(uuid)
if wxConn != nil {
wxCache := wxConn.GetWXCache()
// 设置消息同步完成标志,允许接收实时消息
wxCache.SetInitNewSyncFinished(true)
fmt.Printf("已激活消息接收 [UUID: %s]\n", uuid)
}
}
}
}
}
// 然后初始化回调配置
db.InitMessageCallbacks()
}()
2026-02-17 13:06:23 +08:00
// 启动定期清理未登录连接的协程
go func() {
ticker := time.NewTicker(5 * time.Minute) // 每5分钟清理一次
defer ticker.Stop()
for range ticker.C {
cleanupUnloggedConnections()
}
}()
}
// cleanupUnloggedConnections 清理未登录状态的连接
func cleanupUnloggedConnections() {
defer func() {
if r := recover(); r != nil {
fmt.Printf("cleanupUnloggedConnections发生panic: %v\n", r)
}
}()
// fmt.Println("开始清理未登录连接...")
connectMgr := service.WXServer.GetWXConnectMgr()
connectInfo := connectMgr.GetConnectInfo()
// fmt.Printf("连接信息类型: %T\n", connectInfo)
// 安全获取connections
connectionsRaw, exists := connectInfo["connections"]
if !exists {
fmt.Println("警告没有找到connections字段")
return
}
// fmt.Printf("connections类型: %T\n", connectionsRaw)
connections, ok := connectionsRaw.([]map[string]interface{})
if !ok {
fmt.Printf("错误connections类型断言失败实际类型: %T\n", connectionsRaw)
return
}
// fmt.Printf("找到 %d 个连接\n", len(connections))
for _, conn := range connections {
// fmt.Printf("处理连接 %d\n", i)
// 修复类型断言问题loginState 实际是 uint32 类型
loginStateRaw := conn["loginState"]
// fmt.Printf("loginState原始值: %v, 类型: %T\n", loginStateRaw, loginStateRaw)
var loginState int
// 安全的类型转换
switch v := loginStateRaw.(type) {
case uint32:
loginState = int(v)
case int:
loginState = v
default:
// fmt.Printf("警告:未知的 loginState 类型: %T, 跳过此连接\n", v)
continue
}
// 修复userInfo类型断言问题
userInfoRaw := conn["userInfo"]
// fmt.Printf("userInfo原始值类型: %T\n", userInfoRaw)
var uuid string
// 安全处理不同类型的userInfo
switch v := userInfoRaw.(type) {
case *baseinfo.UserInfo:
uuid = v.UUID
// fmt.Printf("从*baseinfo.UserInfo获取UUID: %s\n", uuid)
case map[string]interface{}:
if uuidRaw, exists := v["UUID"]; exists {
if uuidStr, ok := uuidRaw.(string); ok {
uuid = uuidStr
// fmt.Printf("从map[string]interface{}获取UUID: %s\n", uuid)
} else {
// fmt.Printf("警告UUID类型不是string类型: %T, 跳过此连接\n", uuidRaw)
continue
}
} else {
// fmt.Println("警告userInfo中没有找到UUID字段跳过此连接")
continue
}
default:
// fmt.Printf("警告:未知的 userInfo 类型: %T, 跳过此连接\n", v)
continue
}
// fmt.Printf("连接 %d: UUID=%s, loginState=%d\n", i, uuid, loginState)
// 如果是未登录或已登出状态,清理连接
if loginState == 0 || loginState == 4 { // MMLoginStateNoLogin 或 MMLoginStateLogout
wxConn := connectMgr.GetWXConnectByUserInfoUUID(uuid)
if wxConn != nil {
// fmt.Printf("清理未登录用户的连接: %s (状态: %d)\n", uuid, loginState)
wxConn.Stop()
connectMgr.Remove(wxConn)
} else {
// fmt.Printf("未找到用户连接对象: %s\n", uuid)
}
}
}
// fmt.Println("清理未登录连接完成")
}
// socks5://ztb44:1454@27.128.157.125:4015