feat: ws notify nodes when user traffic is exhausted

This commit is contained in:
xboard
2026-03-25 01:44:55 +08:00
parent 7dacb69275
commit 73a37a07dd
5 changed files with 74 additions and 8 deletions
@@ -0,0 +1,63 @@
<?php
namespace App\Console\Commands;
use App\Models\Server;
use App\Models\User;
use App\Services\NodeSyncService;
use Illuminate\Console\Command;
use Illuminate\Support\Facades\Redis;
class CheckTrafficExceeded extends Command
{
protected $signature = 'check:traffic-exceeded';
protected $description = '检查流量超标用户并通知节点';
public function handle()
{
$count = Redis::scard('traffic:pending_check');
if ($count <= 0) {
return;
}
$pendingUserIds = array_map('intval', Redis::spop('traffic:pending_check', $count));
$exceededUsers = User::toBase()
->whereIn('id', $pendingUserIds)
->whereRaw('u + d >= transfer_enable')
->where('transfer_enable', '>', 0)
->where('banned', 0)
->select(['id', 'group_id'])
->get();
if ($exceededUsers->isEmpty()) {
return;
}
$groupedUsers = $exceededUsers->groupBy('group_id');
$notifiedCount = 0;
foreach ($groupedUsers as $groupId => $users) {
if (!$groupId) {
continue;
}
$userIdsInGroup = $users->pluck('id')->toArray();
$servers = Server::whereJsonContains('group_ids', (string) $groupId)->get();
foreach ($servers as $server) {
if (!NodeSyncService::isNodeOnline($server->id)) {
continue;
}
NodeSyncService::push($server->id, 'sync.user.delta', [
'action' => 'remove',
'users' => array_map(fn($id) => ['id' => $id], $userIdsInGroup),
]);
$notifiedCount++;
}
}
$this->info("Checked " . count($pendingUserIds) . " users, notified {$notifiedCount} nodes for " . $exceededUsers->count() . " exceeded users.");
}
}
+1
View File
@@ -35,6 +35,7 @@ class Kernel extends ConsoleKernel
$schedule->command('check:order')->everyMinute()->onOneServer()->withoutOverlapping(5); $schedule->command('check:order')->everyMinute()->onOneServer()->withoutOverlapping(5);
$schedule->command('check:commission')->everyMinute()->onOneServer()->withoutOverlapping(5); $schedule->command('check:commission')->everyMinute()->onOneServer()->withoutOverlapping(5);
$schedule->command('check:ticket')->everyMinute()->onOneServer()->withoutOverlapping(5); $schedule->command('check:ticket')->everyMinute()->onOneServer()->withoutOverlapping(5);
$schedule->command('check:traffic-exceeded')->everyMinute()->onOneServer()->withoutOverlapping(10);
// reset // reset
$schedule->command('reset:traffic')->everyMinute()->onOneServer()->withoutOverlapping(10); $schedule->command('reset:traffic')->everyMinute()->onOneServer()->withoutOverlapping(10);
$schedule->command('reset:log')->daily()->onOneServer(); $schedule->command('reset:log')->daily()->onOneServer();
+7 -5
View File
@@ -8,6 +8,7 @@ use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable; use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue; use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels; use Illuminate\Queue\SerializesModels;
use Illuminate\Support\Facades\Redis;
class TrafficFetchJob implements ShouldQueue class TrafficFetchJob implements ShouldQueue
{ {
@@ -19,11 +20,6 @@ class TrafficFetchJob implements ShouldQueue
public $tries = 1; public $tries = 1;
public $timeout = 20; public $timeout = 20;
/**
* Create a new job instance.
*
* @return void
*/
public function __construct(array $server, array $data, $protocol, int $timestamp) public function __construct(array $server, array $data, $protocol, int $timestamp)
{ {
$this->onQueue('traffic_fetch'); $this->onQueue('traffic_fetch');
@@ -35,6 +31,8 @@ class TrafficFetchJob implements ShouldQueue
public function handle(): void public function handle(): void
{ {
$userIds = array_keys($this->data);
foreach ($this->data as $uid => $v) { foreach ($this->data as $uid => $v) {
User::where('id', $uid) User::where('id', $uid)
->incrementEach( ->incrementEach(
@@ -45,5 +43,9 @@ class TrafficFetchJob implements ShouldQueue
['t' => time()] ['t' => time()]
); );
} }
if (!empty($userIds)) {
Redis::sadd('traffic:pending_check', ...$userIds);
}
} }
} }
+2 -2
View File
@@ -13,7 +13,7 @@ class NodeSyncService
/** /**
* Check if node has active WS connection * Check if node has active WS connection
*/ */
private static function isNodeOnline(int $nodeId): bool public static function isNodeOnline(int $nodeId): bool
{ {
return (bool) Cache::get("node_ws_alive:{$nodeId}"); return (bool) Cache::get("node_ws_alive:{$nodeId}");
} }
@@ -125,7 +125,7 @@ class NodeSyncService
/** /**
* Publish a push command to Redis — picked up by the Workerman WS server * Publish a push command to Redis — picked up by the Workerman WS server
*/ */
private static function push(int $nodeId, string $event, array $data): void public static function push(int $nodeId, string $event, array $data): void
{ {
try { try {
Redis::publish('node:push', json_encode([ Redis::publish('node:push', json_encode([