mirror of
https://github.com/lkddi/Xboard.git
synced 2026-04-28 06:47:24 +08:00
refactor(online-status): consolidate updates and add cleanup command
This commit is contained in:
@@ -0,0 +1,52 @@
|
|||||||
|
<?php
|
||||||
|
|
||||||
|
namespace App\Console\Commands;
|
||||||
|
|
||||||
|
use Illuminate\Console\Command;
|
||||||
|
use App\Models\User;
|
||||||
|
use Illuminate\Support\Facades\Log;
|
||||||
|
|
||||||
|
class CleanupExpiredOnlineStatus extends Command
|
||||||
|
{
|
||||||
|
/**
|
||||||
|
* The name and signature of the console command.
|
||||||
|
*
|
||||||
|
* @var string
|
||||||
|
*/
|
||||||
|
protected $signature = 'cleanup:expired-online-status';
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The console command description.
|
||||||
|
*
|
||||||
|
* @var string
|
||||||
|
*/
|
||||||
|
protected $description = 'Reset online_count to 0 for users stale for 5+ minutes';
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Execute the console command.
|
||||||
|
*/
|
||||||
|
public function handle()
|
||||||
|
{
|
||||||
|
try {
|
||||||
|
$affected = 0;
|
||||||
|
User::query()
|
||||||
|
->where('online_count', '>', 0)
|
||||||
|
->where('last_online_at', '<', now()->subMinutes(5))
|
||||||
|
->chunkById(1000, function ($users) use (&$affected) {
|
||||||
|
if ($users->isEmpty()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
$count = User::whereIn('id', $users->pluck('id'))
|
||||||
|
->update(['online_count' => 0]);
|
||||||
|
$affected += $count;
|
||||||
|
}, 'id');
|
||||||
|
|
||||||
|
$this->info("Expired online status cleaned. Affected: {$affected}");
|
||||||
|
return self::SUCCESS;
|
||||||
|
} catch (\Throwable $e) {
|
||||||
|
Log::error('CleanupExpiredOnlineStatus failed', ['error' => $e->getMessage()]);
|
||||||
|
$this->error('Cleanup failed: ' . $e->getMessage());
|
||||||
|
return self::FAILURE;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -46,10 +46,7 @@ class Kernel extends ConsoleKernel
|
|||||||
// if (env('ENABLE_AUTO_BACKUP_AND_UPDATE', false)) {
|
// if (env('ENABLE_AUTO_BACKUP_AND_UPDATE', false)) {
|
||||||
// $schedule->command('backup:database', ['true'])->daily()->onOneServer();
|
// $schedule->command('backup:database', ['true'])->daily()->onOneServer();
|
||||||
// }
|
// }
|
||||||
// 每分钟清理过期的在线状态
|
$schedule->command('cleanup:expired-online-status')->everyMinute()->onOneServer()->withoutOverlapping(4);
|
||||||
$schedule->call(function () {
|
|
||||||
app(UserOnlineService::class)->cleanExpiredOnlineStatus();
|
|
||||||
})->everyMinute()->name('cleanup:expired-online-status')->onOneServer();
|
|
||||||
|
|
||||||
app(PluginManager::class)->registerPluginSchedules($schedule);
|
app(PluginManager::class)->registerPluginSchedules($schedule);
|
||||||
|
|
||||||
|
|||||||
@@ -3,6 +3,7 @@
|
|||||||
namespace App\Http\Controllers\V1\Server;
|
namespace App\Http\Controllers\V1\Server;
|
||||||
|
|
||||||
use App\Http\Controllers\Controller;
|
use App\Http\Controllers\Controller;
|
||||||
|
use App\Jobs\UpdateAliveDataJob;
|
||||||
use App\Services\ServerService;
|
use App\Services\ServerService;
|
||||||
use App\Services\UserService;
|
use App\Services\UserService;
|
||||||
use App\Utils\CacheKey;
|
use App\Utils\CacheKey;
|
||||||
@@ -216,7 +217,7 @@ class UniProxyController extends Controller
|
|||||||
'error' => 'Invalid online data'
|
'error' => 'Invalid online data'
|
||||||
], 400);
|
], 400);
|
||||||
}
|
}
|
||||||
$this->userOnlineService->updateAliveData($data, $node->type, $node->id);
|
UpdateAliveDataJob::dispatch($data, $node->type, $node->id);
|
||||||
return response()->json(['data' => true]);
|
return response()->json(['data' => true]);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -1,69 +0,0 @@
|
|||||||
<?php
|
|
||||||
|
|
||||||
|
|
||||||
namespace App\Jobs;
|
|
||||||
|
|
||||||
use App\Models\User;
|
|
||||||
use Illuminate\Bus\Queueable;
|
|
||||||
use Illuminate\Contracts\Queue\ShouldQueue;
|
|
||||||
use Illuminate\Foundation\Bus\Dispatchable;
|
|
||||||
use Illuminate\Queue\InteractsWithQueue;
|
|
||||||
use Illuminate\Queue\SerializesModels;
|
|
||||||
use Illuminate\Support\Collection;
|
|
||||||
|
|
||||||
class SyncUserOnlineStatusJob implements ShouldQueue
|
|
||||||
{
|
|
||||||
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 任务最大尝试次数
|
|
||||||
*/
|
|
||||||
public int $tries = 3;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 任务可以运行的最大秒数
|
|
||||||
*/
|
|
||||||
public int $timeout = 30;
|
|
||||||
|
|
||||||
public function __construct(
|
|
||||||
private readonly array $updates
|
|
||||||
) {
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 执行任务
|
|
||||||
*/
|
|
||||||
public function handle(): void
|
|
||||||
{
|
|
||||||
if (empty($this->updates)) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
collect($this->updates)
|
|
||||||
->chunk(1000)
|
|
||||||
->each(function (Collection $chunk) {
|
|
||||||
$userIds = $chunk->pluck('id')->all();
|
|
||||||
User::query()
|
|
||||||
->whereIn('id', $userIds)
|
|
||||||
->each(function (User $user) use ($chunk) {
|
|
||||||
$update = $chunk->firstWhere('id', $user->id);
|
|
||||||
if ($update) {
|
|
||||||
$user->update([
|
|
||||||
'online_count' => $update['count'],
|
|
||||||
'last_online_at' => now(),
|
|
||||||
]);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 任务失败的处理
|
|
||||||
*/
|
|
||||||
public function failed(\Throwable $exception): void
|
|
||||||
{
|
|
||||||
\Log::error('Failed to sync user online status', [
|
|
||||||
'error' => $exception->getMessage(),
|
|
||||||
'updates_count' => count($this->updates)
|
|
||||||
]);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -0,0 +1,108 @@
|
|||||||
|
<?php
|
||||||
|
|
||||||
|
namespace App\Jobs;
|
||||||
|
|
||||||
|
use App\Models\User;
|
||||||
|
use Illuminate\Bus\Queueable;
|
||||||
|
use Illuminate\Contracts\Queue\ShouldQueue;
|
||||||
|
use Illuminate\Foundation\Bus\Dispatchable;
|
||||||
|
use Illuminate\Queue\InteractsWithQueue;
|
||||||
|
use Illuminate\Queue\SerializesModels;
|
||||||
|
use Illuminate\Support\Facades\Cache;
|
||||||
|
use App\Services\UserOnlineService;
|
||||||
|
use Illuminate\Support\Facades\Log;
|
||||||
|
|
||||||
|
class UpdateAliveDataJob implements ShouldQueue
|
||||||
|
{
|
||||||
|
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
|
||||||
|
|
||||||
|
private const CACHE_PREFIX = 'ALIVE_IP_USER_';
|
||||||
|
private const CACHE_TTL = 120;
|
||||||
|
private const NODE_DATA_EXPIRY = 100;
|
||||||
|
|
||||||
|
public function __construct(
|
||||||
|
private readonly array $data,
|
||||||
|
private readonly string $nodeType,
|
||||||
|
private readonly int $nodeId
|
||||||
|
) {
|
||||||
|
$this->onQueue('online_sync');
|
||||||
|
}
|
||||||
|
|
||||||
|
public function handle(): void
|
||||||
|
{
|
||||||
|
try {
|
||||||
|
$updateAt = time();
|
||||||
|
$nowTs = time();
|
||||||
|
$now = now();
|
||||||
|
$nodeKey = $this->nodeType . $this->nodeId;
|
||||||
|
$userUpdates = [];
|
||||||
|
|
||||||
|
foreach ($this->data as $uid => $ips) {
|
||||||
|
$cacheKey = self::CACHE_PREFIX . $uid;
|
||||||
|
$ipsArray = Cache::get($cacheKey, []);
|
||||||
|
$ipsArray = [
|
||||||
|
...collect($ipsArray)
|
||||||
|
->filter(fn(mixed $value): bool => is_array($value) && ($updateAt - ($value['lastupdateAt'] ?? 0) <= self::NODE_DATA_EXPIRY)),
|
||||||
|
$nodeKey => [
|
||||||
|
'aliveips' => $ips,
|
||||||
|
'lastupdateAt' => $updateAt,
|
||||||
|
],
|
||||||
|
];
|
||||||
|
|
||||||
|
$count = UserOnlineService::calculateDeviceCount($ipsArray);
|
||||||
|
$ipsArray['alive_ip'] = $count;
|
||||||
|
Cache::put($cacheKey, $ipsArray, now()->addSeconds(self::CACHE_TTL));
|
||||||
|
|
||||||
|
$userUpdates[] = [
|
||||||
|
'id' => (int) $uid,
|
||||||
|
'count' => (int) $count,
|
||||||
|
];
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!empty($userUpdates)) {
|
||||||
|
$allIds = collect($userUpdates)
|
||||||
|
->pluck('id')
|
||||||
|
->filter()
|
||||||
|
->map(fn($v) => (int) $v)
|
||||||
|
->unique()
|
||||||
|
->values()
|
||||||
|
->all();
|
||||||
|
|
||||||
|
if (!empty($allIds)) {
|
||||||
|
$existingIds = User::query()
|
||||||
|
->whereIn('id', $allIds)
|
||||||
|
->pluck('id')
|
||||||
|
->map(fn($v) => (int) $v)
|
||||||
|
->all();
|
||||||
|
|
||||||
|
if (!empty($existingIds)) {
|
||||||
|
collect($userUpdates)
|
||||||
|
->filter(fn($row) => in_array((int) ($row['id'] ?? 0), $existingIds, true))
|
||||||
|
->chunk(1000)
|
||||||
|
->each(function ($chunk) use ($now) {
|
||||||
|
collect($chunk)->each(function ($update) use ($now) {
|
||||||
|
$id = (int) ($update['id'] ?? 0);
|
||||||
|
$count = (int) ($update['count'] ?? 0);
|
||||||
|
if ($id > 0) {
|
||||||
|
User::query()
|
||||||
|
->whereKey($id)
|
||||||
|
->update([
|
||||||
|
'online_count' => $count,
|
||||||
|
'last_online_at' => $now,
|
||||||
|
]);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (\Throwable $e) {
|
||||||
|
Log::error('UpdateAliveDataJob failed', [
|
||||||
|
'error' => $e->getMessage(),
|
||||||
|
]);
|
||||||
|
$this->fail($e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user