升级至8069版本:版本号更新/代理配置系统/红包计时埋点/长连接重构/回调修复

This commit is contained in:
2026-02-26 10:44:13 +08:00
parent 7cbd3d061d
commit 40a74d2ea7
38 changed files with 3639 additions and 235 deletions
+236 -35
View File
@@ -2,64 +2,187 @@ package mmtls
import (
"bytes"
"io/ioutil"
"context"
"fmt"
"io"
"net"
"net/http"
"net/http/httptrace"
"strings"
"sync"
"time"
"xiawan/wx/clientsdk/baseutils"
)
var mmHTTPClientCache sync.Map
func getMMHTTPClient(mmInfo *MMInfo, timeout time.Duration) *http.Client {
key := fmt.Sprintf("%s|%d|%p|%t", mmInfo.ShortHost, int64(timeout/time.Millisecond), mmInfo.Dialer, mmInfo.AllowDirectOnProxyFail)
if val, ok := mmHTTPClientCache.Load(key); ok {
return val.(*http.Client)
}
dialer := &net.Dialer{Timeout: timeout, KeepAlive: 30 * time.Second}
transport := &http.Transport{
DialContext: dialer.DialContext,
ResponseHeaderTimeout: timeout,
MaxIdleConns: 128,
MaxIdleConnsPerHost: 128,
IdleConnTimeout: 90 * time.Second,
DisableKeepAlives: false,
}
if mmInfo.Dialer != nil {
transport.DialContext = func(ctx context.Context, network, addr string) (net.Conn, error) {
type result struct {
conn net.Conn
err error
}
resultChan := make(chan result, 1)
go func() {
conn, err := mmInfo.Dialer.Dial(network, addr)
resultChan <- result{conn: conn, err: err}
}()
select {
case res := <-resultChan:
if res.err != nil {
baseutils.PrintLog(fmt.Sprintf("Proxy dial failed: %s", res.err.Error()))
if mmInfo.AllowDirectOnProxyFail {
baseutils.PrintLog("Proxy failed, falling back to direct connection")
conn, err := dialer.DialContext(ctx, network, addr)
if err != nil {
return nil, fmt.Errorf("proxy and direct connection both failed: proxy=%v, direct=%v", res.err, err)
}
return conn, nil
}
return nil, fmt.Errorf("proxy connection failed and direct fallback is disabled: %w", res.err)
}
return res.conn, nil
case <-time.After(timeout):
baseutils.PrintLog("Proxy dial timeout")
if mmInfo.AllowDirectOnProxyFail {
baseutils.PrintLog("Proxy timeout, falling back to direct connection")
conn, err := dialer.DialContext(ctx, network, addr)
if err != nil {
return nil, fmt.Errorf("proxy timeout and direct connection failed: %w", err)
}
return conn, nil
}
return nil, fmt.Errorf("proxy connection timeout and direct fallback is disabled")
}
}
}
client := &http.Client{
Transport: transport,
Timeout: timeout,
}
actual, _ := mmHTTPClientCache.LoadOrStore(key, client)
return actual.(*http.Client)
}
// MMHTTPPost mmtls方式发送数据包
func MMHTTPPost(mmInfo *MMInfo, data []byte) ([]byte, error) {
func MMHTTPPost(mmInfo *MMInfo, data []byte, tag string) ([]byte, error) {
startMs := time.Now().UnixMilli()
var connReused bool
var connWasIdle bool
var connIdleMs int64
var remoteAddr string
var connectStartMs int64
var connectDoneMs int64
var gotConnMs int64
var wroteRequestMs int64
var firstByteMs int64
requestURL := "http://" + mmInfo.ShortHost + mmInfo.ShortURL
request, err := http.NewRequest("POST", requestURL, bytes.NewReader(data))
req, err := http.NewRequest("POST", requestURL, bytes.NewReader(data))
if err != nil {
return nil, err
}
request.Header.Add("UserAgent", "MicroMessenger Client")
request.Header.Add("Accept", "*/*")
request.Header.Add("Cache-Control", "no-cache")
request.Header.Add("Connection", "close")
request.Header.Add("content-type", "application/octet-stream")
request.Header.Add("Upgrade", "mmtls")
request.Header.Add("Host", mmInfo.ShortHost)
//fmt.Println(mmInfo.ShortHost)//szshort.weixin.qq.com
req.Header.Add("UserAgent", "MicroMessenger Client")
req.Header.Add("Accept", "*/*")
req.Header.Add("Cache-Control", "no-cache")
req.Header.Add("Connection", "Keep-Alive")
req.Header.Add("content-type", "application/octet-stream")
req.Header.Add("Upgrade", "mmtls")
req.Header.Add("Host", mmInfo.ShortHost)
// fmt.Println(mmInfo.ShortHost) //szshort.weixin.qq.com
// 发送请求
httpTransport := &http.Transport{
Dial: func(network, addr string) (net.Conn, error) {
conn, err := net.DialTimeout(network, addr, time.Second*15) //设置建立连接超时
if err != nil {
return nil, err
}
conn.SetDeadline(time.Now().Add(time.Second * 15)) //设置发送接受数据超时
return conn, nil
},
ResponseHeaderTimeout: time.Second * 15,
MaxIdleConnsPerHost: -1, //禁用连接池缓存
DisableKeepAlives: true, //禁用客户端连接缓存到连接池
// 获取超时配置
timeout := time.Duration(mmInfo.ShortConnTimeout) * time.Second
if timeout <= 0 {
timeout = 15 * time.Second
}
// 如果有代理
if mmInfo.Dialer != nil {
httpTransport.Dial = mmInfo.Dialer.Dial
client := getMMHTTPClient(mmInfo, timeout)
if strings.Contains(tag, "openwxhb") || strings.Contains(tag, "openunionhb") || strings.Contains(tag, "receivewxhb") || strings.Contains(tag, "unionhb") {
trace := &httptrace.ClientTrace{
GotConn: func(info httptrace.GotConnInfo) {
gotConnMs = time.Now().UnixMilli()
connReused = info.Reused
connWasIdle = info.WasIdle
connIdleMs = int64(info.IdleTime / time.Millisecond)
if info.Conn != nil && info.Conn.RemoteAddr() != nil {
remoteAddr = info.Conn.RemoteAddr().String()
}
},
ConnectStart: func(network, addr string) {
connectStartMs = time.Now().UnixMilli()
},
ConnectDone: func(network, addr string, err error) {
connectDoneMs = time.Now().UnixMilli()
},
WroteRequest: func(info httptrace.WroteRequestInfo) {
wroteRequestMs = time.Now().UnixMilli()
},
GotFirstResponseByte: func() {
firstByteMs = time.Now().UnixMilli()
},
}
req = req.WithContext(httptrace.WithClientTrace(req.Context(), trace))
}
client := &http.Client{Transport: httpTransport}
resp, err := client.Do(request)
resp, err := client.Do(req)
if err != nil {
baseutils.PrintLog(err.Error())
return nil, err
}
if strings.Contains(tag, "openwxhb") || strings.Contains(tag, "openunionhb") || strings.Contains(tag, "receivewxhb") || strings.Contains(tag, "unionhb") {
fmt.Printf("MMHTTPPost resp tag=%s status=%s connHeader=%s close=%t\n", tag, resp.Status, resp.Header.Get("Connection"), resp.Close)
}
// 接收响应
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
}
endMs := time.Now().UnixMilli()
if strings.Contains(tag, "openwxhb") || strings.Contains(tag, "openunionhb") || strings.Contains(tag, "receivewxhb") || strings.Contains(tag, "unionhb") {
connectMs := int64(0)
if connectStartMs > 0 && connectDoneMs > 0 {
connectMs = connectDoneMs - connectStartMs
}
ttfbMs := int64(0)
if firstByteMs > 0 {
ttfbMs = firstByteMs - startMs
}
writeMs := int64(0)
if wroteRequestMs > 0 {
writeMs = wroteRequestMs - startMs
}
gotConnDelayMs := int64(0)
if gotConnMs > 0 {
gotConnDelayMs = gotConnMs - startMs
}
fmt.Printf("MMHTTPPost trace tag=%s reused=%t wasIdle=%t idle=%dms gotConn=%dms connect=%dms wrote=%dms ttfb=%dms total=%dms\n",
tag, connReused, connWasIdle, connIdleMs, gotConnDelayMs, connectMs, writeMs, ttfbMs, endMs-startMs)
if remoteAddr != "" {
fmt.Printf("MMHTTPPost peer tag=%s remote=%s\n", tag, remoteAddr)
}
}
// 返回响应数据
return body, nil
@@ -67,6 +190,8 @@ func MMHTTPPost(mmInfo *MMInfo, data []byte) ([]byte, error) {
// MMHTTPPostData mmtls短链接方式发送请求数据包
func MMHTTPPostData(mmInfo *MMInfo, url string, data []byte) ([]byte, error) {
startMs := time.Now().UnixMilli()
createStartMs := startMs
// 创建HttpHandler
httpHandler := &HTTPHandler{}
httpHandler.URL = url
@@ -75,24 +200,36 @@ func MMHTTPPostData(mmInfo *MMInfo, url string, data []byte) ([]byte, error) {
// 创建发送请求项列表
sendItems, err := CreateSendPackItems(mmInfo, httpHandler)
createEndMs := time.Now().UnixMilli()
if err != nil {
return []byte{}, err
}
// MMTLS-加密要发送的数据
packStartMs := time.Now().UnixMilli()
packData, err := MMHTTPPackData(mmInfo, sendItems)
packEndMs := time.Now().UnixMilli()
if err != nil {
return []byte{}, err
}
// 发送数据
respData, err := MMHTTPPost(mmInfo, packData)
postStartMs := time.Now().UnixMilli()
respData, err := MMHTTPPost(mmInfo, packData, url)
postEndMs := time.Now().UnixMilli()
// 解包响应数据
decodeStartMs := time.Now().UnixMilli()
decodeData, err := MMDecodeResponseData(mmInfo, sendItems, respData)
decodeEndMs := time.Now().UnixMilli()
if err != nil {
baseutils.PrintLog(err.Error())
return nil, err
}
if strings.Contains(url, "openwxhb") || strings.Contains(url, "openunionhb") || strings.Contains(url, "receivewxhb") || strings.Contains(url, "unionhb") {
totalMs := decodeEndMs - startMs
fmt.Printf("MMHTTPPostData url=%s create=%dms pack=%dms post=%dms decode=%dms total=%dms\n",
url, createEndMs-createStartMs, packEndMs-packStartMs, postEndMs-postStartMs, decodeEndMs-decodeStartMs, totalMs)
}
return decodeData, nil
}
@@ -111,13 +248,77 @@ func HTTPPost(mmInfo *MMInfo, cgi string, data []byte) ([]byte, error) {
request.Header.Add("Cache-Control", "no-cache")
request.Header.Add("Connection", "Keep-Alive")
request.Header.Add("content-type", "application/octet-stream")
// 获取超时配置
timeout := time.Duration(mmInfo.ShortConnTimeout) * time.Second
if timeout <= 0 {
timeout = 15 * time.Second
}
// 发送请求
httpTransport := &http.Transport{}
httpTransport := &http.Transport{
Dial: func(network, addr string) (net.Conn, error) {
conn, err := net.DialTimeout(network, addr, timeout)
if err != nil {
return nil, err
}
conn.SetDeadline(time.Now().Add(timeout))
return conn, nil
},
ResponseHeaderTimeout: timeout,
}
// 如果有代理
if mmInfo.Dialer != nil {
httpTransport.Dial = mmInfo.Dialer.Dial
httpTransport.Dial = func(network, addr string) (net.Conn, error) {
type result struct {
conn net.Conn
err error
}
resultChan := make(chan result, 1)
go func() {
conn, err := mmInfo.Dialer.Dial(network, addr)
resultChan <- result{conn: conn, err: err}
}()
select {
case res := <-resultChan:
if res.err != nil {
if mmInfo.AllowDirectOnProxyFail {
baseutils.PrintLog("HTTPPost: Proxy failed, falling back to direct connection")
conn, err := net.DialTimeout(network, addr, timeout)
if err != nil {
return nil, fmt.Errorf("proxy and direct connection both failed: %w", err)
}
conn.SetDeadline(time.Now().Add(timeout))
return conn, nil
}
return nil, fmt.Errorf("proxy connection failed: %w", res.err)
}
if res.conn != nil {
res.conn.SetDeadline(time.Now().Add(timeout))
}
return res.conn, nil
case <-time.After(timeout):
if mmInfo.AllowDirectOnProxyFail {
baseutils.PrintLog("HTTPPost: Proxy timeout, falling back to direct connection")
conn, err := net.DialTimeout(network, addr, timeout)
if err != nil {
return nil, fmt.Errorf("proxy timeout and direct connection failed: %w", err)
}
conn.SetDeadline(time.Now().Add(timeout))
return conn, nil
}
return nil, fmt.Errorf("proxy connection timeout")
}
}
}
client := &http.Client{
Transport: httpTransport,
Timeout: timeout,
}
client := &http.Client{Transport: httpTransport}
resp, err := client.Do(request)
if err != nil {
baseutils.PrintLog(err.Error())
@@ -126,7 +327,7 @@ func HTTPPost(mmInfo *MMInfo, cgi string, data []byte) ([]byte, error) {
// 接收响应
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
}
+108 -21
View File
@@ -17,46 +17,94 @@ func MMLongConnect(mmInfo *MMInfo) error {
strPort := strconv.Itoa(int(mmInfo.LONGPort))
serverAddr := mmInfo.LongHost + ":" + strPort
fmt.Println("MMLongConnect serverAddr: ", serverAddr)
mminfoDialer := "0"
if mmInfo.Dialer != nil {
mminfoDialer = "1"
}
// mminfoDialer := "0"
// if mmInfo.Dialer != nil {
// mminfoDialer = "1"
// }
// 打印 mmInfo.Dialer
fmt.Println("MMLongConnect mmInfo.Dialer: ", mminfoDialer)
// fmt.Println("MMLongConnect mmInfo.Dialer: ", mminfoDialer)
// 从MMInfo获取重试参数,如果未设置则使用默认值
maxRetries := mmInfo.LongConnRetryTimes
if maxRetries <= 0 {
maxRetries = 30
}
retryInterval := time.Duration(mmInfo.LongConnRetryInterval) * time.Millisecond
if retryInterval <= 0 {
retryInterval = 500 * time.Millisecond
}
connTimeout := time.Duration(mmInfo.LongConnTimeout) * time.Second
if connTimeout <= 0 {
connTimeout = 15 * time.Second
}
// 调试:打印 AllowDirectOnProxyFail 的值
fmt.Printf("MMLongConnect AllowDirectOnProxyFail: %v\n", mmInfo.AllowDirectOnProxyFail)
// 定义
var conn net.Conn
var err error
for i := 0; i < 30; i++ {
if mmInfo.Dialer != nil {
proxyFailed := false // 标记代理是否已经失败过
for i := 0; i < maxRetries; i++ {
// 如果有代理且代理没有被标记为失败,尝试代理连接
if mmInfo.Dialer != nil && !proxyFailed {
// 使用代理连接
conn, err = mmInfo.Dialer.Dial("tcp4", serverAddr)
if err != nil {
baseutils.PrintLog(fmt.Sprintf("MMLongConnect attempt %d failed: %s", i+1, err.Error()))
// 等待 500 毫秒重时 (最大 30 次)
time.Sleep(500 * time.Millisecond)
baseutils.PrintLog(fmt.Sprintf("MMLongConnect with proxy attempt %d/%d failed: %s", i+1, maxRetries, err.Error()))
// 检查是否允许降级到直连
if mmInfo.AllowDirectOnProxyFail {
baseutils.PrintLog("MMLongConnect: Proxy failed, falling back to direct connection")
proxyFailed = true // 标记代理失败,后续使用直连
// 不等待,立即尝试直连
} else {
// 不允许直连,等待后重试代理
time.Sleep(retryInterval)
}
continue
}
// 设置连接超时
if err := conn.SetDeadline(time.Now().Add(connTimeout)); err != nil {
baseutils.PrintLog(fmt.Sprintf("MMLongConnect SetDeadline failed: %s", err.Error()))
conn.Close()
time.Sleep(retryInterval)
continue
}
mmInfo.Conn = conn
mmInfo.reader = bufio.NewReader(conn)
fmt.Println("MMLongConnect success!")
fmt.Println("MMLongConnect with proxy success!")
return nil
}
// 没有使用代理
conn, err = net.Dial("tcp4", serverAddr)
// 没有使用代理或代理已失败 - 直连
conn, err = net.DialTimeout("tcp4", serverAddr, connTimeout)
if err != nil {
baseutils.PrintLog(fmt.Sprintf("MMLongConnect attempt %d failed: %s", i+1, err.Error()))
// 等待 500 毫秒重时 (最大 30 次)
time.Sleep(500 * time.Millisecond)
baseutils.PrintLog(fmt.Sprintf("MMLongConnect direct attempt %d/%d failed: %s", i+1, maxRetries, err.Error()))
// 等待重试
time.Sleep(retryInterval)
continue
}
// 设置连接超时
if err := conn.SetDeadline(time.Now().Add(connTimeout)); err != nil {
baseutils.PrintLog(fmt.Sprintf("MMLongConnect SetDeadline failed: %s", err.Error()))
conn.Close()
time.Sleep(retryInterval)
continue
}
mmInfo.Conn = conn
mmInfo.reader = bufio.NewReader(conn)
if proxyFailed {
fmt.Println("MMLongConnect direct success (after proxy fallback)!")
} else {
fmt.Println("MMLongConnect direct success!")
}
return nil
}
if err != nil {
return err
return fmt.Errorf("MMLongConnect failed after %d attempts: %w", maxRetries, err)
}
return nil
return errors.New("MMLongConnect failed: unknown error")
}
// MMTCPSendData MMTCPSendData 长链接发送数据
@@ -70,11 +118,20 @@ func MMTCPSendData(mmInfo *MMInfo, data []byte) error {
}
}
// 设置写超时
writeTimeout := time.Duration(mmInfo.LongConnTimeout) * time.Second
if writeTimeout <= 0 {
writeTimeout = 15 * time.Second
}
if err := mmInfo.Conn.SetWriteDeadline(time.Now().Add(writeTimeout)); err != nil {
return fmt.Errorf("SetWriteDeadline failed: %w", err)
}
// 发送数据
length, err := mmInfo.Conn.Write(data)
// 判断是否出错
if err != nil {
return err
return fmt.Errorf("write failed: %w", err)
}
// 判断数据是否发送完毕
if length != len(data) {
@@ -87,6 +144,18 @@ func MMTCPSendData(mmInfo *MMInfo, data []byte) error {
// MMTCPRecvItems 循环接收长链接数据
// Deprecated
func MMTCPRecvItems(mmInfo *MMInfo) ([]*PackItem, error) {
// 设置读超时
readTimeout := time.Duration(mmInfo.LongConnReadTimeout) * time.Second
if readTimeout <= 0 {
readTimeout = time.Duration(mmInfo.LongConnTimeout) * time.Second
if readTimeout <= 0 {
readTimeout = 15 * time.Second
}
}
if err := mmInfo.Conn.SetReadDeadline(time.Now().Add(readTimeout)); err != nil {
return nil, fmt.Errorf("SetReadDeadline failed: %w", err)
}
// 接收返回数据
recvBuf := make([]byte, 10240)
recvLen, err := mmInfo.Conn.Read(recvBuf)
@@ -107,17 +176,35 @@ func MMTCPRecvOneItem(mmInfo *MMInfo) (*PackItem, error) {
return nil, errors.New("mmInfo.Conn or mmInfo.reader is nil")
}
// 设置读超时
readTimeout := time.Duration(mmInfo.LongConnReadTimeout) * time.Second
if readTimeout <= 0 {
readTimeout = time.Duration(mmInfo.LongConnTimeout) * time.Second
if readTimeout <= 0 {
readTimeout = 15 * time.Second
}
}
if err := mmInfo.Conn.SetReadDeadline(time.Now().Add(readTimeout)); err != nil {
return nil, fmt.Errorf("SetReadDeadline failed: %w", err)
}
// 读取头部数据
recordHeadData := make([]byte, 5)
if _, err := io.ReadFull(mmInfo.reader, recordHeadData); err != nil {
return nil, err
return nil, fmt.Errorf("read header failed: %w", err)
}
// 读取Content
recordHead := RecordHeadDeSerialize(recordHeadData)
bodyData := make([]byte, recordHead.Size)
// 重新设置读超时(针对body
if err := mmInfo.Conn.SetReadDeadline(time.Now().Add(readTimeout)); err != nil {
return nil, fmt.Errorf("SetReadDeadline for body failed: %w", err)
}
if _, err := io.ReadFull(mmInfo.reader, bodyData); err != nil {
return nil, err
return nil, fmt.Errorf("read body failed: %w", err)
}
return &PackItem{
RecordHead: recordHeadData,
+9 -1
View File
@@ -24,6 +24,14 @@ func CreateNewMMInfo() *MMInfo {
mmInfo.LONGClientSeq = 1
mmInfo.LONGServerSeq = 1
// 从全局配置设置代理配置
mmInfo.LongConnTimeout = GlobalProxyConfig.LongConnTimeout
mmInfo.LongConnReadTimeout = GlobalProxyConfig.LongConnReadTimeout
mmInfo.LongConnRetryTimes = GlobalProxyConfig.LongConnRetryTimes
mmInfo.LongConnRetryInterval = GlobalProxyConfig.LongConnRetryInterval
mmInfo.ShortConnTimeout = GlobalProxyConfig.ShortConnTimeout
mmInfo.AllowDirectOnProxyFail = GlobalProxyConfig.AllowDirectOnProxyFail
return mmInfo
}
@@ -77,7 +85,7 @@ func MMHandShakeByShortLink(mmInfo *MMInfo, hostName string) (*MMInfo, error) {
// 发送握手请求 - ClientHello
clientHelloData := CreateHandShakeClientHelloData(mmInfo)
sendData := CreateRecordData(ServerHandShakeType, clientHelloData)
retBytes, err := MMHTTPPost(mmInfo, sendData)
retBytes, err := MMHTTPPost(mmInfo, sendData, "handshake")
if err != nil {
return nil, err
}
+24
View File
@@ -9,6 +9,23 @@ import (
"golang.org/x/net/proxy"
)
// GlobalProxyConfig 全局代理配置(由 srv 层设置)
var GlobalProxyConfig = struct {
LongConnTimeout int
LongConnReadTimeout int
LongConnRetryTimes int
LongConnRetryInterval int
ShortConnTimeout int
AllowDirectOnProxyFail bool
}{
LongConnTimeout: 15,
LongConnReadTimeout: 210,
LongConnRetryTimes: 30,
LongConnRetryInterval: 500,
ShortConnTimeout: 15,
AllowDirectOnProxyFail: false,
}
// AesGcmParam AesGcm加密解密参数
type AesGcmParam struct {
AesKey []byte
@@ -66,6 +83,13 @@ type MMInfo struct {
ClientEcdhKeys *ClientEcdhKeys
// 代理
Dialer proxy.Dialer
LongConnTimeout int
LongConnReadTimeout int
LongConnRetryTimes int
LongConnRetryInterval int
ShortConnTimeout int
AllowDirectOnProxyFail bool
}
// EcdsaSignature 服务端传过来的校验数据