Merge pull request #755 from socksprox/feat/server-id-stat-user

feat: Track user traffic per node (server_id)
This commit is contained in:
Xboard
2026-03-30 13:55:11 +08:00
committed by GitHub
7 changed files with 345 additions and 10 deletions

View File

@@ -0,0 +1,189 @@
# Per-Node User Traffic Tracking - Implementation Summary
## Changes Made
### 1. Database Migration
**File:** `database/migrations/2025_11_29_000000_add_server_id_to_stat_user.php`
- Added `server_id` column to `v2_stat_user` table
- Column is **nullable** for backward compatibility with existing data
- Added composite index `user_server_record_idx` for efficient per-node queries
### 2. StatUserJob Updates
**File:** `app/Jobs/StatUserJob.php`
Updated all three database-specific methods to include `server_id`:
- `processUserStatForSqlite()` - Added `server_id` to WHERE clause and CREATE
- `processUserStatForOtherDatabases()` - Added `server_id` to upsert unique key
- `processUserStatForPostgres()` - Added `server_id` to ON CONFLICT clause
### 3. StatUser Model
**File:** `app/Models/StatUser.php`
- Added `@property int|null $server_id` documentation
- Added `server()` relationship method to Server model
## How It Works
### Node Identification Flow
1. **Node sends traffic report:**
```http
POST /api/v1/server/UniProxy/push?node_type=vmess&node_id=5&token=xxx
```
2. **Middleware extracts node info:**
- `Server` middleware validates `node_id` and `token`
- Loads full server object from database
- Injects into `$request->attributes->set('node_info', $serverInfo)`
3. **Controller passes to service:**
- `$server = $request->attributes->get('node_info')`
- Contains `$server->id`, `$server->rate`, etc.
4. **StatUserJob now uses `server_id`:**
- Creates/updates records with composite key: `(user_id, server_id, server_rate, record_at, record_type)`
- Traffic from different nodes is now stored separately
### Record Creation Logic
**NEW behavior with `server_id`:**
| Scenario | Result |
|----------|--------|
| Same user, same node, same day | **UPDATE** existing record (accumulate traffic) |
| Same user, different node, same day | **CREATE** separate records per node |
| Same user, same node, next day | **CREATE** new record (new day) |
**OLD behavior (without `server_id`):**
- Same user, same rate, same day → Single aggregated record (couldn't differentiate nodes)
## Backward Compatibility
### ✅ Existing Queries Continue to Work
All existing queries that aggregate user traffic will work unchanged:
```php
// Example: User consumption ranking (aggregates across all nodes)
StatUser::select([
'user_id',
DB::raw('sum(u) as u'),
DB::raw('sum(d) as d'),
DB::raw('sum(u) + sum(d) as total')
])
->where('record_at', '>=', $startAt)
->where('record_at', '<', $endAt)
->groupBy('user_id')
->orderBy('total', 'DESC')
->get();
```
This query:
- Works with old records (where `server_id` is NULL)
- Works with new records (where `server_id` is populated)
- Correctly sums traffic across all nodes per user
### ✅ API Endpoints Unchanged
- **No changes** to admin API endpoints
- **No changes** to user API endpoints
- **No changes** to node API endpoints (they already send `node_id`)
### ✅ Legacy Data Preserved
- Old records without `server_id` remain valid
- Represent aggregated historical data
- New records will have `server_id` populated
## New Capabilities
### Per-Node User Traffic Analysis
You can now query traffic per user per node:
```php
// Get user's traffic breakdown by node
StatUser::where('user_id', $userId)
->where('record_at', '>=', $startDate)
->whereNotNull('server_id') // Only new records
->groupBy('server_id')
->selectRaw('server_id, SUM(u) as upload, SUM(d) as download')
->get();
```
### Example Use Cases
1. **Identify which nodes a user uses most**
2. **Detect unusual traffic patterns per node**
3. **Analyze node-specific user behavior**
4. **Generate per-node billing reports**
## Migration Instructions
1. **Run the migration:**
```bash
php artisan migrate
```
2. **Deploy code changes** - No downtime required
3. **Verify:**
- Old data remains queryable
- New traffic reports populate `server_id`
- Existing dashboards continue to work
## Database Schema
### Before
```sql
CREATE TABLE v2_stat_user (
id INT PRIMARY KEY,
user_id INT,
server_rate DECIMAL(10),
u BIGINT,
d BIGINT,
record_type CHAR(2),
record_at INT,
created_at INT,
updated_at INT,
UNIQUE KEY (server_rate, user_id, record_at)
);
```
### After
```sql
CREATE TABLE v2_stat_user (
id INT PRIMARY KEY,
user_id INT,
server_id INT NULL, -- NEW
server_rate DECIMAL(10),
u BIGINT,
d BIGINT,
record_type CHAR(2),
record_at INT,
created_at INT,
updated_at INT,
UNIQUE KEY (server_rate, user_id, record_at), -- Old unique key still exists
INDEX user_server_record_idx (user_id, server_id, record_at) -- NEW
);
```
## Testing Checklist
- [ ] Run migration successfully
- [ ] Node reports traffic → `server_id` is populated
- [ ] Same user on different nodes → separate records created
- [ ] Same user on same node → traffic accumulates in single record
- [ ] Existing admin dashboards show correct totals
- [ ] User traffic logs display correctly
- [ ] Old records (server_id=NULL) are still queryable
- [ ] SUM queries aggregate correctly across nodes
## Notes
- The `server_id` is sourced from the `node_id` parameter that nodes already send
- No changes needed to node software - they already provide this information
- The composite unique key now effectively includes `server_id` in the WHERE clauses
- PostgreSQL ON CONFLICT clause updated to match new unique constraint

View File

@@ -14,13 +14,29 @@ class StatController extends Controller
public function getTrafficLog(Request $request)
{
$startDate = now()->startOfMonth()->timestamp;
// Aggregate per-node data into per-day entries for backward compatibility
$records = StatUser::query()
->select([
'user_id',
'server_rate',
'record_at',
'record_type',
DB::raw('SUM(u) as u'),
DB::raw('SUM(d) as d'),
])
->where('user_id', $request->user()->id)
->where('record_at', '>=', $startDate)
->groupBy(['user_id', 'server_rate', 'record_at', 'record_type'])
->orderBy('record_at', 'DESC')
->get();
->get()
->map(function ($item) {
$item->u = (int) $item->u;
$item->d = (int) $item->d;
return $item;
});
$data = TrafficLogResource::collection(collect($records));
$data = TrafficLogResource::collection($records);
return $this->success($data);
}
}

