From 2ac126dd42e6400fe8f5f0c644348b05a4eb4497 Mon Sep 17 00:00:00 2001 From: xboard Date: Mon, 15 Sep 2025 09:56:36 +0800 Subject: [PATCH] refactor(Jobs): Optimize traffic statistics jobs with upsert --- app/Jobs/StatServerJob.php | 89 +++++++++++++++++++++++++++----------- app/Jobs/StatUserJob.php | 88 ++++++++++++++++++++++++++----------- 2 files changed, 126 insertions(+), 51 deletions(-) diff --git a/app/Jobs/StatServerJob.php b/app/Jobs/StatServerJob.php index ba567b0..9c822cc 100644 --- a/app/Jobs/StatServerJob.php +++ b/app/Jobs/StatServerJob.php @@ -45,16 +45,12 @@ class StatServerJob implements ShouldQueue $this->recordType = $recordType; } - /** - * Execute the job. - */ public function handle(): void { $recordAt = $this->recordType === 'm' ? strtotime(date('Y-m-01')) : strtotime(date('Y-m-d')); - // Aggregate traffic data $u = $d = 0; foreach ($this->data as $traffic) { $u += $traffic[0]; @@ -62,31 +58,72 @@ class StatServerJob implements ShouldQueue } try { - DB::transaction(function () use ($u, $d, $recordAt) { - $affected = StatServer::where([ - 'record_at' => $recordAt, - 'server_id' => $this->server['id'], - 'server_type' => $this->protocol, - 'record_type' => $this->recordType, - ])->update([ - 'u' => DB::raw('u + ' . $u), - 'd' => DB::raw('d + ' . $d), - ]); - - if (!$affected) { - StatServer::create([ - 'record_at' => $recordAt, - 'server_id' => $this->server['id'], - 'server_type' => $this->protocol, - 'record_type' => $this->recordType, - 'u' => $u, - 'd' => $d, - ]); - } - }, 3); + $this->processServerStat($u, $d, $recordAt); } catch (\Exception $e) { Log::error('StatServerJob failed for server ' . $this->server['id'] . ': ' . $e->getMessage()); throw $e; } } + + protected function processServerStat(int $u, int $d, int $recordAt): void + { + if (config('database.default') === 'sqlite') { + $this->processServerStatForSqlite($u, $d, $recordAt); + } else { + $this->processServerStatForOtherDatabases($u, $d, $recordAt); + } + } + + protected function processServerStatForSqlite(int $u, int $d, int $recordAt): void + { + DB::transaction(function () use ($u, $d, $recordAt) { + $existingRecord = StatServer::where([ + 'record_at' => $recordAt, + 'server_id' => $this->server['id'], + 'server_type' => $this->protocol, + 'record_type' => $this->recordType, + ])->first(); + + if ($existingRecord) { + $existingRecord->update([ + 'u' => $existingRecord->u + $u, + 'd' => $existingRecord->d + $d, + 'updated_at' => time(), + ]); + } else { + StatServer::create([ + 'record_at' => $recordAt, + 'server_id' => $this->server['id'], + 'server_type' => $this->protocol, + 'record_type' => $this->recordType, + 'u' => $u, + 'd' => $d, + 'created_at' => time(), + 'updated_at' => time(), + ]); + } + }, 3); + } + + protected function processServerStatForOtherDatabases(int $u, int $d, int $recordAt): void + { + StatServer::upsert( + [ + 'record_at' => $recordAt, + 'server_id' => $this->server['id'], + 'server_type' => $this->protocol, + 'record_type' => $this->recordType, + 'u' => $u, + 'd' => $d, + 'created_at' => time(), + 'updated_at' => time(), + ], + ['server_id', 'server_type', 'record_at', 'record_type'], + [ + 'u' => DB::raw("u + VALUES(u)"), + 'd' => DB::raw("d + VALUES(d)"), + 'updated_at' => time(), + ] + ); + } } diff --git a/app/Jobs/StatUserJob.php b/app/Jobs/StatUserJob.php index c8927f2..db00882 100644 --- a/app/Jobs/StatUserJob.php +++ b/app/Jobs/StatUserJob.php @@ -45,9 +45,6 @@ class StatUserJob implements ShouldQueue $this->recordType = $recordType; } - /** - * Execute the job. - */ public function handle(): void { $recordAt = $this->recordType === 'm' @@ -56,32 +53,73 @@ class StatUserJob implements ShouldQueue foreach ($this->data as $uid => $v) { try { - DB::transaction(function () use ($uid, $v, $recordAt) { - $affected = StatUser::where([ - 'user_id' => $uid, - 'server_rate' => $this->server['rate'], - 'record_at' => $recordAt, - 'record_type' => $this->recordType, - ])->update([ - 'u' => DB::raw('u + ' . ($v[0] * $this->server['rate'])), - 'd' => DB::raw('d + ' . ($v[1] * $this->server['rate'])), - ]); - - if (!$affected) { - StatUser::create([ - 'user_id' => $uid, - 'server_rate' => $this->server['rate'], - 'record_at' => $recordAt, - 'record_type' => $this->recordType, - 'u' => ($v[0] * $this->server['rate']), - 'd' => ($v[1] * $this->server['rate']), - ]); - } - }, 3); + $this->processUserStat($uid, $v, $recordAt); } catch (\Exception $e) { Log::error('StatUserJob failed for user ' . $uid . ': ' . $e->getMessage()); throw $e; } } } + + protected function processUserStat(int $uid, array $v, int $recordAt): void + { + if (config('database.default') === 'sqlite') { + $this->processUserStatForSqlite($uid, $v, $recordAt); + } else { + $this->processUserStatForOtherDatabases($uid, $v, $recordAt); + } + } + + protected function processUserStatForSqlite(int $uid, array $v, int $recordAt): void + { + DB::transaction(function () use ($uid, $v, $recordAt) { + $existingRecord = StatUser::where([ + 'user_id' => $uid, + 'server_rate' => $this->server['rate'], + 'record_at' => $recordAt, + 'record_type' => $this->recordType, + ])->first(); + + if ($existingRecord) { + $existingRecord->update([ + 'u' => $existingRecord->u + ($v[0] * $this->server['rate']), + 'd' => $existingRecord->d + ($v[1] * $this->server['rate']), + 'updated_at' => time(), + ]); + } else { + StatUser::create([ + 'user_id' => $uid, + 'server_rate' => $this->server['rate'], + 'record_at' => $recordAt, + 'record_type' => $this->recordType, + 'u' => ($v[0] * $this->server['rate']), + 'd' => ($v[1] * $this->server['rate']), + 'created_at' => time(), + 'updated_at' => time(), + ]); + } + }, 3); + } + + protected function processUserStatForOtherDatabases(int $uid, array $v, int $recordAt): void + { + StatUser::upsert( + [ + 'user_id' => $uid, + 'server_rate' => $this->server['rate'], + 'record_at' => $recordAt, + 'record_type' => $this->recordType, + 'u' => ($v[0] * $this->server['rate']), + 'd' => ($v[1] * $this->server['rate']), + 'created_at' => time(), + 'updated_at' => time(), + ], + ['user_id', 'server_rate', 'record_at', 'record_type'], + [ + 'u' => DB::raw("u + VALUES(u)"), + 'd' => DB::raw("d + VALUES(d)"), + 'updated_at' => time(), + ] + ); + } } \ No newline at end of file