房间 ID 数组 */ public function getUserRooms(string $username): array { $rooms = []; $cursor = '0'; do { [$cursor, $keys] = Redis::scan($cursor, ['match' => 'room:*:users', 'count' => 100]); foreach ($keys ?? [] as $key) { if (Redis::hexists($key, $username)) { // 从 key "room:123:users" 中提取房间 ID preg_match('/room:(\d+):users/', $key, $matches); if (isset($matches[1])) { $rooms[] = (int) $matches[1]; } } } } while ($cursor !== '0'); return $rooms; } /** * 获取指定房间的所有在线用户列表。 * * @param int $roomId 房间ID */ public function getRoomUsers(int $roomId): array { $key = "room:{$roomId}:users"; $users = Redis::hgetall($key); $result = []; foreach ($users as $username => $jsonInfo) { $result[$username] = json_decode($jsonInfo, true); } return $result; } /** * 将一条新发言推入 Redis 列表,并限制最大保留数量,防止内存泄漏。 * * @param int $roomId 房间ID * @param array $message 发言数据包 * @param int $maxKeep 最大保留条数 */ public function pushMessage(int $roomId, array $message, int $maxKeep = 100): void { $key = "room:{$roomId}:messages"; Redis::rpush($key, json_encode($message, JSON_UNESCAPED_UNICODE)); // 仅保留最新的 $maxKeep 条,旧的自动截断弹弃 (-$maxKeep 到 -1 的区间保留) Redis::ltrim($key, -$maxKeep, -1); } /** * 获取指定房间的新发言记录。 * 在高频长轮询或前端断线重连拉取时使用。 * * @param int $roomId 房间ID * @param int $lastId 客户端收到的最后一条发言的ID */ public function getNewMessages(int $roomId, int $lastId): array { $key = "room:{$roomId}:messages"; $messages = Redis::lrange($key, 0, -1); // 获取当前缓存的全部 $newMessages = []; foreach ($messages as $msgJson) { $msg = json_decode($msgJson, true); if (isset($msg['id']) && $msg['id'] > $lastId) { $newMessages[] = $msg; } } return $newMessages; } /** * 分布式发号器:为房间内的新消息生成绝对递增的 ID。 * 解决了 MySQL 自增在极致并发下依赖读锁的问题。 * * @param int $roomId 房间ID */ public function nextMessageId(int $roomId): int { $key = "room:{$roomId}:message_seq"; return Redis::incr($key); } /** * 读取系统参数并设置合理的缓存。 * 替代每次都去 MySQL query 的性能损耗。 * * @param string $alias 别名 */ public function getSysParam(string $alias): ?string { return Cache::remember("sys_param:{$alias}", 60, function () use ($alias) { return SysParam::where('alias', $alias)->value('body'); }); } /** * 更新系统参数并刷新 Cache 缓存。 * * @param string $alias 别名 * @param mixed $value 参数值 */ public function setSysParam(string $alias, mixed $value): void { Cache::put("sys_param:{$alias}", $value); } /** * 提供一个分布式的 Redis 互斥锁包围。 * 防止并发抢占资源(如同时创建重名房间,同时修改某一敏感属性)。 * * @param string $key 锁名称 * @param callable $callback 获得锁后执行的闭包 * @param int $timeout 锁超时时间(秒) * @return mixed */ public function withLock(string $key, callable $callback, int $timeout = 5) { $lockKey = "lock:{$key}"; // 尝试获取锁,set nx ex $isLocked = Redis::set($lockKey, 1, 'EX', $timeout, 'NX'); if (! $isLocked) { // 获取锁失败,业务可自行决定抛异常或重试 throw new \Exception("The lock {$key} is currently held by another process."); } try { return $callback(); } finally { Redis::del($lockKey); } } }