feat: introduce WebSocket sync for XBoard nodes

- Implement Workerman-based `xboard:ws-server` for real-time node synchronization.
- Support custom routes, outbounds, and certificate configurations via JSON.
- Optimize scheduled tasks with `lazyById` to minimize memory footprint.
- Enhance reactivity using Observers for `Plan`, `Server`, and `ServerRoute`.
- Expand protocol support for `httpupgrade`, `h2`, and `mieru`.
This commit is contained in:
xboard
2026-03-15 09:49:11 +08:00
parent 1864223c9b
commit 010275b09e
47 changed files with 1314 additions and 223 deletions
+77
View File
@@ -0,0 +1,77 @@
<?php
namespace App\Services;
use Workerman\Connection\TcpConnection;
/**
* In-memory registry for active WebSocket node connections.
* Runs inside the Workerman process.
*/
class NodeRegistry
{
/** @var array<int, TcpConnection> nodeId → connection */
private static array $connections = [];
public static function add(int $nodeId, TcpConnection $conn): void
{
// Close existing connection for this node (if reconnecting)
if (isset(self::$connections[$nodeId])) {
self::$connections[$nodeId]->close();
}
self::$connections[$nodeId] = $conn;
}
public static function remove(int $nodeId): void
{
unset(self::$connections[$nodeId]);
}
public static function get(int $nodeId): ?TcpConnection
{
return self::$connections[$nodeId] ?? null;
}
/**
* Send a JSON message to a specific node.
*/
public static function send(int $nodeId, string $event, array $data): bool
{
$conn = self::get($nodeId);
if (!$conn) {
return false;
}
$payload = json_encode([
'event' => $event,
'data' => $data,
'timestamp' => time(),
]);
$conn->send($payload);
return true;
}
/**
* Get the connection for a node by ID, checking if it's still alive.
*/
public static function isOnline(int $nodeId): bool
{
$conn = self::get($nodeId);
return $conn !== null && $conn->getStatus() === TcpConnection::STATUS_ESTABLISHED;
}
/**
* Get all connected node IDs.
* @return int[]
*/
public static function getConnectedNodeIds(): array
{
return array_keys(self::$connections);
}
public static function count(): int
{
return count(self::$connections);
}
}
+143
View File
@@ -0,0 +1,143 @@
<?php
namespace App\Services;
use App\Models\Server;
use App\Models\User;
use Illuminate\Support\Facades\Cache;
use Illuminate\Support\Facades\Log;
use Illuminate\Support\Facades\Redis;
class NodeSyncService
{
/**
* Check if node has active WS connection
*/
private static function isNodeOnline(int $nodeId): bool
{
return (bool) Cache::get("node_ws_alive:{$nodeId}");
}
/**
* Push node config update
*/
public static function notifyConfigUpdated(int $nodeId): void
{
if (!self::isNodeOnline($nodeId))
return;
$node = Server::find($nodeId);
if (!$node)
return;
self::push($nodeId, 'sync.config', ['config' => ServerService::buildNodeConfig($node)]);
}
/**
* Push all users to all nodes in the group
*/
public static function notifyUsersUpdatedByGroup(int $groupId): void
{
$servers = Server::whereJsonContains('group_ids', (string) $groupId)
->get();
foreach ($servers as $server) {
if (!self::isNodeOnline($server->id))
continue;
$users = ServerService::getAvailableUsers($server)->toArray();
self::push($server->id, 'sync.users', ['users' => $users]);
}
}
/**
* Push user changes (add/remove) to affected nodes
*/
public static function notifyUserChanged(User $user): void
{
if (!$user->group_id)
return;
$servers = Server::whereJsonContains('group_ids', (string) $user->group_id)->get();
foreach ($servers as $server) {
if (!self::isNodeOnline($server->id))
continue;
if ($user->isAvailable()) {
self::push($server->id, 'sync.user.delta', [
'action' => 'add',
'users' => [
[
'id' => $user->id,
'uuid' => $user->uuid,
'speed_limit' => $user->speed_limit,
'device_limit' => $user->device_limit,
]
],
]);
} else {
self::push($server->id, 'sync.user.delta', [
'action' => 'remove',
'users' => [['id' => $user->id]],
]);
}
}
}
/**
* Push user removal from a specific group's nodes
*/
public static function notifyUserRemovedFromGroup(int $userId, int $groupId): void
{
$servers = Server::whereJsonContains('group_ids', (string) $groupId)
->get();
foreach ($servers as $server) {
if (!self::isNodeOnline($server->id))
continue;
self::push($server->id, 'sync.user.delta', [
'action' => 'remove',
'users' => [['id' => $userId]],
]);
}
}
/**
* Full sync: push config + users to a node
*/
public static function notifyFullSync(int $nodeId): void
{
if (!self::isNodeOnline($nodeId))
return;
$node = Server::find($nodeId);
if (!$node)
return;
self::push($nodeId, 'sync.config', ['config' => ServerService::buildNodeConfig($node)]);
$users = ServerService::getAvailableUsers($node)->toArray();
self::push($nodeId, 'sync.users', ['users' => $users]);
}
/**
* Publish a push command to Redis — picked up by the Workerman WS server
*/
private static function push(int $nodeId, string $event, array $data): void
{
try {
Redis::publish('node:push', json_encode([
'node_id' => $nodeId,
'event' => $event,
'data' => $data,
]));
} catch (\Throwable $e) {
Log::warning("[NodePush] Redis publish failed: {$e->getMessage()}", [
'node_id' => $nodeId,
'event' => $event,
]);
}
}
}
+2 -1
View File
@@ -95,13 +95,14 @@ class OrderService
public function open(): void
{
$order = $this->order;
$this->user = User::find($order->user_id);
$plan = Plan::find($order->plan_id);
HookManager::call('order.open.before', $order);
DB::transaction(function () use ($order, $plan) {
$this->user = User::lockForUpdate()->find($order->user_id);
if ($order->refund_amount) {
$this->user->balance += $order->refund_amount;
}
+10 -2
View File
@@ -25,10 +25,18 @@ class PaymentService
}
if ($id) {
$payment = Payment::find($id)->toArray();
$paymentModel = Payment::find($id);
if (!$paymentModel) {
throw new ApiException('payment not found');
}
$payment = $paymentModel->toArray();
}
if ($uuid) {
$payment = Payment::where('uuid', $uuid)->first()->toArray();
$paymentModel = Payment::where('uuid', $uuid)->first();
if (!$paymentModel) {
throw new ApiException('payment not found');
}
$payment = $paymentModel->toArray();
}
$this->config = [];
+128 -1
View File
@@ -27,7 +27,9 @@ class ServerService
'is_online',
'available_status',
'cache_key',
'load_status'
'load_status',
'metrics',
'online_conn'
]);
}
@@ -93,6 +95,131 @@ class ServerService
return $routes;
}
/**
* Build node config data
*/
public static function buildNodeConfig(Server $node): array
{
$nodeType = $node->type;
$protocolSettings = $node->protocol_settings;
$serverPort = $node->server_port;
$host = $node->host;
$baseConfig = [
'protocol' => $nodeType,
'listen_ip' => '0.0.0.0',
'server_port' => (int) $serverPort,
'network' => data_get($protocolSettings, 'network'),
'networkSettings' => data_get($protocolSettings, 'network_settings') ?: null,
];
$response = match ($nodeType) {
'shadowsocks' => [
...$baseConfig,
'cipher' => $protocolSettings['cipher'],
'plugin' => $protocolSettings['plugin'],
'plugin_opts' => $protocolSettings['plugin_opts'],
'server_key' => match ($protocolSettings['cipher']) {
'2022-blake3-aes-128-gcm' => Helper::getServerKey($node->created_at, 16),
'2022-blake3-aes-256-gcm' => Helper::getServerKey($node->created_at, 32),
default => null,
},
],
'vmess' => [
...$baseConfig,
'tls' => (int) $protocolSettings['tls'],
],
'trojan' => [
...$baseConfig,
'host' => $host,
'server_name' => $protocolSettings['server_name'],
],
'vless' => [
...$baseConfig,
'tls' => (int) $protocolSettings['tls'],
'flow' => $protocolSettings['flow'],
'tls_settings' => match ((int) $protocolSettings['tls']) {
2 => $protocolSettings['reality_settings'],
default => $protocolSettings['tls_settings'],
},
],
'hysteria' => [
...$baseConfig,
'server_port' => (int) $serverPort,
'version' => (int) $protocolSettings['version'],
'host' => $host,
'server_name' => $protocolSettings['tls']['server_name'],
'up_mbps' => (int) $protocolSettings['bandwidth']['up'],
'down_mbps' => (int) $protocolSettings['bandwidth']['down'],
...match ((int) $protocolSettings['version']) {
1 => ['obfs' => $protocolSettings['obfs']['password'] ?? null],
2 => [
'obfs' => $protocolSettings['obfs']['open'] ? $protocolSettings['obfs']['type'] : null,
'obfs-password' => $protocolSettings['obfs']['password'] ?? null,
],
default => [],
},
],
'tuic' => [
...$baseConfig,
'version' => (int) $protocolSettings['version'],
'server_port' => (int) $serverPort,
'server_name' => $protocolSettings['tls']['server_name'],
'congestion_control' => $protocolSettings['congestion_control'],
'tls_settings' => data_get($protocolSettings, 'tls_settings'),
'auth_timeout' => '3s',
'zero_rtt_handshake' => false,
'heartbeat' => '3s',
],
'anytls' => [
...$baseConfig,
'server_port' => (int) $serverPort,
'server_name' => $protocolSettings['tls']['server_name'],
'padding_scheme' => $protocolSettings['padding_scheme'],
],
'socks' => [
...$baseConfig,
'server_port' => (int) $serverPort,
],
'naive' => [
...$baseConfig,
'server_port' => (int) $serverPort,
'tls' => (int) $protocolSettings['tls'],
'tls_settings' => $protocolSettings['tls_settings'],
],
'http' => [
...$baseConfig,
'server_port' => (int) $serverPort,
'tls' => (int) $protocolSettings['tls'],
'tls_settings' => $protocolSettings['tls_settings'],
],
'mieru' => [
...$baseConfig,
'server_port' => (string) $serverPort,
'protocol' => (int) $protocolSettings['protocol'],
],
default => [],
};
if (!empty($node['route_ids'])) {
$response['routes'] = self::getRoutes($node['route_ids']);
}
if (!empty($node['custom_outbounds'])) {
$response['custom_outbounds'] = $node['custom_outbounds'];
}
if (!empty($node['custom_routes'])) {
$response['custom_routes'] = $node['custom_routes'];
}
if (!empty($node['cert_config'])) {
$response['cert_config'] = $node['cert_config'];
}
return $response;
}
/**
* 根据协议类型和标识获取服务器
* @param int $serverId