Files
chatroom/app/Services/WechatBot/KafkaConsumerService.php

134 lines
4.0 KiB
PHP
Raw Normal View History

<?php
/**
* 文件功能Kafka 消费者服务
*
* @author ChatRoom Laravel
*
* @version 1.0.0
*/
namespace App\Services\WechatBot;
use App\Models\SysParam;
use Illuminate\Support\Facades\Log;
use longlang\phpkafka\Consumer\Consumer;
use longlang\phpkafka\Consumer\ConsumerConfig;
class KafkaConsumerService
{
/**
* Kafka Broker 地址
*/
protected string $brokers = '';
/**
* 消费 Topic
*/
protected string $topic = '';
/**
* 消费者组 ID
*/
protected string $groupId = '';
/**
* 构造函数 SysParam 获取配置
*/
public function __construct()
{
$param = SysParam::where('alias', 'wechat_bot_config')->first();
if ($param && ! empty($param->body)) {
$config = json_decode($param->body, true);
$this->brokers = $config['kafka']['brokers'] ?? '';
$this->topic = $config['kafka']['topic'] ?? '';
$this->groupId = $config['kafka']['group_id'] ?? 'chatroom_wechat_bot';
}
}
/**
* 创建 Kafka 消费者实例
*/
public function createConsumer(): ?Consumer
{
if (empty($this->brokers) || empty($this->topic)) {
Log::warning('WechatBot Kafka: brokers or topic is empty. Consumer not started.');
return null;
}
$config = new ConsumerConfig;
$config->setBroker($this->brokers);
$config->setTopic($this->topic);
$config->setGroupId($this->groupId);
$config->setClientId('chatroom_wechat_bot_'.getmypid().'_'.uniqid());
$config->setGroupInstanceId('chatroom_wechat_bot_instance_'.getmypid().'_'.uniqid());
$config->setInterval(0.5); // 拉取间隔(秒)
$config->setGroupRetry(5); // 组协调重试次数
$config->setGroupRetrySleep(1); // 组协调重试间隔(秒)
return new Consumer($config);
}
/**
* 解析 Kafka 原始消息,提取有效的聊天消息列表
*
* @param string $rawJson Kafka 消息体 JSON 字符串
* @return array 解析后的消息数组
*/
public function parseKafkaMessage(string $rawJson): array
{
try {
$data = json_decode($rawJson, true, 512, JSON_THROW_ON_ERROR);
} catch (\JsonException $e) {
Log::warning('微信机器人 Kafka 消息 JSON 解析失败', ['error' => $e->getMessage()]);
return [];
}
// 只处理包含 AddMsgs 的消息
$addMsgs = $data['AddMsgs'] ?? [];
if (empty($addMsgs)) {
return [];
}
$messages = [];
foreach ($addMsgs as $msg) {
$fromUser = $msg['from_user_name']['str'] ?? '';
$toUser = $msg['to_user_name']['str'] ?? '';
$msgType = $msg['msg_type'] ?? 0;
$content = $msg['content']['str'] ?? '';
// 判断是否来自群聊
$isChatroom = str_contains($fromUser, '@chatroom');
// 群聊消息中from_user 是群ID实际发送者在 content 中以 "wxid:\n内容" 格式出现
$actualSender = $fromUser;
$actualContent = $content;
$chatroomId = null;
if ($isChatroom) {
$chatroomId = $fromUser;
// 解析群聊消息格式: "发送者wxid:\n实际内容"
if (preg_match('/^(.+?):\n(.*)$/s', $content, $matches)) {
$actualSender = $matches[1];
$actualContent = $matches[2];
}
}
$messages[] = [
'msg_id' => $msg['new_msg_id'] ?? $msg['msg_id'] ?? 0,
'from_user' => $actualSender,
'to_user' => $toUser,
'chatroom_id' => $chatroomId,
'msg_type' => $msgType,
'content' => $actualContent,
'raw_content' => $content,
'is_chatroom' => $isChatroom,
];
}
return $messages;
}
}