View File

@@ -13,6 +13,7 @@ use App\Models\Ticket;
use App\Models\User;
use App\Services\StatisticalService;
use Illuminate\Http\Request;
use Illuminate\Support\Facades\DB;
class StatController extends Controller
{
@@ -234,14 +235,38 @@ class StatController extends Controller
]);
$pageSize = $request->input('pageSize', 10);
$records = StatUser::orderBy('record_at', 'DESC')
$page = $request->input('page', 1);
// Aggregate per-node data into per-day entries for backward compatibility
$query = StatUser::query()
->select([
'user_id',
'server_rate',
'record_at',
'record_type',
DB::raw('SUM(u) as u'),
DB::raw('SUM(d) as d'),
DB::raw('MAX(created_at) as created_at'),
DB::raw('MAX(updated_at) as updated_at'),
])
->where('user_id', $request->input('user_id'))
->paginate($pageSize);
->groupBy(['user_id', 'server_rate', 'record_at', 'record_type'])
->orderBy('record_at', 'DESC');
// Manual pagination for grouped query
$total = (clone $query)->get()->count();
$data = $query->skip(($page - 1) * $pageSize)->take($pageSize)->get()
->map(function ($item) {
$item->u = (int) $item->u;
$item->d = (int) $item->d;
$item->created_at = (int) $item->created_at;
$item->updated_at = (int) $item->updated_at;
return $item;
});
$data = $records->items();
return [
'data' => $data,
'total' => $records->total(),
'total' => $total,
];
}

View File

