kafkaService = $kafkaService; } public function handle(): int { $this->info('正在启动微信机器人 Kafka 消费者...'); $consumer = $this->kafkaService->createConsumer(); if (! $consumer) { $this->error('Kafka 配置不完整或加载失败,请在后台检查机器人设置。'); return self::FAILURE; } $this->info('消费者已启动,等待消息...'); $apiService = new WechatBotApiService; while (true) { try { $messageJson = $consumer->consume(); if ($messageJson) { $rawJson = $messageJson->getValue(); $this->info('--> 收到新的 Kafka 消息 (Raw Length: '.strlen($rawJson).')'); $messages = $this->kafkaService->parseKafkaMessage($rawJson); if (empty($messages)) { $this->info('--> 解析后:无匹配的 AddMsgs 内容'); } foreach ($messages as $msg) { try { $this->processMessage($msg, $apiService); } catch (\Exception $e) { Log::error('处理单条微信消息失败', [ 'error' => $e->getMessage(), 'msg' => $msg, ]); } } $consumer->ack($messageJson); } } catch (\Exception $e) { Log::error('Kafka 消费异常', ['error' => $e->getMessage()]); // 延迟重试避免死循环 CPU 空转 sleep(2); } } return self::SUCCESS; } /** * 处理单条消息逻辑 */ protected function processMessage(array $msg, WechatBotApiService $apiService): void { // 仅处理文本消息 (msg_type = 1) if ($msg['msg_type'] != 1) { return; } $content = trim($msg['content']); $fromUser = $msg['from_user']; $isChatroom = $msg['is_chatroom']; // 绑定逻辑:支持私聊和被授权的微信群。只要内容格式为 BD-xxxxxx if (preg_match('/^BD-\d{6}$/i', $content)) { // 如果是群聊,则仅允许在后台设定的目标通知群里进行扫码绑定 if ($isChatroom) { $sysParam = \App\Models\SysParam::where('alias', 'wechat_bot_config')->first(); $config = $sysParam && ! empty($sysParam->body) ? json_decode($sysParam->body, true) : []; $allowedGroupWxid = $config['group_notify']['target_wxid'] ?? ''; if ($msg['chatroom_id'] !== $allowedGroupWxid) { $this->info("拒绝绑定:来自非授权群聊 {$msg['chatroom_id']}"); return; } } $replyTarget = $isChatroom ? $msg['chatroom_id'] : $fromUser; $this->info("收到潜在绑定请求: {$content} from {$fromUser} (Reply to: {$replyTarget})"); $this->handleBindRequest(strtoupper($content), $fromUser, $replyTarget, $apiService); } } /** * 处理账号绑定请求 */ protected function handleBindRequest(string $code, string $wxid, string $replyTarget, WechatBotApiService $apiService): void { $cacheKey = 'wechat_bind_code:'.$code; $username = Cache::get($cacheKey); if (! $username) { $apiService->sendTextMessage($replyTarget, '❌ 绑定失败:该验证码无效或已过有效期(5分钟)。请在个人中心重新生成。'); return; } $user = User::where('username', $username)->first(); if (! $user) { $apiService->sendTextMessage($replyTarget, '❌ 绑定失败:找不到对应的用户账号。'); return; } // 判断该微信号是否已经被其他用户绑定(防止碰撞或安全隐患) $existing = User::where('wxid', $wxid)->where('id', '!=', $user->id)->first(); if ($existing) { $apiService->sendTextMessage($replyTarget, "❌ 绑定失败:当前微信号已经被其他账号 [{$existing->username}] 绑定。请先解绑后再试。"); return; } $user->wxid = $wxid; $user->save(); // 验证成功后立即销毁验证码 Cache::forget($cacheKey); $this->info("用户 [{$username}] 成功绑定微信: {$wxid}"); $successMsg = "🎉 绑定成功!\n" ."您已成功绑定聊天室账号:[{$username}]。\n" .'现在您可以接收重要系统通知了。'; $apiService->sendTextMessage($replyTarget, $successMsg); } }