mirror of
https://github.com/lkddi/nexusphp.git
synced 2026-04-24 03:57:22 +08:00
Refactoring to clean up asynchronous tasks
This commit is contained in:
@@ -59,7 +59,42 @@ class UpdateTorrentSeedersEtc implements ShouldQueue
|
|||||||
*/
|
*/
|
||||||
public function handle()
|
public function handle()
|
||||||
{
|
{
|
||||||
CleanupRepository::runBatchJob(CleanupRepository::TORRENT_SEEDERS_ETC_BATCH_KEY, $this->requestId);
|
$beginTimestamp = time();
|
||||||
|
$logPrefix = sprintf("[CLEANUP_CLI_UPDATE_TORRENT_SEEDERS_ETC_HANDLE_JOB], commonRequestId: %s, beginTorrentId: %s, endTorrentId: %s", $this->requestId, $this->beginTorrentId, $this->endTorrentId);
|
||||||
|
|
||||||
|
$torrentIdArr = explode(",", $this->idStr);
|
||||||
|
foreach ($torrentIdArr as $torrentId) {
|
||||||
|
$peerResult = NexusDB::table('peers')
|
||||||
|
->where('torrent', $torrentId)
|
||||||
|
->selectRaw("count(*) as count, seeder")
|
||||||
|
->groupBy('seeder')
|
||||||
|
->get()
|
||||||
|
;
|
||||||
|
$commentResult = NexusDB::table('comments')
|
||||||
|
->where('torrent',$torrentId)
|
||||||
|
->selectRaw("count(*) as count")
|
||||||
|
->first()
|
||||||
|
;
|
||||||
|
$update = [
|
||||||
|
'comments' => $commentResult && $commentResult->count !== null ? $commentResult->count : 0,
|
||||||
|
'seeders' => 0,
|
||||||
|
'leechers' => 0,
|
||||||
|
];
|
||||||
|
foreach ($peerResult as $item) {
|
||||||
|
if ($item->seeder == 'yes') {
|
||||||
|
$update['seeders'] = $item->count;
|
||||||
|
} elseif ($item->seeder == 'no') {
|
||||||
|
$update['leechers'] = $item->count;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
NexusDB::table('torrents')->where('id', $torrentId)->update($update);
|
||||||
|
do_log("[CLEANUP_CLI_UPDATE_TORRENT_SEEDERS_ETC_HANDLE_TORRENT], [SUCCESS]: $torrentId => " . json_encode($update));
|
||||||
|
}
|
||||||
|
$costTime = time() - $beginTimestamp;
|
||||||
|
do_log(sprintf(
|
||||||
|
"$logPrefix, [DONE], update torrent count: %s, cost time: %s seconds",
|
||||||
|
count($torrentIdArr), $costTime
|
||||||
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|||||||
@@ -59,7 +59,31 @@ class UpdateUserSeedingLeechingTime implements ShouldQueue
|
|||||||
*/
|
*/
|
||||||
public function handle()
|
public function handle()
|
||||||
{
|
{
|
||||||
CleanupRepository::runBatchJob(CleanupRepository::USER_SEEDING_LEECHING_TIME_BATCH_KEY, $this->requestId);
|
$beginTimestamp = time();
|
||||||
|
$logPrefix = sprintf("[CLEANUP_CLI_UPDATE_SEEDING_LEECHING_TIME_HANDLE_JOB], commonRequestId: %s, beginUid: %s, endUid: %s", $this->requestId, $this->beginUid, $this->endUid);
|
||||||
|
|
||||||
|
$count = 0;
|
||||||
|
$uidArr = explode(",", $this->idStr);
|
||||||
|
foreach ($uidArr as $uid) {
|
||||||
|
$sumInfo = NexusDB::table('snatched')
|
||||||
|
->selectRaw('sum(seedtime) as seedtime_sum, sum(leechtime) as leechtime_sum')
|
||||||
|
->where('userid', $uid)
|
||||||
|
->first();
|
||||||
|
if ($sumInfo && $sumInfo->seedtime_sum !== null) {
|
||||||
|
$update = [
|
||||||
|
'seedtime' => $sumInfo->seedtime_sum ?? 0,
|
||||||
|
'leechtime' => $sumInfo->leechtime_sum ?? 0,
|
||||||
|
'seed_time_updated_at' => Carbon::now()->toDateTimeString(),
|
||||||
|
];
|
||||||
|
NexusDB::table('users')
|
||||||
|
->where('id', $uid)
|
||||||
|
->update($update);
|
||||||
|
do_log("[CLEANUP_CLI_UPDATE_SEEDING_LEECHING_TIME_HANDLE_USER], [SUCCESS]: $uid => " . json_encode($update));
|
||||||
|
$count++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
$costTime = time() - $beginTimestamp;
|
||||||
|
do_log("$logPrefix, [DONE], user total count: " . count($uidArr) . ", success update count: $count, cost time: $costTime seconds");
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|||||||
@@ -12,6 +12,21 @@ class CleanupRepository extends BaseRepository
|
|||||||
const USER_SEEDING_LEECHING_TIME_BATCH_KEY = "batch_key:user_seeding_leeching_time";
|
const USER_SEEDING_LEECHING_TIME_BATCH_KEY = "batch_key:user_seeding_leeching_time";
|
||||||
const TORRENT_SEEDERS_ETC_BATCH_KEY = "batch_key:torrent_seeders_etc";
|
const TORRENT_SEEDERS_ETC_BATCH_KEY = "batch_key:torrent_seeders_etc";
|
||||||
|
|
||||||
|
private static array $batchKeyActionsMap = [
|
||||||
|
self::USER_SEED_BONUS_BATCH_KEY => [
|
||||||
|
'action' => 'seed_bonus',
|
||||||
|
'task_index' => 0,
|
||||||
|
],
|
||||||
|
self::TORRENT_SEEDERS_ETC_BATCH_KEY => [
|
||||||
|
'action' => 'seeders_etc',
|
||||||
|
'task_index' => 1,
|
||||||
|
],
|
||||||
|
self::USER_SEEDING_LEECHING_TIME_BATCH_KEY => [
|
||||||
|
'action' => 'seeding_leeching_time',
|
||||||
|
'task_index' => 2,
|
||||||
|
],
|
||||||
|
];
|
||||||
|
|
||||||
private static int $totalTask = 3;
|
private static int $totalTask = 3;
|
||||||
|
|
||||||
private static int $oneTaskSeconds = 0;
|
private static int $oneTaskSeconds = 0;
|
||||||
@@ -37,11 +52,26 @@ class CleanupRepository extends BaseRepository
|
|||||||
self::runBatchJob(self::USER_SEED_BONUS_BATCH_KEY, $requestId);
|
self::runBatchJob(self::USER_SEED_BONUS_BATCH_KEY, $requestId);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static function runBatchJob($batchKey, $requestId)
|
public static function runBatchJobUpdateUserSeedingLeechingTime(string $requestId)
|
||||||
|
{
|
||||||
|
self::runBatchJob(self::USER_SEEDING_LEECHING_TIME_BATCH_KEY, $requestId);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static function runBatchJobUpdateTorrentSeedersEtc(string $requestId)
|
||||||
|
{
|
||||||
|
self::runBatchJob(self::TORRENT_SEEDERS_ETC_BATCH_KEY, $requestId);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static function runBatchJob($batchKey, $requestId)
|
||||||
{
|
{
|
||||||
$redis = NexusDB::redis();
|
$redis = NexusDB::redis();
|
||||||
$logPrefix = sprintf("[$batchKey], commonRequestId: %s", $requestId);
|
$logPrefix = sprintf("[$batchKey], commonRequestId: %s", $requestId);
|
||||||
$beginTimestamp = time();
|
$beginTimestamp = time();
|
||||||
|
if (!isset(self::$batchKeyActionsMap[$batchKey])) {
|
||||||
|
do_log("$logPrefix, batchKey: $batchKey invalid", 'error');
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
$batchKeyInfo = self::$batchKeyActionsMap[$batchKey];
|
||||||
|
|
||||||
$batch = self::getBatch($redis, $batchKey);
|
$batch = self::getBatch($redis, $batchKey);
|
||||||
if (!$batch) {
|
if (!$batch) {
|
||||||
@@ -50,12 +80,28 @@ class CleanupRepository extends BaseRepository
|
|||||||
}
|
}
|
||||||
//update the batch key
|
//update the batch key
|
||||||
$redis->set($batchKey, $batchKey . ":" . self::getHashKeySuffix());
|
$redis->set($batchKey, $batchKey . ":" . self::getHashKeySuffix());
|
||||||
$count = match ($batchKey) {
|
|
||||||
self::USER_SEEDING_LEECHING_TIME_BATCH_KEY => self::updateUserLeechingSeedingTime($redis, $batch, $requestId),
|
$count = 0;
|
||||||
self::TORRENT_SEEDERS_ETC_BATCH_KEY => self::updateTorrentSeedersEtc($redis, $batch, $requestId),
|
$it = NULL;
|
||||||
self::USER_SEED_BONUS_BATCH_KEY => self::calculateUserSeedBonus($redis, $batch, $requestId),
|
$length = $redis->hLen($batch);
|
||||||
default => throw new \InvalidArgumentException("Invalid batchKey: $batchKey")
|
$page = 0;
|
||||||
};
|
/* Don't ever return an empty array until we're done iterating */
|
||||||
|
$redis->setOption(\Redis::OPT_SCAN, \Redis::SCAN_RETRY);
|
||||||
|
while($arr_keys = $redis->hScan($batch, $it, "*", self::$scanSize)) {
|
||||||
|
foreach($arr_keys as $k => $v) {
|
||||||
|
$delay = self::getDelay($batchKeyInfo['task_index'], $length, $page);
|
||||||
|
$idStr = implode(",", array_keys($arr_keys));
|
||||||
|
$command = sprintf(
|
||||||
|
'cleanup --action=%s --begin_id=%s --end_id=%s --id_str=%s --request_id=%s --delay=%s',
|
||||||
|
$batchKeyInfo['action'], 0, 0, $idStr, $requestId, $delay
|
||||||
|
);
|
||||||
|
$output = executeCommand($command, 'string', true);
|
||||||
|
do_log(sprintf('command: %s, output: %s', $command, $output));
|
||||||
|
$page++;
|
||||||
|
$count += count($arr_keys);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
//remove this batch
|
//remove this batch
|
||||||
$redis->del($batch);
|
$redis->del($batch);
|
||||||
$endTimestamp = time();
|
$endTimestamp = time();
|
||||||
@@ -63,79 +109,6 @@ class CleanupRepository extends BaseRepository
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
private static function updateUserLeechingSeedingTime(\Redis $redis, $batch, $logPrefix): int
|
|
||||||
{
|
|
||||||
$count = 0;
|
|
||||||
$size = 1000;
|
|
||||||
$it = NULL;
|
|
||||||
/* Don't ever return an empty array until we're done iterating */
|
|
||||||
$redis->setOption(\Redis::OPT_SCAN, \Redis::SCAN_RETRY);
|
|
||||||
while($arr_keys = $redis->hScan($batch, $it, "*", $size)) {
|
|
||||||
foreach($arr_keys as $uid => $timestamp) {
|
|
||||||
do_log("$logPrefix $uid => $timestamp"); /* Print the hash member and value */
|
|
||||||
$sumInfo = NexusDB::table('snatched')
|
|
||||||
->selectRaw('sum(seedtime) as seedtime_sum, sum(leechtime) as leechtime_sum')
|
|
||||||
->where('userid', $uid)
|
|
||||||
->first();
|
|
||||||
if ($sumInfo && $sumInfo->seedtime_sum !== null) {
|
|
||||||
$update = [
|
|
||||||
'seedtime' => $sumInfo->seedtime_sum ?? 0,
|
|
||||||
'leechtime' => $sumInfo->leechtime_sum ?? 0,
|
|
||||||
'seed_time_updated_at' => Carbon::now()->toDateTimeString(),
|
|
||||||
];
|
|
||||||
NexusDB::table('users')
|
|
||||||
->where('id', $uid)
|
|
||||||
->update($update);
|
|
||||||
do_log("$logPrefix, [SUCCESS]: $uid => " . json_encode($update));
|
|
||||||
$count++;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
sleep(rand(1, 10));
|
|
||||||
}
|
|
||||||
return $count;
|
|
||||||
}
|
|
||||||
|
|
||||||
private static function updateTorrentSeedersEtc(\Redis $redis, $batch, $logPrefix)
|
|
||||||
{
|
|
||||||
$count = 0;
|
|
||||||
$size = 1000;
|
|
||||||
$it = NULL;
|
|
||||||
/* Don't ever return an empty array until we're done iterating */
|
|
||||||
$redis->setOption(\Redis::OPT_SCAN, \Redis::SCAN_RETRY);
|
|
||||||
while($arr_keys = $redis->hScan($batch, $it, "*", $size)) {
|
|
||||||
foreach($arr_keys as $torrentId => $timestamp) {
|
|
||||||
do_log("$logPrefix $torrentId => $timestamp"); /* Print the hash member and value */
|
|
||||||
$peerResult = NexusDB::table('peers')
|
|
||||||
->where('torrent', $torrentId)
|
|
||||||
->selectRaw("count(*) as count, seeder")
|
|
||||||
->groupBy('seeder')
|
|
||||||
->get()
|
|
||||||
;
|
|
||||||
$commentResult = NexusDB::table('comments')
|
|
||||||
->where('torrent', $torrentId)
|
|
||||||
->selectRaw("count(*) as count")
|
|
||||||
->first()
|
|
||||||
;
|
|
||||||
$update = [
|
|
||||||
'comments' => $commentResult && $commentResult->count !== null ? $commentResult->count : 0,
|
|
||||||
'seeders' => 0,
|
|
||||||
'leechers' => 0,
|
|
||||||
];
|
|
||||||
foreach ($peerResult as $item) {
|
|
||||||
if ($item->seeder == 'yes') {
|
|
||||||
$update['seeders'] = $item->count;
|
|
||||||
} elseif ($item->seeder == 'no') {
|
|
||||||
$update['leechers'] = $item->count;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
NexusDB::table('torrents')->where('id', $torrentId)->update($update);
|
|
||||||
do_log("$logPrefix, [SUCCESS]: $torrentId => " . json_encode($update));
|
|
||||||
$count++;
|
|
||||||
}
|
|
||||||
sleep(rand(1, 10));
|
|
||||||
}
|
|
||||||
return $count;
|
|
||||||
}
|
|
||||||
|
|
||||||
private static function getBatch(\Redis $redis, $batchKey)
|
private static function getBatch(\Redis $redis, $batchKey)
|
||||||
{
|
{
|
||||||
@@ -191,29 +164,6 @@ LUA;
|
|||||||
return date('Ymd_His');
|
return date('Ymd_His');
|
||||||
}
|
}
|
||||||
|
|
||||||
private static function calculateUserSeedBonus(\Redis $redis, $batch, $requestId): int
|
|
||||||
{
|
|
||||||
$count = 0;
|
|
||||||
$it = NULL;
|
|
||||||
$length = $redis->hLen($batch);
|
|
||||||
$page = 0;
|
|
||||||
/* Don't ever return an empty array until we're done iterating */
|
|
||||||
$redis->setOption(\Redis::OPT_SCAN, \Redis::SCAN_RETRY);
|
|
||||||
while($arr_keys = $redis->hScan($batch, $it, "*", self::$scanSize)) {
|
|
||||||
$delay = self::getDelay(0, $length, $page);
|
|
||||||
$idStr = implode(",", array_keys($arr_keys));
|
|
||||||
$command = sprintf(
|
|
||||||
'cleanup --action=seed_bonus --begin_id=%s --end_id=%s --id_str=%s --request_id=%s --delay=%s',
|
|
||||||
0, 0, $idStr, $requestId, $delay
|
|
||||||
);
|
|
||||||
$output = executeCommand($command, 'string', true);
|
|
||||||
do_log(sprintf('command: %s, output: %s', $command, $output));
|
|
||||||
$page++;
|
|
||||||
$count += count($arr_keys);
|
|
||||||
}
|
|
||||||
return $count;
|
|
||||||
}
|
|
||||||
|
|
||||||
private static function getOneTaskSeconds(): float|int
|
private static function getOneTaskSeconds(): float|int
|
||||||
{
|
{
|
||||||
if (self::$oneTaskSeconds == 0) {
|
if (self::$oneTaskSeconds == 0) {
|
||||||
|
|||||||
+2
-12
@@ -387,12 +387,7 @@ function docleanup($forceAll = 0, $printProgress = false) {
|
|||||||
// sql_query("UPDATE torrents SET " . implode(",", $update) . " WHERE id = $id") or sqlerr(__FILE__, __LINE__);
|
// sql_query("UPDATE torrents SET " . implode(",", $update) . " WHERE id = $id") or sqlerr(__FILE__, __LINE__);
|
||||||
// }
|
// }
|
||||||
|
|
||||||
$command = sprintf(
|
\App\Repositories\CleanupRepository::runBatchJobUpdateTorrentSeedersEtc($requestId);
|
||||||
'cleanup --action=seeders_etc --begin_id=%s --end_id=%s --request_id=%s --delay=%s',
|
|
||||||
0, 0, $requestId, 0
|
|
||||||
);
|
|
||||||
$output = executeCommand($command, 'string', true);
|
|
||||||
do_log(sprintf('command: %s, output: %s', $command, $output));
|
|
||||||
|
|
||||||
$log = "update count of seeders, leechers, comments for torrents";
|
$log = "update count of seeders, leechers, comments for torrents";
|
||||||
do_log($log);
|
do_log($log);
|
||||||
@@ -870,12 +865,7 @@ function docleanup($forceAll = 0, $printProgress = false) {
|
|||||||
// sql_query("UPDATE users SET seedtime = " . intval($arr2['st']) . ", leechtime = " . intval($arr2['lt']) . " WHERE id = " . $arr['id']) or sqlerr(__FILE__, __LINE__);
|
// sql_query("UPDATE users SET seedtime = " . intval($arr2['st']) . ", leechtime = " . intval($arr2['lt']) . " WHERE id = " . $arr['id']) or sqlerr(__FILE__, __LINE__);
|
||||||
// }
|
// }
|
||||||
|
|
||||||
$command = sprintf(
|
\App\Repositories\CleanupRepository::runBatchJobUpdateUserSeedingLeechingTime($requestId);
|
||||||
'cleanup --action=seeding_leeching_time --begin_id=%s --end_id=%s --request_id=%s --delay=%s',
|
|
||||||
0, 0, $requestId, 0
|
|
||||||
);
|
|
||||||
$output = executeCommand($command, 'string', true);
|
|
||||||
do_log(sprintf('command: %s, output: %s', $command, $output));
|
|
||||||
|
|
||||||
$log = "update total seeding and leeching time of users";
|
$log = "update total seeding and leeching time of users";
|
||||||
do_log($log);
|
do_log($log);
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
<?php
|
<?php
|
||||||
defined('VERSION_NUMBER') || define('VERSION_NUMBER', '1.8.5');
|
defined('VERSION_NUMBER') || define('VERSION_NUMBER', '1.8.5');
|
||||||
defined('RELEASE_DATE') || define('RELEASE_DATE', '2023-07-19');
|
defined('RELEASE_DATE') || define('RELEASE_DATE', '2023-07-22');
|
||||||
defined('IN_TRACKER') || define('IN_TRACKER', false);
|
defined('IN_TRACKER') || define('IN_TRACKER', false);
|
||||||
defined('PROJECTNAME') || define("PROJECTNAME","NexusPHP");
|
defined('PROJECTNAME') || define("PROJECTNAME","NexusPHP");
|
||||||
defined('NEXUSPHPURL') || define("NEXUSPHPURL","https://nexusphp.org");
|
defined('NEXUSPHPURL') || define("NEXUSPHPURL","https://nexusphp.org");
|
||||||
|
|||||||
Reference in New Issue
Block a user