diff --git a/app/Console/Commands/Test.php b/app/Console/Commands/Test.php index fc0841b1..d197a4d1 100644 --- a/app/Console/Commands/Test.php +++ b/app/Console/Commands/Test.php @@ -26,6 +26,7 @@ use App\Models\User; use App\Models\UserBanLog; use App\Repositories\AgentAllowRepository; use App\Repositories\AttendanceRepository; +use App\Repositories\CleanupRepository; use App\Repositories\ExamRepository; use App\Repositories\HitAndRunRepository; use App\Repositories\MeiliSearchRepository; @@ -97,8 +98,9 @@ class Test extends Command */ public function handle() { - $arr = ['aa' => ['bb' => []]]; - dd(array_filter($arr)); + $redis = NexusDB::redis(); + $r = CleanupRepository::recordBatch($redis, 99, 100); + dd($r); } } diff --git a/app/Jobs/UpdateTorrentSeedersEtc.php b/app/Jobs/UpdateTorrentSeedersEtc.php index 1feb37b4..2fa155a4 100644 --- a/app/Jobs/UpdateTorrentSeedersEtc.php +++ b/app/Jobs/UpdateTorrentSeedersEtc.php @@ -4,6 +4,7 @@ namespace App\Jobs; use App\Models\Setting; use App\Models\User; +use App\Repositories\CleanupRepository; use Illuminate\Bus\Queueable; use Illuminate\Contracts\Queue\ShouldBeUnique; use Illuminate\Contracts\Queue\ShouldQueue; @@ -55,50 +56,7 @@ class UpdateTorrentSeedersEtc implements ShouldQueue */ public function handle() { - $beginTimestamp = time(); - $logPrefix = sprintf("[CLEANUP_CLI_UPDATE_TORRENT_SEEDERS_ETC_HANDLE_JOB], commonRequestId: %s, beginTorrentId: %s, endTorrentId: %s", $this->requestId, $this->beginTorrentId, $this->endTorrentId); -// $sql = sprintf("update torrents set seeders = (select count(*) from peers where torrent = torrents.id and seeder = 'yes'), leechers = (select count(*) from peers where torrent = torrents.id and seeder = 'no'), comments = (select count(*) from comments where torrent = torrents.id) where id > %s and id <= %s", -// $this->beginTorrentId, $this->endTorrentId -// ); -// $result = NexusDB::statement($sql); - - $torrents = NexusDB::table('torrents') - ->where('id', '>', $this->beginTorrentId) - ->where('id', '<=', $this->endTorrentId) - ->get(['id']) - ; - foreach ($torrents as $torrent) { - $peerResult = NexusDB::table('peers') - ->where('torrent', $torrent->id) - ->selectRaw("count(*) as count, seeder") - ->groupBy('seeder') - ->get() - ; - $commentResult = NexusDB::table('comments') - ->where('torrent', $torrent->id) - ->selectRaw("count(*) as count") - ->first() - ; - $update = [ - 'comments' => $commentResult && $commentResult->count !== null ? $commentResult->count : 0, - 'seeders' => 0, - 'leechers' => 0, - ]; - foreach ($peerResult as $item) { - if ($item->seeder == 'yes') { - $update['seeders'] = $item->count; - } elseif ($item->seeder == 'no') { - $update['leechers'] = $item->count; - } - } - NexusDB::table('torrents')->where('id', $torrent->id)->update($update); - do_log("[CLEANUP_CLI_UPDATE_TORRENT_SEEDERS_ETC_HANDLE_TORRENT], [SUCCESS]: $torrent->id => " . json_encode($update)); - } - $costTime = time() - $beginTimestamp; - do_log(sprintf( - "$logPrefix, [DONE], update torrent count: %s, cost time: %s seconds", - count($torrents), $costTime - )); + CleanupRepository::runBatchJob(CleanupRepository::TORRENT_SEEDERS_ETC_BATCH_KEY, $this->requestId); } /** diff --git a/app/Jobs/UpdateUserSeedingLeechingTime.php b/app/Jobs/UpdateUserSeedingLeechingTime.php index 95dbc307..9700d2ff 100644 --- a/app/Jobs/UpdateUserSeedingLeechingTime.php +++ b/app/Jobs/UpdateUserSeedingLeechingTime.php @@ -3,6 +3,7 @@ namespace App\Jobs; use App\Models\Setting; +use App\Repositories\CleanupRepository; use Carbon\Carbon; use Illuminate\Bus\Queueable; use Illuminate\Contracts\Queue\ShouldBeUnique; @@ -55,42 +56,7 @@ class UpdateUserSeedingLeechingTime implements ShouldQueue */ public function handle() { - $beginTimestamp = time(); - $logPrefix = sprintf("[CLEANUP_CLI_UPDATE_SEEDING_LEECHING_TIME_HANDLE_JOB], commonRequestId: %s, beginUid: %s, endUid: %s", $this->requestId, $this->beginUid, $this->endUid); -// $sql = sprintf( -// "update users set seedtime = (select sum(seedtime) from snatched where userid = users.id), leechtime=(select sum(leechtime) from snatched where userid = users.id), seed_time_updated_at = '%s' where id > %s and id <= %s and status = 'confirmed' and enabled = 'yes'", -// now()->toDateTimeString(), $this->beginUid, $this->endUid -// ); -// $results = NexusDB::statement($sql); - - $users = NexusDB::table('users') - ->where('id', '>', $this->beginUid) - ->where('id', '<=', $this->endUid) - ->where('status', 'confirmed') - ->where('enabled', 'yes') - ->get(['id']) - ; - $count = 0; - foreach ($users as $user) { - $sumInfo = NexusDB::table('snatched') - ->selectRaw('sum(seedtime) as seedtime_sum, sum(leechtime) as leechtime_sum') - ->where('userid', $user->id) - ->first(); - if ($sumInfo && $sumInfo->seedtime_sum !== null) { - $update = [ - 'seedtime' => $sumInfo->seedtime_sum ?? 0, - 'leechtime' => $sumInfo->leechtime_sum ?? 0, - 'seed_time_updated_at' => Carbon::now()->toDateTimeString(), - ]; - NexusDB::table('users') - ->where('id', $user->id) - ->update($update); - do_log("[CLEANUP_CLI_UPDATE_SEEDING_LEECHING_TIME_HANDLE_USER], [SUCCESS]: $user->id => " . json_encode($update)); - $count++; - } - } - $costTime = time() - $beginTimestamp; - do_log("$logPrefix, [DONE], user total count: " . count($users) . ", success update count: $count, cost time: $costTime seconds"); + CleanupRepository::runBatchJob(CleanupRepository::USER_SEEDING_LEECHING_TIME_BATCH_KEY, $this->requestId); } /** diff --git a/app/Repositories/CleanupRepository.php b/app/Repositories/CleanupRepository.php index f7798f8a..aacd4142 100644 --- a/app/Repositories/CleanupRepository.php +++ b/app/Repositories/CleanupRepository.php @@ -6,45 +6,53 @@ use Nexus\Database\NexusDB; class CleanupRepository extends BaseRepository { - const USER_SEED_BONUS_BATCH_LIST_KEY = "batch_key:user_seed_bonus"; - const USER_SEEDING_LEECHING_TIME_BATCH_LIST_KEY = "batch_key:user_seeding_leeching_time"; - const TORRENT_SEEDERS_ETC_BATCH_LIST_KEY = "batch_key:torrent_seeders_etc"; + const USER_SEED_BONUS_BATCH_KEY = "batch_key:user_seed_bonus"; + const USER_SEEDING_LEECHING_TIME_BATCH_KEY = "batch_key:user_seeding_leeching_time"; + const TORRENT_SEEDERS_ETC_BATCH_KEY = "batch_key:torrent_seeders_etc"; public static function recordBatch(\Redis $redis, $uid, $torrentId) { - self::doRecordBatch($redis, self::USER_SEED_BONUS_BATCH_LIST_KEY, $uid); - self::doRecordBatch($redis, self::USER_SEEDING_LEECHING_TIME_BATCH_LIST_KEY, $uid); - self::doRecordBatch($redis, self::TORRENT_SEEDERS_ETC_BATCH_LIST_KEY, $torrentId); - } - - private static function doRecordBatch(\Redis $redis, $batchListKey, $hashKey) - { - $batchKey = $redis->rPop($batchListKey); - if ($batchKey === false) { - //not exists - $batchKey = date('YmdHis'); - $redis->lPush($batchListKey, $batchKey); + $args = [ + self::USER_SEEDING_LEECHING_TIME_BATCH_KEY, self::TORRENT_SEEDERS_ETC_BATCH_KEY, + $uid, $torrentId, self::getHashKeySuffix() + ]; + $result = $redis->eval(self::getAddRecordLuaScript(), $args, 2); + $err = $redis->getLastError(); + if ($err) { + do_log("[REDIS_LUA_ERROR]: $err", "error"); } - $redis->hSetNx($batchKey, $hashKey, date('YmdHis')); + return $result; } - - public static function updateUserLeechingSeedingTime($requestId) + public static function runBatchJob($batchKey, $requestId) { - global $Cache; - $redis = $Cache->getRedis(); - - $logPrefix = sprintf("[CLEANUP_CLI_UPDATE_USER_SEEDING_LEECHING_TIME], commonRequestId: %s", $requestId); + $redis = NexusDB::redis(); + $logPrefix = sprintf("[$batchKey], commonRequestId: %s", $requestId); $beginTimestamp = time(); - $count = 0; - $size = 1000; - $batchListKey = self::USER_SEEDING_LEECHING_TIME_BATCH_LIST_KEY; - $batch = $redis->lPop($batchListKey); - if ($batch === false) { - do_log("$logPrefix, batchListKey: $batchListKey, no batch..."); + $batch = self::getBatch($redis, $batchKey); + if (!$batch) { + do_log("$logPrefix, batchKey: $batchKey no batch...", 'error'); return; } + //update the batch key + $redis->set($batchKey, $batchKey . self::getHashKeySuffix()); + $count = match ($batchKey) { + self::USER_SEEDING_LEECHING_TIME_BATCH_KEY => self::updateUserLeechingSeedingTime($redis, $batch, $logPrefix), + self::TORRENT_SEEDERS_ETC_BATCH_KEY => self::updateTorrentSeedersEtc($redis, $batch, $logPrefix), + default => throw new \InvalidArgumentException("Invalid batchKey: $batchKey") + }; + //remove this batch + $redis->del($batch); + $endTimestamp = time(); + do_log(sprintf("$logPrefix, [DONE], batch: $batch, count: $count, cost time: %d seconds", $endTimestamp - $beginTimestamp)); + } + + + private static function updateUserLeechingSeedingTime(\Redis $redis, $batch, $logPrefix): int + { + $count = 0; + $size = 1000; $it = NULL; /* Don't ever return an empty array until we're done iterating */ $redis->setOption(\Redis::OPT_SCAN, \Redis::SCAN_RETRY); @@ -68,28 +76,15 @@ class CleanupRepository extends BaseRepository $count++; } } - sleep(rand(10, 60)); + sleep(rand(1, 10)); } - $costTime = time() - $beginTimestamp; - do_log("$logPrefix, [DONE] success update count: $count, cost time: $costTime seconds"); + return $count; } - public static function updateTorrentSeedersEtc($requestId) + private static function updateTorrentSeedersEtc(\Redis $redis, $batch, $logPrefix) { - global $Cache; - $redis = $Cache->getRedis(); - - $logPrefix = sprintf("[CLEANUP_CLI_UPDATE_TORRENT_SEEDERS_ETC], commonRequestId: %s", $requestId); - $beginTimestamp = time(); - $count = 0; $size = 1000; - $batchListKey = self::TORRENT_SEEDERS_ETC_BATCH_LIST_KEY; - $batch = $redis->lPop($batchListKey); - if ($batch === false) { - do_log("$logPrefix, batchListKey: $batchListKey, no batch..."); - return; - } $it = NULL; /* Don't ever return an empty array until we're done iterating */ $redis->setOption(\Redis::OPT_SCAN, \Redis::SCAN_RETRY); @@ -123,9 +118,59 @@ class CleanupRepository extends BaseRepository do_log("$logPrefix, [SUCCESS]: $torrentId => " . json_encode($update)); $count++; } - sleep(rand(10, 60)); + sleep(rand(1, 10)); } - $costTime = time() - $beginTimestamp; - do_log("$logPrefix, [DONE] success update count: $count, cost time: $costTime seconds"); + return $count; } + + private static function getBatch(\Redis $redis, $batchKey) + { + $batch = $redis->get($batchKey); + if ($batch === false) { + do_log("batchKey: $batchKey, no batch...", 'error'); + return false; + } + if (!$redis->exists($batch)) { + do_log("batch: $batch, not exists...", 'error'); + return false; + } + return $batch; + } + + /** + * USER_SEEDING_LEECHING_TIME, TORRENT_SEEDERS_ETC, uid, torrentId, timeStr + * + * @return string + */ + private static function getAddRecordLuaScript(): string + { + return <<<'LUA' +local batchList = {KEYS[1], KEYS[2]} +for k, v in pairs(batchList) do + local batchKey = redis.call("GET", v) + local isBatchKeyNew = false + if batchKey == false then + batchKey = v .. ARGV[3] + redis.call("SET", v, batchKey, "EX", 2592000) + isBatchKeyNew = true + end + local hashKey + if k == 1 then + hashKey = ARGV[1] + else + hashKey = ARGV[2] + end + redis.call("HSETNX", batchKey, hashKey, ARGV[3]) + if isBatchKeyNew then + redis.call("EXPIRE", batchKey, 2592000) + end +end +LUA; + } + + private static function getHashKeySuffix(): string + { + return ":" . date('Ymd_His'); + } + } diff --git a/include/cleanup.php b/include/cleanup.php index 335b98fd..40f7a7af 100644 --- a/include/cleanup.php +++ b/include/cleanup.php @@ -408,25 +408,13 @@ function docleanup($forceAll = 0, $printProgress = false) { // sql_query("UPDATE torrents SET " . implode(",", $update) . " WHERE id = $id") or sqlerr(__FILE__, __LINE__); // } - $delayBase = $baseDuration; - $maxTorrentIdRes = mysql_fetch_assoc(sql_query("select max(id) as max_torrent_id from torrents limit 1")); - $maxTorrentId = $maxTorrentIdRes['max_torrent_id']; - $chunk = 1000; - $beginTorrentId = 0; - $chunkCounts = ceil($maxTorrentId / $chunk); - $delay = ceil($baseDuration/$chunkCounts); - $i = 0; - do_log("maxTorrentId: $maxTorrentId, chunk: $chunk, chunkCounts: $chunkCounts, delayBase: $delayBase, delay: $delay"); - do { - $command = sprintf( - 'cleanup --action=seeders_etc --begin_id=%s --end_id=%s --request_id=%s --delay=%s', - $beginTorrentId, $beginTorrentId + $chunk, $requestId, $delayBase + $i * $delay - ); - $output = executeCommand($command, 'string', true); - do_log(sprintf('command: %s, output: %s', $command, $output)); - $beginTorrentId += $chunk; - $i++; - } while ($beginTorrentId < $maxTorrentId); + $command = sprintf( + 'cleanup --action=seeders_etc --begin_id=%s --end_id=%s --request_id=%s --delay=%s', + 0, 0, $requestId, 0 + ); + $output = executeCommand($command, 'string', true); + do_log(sprintf('command: %s, output: %s', $command, $output)); + $log = "update count of seeders, leechers, comments for torrents"; do_log($log); if ($printProgress) { @@ -903,23 +891,12 @@ function docleanup($forceAll = 0, $printProgress = false) { // sql_query("UPDATE users SET seedtime = " . intval($arr2['st']) . ", leechtime = " . intval($arr2['lt']) . " WHERE id = " . $arr['id']) or sqlerr(__FILE__, __LINE__); // } - $chunk = 1000; - $beginUid = 0; - $delayBase = $baseDuration * 2; - $chunkCounts = ceil($maxUid / $chunk); - $delay = ceil($baseDuration/$chunkCounts); - $i = 0; - do_log("maxUid: $maxUid, chunk: $chunk, chunkCounts: $chunkCounts, delayBase: $delayBase, delay: $delay"); - do { - $command = sprintf( - 'cleanup --action=seeding_leeching_time --begin_id=%s --end_id=%s --request_id=%s --delay=%s', - $beginUid, $beginUid + $chunk, $requestId, $delayBase + $delay * $i - ); - $output = executeCommand($command, 'string', true); - do_log(sprintf('command: %s, output: %s', $command, $output)); - $beginUid += $chunk; - $i++; - } while ($beginUid < $maxUid); + $command = sprintf( + 'cleanup --action=seeding_leeching_time --begin_id=%s --end_id=%s --request_id=%s --delay=%s', + 0, 0, $requestId, 0 + ); + $output = executeCommand($command, 'string', true); + do_log(sprintf('command: %s, output: %s', $command, $output)); $log = "update total seeding and leeching time of users"; do_log($log); diff --git a/include/constants.php b/include/constants.php index 85dfe66c..aaee0e16 100644 --- a/include/constants.php +++ b/include/constants.php @@ -1,6 +1,6 @@ set($lockKey, TIMENOW, ['nx', 'ex' => $autoclean_interval_one])) { + \App\Repositories\CleanupRepository::recordBatch($redis, $userid, $torrentid); +} do_action('announced', $torrent, $az, $_REQUEST); benc_resp($rep_dict); ?>