feat: new xboard

This commit is contained in:
xboard
2025-01-21 14:57:54 +08:00
parent de18cfe596
commit 0f43fff242
373 changed files with 17923 additions and 20264 deletions
-52
View File
@@ -1,52 +0,0 @@
<?php
namespace App\Jobs;
use App\Models\User;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
class BatchTrafficFetchJob implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
protected $data;
protected $server;
protected $childServer;
protected $protocol;
protected $timestamp;
public $tries = 1;
public $timeout = 20;
/**
* Create a new job instance.
*
* @return void
*/
public function __construct(array $server, array $data, $protocol, int $timestamp, $childServer = null)
{
$this->onQueue('batch_traffic_fetch');
$this->server = $server;
$this->data = $data;
$this->protocol = $protocol;
$this->timestamp = $timestamp;
$this->childServer = $childServer;
}
public function handle(): void
{
$targetServer = $this->childServer ?? $this->server;
foreach ($this->data as $uid => $v) {
User::where('id', $uid)
->incrementEach(
[
'u' => $v[0] * $targetServer['rate'],
'd' => $v[1] * $targetServer['rate'],
],
['t' => time()]
);
}
}
}
+78
View File
@@ -0,0 +1,78 @@
<?php
namespace App\Jobs;
use App\Models\StatServer;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
use Illuminate\Support\Facades\DB;
class StatServerJob implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
protected array $data;
protected array $server;
protected string $protocol;
protected string $recordType;
public $tries = 3;
public $timeout = 60;
/**
* Create a new job instance.
*/
public function __construct(array $server, array $data, $protocol, string $recordType = 'd')
{
$this->onQueue('stat');
$this->data = $data;
$this->server = $server;
$this->protocol = $protocol;
$this->recordType = $recordType;
}
/**
* Execute the job.
*/
public function handle(): void
{
// Calculate record timestamp
$recordAt = $this->recordType === 'm'
? strtotime(date('Y-m-01'))
: strtotime(date('Y-m-d'));
// Aggregate traffic data
$u = $d = 0;
foreach ($this->data as $traffic) {
$u += $traffic[0];
$d += $traffic[1];
}
DB::transaction(function () use ($u, $d, $recordAt) {
$stat = StatServer::lockForUpdate()
->where('record_at', $recordAt)
->where('server_id', $this->server['id'])
->where('server_type', $this->protocol)
->where('record_type', $this->recordType)
->first();
if ($stat) {
$stat->u += $u;
$stat->d += $d;
$stat->save();
} else {
StatServer::create([
'record_at' => $recordAt,
'server_id' => $this->server['id'],
'server_type' => $this->protocol,
'record_type' => $this->recordType,
'u' => $u,
'd' => $d,
]);
}
});
}
}
+73
View File
@@ -0,0 +1,73 @@
<?php
namespace App\Jobs;
use App\Models\StatUser;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
use Illuminate\Support\Facades\DB;
class StatUserJob implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
protected array $data;
protected array $server;
protected string $protocol;
protected string $recordType;
public $tries = 3;
public $timeout = 60;
/**
* Create a new job instance.
*/
public function __construct(array $server, array $data, string $protocol, string $recordType = 'd')
{
$this->onQueue('stat');
$this->data = $data;
$this->server = $server;
$this->protocol = $protocol;
$this->recordType = $recordType;
}
/**
* Execute the job.
*/
public function handle(): void
{
// Calculate record timestamp
$recordAt = $this->recordType === 'm'
? strtotime(date('Y-m-01'))
: strtotime(date('Y-m-d'));
foreach ($this->data as $uid => $v) {
DB::transaction(function () use ($uid, $v, $recordAt) {
$stat = StatUser::lockForUpdate()
->where('user_id', $uid)
->where('server_rate', $this->server['rate'])
->where('record_at', $recordAt)
->where('record_type', $this->recordType)
->first();
if ($stat) {
$stat->u += ($v[0] * $this->server['rate']);
$stat->d += ($v[1] * $this->server['rate']);
$stat->save();
} else {
StatUser::create([
'user_id' => $uid,
'server_rate' => $this->server['rate'],
'record_at' => $recordAt,
'record_type' => $this->recordType,
'u' => ($v[0] * $this->server['rate']),
'd' => ($v[1] * $this->server['rate']),
]);
}
});
}
}
}
+69
View File
@@ -0,0 +1,69 @@
<?php
namespace App\Jobs;
use App\Models\User;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
use Illuminate\Support\Collection;
class SyncUserOnlineStatusJob implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
/**
* 任务最大尝试次数
*/
public int $tries = 3;
/**
* 任务可以运行的最大秒数
*/
public int $timeout = 30;
public function __construct(
private readonly array $updates
) {
}
/**
* 执行任务
*/
public function handle(): void
{
if (empty($this->updates)) {
return;
}
collect($this->updates)
->chunk(1000)
->each(function (Collection $chunk) {
$userIds = $chunk->pluck('id')->all();
User::query()
->whereIn('id', $userIds)
->each(function (User $user) use ($chunk) {
$update = $chunk->firstWhere('id', $user->id);
if ($update) {
$user->update([
'online_count' => $update['count'],
'last_online_at' => now(),
]);
}
});
});
}
/**
* 任务失败的处理
*/
public function failed(\Throwable $exception): void
{
\Log::error('Failed to sync user online status', [
'error' => $exception->getMessage(),
'updates_count' => count($this->updates)
]);
}
}
+19 -28
View File
@@ -12,47 +12,38 @@ use Illuminate\Queue\SerializesModels;
class TrafficFetchJob implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
protected $u;
protected $d;
protected $userId;
protected $data;
protected $server;
protected $protocol;
public $tries = 3;
public $timeout = 10;
protected $timestamp;
public $tries = 1;
public $timeout = 20;
/**
* Create a new job instance.
*
* @return void
*/
public function __construct($u, $d, $userId, array $server, $protocol)
public function __construct(array $server, array $data, $protocol, int $timestamp)
{
$this->onQueue('traffic_fetch');
$this->u = $u;
$this->d = $d;
$this->userId = $userId;
$this->server = $server;
$this->data = $data;
$this->protocol = $protocol;
$this->timestamp = $timestamp;
}
/**
* Execute the job.
*
* @return void
*/
public function handle()
public function handle(): void
{
\DB::transaction(function () {
$user = User::lockForUpdate()->find($this->userId);
if (!$user)
return;
$user->t = time();
$user->u = $user->u + ($this->u * $this->server['rate']);
$user->d = $user->d + ($this->d * $this->server['rate']);
if (!$user->save()) {
info("流量更新失败\n未记录用户ID:{$this->userId}\n未记录上行:{$user->u}\n未记录下行:{$user->d}");
}
});
foreach ($this->data as $uid => $v) {
User::where('id', $uid)
->incrementEach(
[
'u' => $v[0] * $this->server['rate'],
'd' => $v[1] * $this->server['rate'],
],
['t' => time()]
);
}
}
}
}