mirror of
https://github.com/lkddi/Xboard.git
synced 2026-04-28 06:47:24 +08:00
fix: resolve device sync issues and refactor WebSocket server
This commit is contained in:
@@ -2,18 +2,8 @@
|
|||||||
|
|
||||||
namespace App\Console\Commands;
|
namespace App\Console\Commands;
|
||||||
|
|
||||||
use App\Models\Server;
|
use App\WebSocket\NodeWorker;
|
||||||
use App\Services\DeviceStateService;
|
|
||||||
use App\Services\NodeSyncService;
|
|
||||||
use App\Services\NodeRegistry;
|
|
||||||
use App\Services\ServerService;
|
|
||||||
use Illuminate\Console\Command;
|
use Illuminate\Console\Command;
|
||||||
use Illuminate\Support\Facades\Cache;
|
|
||||||
use Illuminate\Support\Facades\Log;
|
|
||||||
use Illuminate\Support\Facades\Redis;
|
|
||||||
use Workerman\Connection\TcpConnection;
|
|
||||||
use Workerman\Timer;
|
|
||||||
use Workerman\Worker;
|
|
||||||
|
|
||||||
class NodeWebSocketServer extends Command
|
class NodeWebSocketServer extends Command
|
||||||
{
|
{
|
||||||
@@ -25,18 +15,11 @@ class NodeWebSocketServer extends Command
|
|||||||
|
|
||||||
protected $description = 'Start the WebSocket server for node-panel synchronization';
|
protected $description = 'Start the WebSocket server for node-panel synchronization';
|
||||||
|
|
||||||
/** Auth timeout in seconds — close unauthenticated connections */
|
|
||||||
private const AUTH_TIMEOUT = 10;
|
|
||||||
|
|
||||||
/** Ping interval in seconds */
|
|
||||||
private const PING_INTERVAL = 55;
|
|
||||||
|
|
||||||
public function handle(): void
|
public function handle(): void
|
||||||
{
|
{
|
||||||
global $argv;
|
global $argv;
|
||||||
$action = $this->argument('action');
|
$action = $this->argument('action');
|
||||||
|
|
||||||
// 重新构建 argv 供 Workerman 解析
|
|
||||||
$argv[1] = $action;
|
$argv[1] = $action;
|
||||||
if ($this->option('d')) {
|
if ($this->option('d')) {
|
||||||
$argv[2] = '-d';
|
$argv[2] = '-d';
|
||||||
@@ -45,341 +28,7 @@ class NodeWebSocketServer extends Command
|
|||||||
$host = $this->option('host');
|
$host = $this->option('host');
|
||||||
$port = $this->option('port');
|
$port = $this->option('port');
|
||||||
|
|
||||||
$worker = new Worker("websocket://{$host}:{$port}");
|
$worker = new NodeWorker($host, $port);
|
||||||
$worker->count = 1;
|
$worker->run();
|
||||||
$worker->name = 'xboard-ws-server';
|
|
||||||
|
|
||||||
// 设置日志和 PID 文件路径
|
|
||||||
$logPath = storage_path('logs');
|
|
||||||
if (!is_dir($logPath)) {
|
|
||||||
mkdir($logPath, 0777, true);
|
|
||||||
}
|
|
||||||
Worker::$logFile = $logPath . '/xboard-ws-server.log'; // 指向具体文件,避免某些环境 php://stdout 的 stat 失败
|
|
||||||
Worker::$pidFile = $logPath . '/xboard-ws-server.pid';
|
|
||||||
|
|
||||||
$worker->onWorkerStart = function (Worker $worker) {
|
|
||||||
$this->info("[WS] Worker started, pid={$worker->id}");
|
|
||||||
$this->subscribeRedis();
|
|
||||||
|
|
||||||
// Periodic ping to detect dead connections
|
|
||||||
Timer::add(self::PING_INTERVAL, function () {
|
|
||||||
foreach (NodeRegistry::getConnectedNodeIds() as $nodeId) {
|
|
||||||
$conn = NodeRegistry::get($nodeId);
|
|
||||||
if ($conn) {
|
|
||||||
$conn->send(json_encode(['event' => 'ping']));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
// 定时推送设备状态给节点(每10秒)
|
|
||||||
Timer::add(10, function () {
|
|
||||||
$pendingNodeIds = Redis::spop('device:push_pending_nodes', 100);
|
|
||||||
if (empty($pendingNodeIds)) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
$deviceStateService = app(DeviceStateService::class);
|
|
||||||
|
|
||||||
foreach ($pendingNodeIds as $nodeId) {
|
|
||||||
$nodeId = (int) $nodeId;
|
|
||||||
if (NodeRegistry::get($nodeId) !== null) {
|
|
||||||
$this->pushDeviceStateToNode($nodeId, $deviceStateService);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
};
|
|
||||||
|
|
||||||
$worker->onConnect = function (TcpConnection $conn) {
|
|
||||||
// Set auth timeout — must authenticate within N seconds or get disconnected
|
|
||||||
$conn->authTimer = Timer::add(self::AUTH_TIMEOUT, function () use ($conn) {
|
|
||||||
if (empty($conn->nodeId)) {
|
|
||||||
$conn->close(json_encode([
|
|
||||||
'event' => 'error',
|
|
||||||
'data' => ['message' => 'auth timeout'],
|
|
||||||
]));
|
|
||||||
}
|
|
||||||
}, [], false);
|
|
||||||
};
|
|
||||||
|
|
||||||
$worker->onWebSocketConnect = function (TcpConnection $conn, $httpMessage) {
|
|
||||||
// Parse query string from the WebSocket upgrade request
|
|
||||||
// In Workerman 4.x/5.x with onWebSocketConnect, the second arg can be a string or Request object
|
|
||||||
$queryString = '';
|
|
||||||
if (is_string($httpMessage)) {
|
|
||||||
$queryString = parse_url($httpMessage, PHP_URL_QUERY) ?? '';
|
|
||||||
} elseif ($httpMessage instanceof \Workerman\Protocols\Http\Request) {
|
|
||||||
$queryString = $httpMessage->queryString();
|
|
||||||
}
|
|
||||||
|
|
||||||
parse_str($queryString, $params);
|
|
||||||
|
|
||||||
$token = $params['token'] ?? '';
|
|
||||||
$nodeId = (int) ($params['node_id'] ?? 0);
|
|
||||||
|
|
||||||
// Authenticate
|
|
||||||
$serverToken = admin_setting('server_token', '');
|
|
||||||
if ($token === '' || $serverToken === '' || !hash_equals($serverToken, $token)) {
|
|
||||||
$conn->close(json_encode([
|
|
||||||
'event' => 'error',
|
|
||||||
'data' => ['message' => 'invalid token'],
|
|
||||||
]));
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
$node = Server::find($nodeId);
|
|
||||||
if (!$node) {
|
|
||||||
$conn->close(json_encode([
|
|
||||||
'event' => 'error',
|
|
||||||
'data' => ['message' => 'node not found'],
|
|
||||||
]));
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Auth passed — cancel timeout, register connection
|
|
||||||
if (isset($conn->authTimer)) {
|
|
||||||
Timer::del($conn->authTimer);
|
|
||||||
}
|
|
||||||
|
|
||||||
$conn->nodeId = $nodeId;
|
|
||||||
NodeRegistry::add($nodeId, $conn);
|
|
||||||
Cache::put("node_ws_alive:{$nodeId}", true, 86400);
|
|
||||||
|
|
||||||
// 清理该节点的旧设备数据(节点重连后需重新上报全量)
|
|
||||||
$deviceStateService = app(DeviceStateService::class);
|
|
||||||
$deviceStateService->clearNodeDevices($nodeId);
|
|
||||||
|
|
||||||
Log::debug("[WS] Node#{$nodeId} connected", [
|
|
||||||
'remote' => $conn->getRemoteIp(),
|
|
||||||
'total' => NodeRegistry::count(),
|
|
||||||
]);
|
|
||||||
|
|
||||||
// Send auth success
|
|
||||||
$conn->send(json_encode([
|
|
||||||
'event' => 'auth.success',
|
|
||||||
'data' => ['node_id' => $nodeId],
|
|
||||||
]));
|
|
||||||
|
|
||||||
// Push full sync (config + users) immediately to this specific connection
|
|
||||||
$this->pushFullSync($conn, $node);
|
|
||||||
};
|
|
||||||
|
|
||||||
$worker->onMessage = function (TcpConnection $conn, $data) {
|
|
||||||
$msg = json_decode($data, true);
|
|
||||||
if (!is_array($msg)) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
$event = $msg['event'] ?? '';
|
|
||||||
$nodeId = $conn->nodeId ?? null;
|
|
||||||
|
|
||||||
switch ($event) {
|
|
||||||
case 'pong':
|
|
||||||
// Heartbeat response — node is alive
|
|
||||||
if ($nodeId) {
|
|
||||||
Cache::put("node_ws_alive:{$nodeId}", true, 86400);
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
case 'node.status':
|
|
||||||
if ($nodeId && isset($msg['data'])) {
|
|
||||||
$this->handleNodeStatus($nodeId, $msg['data']);
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
case 'report.devices':
|
|
||||||
if ($nodeId && isset($msg['data'])) {
|
|
||||||
$this->handleDeviceReport($nodeId, $msg['data']);
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
case 'request.devices':
|
|
||||||
if ($nodeId) {
|
|
||||||
$this->handleDeviceRequest($conn, $nodeId);
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
default:
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
$worker->onClose = function (TcpConnection $conn) {
|
|
||||||
if (!empty($conn->nodeId)) {
|
|
||||||
$nodeId = $conn->nodeId;
|
|
||||||
NodeRegistry::remove($nodeId);
|
|
||||||
Cache::forget("node_ws_alive:{$nodeId}");
|
|
||||||
|
|
||||||
app(DeviceStateService::class)->clearNodeDevices($nodeId);
|
|
||||||
|
|
||||||
Log::debug("[WS] Node#{$nodeId} disconnected", [
|
|
||||||
'total' => NodeRegistry::count(),
|
|
||||||
]);
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
Worker::runAll();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Handle status data pushed from node via WebSocket
|
|
||||||
*/
|
|
||||||
private function handleNodeStatus(int $nodeId, array $data): void
|
|
||||||
{
|
|
||||||
$node = Server::find($nodeId);
|
|
||||||
if (!$node) return;
|
|
||||||
|
|
||||||
$nodeType = strtoupper($node->type);
|
|
||||||
|
|
||||||
// Update last check-in cache
|
|
||||||
Cache::put(\App\Utils\CacheKey::get('SERVER_' . $nodeType . '_LAST_CHECK_AT', $nodeId), time(), 3600);
|
|
||||||
|
|
||||||
// Update metrics cache via Service
|
|
||||||
ServerService::updateMetrics($node, $data);
|
|
||||||
|
|
||||||
Log::debug("[WS] Node#{$nodeId} status updated via WebSocket");
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Handle device report from node via WebSocket
|
|
||||||
*
|
|
||||||
* 节点发送全量设备列表,面板负责差异计算
|
|
||||||
* 数据格式: {"event": "report.devices", "data": {userId: [ip1, ip2, ...], ...}}
|
|
||||||
*
|
|
||||||
* 示例: {"event": "report.devices", "data": {"123": ["1.1.1.1", "2.2.2.2"], "456": ["3.3.3.3"]}}
|
|
||||||
*/
|
|
||||||
private function handleDeviceReport(int $nodeId, array $data): void
|
|
||||||
{
|
|
||||||
$deviceStateService = app(DeviceStateService::class);
|
|
||||||
|
|
||||||
// 清理该节点的旧数据
|
|
||||||
$deviceStateService->clearNodeDevices($nodeId);
|
|
||||||
|
|
||||||
// 全量写入新数据
|
|
||||||
foreach ($data as $userId => $ips) {
|
|
||||||
if (is_numeric($userId) && is_array($ips)) {
|
|
||||||
$deviceStateService->setDevices((int) $userId, $nodeId, $ips);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// 标记该节点待推送(由定时器批量处理)
|
|
||||||
Redis::sadd('device:push_pending_nodes', $nodeId);
|
|
||||||
|
|
||||||
Log::debug("[WS] Node#{$nodeId} synced " . count($data) . " users");
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 推送全量设备状态给指定节点
|
|
||||||
*/
|
|
||||||
private function pushDeviceStateToNode(int $nodeId, DeviceStateService $service): void
|
|
||||||
{
|
|
||||||
$node = Server::find($nodeId);
|
|
||||||
if (!$node) return;
|
|
||||||
|
|
||||||
// 获取该节点关联的所有用户
|
|
||||||
$users = ServerService::getAvailableUsers($node);
|
|
||||||
$userIds = $users->pluck('id')->toArray();
|
|
||||||
|
|
||||||
// 获取这些用户的设备列表
|
|
||||||
$devices = $service->getUsersDevices($userIds);
|
|
||||||
|
|
||||||
NodeRegistry::send($nodeId, 'sync.devices', [
|
|
||||||
'users' => $devices
|
|
||||||
]);
|
|
||||||
|
|
||||||
Log::debug("[WS] Pushed device state to node#{$nodeId}: " . count($devices) . " users");
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Handle device state request from node via WebSocket
|
|
||||||
*/
|
|
||||||
private function handleDeviceRequest(TcpConnection $conn, int $nodeId): void
|
|
||||||
{
|
|
||||||
$node = Server::find($nodeId);
|
|
||||||
if (!$node) return;
|
|
||||||
|
|
||||||
$users = ServerService::getAvailableUsers($node);
|
|
||||||
$userIds = $users->pluck('id')->toArray();
|
|
||||||
|
|
||||||
$deviceStateService = app(DeviceStateService::class);
|
|
||||||
$devices = $deviceStateService->getUsersDevices($userIds);
|
|
||||||
|
|
||||||
$conn->send(json_encode([
|
|
||||||
'event' => 'sync.devices',
|
|
||||||
'data' => ['users' => $devices],
|
|
||||||
]));
|
|
||||||
|
|
||||||
Log::debug("[WS] Node#{$nodeId} requested devices, sent " . count($devices) . " users");
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Subscribe to Redis pub/sub channel for receiving push commands from Laravel.
|
|
||||||
* Laravel app publishes to "node:push" channel, Workerman picks it up and forwards to the right node.
|
|
||||||
*/
|
|
||||||
private function subscribeRedis(): void
|
|
||||||
{
|
|
||||||
$host = config('database.redis.default.host', '127.0.0.1');
|
|
||||||
$port = config('database.redis.default.port', 6379);
|
|
||||||
|
|
||||||
// Handle Unix Socket connection
|
|
||||||
if (str_starts_with($host, '/')) {
|
|
||||||
$redisUri = "unix://{$host}";
|
|
||||||
} else {
|
|
||||||
$redisUri = "redis://{$host}:{$port}";
|
|
||||||
}
|
|
||||||
|
|
||||||
$redis = new \Workerman\Redis\Client($redisUri);
|
|
||||||
|
|
||||||
$password = config('database.redis.default.password');
|
|
||||||
if ($password) {
|
|
||||||
$redis->auth($password);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Get Laravel Redis prefix to match publish()
|
|
||||||
$prefix = config('database.redis.options.prefix', '');
|
|
||||||
$channel = $prefix . 'node:push';
|
|
||||||
|
|
||||||
$redis->subscribe([$channel], function ($chan, $message) {
|
|
||||||
$payload = json_decode($message, true);
|
|
||||||
if (!is_array($payload)) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
$nodeId = $payload['node_id'] ?? null;
|
|
||||||
$event = $payload['event'] ?? '';
|
|
||||||
$data = $payload['data'] ?? [];
|
|
||||||
|
|
||||||
if (!$nodeId || !$event) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
$sent = NodeRegistry::send((int) $nodeId, $event, $data);
|
|
||||||
if ($sent) {
|
|
||||||
Log::debug("[WS] Pushed {$event} to node#{$nodeId}, data: " . json_encode($data));
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
$this->info("[WS] Subscribed to Redis channel: {$channel}");
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Push full config + users to a newly connected node.
|
|
||||||
*/
|
|
||||||
private function pushFullSync(TcpConnection $conn, Server $node): void
|
|
||||||
{
|
|
||||||
$nodeId = $conn->nodeId;
|
|
||||||
// Push config
|
|
||||||
$config = ServerService::buildNodeConfig($node);
|
|
||||||
Log::debug("[WS] Node#{$nodeId} config: ", $config);
|
|
||||||
$conn->send(json_encode([
|
|
||||||
'event' => 'sync.config',
|
|
||||||
'data' => ['config' => $config]
|
|
||||||
]));
|
|
||||||
|
|
||||||
// Push users
|
|
||||||
$users = ServerService::getAvailableUsers($node)->toArray();
|
|
||||||
$conn->send(json_encode([
|
|
||||||
'event' => 'sync.users',
|
|
||||||
'data' => ['users' => $users]
|
|
||||||
]));
|
|
||||||
|
|
||||||
Log::info("[WS] Full sync pushed to node#{$nodeId}", [
|
|
||||||
'users' => count($users),
|
|
||||||
]);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -3,7 +3,6 @@
|
|||||||
namespace App\Http\Controllers\V2\Server;
|
namespace App\Http\Controllers\V2\Server;
|
||||||
|
|
||||||
use App\Http\Controllers\Controller;
|
use App\Http\Controllers\Controller;
|
||||||
use App\Jobs\UserAliveSyncJob;
|
|
||||||
use App\Services\DeviceStateService;
|
use App\Services\DeviceStateService;
|
||||||
use App\Services\ServerService;
|
use App\Services\ServerService;
|
||||||
use App\Services\UserService;
|
use App\Services\UserService;
|
||||||
|
|||||||
@@ -12,6 +12,15 @@ class DeviceStateService
|
|||||||
private const TTL = 300; // device state ttl
|
private const TTL = 300; // device state ttl
|
||||||
private const DB_THROTTLE = 10; // update db throttle
|
private const DB_THROTTLE = 10; // update db throttle
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 移除 Redis key 的前缀
|
||||||
|
*/
|
||||||
|
private function removeRedisPrefix(string $key): string
|
||||||
|
{
|
||||||
|
$prefix = config('database.redis.options.prefix', '');
|
||||||
|
return $prefix ? substr($key, strlen($prefix)) : $key;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 批量设置设备
|
* 批量设置设备
|
||||||
* 用于 HTTP /alive 和 WebSocket report.devices
|
* 用于 HTTP /alive 和 WebSocket report.devices
|
||||||
@@ -21,7 +30,7 @@ class DeviceStateService
|
|||||||
$key = self::PREFIX . $userId;
|
$key = self::PREFIX . $userId;
|
||||||
$timestamp = time();
|
$timestamp = time();
|
||||||
|
|
||||||
$this->clearNodeDevices($nodeId, $userId);
|
$this->removeNodeDevices($nodeId, $userId);
|
||||||
|
|
||||||
if (!empty($ips)) {
|
if (!empty($ips)) {
|
||||||
$fields = [];
|
$fields = [];
|
||||||
@@ -36,49 +45,80 @@ class DeviceStateService
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* clear node devices
|
* 获取某节点的所有设备数据
|
||||||
* - only nodeId: clear all devices of the node
|
* 返回: {userId: [ip1, ip2, ...], ...}
|
||||||
* - userId and nodeId: clear specific user's specific node device
|
|
||||||
*/
|
*/
|
||||||
public function clearNodeDevices(int $nodeId, ?int $userId = null): void
|
public function getNodeDevices(int $nodeId): array
|
||||||
{
|
{
|
||||||
if ($userId !== null) {
|
|
||||||
$key = self::PREFIX . $userId;
|
|
||||||
$prefix = "{$nodeId}:";
|
|
||||||
foreach (Redis::hkeys($key) as $field) {
|
|
||||||
if (str_starts_with($field, $prefix)) {
|
|
||||||
Redis::hdel($key, $field);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
$keys = Redis::keys(self::PREFIX . '*');
|
$keys = Redis::keys(self::PREFIX . '*');
|
||||||
$prefix = "{$nodeId}:";
|
$prefix = "{$nodeId}:";
|
||||||
|
$result = [];
|
||||||
foreach ($keys as $key) {
|
foreach ($keys as $key) {
|
||||||
foreach (Redis::hkeys($key) as $field) {
|
$actualKey = $this->removeRedisPrefix($key);
|
||||||
|
$uid = (int) substr($actualKey, strlen(self::PREFIX));
|
||||||
|
$data = Redis::hgetall($actualKey);
|
||||||
|
foreach ($data as $field => $timestamp) {
|
||||||
if (str_starts_with($field, $prefix)) {
|
if (str_starts_with($field, $prefix)) {
|
||||||
Redis::hdel($key, $field);
|
$ip = substr($field, strlen($prefix));
|
||||||
|
$result[$uid][] = $ip;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return $result;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 删除某节点某用户的设备
|
||||||
|
*/
|
||||||
|
public function removeNodeDevices(int $nodeId, int $userId): void
|
||||||
|
{
|
||||||
|
$key = self::PREFIX . $userId;
|
||||||
|
$prefix = "{$nodeId}:";
|
||||||
|
|
||||||
|
foreach (Redis::hkeys($key) as $field) {
|
||||||
|
if (str_starts_with($field, $prefix)) {
|
||||||
|
Redis::hdel($key, $field);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* get user device count (filter expired data)
|
* 清除节点所有设备数据(用于节点断开连接)
|
||||||
|
*/
|
||||||
|
public function clearAllNodeDevices(int $nodeId): array
|
||||||
|
{
|
||||||
|
$oldDevices = $this->getNodeDevices($nodeId);
|
||||||
|
$prefix = "{$nodeId}:";
|
||||||
|
|
||||||
|
foreach ($oldDevices as $userId => $ips) {
|
||||||
|
$key = self::PREFIX . $userId;
|
||||||
|
foreach (Redis::hkeys($key) as $field) {
|
||||||
|
if (str_starts_with($field, $prefix)) {
|
||||||
|
Redis::hdel($key, $field);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return array_keys($oldDevices);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* get user device count (deduplicated by IP, filter expired data)
|
||||||
*/
|
*/
|
||||||
public function getDeviceCount(int $userId): int
|
public function getDeviceCount(int $userId): int
|
||||||
{
|
{
|
||||||
$data = Redis::hgetall(self::PREFIX . $userId);
|
$data = Redis::hgetall(self::PREFIX . $userId);
|
||||||
$now = time();
|
$now = time();
|
||||||
$count = 0;
|
$ips = [];
|
||||||
|
|
||||||
foreach ($data as $field => $timestamp) {
|
foreach ($data as $field => $timestamp) {
|
||||||
// if ($now - $timestamp <= self::TTL) {
|
if ($now - $timestamp <= self::TTL) {
|
||||||
$count++;
|
$ips[] = substr($field, strpos($field, ':') + 1);
|
||||||
// }
|
}
|
||||||
}
|
}
|
||||||
return $count;
|
|
||||||
|
return count(array_unique($ips));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -108,15 +148,14 @@ class DeviceStateService
|
|||||||
{
|
{
|
||||||
$result = [];
|
$result = [];
|
||||||
$now = time();
|
$now = time();
|
||||||
|
|
||||||
foreach ($userIds as $userId) {
|
foreach ($userIds as $userId) {
|
||||||
$data = Redis::hgetall(self::PREFIX . $userId);
|
$data = Redis::hgetall(self::PREFIX . $userId);
|
||||||
if (!empty($data)) {
|
if (!empty($data)) {
|
||||||
$ips = [];
|
$ips = [];
|
||||||
foreach ($data as $field => $timestamp) {
|
foreach ($data as $field => $timestamp) {
|
||||||
// if ($now - $timestamp <= self::TTL) {
|
if ($now - $timestamp <= self::TTL) {
|
||||||
$ips[] = substr($field, strpos($field, ':') + 1);
|
$ips[] = substr($field, strpos($field, ':') + 1);
|
||||||
// }
|
}
|
||||||
}
|
}
|
||||||
if (!empty($ips)) {
|
if (!empty($ips)) {
|
||||||
$result[$userId] = array_unique($ips);
|
$result[$userId] = array_unique($ips);
|
||||||
@@ -130,12 +169,12 @@ class DeviceStateService
|
|||||||
/**
|
/**
|
||||||
* notify update (throttle control)
|
* notify update (throttle control)
|
||||||
*/
|
*/
|
||||||
private function notifyUpdate(int $userId): void
|
public function notifyUpdate(int $userId): void
|
||||||
{
|
{
|
||||||
$dbThrottleKey = "device:db_throttle:{$userId}";
|
$dbThrottleKey = "device:db_throttle:{$userId}";
|
||||||
|
|
||||||
if (Redis::setnx($dbThrottleKey, 1)) {
|
// if (Redis::setnx($dbThrottleKey, 1)) {
|
||||||
Redis::expire($dbThrottleKey, self::DB_THROTTLE);
|
// Redis::expire($dbThrottleKey, self::DB_THROTTLE);
|
||||||
|
|
||||||
User::query()
|
User::query()
|
||||||
->whereKey($userId)
|
->whereKey($userId)
|
||||||
@@ -143,6 +182,6 @@ class DeviceStateService
|
|||||||
'online_count' => $this->getDeviceCount($userId),
|
'online_count' => $this->getDeviceCount($userId),
|
||||||
'last_online_at' => now(),
|
'last_online_at' => now(),
|
||||||
]);
|
]);
|
||||||
}
|
// }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,144 @@
|
|||||||
|
<?php
|
||||||
|
|
||||||
|
namespace App\WebSocket;
|
||||||
|
|
||||||
|
use App\Models\Server;
|
||||||
|
use App\Services\DeviceStateService;
|
||||||
|
use App\Services\NodeRegistry;
|
||||||
|
use App\Services\ServerService;
|
||||||
|
use Illuminate\Support\Facades\Cache;
|
||||||
|
use Illuminate\Support\Facades\Log;
|
||||||
|
use Illuminate\Support\Facades\Redis;
|
||||||
|
use Workerman\Connection\TcpConnection;
|
||||||
|
|
||||||
|
class NodeEventHandlers
|
||||||
|
{
|
||||||
|
/**
|
||||||
|
* Handle pong heartbeat
|
||||||
|
*/
|
||||||
|
public static function handlePong(TcpConnection $conn, int $nodeId, array $data = []): void
|
||||||
|
{
|
||||||
|
Cache::put("node_ws_alive:{$nodeId}", true, 86400);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Handle node status update
|
||||||
|
*/
|
||||||
|
public static function handleNodeStatus(TcpConnection $conn, int $nodeId, array $data): void
|
||||||
|
{
|
||||||
|
$node = Server::find($nodeId);
|
||||||
|
if (!$node) return;
|
||||||
|
|
||||||
|
$nodeType = strtoupper($node->type);
|
||||||
|
Cache::put(\App\Utils\CacheKey::get('SERVER_' . $nodeType . '_LAST_CHECK_AT', $nodeId), time(), 3600);
|
||||||
|
ServerService::updateMetrics($node, $data);
|
||||||
|
|
||||||
|
Log::debug("[WS] Node#{$nodeId} status updated");
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Handle device report from node
|
||||||
|
*
|
||||||
|
* 数据格式: {"event": "report.devices", "data": {userId: [ip1, ip2, ...], ...}}
|
||||||
|
*/
|
||||||
|
public static function handleDeviceReport(TcpConnection $conn, int $nodeId, array $data): void
|
||||||
|
{
|
||||||
|
$service = app(DeviceStateService::class);
|
||||||
|
|
||||||
|
// Get old data
|
||||||
|
$oldDevices = $service->getNodeDevices($nodeId);
|
||||||
|
|
||||||
|
// Calculate diff
|
||||||
|
$removedUsers = array_diff_key($oldDevices, $data);
|
||||||
|
$newDevices = [];
|
||||||
|
|
||||||
|
foreach ($data as $userId => $ips) {
|
||||||
|
if (is_numeric($userId) && is_array($ips)) {
|
||||||
|
$newDevices[(int) $userId] = $ips;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Handle removed users
|
||||||
|
foreach ($removedUsers as $userId => $ips) {
|
||||||
|
$service->removeNodeDevices($nodeId, $userId);
|
||||||
|
$service->notifyUpdate($userId);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Handle new/updated users
|
||||||
|
foreach ($newDevices as $userId => $ips) {
|
||||||
|
$service->setDevices($userId, $nodeId, $ips);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Mark for push
|
||||||
|
Redis::sadd('device:push_pending_nodes', $nodeId);
|
||||||
|
|
||||||
|
Log::debug("[WS] Node#{$nodeId} synced " . count($newDevices) . " users, removed " . count($removedUsers));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Handle device state request from node
|
||||||
|
*/
|
||||||
|
public static function handleDeviceRequest(TcpConnection $conn, int $nodeId, array $data = []): void
|
||||||
|
{
|
||||||
|
$node = Server::find($nodeId);
|
||||||
|
if (!$node) return;
|
||||||
|
|
||||||
|
$users = ServerService::getAvailableUsers($node);
|
||||||
|
$userIds = $users->pluck('id')->toArray();
|
||||||
|
|
||||||
|
$service = app(DeviceStateService::class);
|
||||||
|
$devices = $service->getUsersDevices($userIds);
|
||||||
|
|
||||||
|
$conn->send(json_encode([
|
||||||
|
'event' => 'sync.devices',
|
||||||
|
'data' => ['users' => $devices],
|
||||||
|
]));
|
||||||
|
|
||||||
|
Log::debug("[WS] Node#{$nodeId} requested devices, sent " . count($devices) . " users");
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Push device state to node
|
||||||
|
*/
|
||||||
|
public static function pushDeviceStateToNode(int $nodeId, DeviceStateService $service): void
|
||||||
|
{
|
||||||
|
$node = Server::find($nodeId);
|
||||||
|
if (!$node) return;
|
||||||
|
|
||||||
|
$users = ServerService::getAvailableUsers($node);
|
||||||
|
$userIds = $users->pluck('id')->toArray();
|
||||||
|
$devices = $service->getUsersDevices($userIds);
|
||||||
|
|
||||||
|
NodeRegistry::send($nodeId, 'sync.devices', [
|
||||||
|
'users' => $devices
|
||||||
|
]);
|
||||||
|
|
||||||
|
Log::debug("[WS] Pushed device state to node#{$nodeId}: " . count($devices) . " users");
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Push full config + users to newly connected node
|
||||||
|
*/
|
||||||
|
public static function pushFullSync(TcpConnection $conn, Server $node): void
|
||||||
|
{
|
||||||
|
$nodeId = $conn->nodeId;
|
||||||
|
|
||||||
|
// Push config
|
||||||
|
$config = ServerService::buildNodeConfig($node);
|
||||||
|
$conn->send(json_encode([
|
||||||
|
'event' => 'sync.config',
|
||||||
|
'data' => ['config' => $config]
|
||||||
|
]));
|
||||||
|
|
||||||
|
// Push users
|
||||||
|
$users = ServerService::getAvailableUsers($node)->toArray();
|
||||||
|
$conn->send(json_encode([
|
||||||
|
'event' => 'sync.users',
|
||||||
|
'data' => ['users' => $users]
|
||||||
|
]));
|
||||||
|
|
||||||
|
Log::info("[WS] Full sync pushed to node#{$nodeId}", [
|
||||||
|
'users' => count($users),
|
||||||
|
]);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,249 @@
|
|||||||
|
<?php
|
||||||
|
|
||||||
|
namespace App\WebSocket;
|
||||||
|
|
||||||
|
use App\Models\Server;
|
||||||
|
use App\Services\DeviceStateService;
|
||||||
|
use App\Services\NodeRegistry;
|
||||||
|
use App\Services\ServerService;
|
||||||
|
use Illuminate\Support\Facades\Cache;
|
||||||
|
use Illuminate\Support\Facades\Log;
|
||||||
|
use Illuminate\Support\Facades\Redis;
|
||||||
|
use Workerman\Connection\TcpConnection;
|
||||||
|
use Workerman\Timer;
|
||||||
|
use Workerman\Worker;
|
||||||
|
|
||||||
|
class NodeWorker
|
||||||
|
{
|
||||||
|
private const AUTH_TIMEOUT = 10;
|
||||||
|
private const PING_INTERVAL = 55;
|
||||||
|
|
||||||
|
private Worker $worker;
|
||||||
|
|
||||||
|
private array $handlers = [
|
||||||
|
'pong' => [NodeEventHandlers::class, 'handlePong'],
|
||||||
|
'node.status' => [NodeEventHandlers::class, 'handleNodeStatus'],
|
||||||
|
'report.devices' => [NodeEventHandlers::class, 'handleDeviceReport'],
|
||||||
|
'request.devices' => [NodeEventHandlers::class, 'handleDeviceRequest'],
|
||||||
|
];
|
||||||
|
|
||||||
|
public function __construct(string $host, int $port)
|
||||||
|
{
|
||||||
|
$this->worker = new Worker("websocket://{$host}:{$port}");
|
||||||
|
$this->worker->count = 1;
|
||||||
|
$this->worker->name = 'xboard-ws-server';
|
||||||
|
}
|
||||||
|
|
||||||
|
public function run(): void
|
||||||
|
{
|
||||||
|
$this->setupLogging();
|
||||||
|
$this->setupCallbacks();
|
||||||
|
Worker::runAll();
|
||||||
|
}
|
||||||
|
|
||||||
|
private function setupLogging(): void
|
||||||
|
{
|
||||||
|
$logPath = storage_path('logs');
|
||||||
|
if (!is_dir($logPath)) {
|
||||||
|
mkdir($logPath, 0777, true);
|
||||||
|
}
|
||||||
|
Worker::$logFile = $logPath . '/xboard-ws-server.log';
|
||||||
|
Worker::$pidFile = $logPath . '/xboard-ws-server.pid';
|
||||||
|
}
|
||||||
|
|
||||||
|
private function setupCallbacks(): void
|
||||||
|
{
|
||||||
|
$this->worker->onWorkerStart = [$this, 'onWorkerStart'];
|
||||||
|
$this->worker->onConnect = [$this, 'onConnect'];
|
||||||
|
$this->worker->onWebSocketConnect = [$this, 'onWebSocketConnect'];
|
||||||
|
$this->worker->onMessage = [$this, 'onMessage'];
|
||||||
|
$this->worker->onClose = [$this, 'onClose'];
|
||||||
|
}
|
||||||
|
|
||||||
|
public function onWorkerStart(Worker $worker): void
|
||||||
|
{
|
||||||
|
Log::info("[WS] Worker started, pid={$worker->id}");
|
||||||
|
$this->subscribeRedis();
|
||||||
|
$this->setupTimers();
|
||||||
|
}
|
||||||
|
|
||||||
|
private function setupTimers(): void
|
||||||
|
{
|
||||||
|
// Ping timer
|
||||||
|
Timer::add(self::PING_INTERVAL, function () {
|
||||||
|
foreach (NodeRegistry::getConnectedNodeIds() as $nodeId) {
|
||||||
|
$conn = NodeRegistry::get($nodeId);
|
||||||
|
if ($conn) {
|
||||||
|
$conn->send(json_encode(['event' => 'ping']));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// Device state push timer
|
||||||
|
Timer::add(10, function () {
|
||||||
|
$pendingNodeIds = Redis::spop('device:push_pending_nodes', 100);
|
||||||
|
if (empty($pendingNodeIds)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
$service = app(DeviceStateService::class);
|
||||||
|
foreach ($pendingNodeIds as $nodeId) {
|
||||||
|
$nodeId = (int) $nodeId;
|
||||||
|
if (NodeRegistry::get($nodeId) !== null) {
|
||||||
|
NodeEventHandlers::pushDeviceStateToNode($nodeId, $service);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
public function onConnect(TcpConnection $conn): void
|
||||||
|
{
|
||||||
|
$conn->authTimer = Timer::add(self::AUTH_TIMEOUT, function () use ($conn) {
|
||||||
|
if (empty($conn->nodeId)) {
|
||||||
|
$conn->close(json_encode([
|
||||||
|
'event' => 'error',
|
||||||
|
'data' => ['message' => 'auth timeout'],
|
||||||
|
]));
|
||||||
|
}
|
||||||
|
}, [], false);
|
||||||
|
}
|
||||||
|
|
||||||
|
public function onWebSocketConnect(TcpConnection $conn, $httpMessage): void
|
||||||
|
{
|
||||||
|
$queryString = '';
|
||||||
|
if (is_string($httpMessage)) {
|
||||||
|
$queryString = parse_url($httpMessage, PHP_URL_QUERY) ?? '';
|
||||||
|
} elseif ($httpMessage instanceof \Workerman\Protocols\Http\Request) {
|
||||||
|
$queryString = $httpMessage->queryString();
|
||||||
|
}
|
||||||
|
|
||||||
|
parse_str($queryString, $params);
|
||||||
|
$token = $params['token'] ?? '';
|
||||||
|
$nodeId = (int) ($params['node_id'] ?? 0);
|
||||||
|
|
||||||
|
// Authenticate
|
||||||
|
$serverToken = admin_setting('server_token', '');
|
||||||
|
if ($token === '' || $serverToken === '' || !hash_equals($serverToken, $token)) {
|
||||||
|
$conn->close(json_encode([
|
||||||
|
'event' => 'error',
|
||||||
|
'data' => ['message' => 'invalid token'],
|
||||||
|
]));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
$node = Server::find($nodeId);
|
||||||
|
if (!$node) {
|
||||||
|
$conn->close(json_encode([
|
||||||
|
'event' => 'error',
|
||||||
|
'data' => ['message' => 'node not found'],
|
||||||
|
]));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Auth passed
|
||||||
|
if (isset($conn->authTimer)) {
|
||||||
|
Timer::del($conn->authTimer);
|
||||||
|
}
|
||||||
|
|
||||||
|
$conn->nodeId = $nodeId;
|
||||||
|
NodeRegistry::add($nodeId, $conn);
|
||||||
|
Cache::put("node_ws_alive:{$nodeId}", true, 86400);
|
||||||
|
|
||||||
|
// Clear old device data
|
||||||
|
app(DeviceStateService::class)->clearAllNodeDevices($nodeId);
|
||||||
|
|
||||||
|
Log::debug("[WS] Node#{$nodeId} connected", [
|
||||||
|
'remote' => $conn->getRemoteIp(),
|
||||||
|
'total' => NodeRegistry::count(),
|
||||||
|
]);
|
||||||
|
|
||||||
|
// Send auth success
|
||||||
|
$conn->send(json_encode([
|
||||||
|
'event' => 'auth.success',
|
||||||
|
'data' => ['node_id' => $nodeId],
|
||||||
|
]));
|
||||||
|
|
||||||
|
// Push full sync
|
||||||
|
NodeEventHandlers::pushFullSync($conn, $node);
|
||||||
|
}
|
||||||
|
|
||||||
|
public function onMessage(TcpConnection $conn, $data): void
|
||||||
|
{
|
||||||
|
$msg = json_decode($data, true);
|
||||||
|
if (!is_array($msg)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
$event = $msg['event'] ?? '';
|
||||||
|
$nodeId = $conn->nodeId ?? null;
|
||||||
|
|
||||||
|
if (isset($this->handlers[$event]) && $nodeId) {
|
||||||
|
$handler = $this->handlers[$event];
|
||||||
|
$handler($conn, $nodeId, $msg['data'] ?? []);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public function onClose(TcpConnection $conn): void
|
||||||
|
{
|
||||||
|
if (!empty($conn->nodeId)) {
|
||||||
|
$nodeId = $conn->nodeId;
|
||||||
|
NodeRegistry::remove($nodeId);
|
||||||
|
Cache::forget("node_ws_alive:{$nodeId}");
|
||||||
|
|
||||||
|
$service = app(DeviceStateService::class);
|
||||||
|
$affectedUserIds = $service->clearAllNodeDevices($nodeId);
|
||||||
|
foreach ($affectedUserIds as $userId) {
|
||||||
|
$service->notifyUpdate($userId);
|
||||||
|
}
|
||||||
|
|
||||||
|
Log::debug("[WS] Node#{$nodeId} disconnected", [
|
||||||
|
'total' => NodeRegistry::count(),
|
||||||
|
'affected_users' => count($affectedUserIds),
|
||||||
|
]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private function subscribeRedis(): void
|
||||||
|
{
|
||||||
|
$host = config('database.redis.default.host', '127.0.0.1');
|
||||||
|
$port = config('database.redis.default.port', 6379);
|
||||||
|
|
||||||
|
if (str_starts_with($host, '/')) {
|
||||||
|
$redisUri = "unix://{$host}";
|
||||||
|
} else {
|
||||||
|
$redisUri = "redis://{$host}:{$port}";
|
||||||
|
}
|
||||||
|
|
||||||
|
$redis = new \Workerman\Redis\Client($redisUri);
|
||||||
|
|
||||||
|
$password = config('database.redis.default.password');
|
||||||
|
if ($password) {
|
||||||
|
$redis->auth($password);
|
||||||
|
}
|
||||||
|
|
||||||
|
$prefix = config('database.redis.options.prefix', '');
|
||||||
|
$channel = $prefix . 'node:push';
|
||||||
|
|
||||||
|
$redis->subscribe([$channel], function ($chan, $message) {
|
||||||
|
$payload = json_decode($message, true);
|
||||||
|
if (!is_array($payload)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
$nodeId = $payload['node_id'] ?? null;
|
||||||
|
$event = $payload['event'] ?? '';
|
||||||
|
$data = $payload['data'] ?? [];
|
||||||
|
|
||||||
|
if (!$nodeId || !$event) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
$sent = NodeRegistry::send((int) $nodeId, $event, $data);
|
||||||
|
if ($sent) {
|
||||||
|
Log::debug("[WS] Pushed {$event} to node#{$nodeId}");
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
Log::info("[WS] Subscribed to Redis channel: {$channel}");
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user