2026-03-15 10:20:42 +08:00
|
|
|
<?php
|
|
|
|
|
|
|
|
|
|
namespace App\Console\Commands;
|
|
|
|
|
|
|
|
|
|
use App\Models\Server;
|
|
|
|
|
use App\Services\NodeSyncService;
|
|
|
|
|
use App\Services\NodeRegistry;
|
|
|
|
|
use App\Services\ServerService;
|
|
|
|
|
use Illuminate\Console\Command;
|
|
|
|
|
use Illuminate\Support\Facades\Cache;
|
|
|
|
|
use Illuminate\Support\Facades\Log;
|
|
|
|
|
use Illuminate\Support\Facades\Redis;
|
|
|
|
|
use Workerman\Connection\TcpConnection;
|
|
|
|
|
use Workerman\Timer;
|
|
|
|
|
use Workerman\Worker;
|
|
|
|
|
|
|
|
|
|
class NodeWebSocketServer extends Command
|
|
|
|
|
{
|
2026-03-15 10:57:21 +08:00
|
|
|
protected $signature = 'ws-server
|
2026-03-15 10:20:42 +08:00
|
|
|
{action=start : start | stop | restart | reload | status}
|
|
|
|
|
{--d : Start in daemon mode}
|
|
|
|
|
{--host=0.0.0.0 : Listen address}
|
|
|
|
|
{--port=8076 : Listen port}';
|
|
|
|
|
|
|
|
|
|
protected $description = 'Start the WebSocket server for node-panel synchronization';
|
|
|
|
|
|
|
|
|
|
/** Auth timeout in seconds — close unauthenticated connections */
|
|
|
|
|
private const AUTH_TIMEOUT = 10;
|
|
|
|
|
|
|
|
|
|
/** Ping interval in seconds */
|
|
|
|
|
private const PING_INTERVAL = 55;
|
|
|
|
|
|
|
|
|
|
public function handle(): void
|
|
|
|
|
{
|
|
|
|
|
global $argv;
|
|
|
|
|
$action = $this->argument('action');
|
|
|
|
|
|
|
|
|
|
// 重新构建 argv 供 Workerman 解析
|
|
|
|
|
$argv[1] = $action;
|
|
|
|
|
if ($this->option('d')) {
|
|
|
|
|
$argv[2] = '-d';
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
$host = $this->option('host');
|
|
|
|
|
$port = $this->option('port');
|
|
|
|
|
|
|
|
|
|
$worker = new Worker("websocket://{$host}:{$port}");
|
|
|
|
|
$worker->count = 1;
|
|
|
|
|
$worker->name = 'xboard-ws-server';
|
|
|
|
|
|
|
|
|
|
// 设置日志和 PID 文件路径
|
|
|
|
|
$logPath = storage_path('logs');
|
|
|
|
|
if (!is_dir($logPath)) {
|
|
|
|
|
mkdir($logPath, 0777, true);
|
|
|
|
|
}
|
|
|
|
|
Worker::$logFile = $logPath . '/xboard-ws-server.log'; // 指向具体文件,避免某些环境 php://stdout 的 stat 失败
|
|
|
|
|
Worker::$pidFile = $logPath . '/xboard-ws-server.pid';
|
|
|
|
|
|
|
|
|
|
$worker->onWorkerStart = function (Worker $worker) {
|
|
|
|
|
$this->info("[WS] Worker started, pid={$worker->id}");
|
|
|
|
|
$this->subscribeRedis();
|
|
|
|
|
|
|
|
|
|
// Periodic ping to detect dead connections
|
|
|
|
|
Timer::add(self::PING_INTERVAL, function () {
|
|
|
|
|
foreach (NodeRegistry::getConnectedNodeIds() as $nodeId) {
|
|
|
|
|
$conn = NodeRegistry::get($nodeId);
|
|
|
|
|
if ($conn) {
|
|
|
|
|
$conn->send(json_encode(['event' => 'ping']));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
$worker->onConnect = function (TcpConnection $conn) {
|
|
|
|
|
// Set auth timeout — must authenticate within N seconds or get disconnected
|
|
|
|
|
$conn->authTimer = Timer::add(self::AUTH_TIMEOUT, function () use ($conn) {
|
|
|
|
|
if (empty($conn->nodeId)) {
|
|
|
|
|
$conn->close(json_encode([
|
|
|
|
|
'event' => 'error',
|
|
|
|
|
'data' => ['message' => 'auth timeout'],
|
|
|
|
|
]));
|
|
|
|
|
}
|
|
|
|
|
}, [], false);
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
$worker->onWebSocketConnect = function (TcpConnection $conn, $httpMessage) {
|
|
|
|
|
// Parse query string from the WebSocket upgrade request
|
|
|
|
|
// In Workerman 4.x/5.x with onWebSocketConnect, the second arg can be a string or Request object
|
|
|
|
|
$queryString = '';
|
|
|
|
|
if (is_string($httpMessage)) {
|
|
|
|
|
$queryString = parse_url($httpMessage, PHP_URL_QUERY) ?? '';
|
|
|
|
|
} elseif ($httpMessage instanceof \Workerman\Protocols\Http\Request) {
|
|
|
|
|
$queryString = $httpMessage->queryString();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
parse_str($queryString, $params);
|
|
|
|
|
|
|
|
|
|
$token = $params['token'] ?? '';
|
|
|
|
|
$nodeId = (int) ($params['node_id'] ?? 0);
|
|
|
|
|
|
|
|
|
|
// Authenticate
|
|
|
|
|
$serverToken = admin_setting('server_token', '');
|
|
|
|
|
if ($token === '' || $serverToken === '' || !hash_equals($serverToken, $token)) {
|
|
|
|
|
$conn->close(json_encode([
|
|
|
|
|
'event' => 'error',
|
|
|
|
|
'data' => ['message' => 'invalid token'],
|
|
|
|
|
]));
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
$node = Server::find($nodeId);
|
|
|
|
|
if (!$node) {
|
|
|
|
|
$conn->close(json_encode([
|
|
|
|
|
'event' => 'error',
|
|
|
|
|
'data' => ['message' => 'node not found'],
|
|
|
|
|
]));
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Auth passed — cancel timeout, register connection
|
|
|
|
|
if (isset($conn->authTimer)) {
|
|
|
|
|
Timer::del($conn->authTimer);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
$conn->nodeId = $nodeId;
|
|
|
|
|
NodeRegistry::add($nodeId, $conn);
|
|
|
|
|
Cache::put("node_ws_alive:{$nodeId}", true, 86400);
|
|
|
|
|
|
2026-03-16 23:09:56 +08:00
|
|
|
Log::debug("[WS] Node#{$nodeId} connected", [
|
2026-03-15 10:20:42 +08:00
|
|
|
'remote' => $conn->getRemoteIp(),
|
|
|
|
|
'total' => NodeRegistry::count(),
|
|
|
|
|
]);
|
|
|
|
|
|
|
|
|
|
// Send auth success
|
|
|
|
|
$conn->send(json_encode([
|
|
|
|
|
'event' => 'auth.success',
|
|
|
|
|
'data' => ['node_id' => $nodeId],
|
|
|
|
|
]));
|
|
|
|
|
|
2026-03-16 23:09:56 +08:00
|
|
|
// Push full sync (config + users) immediately to this specific connection
|
|
|
|
|
$this->pushFullSync($conn, $node);
|
2026-03-15 10:20:42 +08:00
|
|
|
};
|
|
|
|
|
|
|
|
|
|
$worker->onMessage = function (TcpConnection $conn, $data) {
|
|
|
|
|
$msg = json_decode($data, true);
|
|
|
|
|
if (!is_array($msg)) {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
$event = $msg['event'] ?? '';
|
2026-03-16 23:09:56 +08:00
|
|
|
$nodeId = $conn->nodeId ?? null;
|
2026-03-15 10:20:42 +08:00
|
|
|
|
|
|
|
|
switch ($event) {
|
|
|
|
|
case 'pong':
|
|
|
|
|
// Heartbeat response — node is alive
|
2026-03-16 23:09:56 +08:00
|
|
|
if ($nodeId) {
|
|
|
|
|
Cache::put("node_ws_alive:{$nodeId}", true, 86400);
|
|
|
|
|
}
|
|
|
|
|
break;
|
|
|
|
|
case 'node.status':
|
|
|
|
|
if ($nodeId && isset($msg['data'])) {
|
|
|
|
|
$this->handleNodeStatus($nodeId, $msg['data']);
|
2026-03-15 10:20:42 +08:00
|
|
|
}
|
|
|
|
|
break;
|
|
|
|
|
default:
|
|
|
|
|
// Future: handle other node-initiated messages if needed
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
$worker->onClose = function (TcpConnection $conn) {
|
|
|
|
|
if (!empty($conn->nodeId)) {
|
|
|
|
|
$nodeId = $conn->nodeId;
|
|
|
|
|
NodeRegistry::remove($nodeId);
|
|
|
|
|
Cache::forget("node_ws_alive:{$nodeId}");
|
2026-03-16 23:09:56 +08:00
|
|
|
Log::debug("[WS] Node#{$nodeId} disconnected", [
|
2026-03-15 10:20:42 +08:00
|
|
|
'total' => NodeRegistry::count(),
|
|
|
|
|
]);
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
Worker::runAll();
|
|
|
|
|
}
|
|
|
|
|
|
2026-03-16 23:09:56 +08:00
|
|
|
/**
|
|
|
|
|
* Handle status data pushed from node via WebSocket
|
|
|
|
|
*/
|
|
|
|
|
private function handleNodeStatus(int $nodeId, array $data): void
|
|
|
|
|
{
|
|
|
|
|
$node = Server::find($nodeId);
|
|
|
|
|
if (!$node) return;
|
|
|
|
|
|
|
|
|
|
$nodeType = strtoupper($node->type);
|
|
|
|
|
|
|
|
|
|
// Update last check-in cache
|
|
|
|
|
Cache::put(\App\Utils\CacheKey::get('SERVER_' . $nodeType . '_LAST_CHECK_AT', $nodeId), time(), 3600);
|
|
|
|
|
|
|
|
|
|
// Update metrics cache via Service
|
|
|
|
|
ServerService::updateMetrics($node, $data);
|
|
|
|
|
|
|
|
|
|
Log::debug("[WS] Node#{$nodeId} status updated via WebSocket");
|
|
|
|
|
}
|
|
|
|
|
|
2026-03-15 10:20:42 +08:00
|
|
|
/**
|
|
|
|
|
* Subscribe to Redis pub/sub channel for receiving push commands from Laravel.
|
|
|
|
|
* Laravel app publishes to "node:push" channel, Workerman picks it up and forwards to the right node.
|
|
|
|
|
*/
|
|
|
|
|
private function subscribeRedis(): void
|
|
|
|
|
{
|
|
|
|
|
$host = config('database.redis.default.host', '127.0.0.1');
|
|
|
|
|
$port = config('database.redis.default.port', 6379);
|
2026-03-15 10:47:31 +08:00
|
|
|
|
|
|
|
|
// Handle Unix Socket connection
|
|
|
|
|
if (str_starts_with($host, '/')) {
|
|
|
|
|
$redisUri = "unix://{$host}";
|
|
|
|
|
} else {
|
|
|
|
|
$redisUri = "redis://{$host}:{$port}";
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
$redis = new \Workerman\Redis\Client($redisUri);
|
2026-03-15 10:20:42 +08:00
|
|
|
|
|
|
|
|
$password = config('database.redis.default.password');
|
|
|
|
|
if ($password) {
|
|
|
|
|
$redis->auth($password);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Get Laravel Redis prefix to match publish()
|
|
|
|
|
$prefix = config('database.redis.options.prefix', '');
|
|
|
|
|
$channel = $prefix . 'node:push';
|
|
|
|
|
|
|
|
|
|
$redis->subscribe([$channel], function ($chan, $message) {
|
|
|
|
|
$payload = json_decode($message, true);
|
|
|
|
|
if (!is_array($payload)) {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
$nodeId = $payload['node_id'] ?? null;
|
|
|
|
|
$event = $payload['event'] ?? '';
|
|
|
|
|
$data = $payload['data'] ?? [];
|
|
|
|
|
|
|
|
|
|
if (!$nodeId || !$event) {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
$sent = NodeRegistry::send((int) $nodeId, $event, $data);
|
|
|
|
|
if ($sent) {
|
|
|
|
|
Log::debug("[WS] Pushed {$event} to node#{$nodeId}");
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
$this->info("[WS] Subscribed to Redis channel: {$channel}");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Push full config + users to a newly connected node.
|
|
|
|
|
*/
|
2026-03-16 23:09:56 +08:00
|
|
|
private function pushFullSync(TcpConnection $conn, Server $node): void
|
2026-03-15 10:20:42 +08:00
|
|
|
{
|
2026-03-16 23:09:56 +08:00
|
|
|
$nodeId = $conn->nodeId;
|
2026-03-15 10:20:42 +08:00
|
|
|
// Push config
|
|
|
|
|
$config = ServerService::buildNodeConfig($node);
|
2026-03-16 23:09:56 +08:00
|
|
|
Log::debug("[WS] Node#{$nodeId} config: ", $config);
|
|
|
|
|
$conn->send(json_encode([
|
|
|
|
|
'event' => 'sync.config',
|
|
|
|
|
'data' => ['config' => $config]
|
|
|
|
|
]));
|
2026-03-15 10:20:42 +08:00
|
|
|
|
|
|
|
|
// Push users
|
|
|
|
|
$users = ServerService::getAvailableUsers($node)->toArray();
|
2026-03-16 23:09:56 +08:00
|
|
|
$conn->send(json_encode([
|
|
|
|
|
'event' => 'sync.users',
|
|
|
|
|
'data' => ['users' => $users]
|
|
|
|
|
]));
|
2026-03-15 10:20:42 +08:00
|
|
|
|
|
|
|
|
Log::info("[WS] Full sync pushed to node#{$nodeId}", [
|
|
|
|
|
'users' => count($users),
|
|
|
|
|
]);
|
|
|
|
|
}
|
|
|
|
|
}
|