buy torrent use queue job

This commit is contained in:
xiaomlove
2024-11-03 15:29:21 +08:00
parent 42cda65027
commit 70dab3e8c5
11 changed files with 157 additions and 38 deletions
+2
View File
@@ -4,6 +4,7 @@ namespace App\Console;
use App\Jobs\CheckCleanup;
use App\Jobs\CheckQueueFailedJobs;
use App\Utils\ThirdPartyJob;
use Carbon\Carbon;
use Illuminate\Console\Scheduling\Event;
use Illuminate\Console\Scheduling\Schedule;
@@ -41,6 +42,7 @@ class Kernel extends ConsoleKernel
$schedule->command('meilisearch:import')->weeklyOn(1, "03:00")->withoutOverlapping();
$schedule->command('torrent:load_pieces_hash')->dailyAt("01:00")->withoutOverlapping();
$schedule->job(new CheckQueueFailedJobs())->everySixHours()->withoutOverlapping();
$schedule->job(new ThirdPartyJob())->everyMinute()->withoutOverlapping();
$this->registerScheduleCleanup($schedule);
}
+68
View File
@@ -0,0 +1,68 @@
<?php
namespace App\Jobs;
use App\Models\TorrentBuyLog;
use App\Repositories\BonusRepository;
use App\Repositories\TorrentRepository;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
class BuyTorrent implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
public int $userId;
public int $torrentId;
/**
* Create a new job instance.
*
* @return void
*/
public function __construct(int $userId, int $torrentId)
{
$this->userId = $userId;
$this->torrentId = $torrentId;
}
/**
* Execute the job.
*
* @return void
*/
public function handle()
{
$logPrefix = sprintf("user: %s, torrent: %s", $this->userId, $this->torrentId);
$torrentRep = new TorrentRepository();
$userId = $this->userId;
$torrentId = $this->torrentId;
$hasBuy = TorrentBuyLog::query()
->where("uid", $userId)
->where("torrent_id", $torrentId)
->exists()
;
if ($hasBuy) {
//标记购买成功
do_log("$logPrefix, already bought");
$torrentRep->addBuySuccessCache($userId, $torrentId);
return;
}
try {
$bonusRep = new BonusRepository();
$bonusRep->consumeToBuyTorrent($this->userId, $this->torrentId);
//标记购买成功
do_log("$logPrefix, buy torrent success");
$torrentRep->addBuySuccessCache($userId, $torrentId);
} catch (\Throwable $throwable) {
//标记购买失败,缓存 3600 秒,这个时间内不能再次购买
do_log("$logPrefix, buy torrent fail: " . $throwable->getMessage(), "error");
$torrentRep->addBuyFailCache($userId, $torrentId);
}
}
}
+3 -10
View File
@@ -49,6 +49,7 @@ class TorrentRepository extends BaseRepository
const BUY_STATUS_SUCCESS = 0;
const BUY_STATUS_NOT_YET = -1;
const BUY_STATUS_UNKNOWN = -2;
@@ -807,16 +808,8 @@ HTML;
//根据失败次数,禁用下载权限并做提示等
return $buyFailCount;
}
//购买失败缓存失效后,再重新查询数据库确定最终状态
$hasBuyFromDB = TorrentBuyLog::query()->where("uid", $uid)->where("torrent_id", $torrentId)->exists();
if ($hasBuyFromDB) {
//标记购买成功, 返回已购买
$this->addBuySuccessCache($uid, $torrentId);
return self::BUY_STATUS_SUCCESS;
} else {
//返回未购买,前端可执行购买逻辑
return self::BUY_STATUS_NOT_YET;
}
//不是成功或失败,直接返回未知
return self::BUY_STATUS_UNKNOWN;
}
/**
+70
View File
@@ -0,0 +1,70 @@
<?php
namespace App\Utils;
use App\Jobs\BuyTorrent;
use http\Exception\InvalidArgumentException;
use Illuminate\Support\Facades\Queue;
use Nexus\Database\NexusDB;
use Nexus\Database\NexusLock;
final class ThirdPartyJob {
private static string $queueKey = "nexus_third_party_job";
private static int $size = 20;
const JOB_BUY_TORRENT = "buyTorrent";
public function __invoke(): void
{
$lockName = convertNamespaceToSnake(__METHOD__);
$lock = new NexusLock($lockName, 600);
if (!$lock->get()) {
do_log("can not get lock: $lockName, return ...");
return;
}
$list = NexusDB::redis()->lRange(self::$queueKey, 0, self::$size);
$successCount = 0;
foreach ($list as $item) {
$data = json_decode($item, true);
if (!empty($data['name'])) {
$successCount++;
match ($data['name']) {
self::JOB_BUY_TORRENT => self::enqueueJobBuyTorrent($data),
default => throw new InvalidArgumentException("invalid name: {$data['name']}")
};
} else {
do_log(sprintf("%s no name, skip", $item), "error");
}
NexusDB::redis()->lRem(self::$queueKey, $item);
}
do_log(sprintf("success dispatch %s jobs", $successCount));
$lock->release();
}
public static function addBuyTorrent(int $userId, int $torrentId): void
{
$key = sprintf("%s:%s_%s_%s", self::$queueKey, convertNamespaceToSnake(__METHOD__), $userId, $torrentId);
if (NexusDB::redis()->set($key, now()->toDateTimeString(), ['nx', 'ex' => 3600])) {
$value = [
'name' => self::JOB_BUY_TORRENT,
'userId' => $userId,
'torrentId' => $torrentId,
];
NexusDB::redis()->rPush(self::$queueKey, json_encode($value));
do_log("success addBuyTorrent: $key", "debug");
} else {
do_log("no need to addBuyTorrent: $key", "debug");
}
}
private static function enqueueJobBuyTorrent(array $params): void
{
if (!empty($params['userId']) && !empty($params['torrentId'])) {
$job = new BuyTorrent($params['userId'], $params['torrentId']);
Queue::push($job);
} else {
do_log("no userId or torrentId: " . json_encode($params), "error");
}
}
}