diff --git a/app/Console/Commands/CheckTrafficExceeded.php b/app/Console/Commands/CheckTrafficExceeded.php new file mode 100644 index 0000000..35a1db6 --- /dev/null +++ b/app/Console/Commands/CheckTrafficExceeded.php @@ -0,0 +1,63 @@ +whereIn('id', $pendingUserIds) + ->whereRaw('u + d >= transfer_enable') + ->where('transfer_enable', '>', 0) + ->where('banned', 0) + ->select(['id', 'group_id']) + ->get(); + + if ($exceededUsers->isEmpty()) { + return; + } + + $groupedUsers = $exceededUsers->groupBy('group_id'); + $notifiedCount = 0; + + foreach ($groupedUsers as $groupId => $users) { + if (!$groupId) { + continue; + } + + $userIdsInGroup = $users->pluck('id')->toArray(); + $servers = Server::whereJsonContains('group_ids', (string) $groupId)->get(); + + foreach ($servers as $server) { + if (!NodeSyncService::isNodeOnline($server->id)) { + continue; + } + + NodeSyncService::push($server->id, 'sync.user.delta', [ + 'action' => 'remove', + 'users' => array_map(fn($id) => ['id' => $id], $userIdsInGroup), + ]); + $notifiedCount++; + } + } + + $this->info("Checked " . count($pendingUserIds) . " users, notified {$notifiedCount} nodes for " . $exceededUsers->count() . " exceeded users."); + } +} diff --git a/app/Console/Kernel.php b/app/Console/Kernel.php index 710d9b7..49b8ae6 100644 --- a/app/Console/Kernel.php +++ b/app/Console/Kernel.php @@ -35,6 +35,7 @@ class Kernel extends ConsoleKernel $schedule->command('check:order')->everyMinute()->onOneServer()->withoutOverlapping(5); $schedule->command('check:commission')->everyMinute()->onOneServer()->withoutOverlapping(5); $schedule->command('check:ticket')->everyMinute()->onOneServer()->withoutOverlapping(5); + $schedule->command('check:traffic-exceeded')->everyMinute()->onOneServer()->withoutOverlapping(10); // reset $schedule->command('reset:traffic')->everyMinute()->onOneServer()->withoutOverlapping(10); $schedule->command('reset:log')->daily()->onOneServer(); diff --git a/app/Jobs/TrafficFetchJob.php b/app/Jobs/TrafficFetchJob.php index 9a01508..1e398aa 100644 --- a/app/Jobs/TrafficFetchJob.php +++ b/app/Jobs/TrafficFetchJob.php @@ -8,6 +8,7 @@ use Illuminate\Contracts\Queue\ShouldQueue; use Illuminate\Foundation\Bus\Dispatchable; use Illuminate\Queue\InteractsWithQueue; use Illuminate\Queue\SerializesModels; +use Illuminate\Support\Facades\Redis; class TrafficFetchJob implements ShouldQueue { @@ -19,11 +20,6 @@ class TrafficFetchJob implements ShouldQueue public $tries = 1; public $timeout = 20; - /** - * Create a new job instance. - * - * @return void - */ public function __construct(array $server, array $data, $protocol, int $timestamp) { $this->onQueue('traffic_fetch'); @@ -35,6 +31,8 @@ class TrafficFetchJob implements ShouldQueue public function handle(): void { + $userIds = array_keys($this->data); + foreach ($this->data as $uid => $v) { User::where('id', $uid) ->incrementEach( @@ -45,5 +43,9 @@ class TrafficFetchJob implements ShouldQueue ['t' => time()] ); } + + if (!empty($userIds)) { + Redis::sadd('traffic:pending_check', ...$userIds); + } } } diff --git a/app/Services/NodeSyncService.php b/app/Services/NodeSyncService.php index 9dace84..ebab769 100644 --- a/app/Services/NodeSyncService.php +++ b/app/Services/NodeSyncService.php @@ -13,7 +13,7 @@ class NodeSyncService /** * Check if node has active WS connection */ - private static function isNodeOnline(int $nodeId): bool + public static function isNodeOnline(int $nodeId): bool { return (bool) Cache::get("node_ws_alive:{$nodeId}"); } @@ -125,7 +125,7 @@ class NodeSyncService /** * Publish a push command to Redis — picked up by the Workerman WS server */ - private static function push(int $nodeId, string $event, array $data): void + public static function push(int $nodeId, string $event, array $data): void { try { Redis::publish('node:push', json_encode([ diff --git a/public/assets/admin b/public/assets/admin index cbad22a..878489d 160000 --- a/public/assets/admin +++ b/public/assets/admin @@ -1 +1 @@ -Subproject commit cbad22a3cd4ae247866a264dcd72aa93c0de1e62 +Subproject commit 878489d9a5466edcf1562426895e3b4cf1dce6c6