@@ -78,6 +78,7 @@ class StatUserJob implements ShouldQueue
DB::transaction(function () use ($uid, $v, $recordAt) {
$existingRecord = StatUser::where([
'user_id' => $uid,
'server_id' => $this->server['id'],
'server_rate' => $this->server['rate'],
'record_at' => $recordAt,
'record_type' => $this->recordType,
@@ -92,6 +93,7 @@ class StatUserJob implements ShouldQueue
} else {
StatUser::create([
'user_id' => $uid,
'server_id' => $this->server['id'],
'server_rate' => $this->server['rate'],
'record_at' => $recordAt,
'record_type' => $this->recordType,
@@ -109,6 +111,7 @@ class StatUserJob implements ShouldQueue
StatUser::upsert(
[
'user_id' => $uid,
'server_id' => $this->server['id'],
'server_rate' => $this->server['rate'],
'record_at' => $recordAt,
'record_type' => $this->recordType,
@@ -117,7 +120,7 @@ class StatUserJob implements ShouldQueue
'created_at' => time(),
'updated_at' => time(),
],
['user_id', 'server_rate', 'record_at', 'record_type'],
['user_id', 'server_id', 'server_rate', 'record_at', 'record_type'],
[
'u' => DB::raw("u + VALUES(u)"),
'd' => DB::raw("d + VALUES(d)"),
@@ -136,9 +139,9 @@ class StatUserJob implements ShouldQueue
$u = intval($v[0] * $this->server['rate']);
$d = intval($v[1] * $this->server['rate']);
$sql = "INSERT INTO {$table} (user_id, server_rate, record_at, record_type, u, d, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT (user_id, server_rate, record_at)
$sql = "INSERT INTO {$table} (user_id, server_id, server_rate, record_at, record_type, u, d, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT (user_id, server_id, server_rate, record_at)
DO UPDATE SET
u = {$table}.u + EXCLUDED.u,
d = {$table}.d + EXCLUDED.d,
@@ -146,6 +149,7 @@ class StatUserJob implements ShouldQueue
DB::statement($sql, [
$uid,
$this->server['id'],
$this->server['rate'],
$recordAt,
$this->recordType,

View File

@@ -9,6 +9,7 @@ use Illuminate\Database\Eloquent\Model;
*
* @property int $id
* @property int $user_id 用户ID
* @property int|null $server_id 节点ID (nullable for legacy data)
* @property int $u 上行流量
* @property int $d 下行流量
* @property int $record_at 记录时间
@@ -25,4 +26,12 @@ class StatUser extends Model
'created_at' => 'timestamp',
'updated_at' => 'timestamp'
];
/**
* Get the server that this traffic stat belongs to
*/
public function server()
{
return $this->belongsTo(Server::class, 'server_id');
}
}

View File

@@ -0,0 +1,37 @@
<?php
use Illuminate\Database\Migrations\Migration;
use Illuminate\Database\Schema\Blueprint;
use Illuminate\Support\Facades\Schema;
return new class extends Migration
{
/**
* Run the migrations.
*/
public function up(): void
{
Schema::table('v2_stat_user', function (Blueprint $table) {
// Add server_id column as nullable for backward compatibility
$table->integer('server_id')->nullable()->after('user_id')->comment('节点ID (nullable for legacy data)');
// Add index for per-node queries
if (config('database.default') !== 'sqlite') {
$table->index(['user_id', 'server_id', 'record_at'], 'user_server_record_idx');
}
});
}
/**
* Reverse the migrations.
*/
public function down(): void
{
Schema::table('v2_stat_user', function (Blueprint $table) {
if (config('database.default') !== 'sqlite') {
$table->dropIndex('user_server_record_idx');
}
$table->dropColumn('server_id');
});
}
};

View File

@@ -0,0 +1,55 @@
<?php
use Illuminate\Database\Migrations\Migration;
use Illuminate\Database\Schema\Blueprint;
use Illuminate\Support\Facades\Schema;
return new class extends Migration
{
/**
* Run the migrations.
*
* Updates the unique constraint on v2_stat_user to include server_id,
* allowing per-node user traffic tracking.
*/
public function up(): void
{
if (config('database.default') === 'sqlite') {
// SQLite uses explicit WHERE queries in code, no constraint changes needed
return;
}
Schema::table('v2_stat_user', function (Blueprint $table) {
// Drop the old unique constraint that doesn't include server_id
$table->dropUnique('server_rate_user_id_record_at');
// Add new unique constraint including server_id
// Note: NULL server_id values (legacy) are treated as distinct in MySQL
$table->unique(
['user_id', 'server_id', 'server_rate', 'record_at', 'record_type'],
'stat_user_unique_idx'
);
});
}
/**
* Reverse the migrations.
*/
public function down(): void
{
if (config('database.default') === 'sqlite') {
return;
}
Schema::table('v2_stat_user', function (Blueprint $table) {
// Drop new constraint
$table->dropUnique('stat_user_unique_idx');
// Restore original constraint
$table->unique(
['server_rate', 'user_id', 'record_at'],
'server_rate_user_id_record_at'
);
});
}
};