From 843c5af4c2eaccbdb3bd7e0f519bed821c4654b0 Mon Sep 17 00:00:00 2001 From: xboard Date: Mon, 15 Sep 2025 20:32:22 +0800 Subject: [PATCH] refactor(online-status): consolidate updates and add cleanup command --- .../Commands/CleanupExpiredOnlineStatus.php | 52 +++++++++ app/Console/Kernel.php | 5 +- .../V1/Server/UniProxyController.php | 3 +- app/Jobs/SyncUserOnlineStatusJob.php | 69 ----------- app/Jobs/UpdateAliveDataJob.php | 108 ++++++++++++++++++ 5 files changed, 163 insertions(+), 74 deletions(-) create mode 100644 app/Console/Commands/CleanupExpiredOnlineStatus.php delete mode 100644 app/Jobs/SyncUserOnlineStatusJob.php create mode 100644 app/Jobs/UpdateAliveDataJob.php diff --git a/app/Console/Commands/CleanupExpiredOnlineStatus.php b/app/Console/Commands/CleanupExpiredOnlineStatus.php new file mode 100644 index 0000000..e51afc9 --- /dev/null +++ b/app/Console/Commands/CleanupExpiredOnlineStatus.php @@ -0,0 +1,52 @@ +where('online_count', '>', 0) + ->where('last_online_at', '<', now()->subMinutes(5)) + ->chunkById(1000, function ($users) use (&$affected) { + if ($users->isEmpty()) { + return; + } + $count = User::whereIn('id', $users->pluck('id')) + ->update(['online_count' => 0]); + $affected += $count; + }, 'id'); + + $this->info("Expired online status cleaned. Affected: {$affected}"); + return self::SUCCESS; + } catch (\Throwable $e) { + Log::error('CleanupExpiredOnlineStatus failed', ['error' => $e->getMessage()]); + $this->error('Cleanup failed: ' . $e->getMessage()); + return self::FAILURE; + } + } +} diff --git a/app/Console/Kernel.php b/app/Console/Kernel.php index c9168b9..d1071c5 100644 --- a/app/Console/Kernel.php +++ b/app/Console/Kernel.php @@ -46,10 +46,7 @@ class Kernel extends ConsoleKernel // if (env('ENABLE_AUTO_BACKUP_AND_UPDATE', false)) { // $schedule->command('backup:database', ['true'])->daily()->onOneServer(); // } - // 每分钟清理过期的在线状态 - $schedule->call(function () { - app(UserOnlineService::class)->cleanExpiredOnlineStatus(); - })->everyMinute()->name('cleanup:expired-online-status')->onOneServer(); + $schedule->command('cleanup:expired-online-status')->everyMinute()->onOneServer()->withoutOverlapping(4); app(PluginManager::class)->registerPluginSchedules($schedule); diff --git a/app/Http/Controllers/V1/Server/UniProxyController.php b/app/Http/Controllers/V1/Server/UniProxyController.php index b90e8cf..01a45d2 100644 --- a/app/Http/Controllers/V1/Server/UniProxyController.php +++ b/app/Http/Controllers/V1/Server/UniProxyController.php @@ -3,6 +3,7 @@ namespace App\Http\Controllers\V1\Server; use App\Http\Controllers\Controller; +use App\Jobs\UpdateAliveDataJob; use App\Services\ServerService; use App\Services\UserService; use App\Utils\CacheKey; @@ -216,7 +217,7 @@ class UniProxyController extends Controller 'error' => 'Invalid online data' ], 400); } - $this->userOnlineService->updateAliveData($data, $node->type, $node->id); + UpdateAliveDataJob::dispatch($data, $node->type, $node->id); return response()->json(['data' => true]); } diff --git a/app/Jobs/SyncUserOnlineStatusJob.php b/app/Jobs/SyncUserOnlineStatusJob.php deleted file mode 100644 index 32d463b..0000000 --- a/app/Jobs/SyncUserOnlineStatusJob.php +++ /dev/null @@ -1,69 +0,0 @@ -updates)) { - return; - } - collect($this->updates) - ->chunk(1000) - ->each(function (Collection $chunk) { - $userIds = $chunk->pluck('id')->all(); - User::query() - ->whereIn('id', $userIds) - ->each(function (User $user) use ($chunk) { - $update = $chunk->firstWhere('id', $user->id); - if ($update) { - $user->update([ - 'online_count' => $update['count'], - 'last_online_at' => now(), - ]); - } - }); - }); - } - - /** - * 任务失败的处理 - */ - public function failed(\Throwable $exception): void - { - \Log::error('Failed to sync user online status', [ - 'error' => $exception->getMessage(), - 'updates_count' => count($this->updates) - ]); - } -} \ No newline at end of file diff --git a/app/Jobs/UpdateAliveDataJob.php b/app/Jobs/UpdateAliveDataJob.php new file mode 100644 index 0000000..b152800 --- /dev/null +++ b/app/Jobs/UpdateAliveDataJob.php @@ -0,0 +1,108 @@ +onQueue('online_sync'); + } + + public function handle(): void + { + try { + $updateAt = time(); + $nowTs = time(); + $now = now(); + $nodeKey = $this->nodeType . $this->nodeId; + $userUpdates = []; + + foreach ($this->data as $uid => $ips) { + $cacheKey = self::CACHE_PREFIX . $uid; + $ipsArray = Cache::get($cacheKey, []); + $ipsArray = [ + ...collect($ipsArray) + ->filter(fn(mixed $value): bool => is_array($value) && ($updateAt - ($value['lastupdateAt'] ?? 0) <= self::NODE_DATA_EXPIRY)), + $nodeKey => [ + 'aliveips' => $ips, + 'lastupdateAt' => $updateAt, + ], + ]; + + $count = UserOnlineService::calculateDeviceCount($ipsArray); + $ipsArray['alive_ip'] = $count; + Cache::put($cacheKey, $ipsArray, now()->addSeconds(self::CACHE_TTL)); + + $userUpdates[] = [ + 'id' => (int) $uid, + 'count' => (int) $count, + ]; + } + + if (!empty($userUpdates)) { + $allIds = collect($userUpdates) + ->pluck('id') + ->filter() + ->map(fn($v) => (int) $v) + ->unique() + ->values() + ->all(); + + if (!empty($allIds)) { + $existingIds = User::query() + ->whereIn('id', $allIds) + ->pluck('id') + ->map(fn($v) => (int) $v) + ->all(); + + if (!empty($existingIds)) { + collect($userUpdates) + ->filter(fn($row) => in_array((int) ($row['id'] ?? 0), $existingIds, true)) + ->chunk(1000) + ->each(function ($chunk) use ($now) { + collect($chunk)->each(function ($update) use ($now) { + $id = (int) ($update['id'] ?? 0); + $count = (int) ($update['count'] ?? 0); + if ($id > 0) { + User::query() + ->whereKey($id) + ->update([ + 'online_count' => $count, + 'last_online_at' => $now, + ]); + } + }); + }); + } + } + } + } catch (\Throwable $e) { + Log::error('UpdateAliveDataJob failed', [ + 'error' => $e->getMessage(), + ]); + $this->fail($e); + } + } + + +}