From 74b5ef0b27184f836def5587d2abd96bd9c61b82 Mon Sep 17 00:00:00 2001 From: xboard Date: Thu, 26 Mar 2026 03:33:01 +0800 Subject: [PATCH] fix: resolve device sync issues and refactor WebSocket server --- app/Console/Commands/NodeWebSocketServer.php | 357 +----------------- .../V2/Server/ServerController.php | 1 - app/Services/DeviceStateService.php | 103 +++-- app/WebSocket/NodeEventHandlers.php | 144 +++++++ app/WebSocket/NodeWorker.php | 249 ++++++++++++ 5 files changed, 467 insertions(+), 387 deletions(-) create mode 100644 app/WebSocket/NodeEventHandlers.php create mode 100644 app/WebSocket/NodeWorker.php diff --git a/app/Console/Commands/NodeWebSocketServer.php b/app/Console/Commands/NodeWebSocketServer.php index 8c07d93..cbad533 100644 --- a/app/Console/Commands/NodeWebSocketServer.php +++ b/app/Console/Commands/NodeWebSocketServer.php @@ -2,18 +2,8 @@ namespace App\Console\Commands; -use App\Models\Server; -use App\Services\DeviceStateService; -use App\Services\NodeSyncService; -use App\Services\NodeRegistry; -use App\Services\ServerService; +use App\WebSocket\NodeWorker; use Illuminate\Console\Command; -use Illuminate\Support\Facades\Cache; -use Illuminate\Support\Facades\Log; -use Illuminate\Support\Facades\Redis; -use Workerman\Connection\TcpConnection; -use Workerman\Timer; -use Workerman\Worker; class NodeWebSocketServer extends Command { @@ -25,18 +15,11 @@ class NodeWebSocketServer extends Command protected $description = 'Start the WebSocket server for node-panel synchronization'; - /** Auth timeout in seconds — close unauthenticated connections */ - private const AUTH_TIMEOUT = 10; - - /** Ping interval in seconds */ - private const PING_INTERVAL = 55; - public function handle(): void { global $argv; $action = $this->argument('action'); - // 重新构建 argv 供 Workerman 解析 $argv[1] = $action; if ($this->option('d')) { $argv[2] = '-d'; @@ -45,341 +28,7 @@ class NodeWebSocketServer extends Command $host = $this->option('host'); $port = $this->option('port'); - $worker = new Worker("websocket://{$host}:{$port}"); - $worker->count = 1; - $worker->name = 'xboard-ws-server'; - - // 设置日志和 PID 文件路径 - $logPath = storage_path('logs'); - if (!is_dir($logPath)) { - mkdir($logPath, 0777, true); - } - Worker::$logFile = $logPath . '/xboard-ws-server.log'; // 指向具体文件,避免某些环境 php://stdout 的 stat 失败 - Worker::$pidFile = $logPath . '/xboard-ws-server.pid'; - - $worker->onWorkerStart = function (Worker $worker) { - $this->info("[WS] Worker started, pid={$worker->id}"); - $this->subscribeRedis(); - - // Periodic ping to detect dead connections - Timer::add(self::PING_INTERVAL, function () { - foreach (NodeRegistry::getConnectedNodeIds() as $nodeId) { - $conn = NodeRegistry::get($nodeId); - if ($conn) { - $conn->send(json_encode(['event' => 'ping'])); - } - } - }); - - // 定时推送设备状态给节点(每10秒) - Timer::add(10, function () { - $pendingNodeIds = Redis::spop('device:push_pending_nodes', 100); - if (empty($pendingNodeIds)) { - return; - } - - $deviceStateService = app(DeviceStateService::class); - - foreach ($pendingNodeIds as $nodeId) { - $nodeId = (int) $nodeId; - if (NodeRegistry::get($nodeId) !== null) { - $this->pushDeviceStateToNode($nodeId, $deviceStateService); - } - } - }); - }; - - $worker->onConnect = function (TcpConnection $conn) { - // Set auth timeout — must authenticate within N seconds or get disconnected - $conn->authTimer = Timer::add(self::AUTH_TIMEOUT, function () use ($conn) { - if (empty($conn->nodeId)) { - $conn->close(json_encode([ - 'event' => 'error', - 'data' => ['message' => 'auth timeout'], - ])); - } - }, [], false); - }; - - $worker->onWebSocketConnect = function (TcpConnection $conn, $httpMessage) { - // Parse query string from the WebSocket upgrade request - // In Workerman 4.x/5.x with onWebSocketConnect, the second arg can be a string or Request object - $queryString = ''; - if (is_string($httpMessage)) { - $queryString = parse_url($httpMessage, PHP_URL_QUERY) ?? ''; - } elseif ($httpMessage instanceof \Workerman\Protocols\Http\Request) { - $queryString = $httpMessage->queryString(); - } - - parse_str($queryString, $params); - - $token = $params['token'] ?? ''; - $nodeId = (int) ($params['node_id'] ?? 0); - - // Authenticate - $serverToken = admin_setting('server_token', ''); - if ($token === '' || $serverToken === '' || !hash_equals($serverToken, $token)) { - $conn->close(json_encode([ - 'event' => 'error', - 'data' => ['message' => 'invalid token'], - ])); - return; - } - - $node = Server::find($nodeId); - if (!$node) { - $conn->close(json_encode([ - 'event' => 'error', - 'data' => ['message' => 'node not found'], - ])); - return; - } - - // Auth passed — cancel timeout, register connection - if (isset($conn->authTimer)) { - Timer::del($conn->authTimer); - } - - $conn->nodeId = $nodeId; - NodeRegistry::add($nodeId, $conn); - Cache::put("node_ws_alive:{$nodeId}", true, 86400); - - // 清理该节点的旧设备数据(节点重连后需重新上报全量) - $deviceStateService = app(DeviceStateService::class); - $deviceStateService->clearNodeDevices($nodeId); - - Log::debug("[WS] Node#{$nodeId} connected", [ - 'remote' => $conn->getRemoteIp(), - 'total' => NodeRegistry::count(), - ]); - - // Send auth success - $conn->send(json_encode([ - 'event' => 'auth.success', - 'data' => ['node_id' => $nodeId], - ])); - - // Push full sync (config + users) immediately to this specific connection - $this->pushFullSync($conn, $node); - }; - - $worker->onMessage = function (TcpConnection $conn, $data) { - $msg = json_decode($data, true); - if (!is_array($msg)) { - return; - } - - $event = $msg['event'] ?? ''; - $nodeId = $conn->nodeId ?? null; - - switch ($event) { - case 'pong': - // Heartbeat response — node is alive - if ($nodeId) { - Cache::put("node_ws_alive:{$nodeId}", true, 86400); - } - break; - case 'node.status': - if ($nodeId && isset($msg['data'])) { - $this->handleNodeStatus($nodeId, $msg['data']); - } - break; - case 'report.devices': - if ($nodeId && isset($msg['data'])) { - $this->handleDeviceReport($nodeId, $msg['data']); - } - break; - case 'request.devices': - if ($nodeId) { - $this->handleDeviceRequest($conn, $nodeId); - } - break; - default: - break; - } - }; - - $worker->onClose = function (TcpConnection $conn) { - if (!empty($conn->nodeId)) { - $nodeId = $conn->nodeId; - NodeRegistry::remove($nodeId); - Cache::forget("node_ws_alive:{$nodeId}"); - - app(DeviceStateService::class)->clearNodeDevices($nodeId); - - Log::debug("[WS] Node#{$nodeId} disconnected", [ - 'total' => NodeRegistry::count(), - ]); - } - }; - - Worker::runAll(); - } - - /** - * Handle status data pushed from node via WebSocket - */ - private function handleNodeStatus(int $nodeId, array $data): void - { - $node = Server::find($nodeId); - if (!$node) return; - - $nodeType = strtoupper($node->type); - - // Update last check-in cache - Cache::put(\App\Utils\CacheKey::get('SERVER_' . $nodeType . '_LAST_CHECK_AT', $nodeId), time(), 3600); - - // Update metrics cache via Service - ServerService::updateMetrics($node, $data); - - Log::debug("[WS] Node#{$nodeId} status updated via WebSocket"); - } - - /** - * Handle device report from node via WebSocket - * - * 节点发送全量设备列表,面板负责差异计算 - * 数据格式: {"event": "report.devices", "data": {userId: [ip1, ip2, ...], ...}} - * - * 示例: {"event": "report.devices", "data": {"123": ["1.1.1.1", "2.2.2.2"], "456": ["3.3.3.3"]}} - */ - private function handleDeviceReport(int $nodeId, array $data): void - { - $deviceStateService = app(DeviceStateService::class); - - // 清理该节点的旧数据 - $deviceStateService->clearNodeDevices($nodeId); - - // 全量写入新数据 - foreach ($data as $userId => $ips) { - if (is_numeric($userId) && is_array($ips)) { - $deviceStateService->setDevices((int) $userId, $nodeId, $ips); - } - } - - // 标记该节点待推送(由定时器批量处理) - Redis::sadd('device:push_pending_nodes', $nodeId); - - Log::debug("[WS] Node#{$nodeId} synced " . count($data) . " users"); - } - - /** - * 推送全量设备状态给指定节点 - */ - private function pushDeviceStateToNode(int $nodeId, DeviceStateService $service): void - { - $node = Server::find($nodeId); - if (!$node) return; - - // 获取该节点关联的所有用户 - $users = ServerService::getAvailableUsers($node); - $userIds = $users->pluck('id')->toArray(); - - // 获取这些用户的设备列表 - $devices = $service->getUsersDevices($userIds); - - NodeRegistry::send($nodeId, 'sync.devices', [ - 'users' => $devices - ]); - - Log::debug("[WS] Pushed device state to node#{$nodeId}: " . count($devices) . " users"); - } - - /** - * Handle device state request from node via WebSocket - */ - private function handleDeviceRequest(TcpConnection $conn, int $nodeId): void - { - $node = Server::find($nodeId); - if (!$node) return; - - $users = ServerService::getAvailableUsers($node); - $userIds = $users->pluck('id')->toArray(); - - $deviceStateService = app(DeviceStateService::class); - $devices = $deviceStateService->getUsersDevices($userIds); - - $conn->send(json_encode([ - 'event' => 'sync.devices', - 'data' => ['users' => $devices], - ])); - - Log::debug("[WS] Node#{$nodeId} requested devices, sent " . count($devices) . " users"); - } - - /** - * Subscribe to Redis pub/sub channel for receiving push commands from Laravel. - * Laravel app publishes to "node:push" channel, Workerman picks it up and forwards to the right node. - */ - private function subscribeRedis(): void - { - $host = config('database.redis.default.host', '127.0.0.1'); - $port = config('database.redis.default.port', 6379); - - // Handle Unix Socket connection - if (str_starts_with($host, '/')) { - $redisUri = "unix://{$host}"; - } else { - $redisUri = "redis://{$host}:{$port}"; - } - - $redis = new \Workerman\Redis\Client($redisUri); - - $password = config('database.redis.default.password'); - if ($password) { - $redis->auth($password); - } - - // Get Laravel Redis prefix to match publish() - $prefix = config('database.redis.options.prefix', ''); - $channel = $prefix . 'node:push'; - - $redis->subscribe([$channel], function ($chan, $message) { - $payload = json_decode($message, true); - if (!is_array($payload)) { - return; - } - - $nodeId = $payload['node_id'] ?? null; - $event = $payload['event'] ?? ''; - $data = $payload['data'] ?? []; - - if (!$nodeId || !$event) { - return; - } - - $sent = NodeRegistry::send((int) $nodeId, $event, $data); - if ($sent) { - Log::debug("[WS] Pushed {$event} to node#{$nodeId}, data: " . json_encode($data)); - } - }); - - $this->info("[WS] Subscribed to Redis channel: {$channel}"); - } - - /** - * Push full config + users to a newly connected node. - */ - private function pushFullSync(TcpConnection $conn, Server $node): void - { - $nodeId = $conn->nodeId; - // Push config - $config = ServerService::buildNodeConfig($node); - Log::debug("[WS] Node#{$nodeId} config: ", $config); - $conn->send(json_encode([ - 'event' => 'sync.config', - 'data' => ['config' => $config] - ])); - - // Push users - $users = ServerService::getAvailableUsers($node)->toArray(); - $conn->send(json_encode([ - 'event' => 'sync.users', - 'data' => ['users' => $users] - ])); - - Log::info("[WS] Full sync pushed to node#{$nodeId}", [ - 'users' => count($users), - ]); + $worker = new NodeWorker($host, $port); + $worker->run(); } } diff --git a/app/Http/Controllers/V2/Server/ServerController.php b/app/Http/Controllers/V2/Server/ServerController.php index 5770282..ff58951 100644 --- a/app/Http/Controllers/V2/Server/ServerController.php +++ b/app/Http/Controllers/V2/Server/ServerController.php @@ -3,7 +3,6 @@ namespace App\Http\Controllers\V2\Server; use App\Http\Controllers\Controller; -use App\Jobs\UserAliveSyncJob; use App\Services\DeviceStateService; use App\Services\ServerService; use App\Services\UserService; diff --git a/app/Services/DeviceStateService.php b/app/Services/DeviceStateService.php index ad1d012..09e106c 100644 --- a/app/Services/DeviceStateService.php +++ b/app/Services/DeviceStateService.php @@ -12,6 +12,15 @@ class DeviceStateService private const TTL = 300; // device state ttl private const DB_THROTTLE = 10; // update db throttle + /** + * 移除 Redis key 的前缀 + */ + private function removeRedisPrefix(string $key): string + { + $prefix = config('database.redis.options.prefix', ''); + return $prefix ? substr($key, strlen($prefix)) : $key; + } + /** * 批量设置设备 * 用于 HTTP /alive 和 WebSocket report.devices @@ -21,7 +30,7 @@ class DeviceStateService $key = self::PREFIX . $userId; $timestamp = time(); - $this->clearNodeDevices($nodeId, $userId); + $this->removeNodeDevices($nodeId, $userId); if (!empty($ips)) { $fields = []; @@ -36,49 +45,80 @@ class DeviceStateService } /** - * clear node devices - * - only nodeId: clear all devices of the node - * - userId and nodeId: clear specific user's specific node device + * 获取某节点的所有设备数据 + * 返回: {userId: [ip1, ip2, ...], ...} */ - public function clearNodeDevices(int $nodeId, ?int $userId = null): void + public function getNodeDevices(int $nodeId): array { - if ($userId !== null) { - $key = self::PREFIX . $userId; - $prefix = "{$nodeId}:"; - foreach (Redis::hkeys($key) as $field) { - if (str_starts_with($field, $prefix)) { - Redis::hdel($key, $field); - } - } - return; - } - $keys = Redis::keys(self::PREFIX . '*'); $prefix = "{$nodeId}:"; - + $result = []; foreach ($keys as $key) { - foreach (Redis::hkeys($key) as $field) { + $actualKey = $this->removeRedisPrefix($key); + $uid = (int) substr($actualKey, strlen(self::PREFIX)); + $data = Redis::hgetall($actualKey); + foreach ($data as $field => $timestamp) { if (str_starts_with($field, $prefix)) { - Redis::hdel($key, $field); + $ip = substr($field, strlen($prefix)); + $result[$uid][] = $ip; } } } + + return $result; + } + + /** + * 删除某节点某用户的设备 + */ + public function removeNodeDevices(int $nodeId, int $userId): void + { + $key = self::PREFIX . $userId; + $prefix = "{$nodeId}:"; + + foreach (Redis::hkeys($key) as $field) { + if (str_starts_with($field, $prefix)) { + Redis::hdel($key, $field); + } + } } /** - * get user device count (filter expired data) + * 清除节点所有设备数据(用于节点断开连接) + */ + public function clearAllNodeDevices(int $nodeId): array + { + $oldDevices = $this->getNodeDevices($nodeId); + $prefix = "{$nodeId}:"; + + foreach ($oldDevices as $userId => $ips) { + $key = self::PREFIX . $userId; + foreach (Redis::hkeys($key) as $field) { + if (str_starts_with($field, $prefix)) { + Redis::hdel($key, $field); + } + } + } + + return array_keys($oldDevices); + } + + /** + * get user device count (deduplicated by IP, filter expired data) */ public function getDeviceCount(int $userId): int { $data = Redis::hgetall(self::PREFIX . $userId); $now = time(); - $count = 0; + $ips = []; + foreach ($data as $field => $timestamp) { - // if ($now - $timestamp <= self::TTL) { - $count++; - // } + if ($now - $timestamp <= self::TTL) { + $ips[] = substr($field, strpos($field, ':') + 1); + } } - return $count; + + return count(array_unique($ips)); } /** @@ -108,15 +148,14 @@ class DeviceStateService { $result = []; $now = time(); - foreach ($userIds as $userId) { $data = Redis::hgetall(self::PREFIX . $userId); if (!empty($data)) { $ips = []; foreach ($data as $field => $timestamp) { - // if ($now - $timestamp <= self::TTL) { + if ($now - $timestamp <= self::TTL) { $ips[] = substr($field, strpos($field, ':') + 1); - // } + } } if (!empty($ips)) { $result[$userId] = array_unique($ips); @@ -130,12 +169,12 @@ class DeviceStateService /** * notify update (throttle control) */ - private function notifyUpdate(int $userId): void + public function notifyUpdate(int $userId): void { $dbThrottleKey = "device:db_throttle:{$userId}"; - if (Redis::setnx($dbThrottleKey, 1)) { - Redis::expire($dbThrottleKey, self::DB_THROTTLE); + // if (Redis::setnx($dbThrottleKey, 1)) { + // Redis::expire($dbThrottleKey, self::DB_THROTTLE); User::query() ->whereKey($userId) @@ -143,6 +182,6 @@ class DeviceStateService 'online_count' => $this->getDeviceCount($userId), 'last_online_at' => now(), ]); - } + // } } } diff --git a/app/WebSocket/NodeEventHandlers.php b/app/WebSocket/NodeEventHandlers.php new file mode 100644 index 0000000..75521f8 --- /dev/null +++ b/app/WebSocket/NodeEventHandlers.php @@ -0,0 +1,144 @@ +type); + Cache::put(\App\Utils\CacheKey::get('SERVER_' . $nodeType . '_LAST_CHECK_AT', $nodeId), time(), 3600); + ServerService::updateMetrics($node, $data); + + Log::debug("[WS] Node#{$nodeId} status updated"); + } + + /** + * Handle device report from node + * + * 数据格式: {"event": "report.devices", "data": {userId: [ip1, ip2, ...], ...}} + */ + public static function handleDeviceReport(TcpConnection $conn, int $nodeId, array $data): void + { + $service = app(DeviceStateService::class); + + // Get old data + $oldDevices = $service->getNodeDevices($nodeId); + + // Calculate diff + $removedUsers = array_diff_key($oldDevices, $data); + $newDevices = []; + + foreach ($data as $userId => $ips) { + if (is_numeric($userId) && is_array($ips)) { + $newDevices[(int) $userId] = $ips; + } + } + + // Handle removed users + foreach ($removedUsers as $userId => $ips) { + $service->removeNodeDevices($nodeId, $userId); + $service->notifyUpdate($userId); + } + + // Handle new/updated users + foreach ($newDevices as $userId => $ips) { + $service->setDevices($userId, $nodeId, $ips); + } + + // Mark for push + Redis::sadd('device:push_pending_nodes', $nodeId); + + Log::debug("[WS] Node#{$nodeId} synced " . count($newDevices) . " users, removed " . count($removedUsers)); + } + + /** + * Handle device state request from node + */ + public static function handleDeviceRequest(TcpConnection $conn, int $nodeId, array $data = []): void + { + $node = Server::find($nodeId); + if (!$node) return; + + $users = ServerService::getAvailableUsers($node); + $userIds = $users->pluck('id')->toArray(); + + $service = app(DeviceStateService::class); + $devices = $service->getUsersDevices($userIds); + + $conn->send(json_encode([ + 'event' => 'sync.devices', + 'data' => ['users' => $devices], + ])); + + Log::debug("[WS] Node#{$nodeId} requested devices, sent " . count($devices) . " users"); + } + + /** + * Push device state to node + */ + public static function pushDeviceStateToNode(int $nodeId, DeviceStateService $service): void + { + $node = Server::find($nodeId); + if (!$node) return; + + $users = ServerService::getAvailableUsers($node); + $userIds = $users->pluck('id')->toArray(); + $devices = $service->getUsersDevices($userIds); + + NodeRegistry::send($nodeId, 'sync.devices', [ + 'users' => $devices + ]); + + Log::debug("[WS] Pushed device state to node#{$nodeId}: " . count($devices) . " users"); + } + + /** + * Push full config + users to newly connected node + */ + public static function pushFullSync(TcpConnection $conn, Server $node): void + { + $nodeId = $conn->nodeId; + + // Push config + $config = ServerService::buildNodeConfig($node); + $conn->send(json_encode([ + 'event' => 'sync.config', + 'data' => ['config' => $config] + ])); + + // Push users + $users = ServerService::getAvailableUsers($node)->toArray(); + $conn->send(json_encode([ + 'event' => 'sync.users', + 'data' => ['users' => $users] + ])); + + Log::info("[WS] Full sync pushed to node#{$nodeId}", [ + 'users' => count($users), + ]); + } +} diff --git a/app/WebSocket/NodeWorker.php b/app/WebSocket/NodeWorker.php new file mode 100644 index 0000000..427d96e --- /dev/null +++ b/app/WebSocket/NodeWorker.php @@ -0,0 +1,249 @@ + [NodeEventHandlers::class, 'handlePong'], + 'node.status' => [NodeEventHandlers::class, 'handleNodeStatus'], + 'report.devices' => [NodeEventHandlers::class, 'handleDeviceReport'], + 'request.devices' => [NodeEventHandlers::class, 'handleDeviceRequest'], + ]; + + public function __construct(string $host, int $port) + { + $this->worker = new Worker("websocket://{$host}:{$port}"); + $this->worker->count = 1; + $this->worker->name = 'xboard-ws-server'; + } + + public function run(): void + { + $this->setupLogging(); + $this->setupCallbacks(); + Worker::runAll(); + } + + private function setupLogging(): void + { + $logPath = storage_path('logs'); + if (!is_dir($logPath)) { + mkdir($logPath, 0777, true); + } + Worker::$logFile = $logPath . '/xboard-ws-server.log'; + Worker::$pidFile = $logPath . '/xboard-ws-server.pid'; + } + + private function setupCallbacks(): void + { + $this->worker->onWorkerStart = [$this, 'onWorkerStart']; + $this->worker->onConnect = [$this, 'onConnect']; + $this->worker->onWebSocketConnect = [$this, 'onWebSocketConnect']; + $this->worker->onMessage = [$this, 'onMessage']; + $this->worker->onClose = [$this, 'onClose']; + } + + public function onWorkerStart(Worker $worker): void + { + Log::info("[WS] Worker started, pid={$worker->id}"); + $this->subscribeRedis(); + $this->setupTimers(); + } + + private function setupTimers(): void + { + // Ping timer + Timer::add(self::PING_INTERVAL, function () { + foreach (NodeRegistry::getConnectedNodeIds() as $nodeId) { + $conn = NodeRegistry::get($nodeId); + if ($conn) { + $conn->send(json_encode(['event' => 'ping'])); + } + } + }); + + // Device state push timer + Timer::add(10, function () { + $pendingNodeIds = Redis::spop('device:push_pending_nodes', 100); + if (empty($pendingNodeIds)) { + return; + } + + $service = app(DeviceStateService::class); + foreach ($pendingNodeIds as $nodeId) { + $nodeId = (int) $nodeId; + if (NodeRegistry::get($nodeId) !== null) { + NodeEventHandlers::pushDeviceStateToNode($nodeId, $service); + } + } + }); + } + + public function onConnect(TcpConnection $conn): void + { + $conn->authTimer = Timer::add(self::AUTH_TIMEOUT, function () use ($conn) { + if (empty($conn->nodeId)) { + $conn->close(json_encode([ + 'event' => 'error', + 'data' => ['message' => 'auth timeout'], + ])); + } + }, [], false); + } + + public function onWebSocketConnect(TcpConnection $conn, $httpMessage): void + { + $queryString = ''; + if (is_string($httpMessage)) { + $queryString = parse_url($httpMessage, PHP_URL_QUERY) ?? ''; + } elseif ($httpMessage instanceof \Workerman\Protocols\Http\Request) { + $queryString = $httpMessage->queryString(); + } + + parse_str($queryString, $params); + $token = $params['token'] ?? ''; + $nodeId = (int) ($params['node_id'] ?? 0); + + // Authenticate + $serverToken = admin_setting('server_token', ''); + if ($token === '' || $serverToken === '' || !hash_equals($serverToken, $token)) { + $conn->close(json_encode([ + 'event' => 'error', + 'data' => ['message' => 'invalid token'], + ])); + return; + } + + $node = Server::find($nodeId); + if (!$node) { + $conn->close(json_encode([ + 'event' => 'error', + 'data' => ['message' => 'node not found'], + ])); + return; + } + + // Auth passed + if (isset($conn->authTimer)) { + Timer::del($conn->authTimer); + } + + $conn->nodeId = $nodeId; + NodeRegistry::add($nodeId, $conn); + Cache::put("node_ws_alive:{$nodeId}", true, 86400); + + // Clear old device data + app(DeviceStateService::class)->clearAllNodeDevices($nodeId); + + Log::debug("[WS] Node#{$nodeId} connected", [ + 'remote' => $conn->getRemoteIp(), + 'total' => NodeRegistry::count(), + ]); + + // Send auth success + $conn->send(json_encode([ + 'event' => 'auth.success', + 'data' => ['node_id' => $nodeId], + ])); + + // Push full sync + NodeEventHandlers::pushFullSync($conn, $node); + } + + public function onMessage(TcpConnection $conn, $data): void + { + $msg = json_decode($data, true); + if (!is_array($msg)) { + return; + } + + $event = $msg['event'] ?? ''; + $nodeId = $conn->nodeId ?? null; + + if (isset($this->handlers[$event]) && $nodeId) { + $handler = $this->handlers[$event]; + $handler($conn, $nodeId, $msg['data'] ?? []); + } + } + + public function onClose(TcpConnection $conn): void + { + if (!empty($conn->nodeId)) { + $nodeId = $conn->nodeId; + NodeRegistry::remove($nodeId); + Cache::forget("node_ws_alive:{$nodeId}"); + + $service = app(DeviceStateService::class); + $affectedUserIds = $service->clearAllNodeDevices($nodeId); + foreach ($affectedUserIds as $userId) { + $service->notifyUpdate($userId); + } + + Log::debug("[WS] Node#{$nodeId} disconnected", [ + 'total' => NodeRegistry::count(), + 'affected_users' => count($affectedUserIds), + ]); + } + } + + private function subscribeRedis(): void + { + $host = config('database.redis.default.host', '127.0.0.1'); + $port = config('database.redis.default.port', 6379); + + if (str_starts_with($host, '/')) { + $redisUri = "unix://{$host}"; + } else { + $redisUri = "redis://{$host}:{$port}"; + } + + $redis = new \Workerman\Redis\Client($redisUri); + + $password = config('database.redis.default.password'); + if ($password) { + $redis->auth($password); + } + + $prefix = config('database.redis.options.prefix', ''); + $channel = $prefix . 'node:push'; + + $redis->subscribe([$channel], function ($chan, $message) { + $payload = json_decode($message, true); + if (!is_array($payload)) { + return; + } + + $nodeId = $payload['node_id'] ?? null; + $event = $payload['event'] ?? ''; + $data = $payload['data'] ?? []; + + if (!$nodeId || !$event) { + return; + } + + $sent = NodeRegistry::send((int) $nodeId, $event, $data); + if ($sent) { + Log::debug("[WS] Pushed {$event} to node#{$nodeId}"); + } + }); + + Log::info("[WS] Subscribed to Redis channel: {$channel}"); + } +}