mirror of
https://github.com/lkddi/Xboard.git
synced 2026-04-23 19:37:35 +08:00
feat: machine mode, ECH subscriptions, batch ops & security hardening
This commit is contained in:
+166
-17
@@ -3,6 +3,7 @@
|
||||
namespace App\WebSocket;
|
||||
|
||||
use App\Models\Server;
|
||||
use App\Models\ServerMachine;
|
||||
use App\Services\DeviceStateService;
|
||||
use App\Services\NodeRegistry;
|
||||
use App\Services\ServerService;
|
||||
@@ -69,17 +70,32 @@ class NodeWorker
|
||||
|
||||
private function setupTimers(): void
|
||||
{
|
||||
// Ping timer
|
||||
Timer::add(self::PING_INTERVAL, function () {
|
||||
$seen = [];
|
||||
|
||||
foreach (NodeRegistry::getConnectedNodeIds() as $nodeId) {
|
||||
$conn = NodeRegistry::get($nodeId);
|
||||
if ($conn) {
|
||||
$conn->send(json_encode(['event' => 'ping']));
|
||||
$oid = spl_object_id($conn);
|
||||
if (!isset($seen[$oid])) {
|
||||
$seen[$oid] = true;
|
||||
$conn->send(json_encode(['event' => 'ping']));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
foreach (NodeRegistry::getConnectedMachineIds() as $machineId) {
|
||||
$conn = NodeRegistry::getMachine($machineId);
|
||||
if ($conn) {
|
||||
$oid = spl_object_id($conn);
|
||||
if (!isset($seen[$oid])) {
|
||||
$seen[$oid] = true;
|
||||
$conn->send(json_encode(['event' => 'ping']));
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
// Device state push timer
|
||||
Timer::add(10, function () {
|
||||
$pendingNodeIds = Redis::spop('device:push_pending_nodes', 100);
|
||||
if (empty($pendingNodeIds)) {
|
||||
@@ -99,7 +115,7 @@ class NodeWorker
|
||||
public function onConnect(TcpConnection $conn): void
|
||||
{
|
||||
$conn->authTimer = Timer::add(self::AUTH_TIMEOUT, function () use ($conn) {
|
||||
if (empty($conn->nodeId)) {
|
||||
if (empty($conn->nodeId) && empty($conn->machineNodeIds)) {
|
||||
$conn->close(json_encode([
|
||||
'event' => 'error',
|
||||
'data' => ['message' => 'auth timeout'],
|
||||
@@ -118,10 +134,27 @@ class NodeWorker
|
||||
}
|
||||
|
||||
parse_str($queryString, $params);
|
||||
|
||||
if (isset($conn->authTimer)) {
|
||||
Timer::del($conn->authTimer);
|
||||
}
|
||||
|
||||
// 判断认证模式
|
||||
if (!empty($params['machine_id'])) {
|
||||
$this->authenticateMachine($conn, $params);
|
||||
} else {
|
||||
$this->authenticateNode($conn, $params);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 旧模式:单节点认证
|
||||
*/
|
||||
private function authenticateNode(TcpConnection $conn, array $params): void
|
||||
{
|
||||
$token = $params['token'] ?? '';
|
||||
$nodeId = (int) ($params['node_id'] ?? 0);
|
||||
|
||||
// Authenticate
|
||||
$serverToken = admin_setting('server_token', '');
|
||||
if ($token === '' || $serverToken === '' || !hash_equals($serverToken, $token)) {
|
||||
$conn->close(json_encode([
|
||||
@@ -140,16 +173,10 @@ class NodeWorker
|
||||
return;
|
||||
}
|
||||
|
||||
// Auth passed
|
||||
if (isset($conn->authTimer)) {
|
||||
Timer::del($conn->authTimer);
|
||||
}
|
||||
|
||||
$conn->nodeId = $nodeId;
|
||||
NodeRegistry::add($nodeId, $conn);
|
||||
Cache::put("node_ws_alive:{$nodeId}", true, 86400);
|
||||
|
||||
// Clear old device data
|
||||
app(DeviceStateService::class)->clearAllNodeDevices($nodeId);
|
||||
|
||||
Log::debug("[WS] Node#{$nodeId} connected", [
|
||||
@@ -157,16 +184,73 @@ class NodeWorker
|
||||
'total' => NodeRegistry::count(),
|
||||
]);
|
||||
|
||||
// Send auth success
|
||||
$conn->send(json_encode([
|
||||
'event' => 'auth.success',
|
||||
'data' => ['node_id' => $nodeId],
|
||||
]));
|
||||
|
||||
// Push full sync
|
||||
NodeEventHandlers::pushFullSync($conn, $node);
|
||||
}
|
||||
|
||||
/**
|
||||
* 新模式:机器认证,自动注册该机器下所有已启用节点
|
||||
*/
|
||||
private function authenticateMachine(TcpConnection $conn, array $params): void
|
||||
{
|
||||
$machineId = (int) ($params['machine_id'] ?? 0);
|
||||
$token = $params['token'] ?? '';
|
||||
|
||||
$machine = ServerMachine::where('id', $machineId)
|
||||
->where('token', $token)
|
||||
->first();
|
||||
|
||||
if (!$machine || !$machine->is_active) {
|
||||
$conn->close(json_encode([
|
||||
'event' => 'error',
|
||||
'data' => ['message' => 'invalid machine credentials'],
|
||||
]));
|
||||
return;
|
||||
}
|
||||
|
||||
$nodes = ServerService::getMachineNodes($machine);
|
||||
|
||||
$machine->forceFill(['last_seen_at' => now()->timestamp])->saveQuietly();
|
||||
NodeRegistry::addMachine($machineId, $conn);
|
||||
|
||||
// 把同一个连接注册到该机器下所有节点
|
||||
$nodeIds = [];
|
||||
$deviceService = app(DeviceStateService::class);
|
||||
foreach ($nodes as $node) {
|
||||
NodeRegistry::add($node->id, $conn);
|
||||
Cache::put("node_ws_alive:{$node->id}", true, 86400);
|
||||
$deviceService->clearAllNodeDevices($node->id);
|
||||
$nodeIds[] = $node->id;
|
||||
}
|
||||
|
||||
// 连接上记录所属机器和节点列表
|
||||
$conn->machineId = $machineId;
|
||||
$conn->machineNodeIds = $nodeIds;
|
||||
|
||||
Log::debug("[WS] Machine#{$machineId} connected, nodes: " . implode(',', $nodeIds), [
|
||||
'remote' => $conn->getRemoteIp(),
|
||||
'total' => NodeRegistry::count(),
|
||||
'machines' => NodeRegistry::machineCount(),
|
||||
]);
|
||||
|
||||
$conn->send(json_encode([
|
||||
'event' => 'auth.success',
|
||||
'data' => [
|
||||
'machine_id' => $machineId,
|
||||
'node_ids' => $nodeIds,
|
||||
],
|
||||
]));
|
||||
|
||||
// 为每个节点推送完整同步
|
||||
foreach ($nodes as $node) {
|
||||
NodeEventHandlers::pushFullSync($conn, $node);
|
||||
}
|
||||
}
|
||||
|
||||
public function onMessage(TcpConnection $conn, $data): void
|
||||
{
|
||||
$msg = json_decode($data, true);
|
||||
@@ -175,8 +259,29 @@ class NodeWorker
|
||||
}
|
||||
|
||||
$event = $msg['event'] ?? '';
|
||||
$nodeId = $conn->nodeId ?? null;
|
||||
|
||||
// 机器连接:从消息中读取 node_id 来分派到具体节点
|
||||
if (!empty($conn->machineNodeIds)) {
|
||||
if ($event === 'pong') {
|
||||
foreach ($conn->machineNodeIds as $nid) {
|
||||
Cache::put("node_ws_alive:{$nid}", true, 86400);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
$nodeId = (int) ($msg['data']['node_id'] ?? 0);
|
||||
if ($nodeId <= 0 || !in_array($nodeId, $conn->machineNodeIds, true)) {
|
||||
return;
|
||||
}
|
||||
if (isset($this->handlers[$event])) {
|
||||
$handler = $this->handlers[$event];
|
||||
$handler($conn, $nodeId, $msg['data'] ?? []);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
// 旧模式:单节点
|
||||
$nodeId = $conn->nodeId ?? null;
|
||||
if (isset($this->handlers[$event]) && $nodeId) {
|
||||
$handler = $this->handlers[$event];
|
||||
$handler($conn, $nodeId, $msg['data'] ?? []);
|
||||
@@ -185,12 +290,39 @@ class NodeWorker
|
||||
|
||||
public function onClose(TcpConnection $conn): void
|
||||
{
|
||||
$service = app(DeviceStateService::class);
|
||||
|
||||
// 机器模式:清理所有关联节点
|
||||
if (!empty($conn->machineNodeIds)) {
|
||||
$machineId = $conn->machineId ?? 'unknown';
|
||||
foreach ($conn->machineNodeIds as $nodeId) {
|
||||
NodeRegistry::remove($nodeId, $conn);
|
||||
Cache::forget("node_ws_alive:{$nodeId}");
|
||||
|
||||
$affectedUserIds = $service->clearAllNodeDevices($nodeId);
|
||||
foreach ($affectedUserIds as $userId) {
|
||||
$service->notifyUpdate($userId);
|
||||
}
|
||||
}
|
||||
|
||||
if (!empty($conn->machineId)) {
|
||||
NodeRegistry::removeMachine((int) $conn->machineId, $conn);
|
||||
}
|
||||
|
||||
Log::debug("[WS] Machine#{$machineId} disconnected", [
|
||||
'nodes' => $conn->machineNodeIds,
|
||||
'total' => NodeRegistry::count(),
|
||||
'machines' => NodeRegistry::machineCount(),
|
||||
]);
|
||||
return;
|
||||
}
|
||||
|
||||
// 旧模式:单节点
|
||||
if (!empty($conn->nodeId)) {
|
||||
$nodeId = $conn->nodeId;
|
||||
NodeRegistry::remove($nodeId);
|
||||
NodeRegistry::remove($nodeId, $conn);
|
||||
Cache::forget("node_ws_alive:{$nodeId}");
|
||||
|
||||
$service = app(DeviceStateService::class);
|
||||
$affectedUserIds = $service->clearAllNodeDevices($nodeId);
|
||||
foreach ($affectedUserIds as $userId) {
|
||||
$service->notifyUpdate($userId);
|
||||
@@ -230,10 +362,27 @@ class NodeWorker
|
||||
return;
|
||||
}
|
||||
|
||||
$nodeId = $payload['node_id'] ?? null;
|
||||
$event = $payload['event'] ?? '';
|
||||
$data = $payload['data'] ?? [];
|
||||
|
||||
// Machine-level events (e.g., sync.nodes)
|
||||
$machineId = $payload['machine_id'] ?? null;
|
||||
if ($machineId && $event) {
|
||||
// Update server-side registry when node membership changes
|
||||
if ($event === 'sync.nodes') {
|
||||
$nodeIds = array_map('intval', array_column($data['nodes'] ?? [], 'id'));
|
||||
NodeRegistry::refreshMachineNodes((int) $machineId, $nodeIds);
|
||||
}
|
||||
|
||||
$sent = NodeRegistry::sendMachine((int) $machineId, $event, $data);
|
||||
if ($sent) {
|
||||
Log::debug("[WS] Pushed {$event} to machine#{$machineId}");
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
// Per-node events
|
||||
$nodeId = $payload['node_id'] ?? null;
|
||||
if (!$nodeId || !$event) {
|
||||
return;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user