onQueue('user_alive_sync'); } public function handle(): void { try { $updateAt = time(); $nowTs = time(); $now = now(); $nodeKey = $this->nodeType . $this->nodeId; $userUpdates = []; foreach ($this->data as $uid => $ips) { $cacheKey = self::CACHE_PREFIX . $uid; $ipsArray = Cache::get($cacheKey, []); $ipsArray = [ ...collect($ipsArray) ->filter(fn(mixed $value): bool => is_array($value) && ($updateAt - ($value['lastupdateAt'] ?? 0) <= self::NODE_DATA_EXPIRY)), $nodeKey => [ 'aliveips' => $ips, 'lastupdateAt' => $updateAt, ], ]; $count = UserOnlineService::calculateDeviceCount($ipsArray); $ipsArray['alive_ip'] = $count; Cache::put($cacheKey, $ipsArray, now()->addSeconds(self::CACHE_TTL)); $userUpdates[] = [ 'id' => (int) $uid, 'count' => (int) $count, ]; } if (!empty($userUpdates)) { $allIds = collect($userUpdates) ->pluck('id') ->filter() ->map(fn($v) => (int) $v) ->unique() ->values() ->all(); if (!empty($allIds)) { $existingIds = User::query() ->whereIn('id', $allIds) ->pluck('id') ->map(fn($v) => (int) $v) ->all(); if (!empty($existingIds)) { collect($userUpdates) ->filter(fn($row) => in_array((int) ($row['id'] ?? 0), $existingIds, true)) ->chunk(1000) ->each(function ($chunk) use ($now) { collect($chunk)->each(function ($update) use ($now) { $id = (int) ($update['id'] ?? 0); $count = (int) ($update['count'] ?? 0); if ($id > 0) { User::query() ->whereKey($id) ->update([ 'online_count' => $count, 'last_online_at' => $now, ]); } }); }); } } } } catch (\Throwable $e) { Log::error('UserAliveSyncJob failed', [ 'error' => $e->getMessage(), ]); $this->fail($e); } } }