简介
RocketMQ是由阿里捐赠给Apache的一款低延迟、高并发、高可用、高可靠的分布式消息中间件。经历了淘宝双十一的洗礼。RocketMQ既可为分布式应用系统提供异步解耦和削峰填谷的能力,同时也具备互联网应用所需的海量消息堆积、高吞吐、可靠重试等特性。
领域模型
Apache RocketMQ 是一款典型的分布式架构下的中间件产品,使用异步通信方式和发布订阅的消息传输模型。通信方式和传输模型的具体说明,请参见下文通信方式介绍和消息传输模型介绍。
Apache RocketMQ 产品具备异步通信的优势,系统拓扑简单、上下游耦合较弱,主要应用于异步解耦,流量削峰填谷等场景。

如上图所示,Apache RocketMQ 中消息的生命周期主要分为消息生产、消息存储、消息消费这三部分。
生产者生产消息并发送至 Apache RocketMQ 服务端,消息被存储在服务端的主题中,消费者通过订阅主题消费消息。
发布订阅模型

发布订阅模型具有如下特点:
消费独立:相比队列模型的匿名消费方式,发布订阅模型中消费方都会具备的身份,一般叫做订阅组(订阅关系),不同订阅组之间相互独立不会相互影响。 一对多通信:基于独立身份的设计,同一个主题内的消息可以被多个订阅组处理,每个订阅组都可以拿到全量消息。因此发布订阅模型可以实现一对多通信。
准备环境
在使用PHP SDK收发消息前,您需按照本文提供的内容来准备环境。
环境要求
安装PHP 5.5.0或以上版本
重要: 对于PHP版本小于7.2.5的运行环境,需要将Composer依赖降低到2.2.x或以下版本。
安装完成后,您可以执行php -v
命令查看PHP语言版本。
安装SDK
执行以下步骤安装PHP SDK。
-
在您PHP安装目录下的composer.json文件中加入以下依赖
{
"require": {
"aliyunmq/mq-http-sdk": ">=1.0.4"
}
}
-
执行以下命令,通过Composer安装依赖。
composer install
代码实现
发送普通消息
发送普通消息的示例代码如下
<?php
require "vendor/autoload.php";
use MQModelTopicMessage;
use MQMQClient;
class ProducerTest
{
private $client;
private $producer;
public function __construct()
{
$this->client = new MQClient(
// 设置HTTP协议客户端接入点,进入消息队列RocketMQ版控制台实例详情页面的接入点区域查看。
"${HTTP_ENDPOINT}",
// 请确保环境变量ALIBABA_CLOUD_ACCESS_KEY_ID、ALIBABA_CLOUD_ACCESS_KEY_SECRET已设置。
// AccessKey ID,阿里云身份验证标识。
getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"),
// AccessKey Secret,阿里云身份验证密钥。
getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET')
);
// 消息所属的Topic,在消息队列RocketMQ版控制台创建。
$topic = "${TOPIC}";
// Topic所属的实例ID,在消息队列RocketMQ版控制台创建。
// 若实例有命名空间,则实例ID必须传入;若实例无命名空间,则实例ID传入null空值或字符串空值。实例的命名空间可以在消息队列RocketMQ版控制台的实例详情页面查看。
$instanceId = "${INSTANCE_ID}";
$this->producer = $this->client->getProducer($instanceId, $topic);
}
public function run()
{
try
{
for ($i=1; $i<=4; $i++)
{
$publishMessage = new TopicMessage(
// 消息内容。
"hello mq!"
);
// 设置消息的自定义属性。
$publishMessage->putProperty("a", $i);
// 设置消息的Key。
$publishMessage->setMessageKey("MessageKey");
$result = $this->producer->publishMessage($publishMessage);
print "Send mq message success. msgId is:" . $result->getMessageId() . ", bodyMD5 is:" . $result->getMessageBodyMD5() . "n";
}
} catch (Exception $e) {
print_r($e->getMessage() . "n");
}
}
}
$instance = new ProducerTest();
$instance->run();
订阅普通消息
订阅普通消息的示例代码如下 consumer.php
<?php
use MQMQClient;
require "vendor/autoload.php";
class ConsumerTest
{
private $client;
private $consumer;
public function __construct()
{
$this->client = new MQClient(
// 设置HTTP协议客户端接入点,进入消息队列RocketMQ版控制台实例详情页面的接入点区域查看。
"${HTTP_ENDPOINT}",
// 请确保环境变量ALIBABA_CLOUD_ACCESS_KEY_ID、ALIBABA_CLOUD_ACCESS_KEY_SECRET已设置。
// AccessKey ID,阿里云身份验证标识。
getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"),
// AccessKey Secret,阿里云身份验证密钥。
getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET')
);
// 消息所属的Topic,在消息队列RocketMQ版控制台创建。
$topic = "${TOPIC}";
// 您在消息队列RocketMQ版控制台创建的Group ID。
$groupId = "${GROUP_ID}";
// Topic所属的实例ID,在消息队列RocketMQ版控制台创建。
// 若实例有命名空间,则实例ID必须传入;若实例无命名空间,则实例ID传入null空值或字符串空值。实例的命名空间可以在消息队列RocketMQ版控制台的实例详情页面查看。
$instanceId = "${INSTANCE_ID}";
$this->consumer = $this->client->getConsumer($instanceId, $topic, $groupId);
}
public function ackMessages($receiptHandles)
{
try {
$this->consumer->ackMessage($receiptHandles);
} catch (Exception $e) {
if ($e instanceof MQExceptionAckMessageException) {
// 某些消息的句柄可能超时,会导致消费确认失败。
printf("Ack Error, RequestId:%sn", $e->getRequestId());
foreach ($e->getAckMessageErrorItems() as $errorItem) {
printf("tReceiptHandle:%s, ErrorCode:%s, ErrorMsg:%sn", $errorItem->getReceiptHandle(), $errorItem->getErrorCode(), $errorItem->getErrorCode());
}
}
}
}
public function run()
{
// 在当前线程循环消费消息,建议多开个几个线程并发消费消息。
while (True) {
try {
// 长轮询消费消息。
// 若Topic内没有消息,请求会在服务端挂起一段时间(长轮询时间),期间如果有消息可以消费则立即返回客户端。
$messages = $this->consumer->consumeMessage(
3, // 一次最多消费3条(最多可设置为16条)。
3 // 长轮询时间3秒(最多可设置为30秒)。
);
} catch (MQExceptionMessageResolveException $e) {
// 当出现消息Body存在不合法字符,无法解析的时候,会抛出此异常。
// 可以正常解析的消息列表。
$messages = $e->getPartialResult()->getMessages();
// 无法正常解析的消息列表。
$failMessages = $e->getPartialResult()->getFailResolveMessages();
$receiptHandles = array();
foreach ($messages as $message) {
// 处理业务逻辑。
$receiptHandles[] = $message->getReceiptHandle();
printf("MsgID %sn", $message->getMessageId());
}
foreach ($failMessages as $failMessage) {
// 处理存在不合法字符,无法解析的消息。
$receiptHandles[] = $failMessage->getReceiptHandle();
printf("Fail To Resolve Message. MsgID %sn", $failMessage->getMessageId());
}
$this->ackMessages($receiptHandles);
continue;
} catch (Exception $e) {
if ($e instanceof MQExceptionMessageNotExistException) {
// 没有消息可以消费,继续轮询。
printf("No message, contine long polling!RequestId:%sn", $e->getRequestId());
continue;
}
print_r($e->getMessage() . "n");
sleep(3);
continue;
}
print "consume finish, messages:n";
// 处理业务逻辑。
$receiptHandles = array();
foreach ($messages as $message) {
$receiptHandles[] = $message->getReceiptHandle();
printf("MessageID:%s TAG:%s BODY:%s nPublishTime:%d, FirstConsumeTime:%d, nConsumedTimes:%d, NextConsumeTime:%d,MessageKey:%sn",
$message->getMessageId(), $message->getMessageTag(), $message->getMessageBody(),
$message->getPublishTime(), $message->getFirstConsumeTime(), $message->getConsumedTimes(), $message->getNextConsumeTime(),
$message->getMessageKey());
print_r($message->getProperties());
}
// $message->getNextConsumeTime()前若不确认消息消费成功,则消息会被重复消费。
// 消息句柄有时间戳,同一条消息每次消费拿到的都不一样。
print_r($receiptHandles);
$this->ackMessages($receiptHandles);
print "ack finishn";
}
}
}
$instance = new ConsumerTest();
$instance->run();
消费端进程
CLI 和 CGI 的区别
首先来看一下 CLI 和 CGI 的区别。我们都知道,Nginx 使用的是 FastCgi 来调用 PHP 的服务。CGI 是通用编程接口,也就是给调用者提供的一种使用本程序的接口。Nginx 这种类型的服务器并不是直接运行 PHP 程序的,而是通过 FastCgi 来执行 PHP 程序并获得返回结果。
CLI 则是 Command Line Interface
,即命令行接口。主要用作 PHP 的开发外壳应用。也就是用 PHP 来进行 shell 脚本的开发。相比 linux 原生的 shell 来说,当然是方便了许多。在命令行状态下,直接使用 php 命令就可以运行某段 PHP 代码或某个 PHP 文件了。
Supervisor 进程管理
模式启动的脚本无法常驻内存中,或者说无法以守护进程的模式在后台启动。这里使用Supervisor 进程管理
来启动消费端的脚本。
简介
Supervisor是用Python开发的一套通用的进程管理程序,能将一个普通的命令行进程变为后台daemon,并监控进程状态,异常退出时能自动重启。它是通过fork/exec的方式把这些被管理的进程当作supervisor的子进程来启动,这样只要在supervisor的配置文件中,把要管理的进程的可执行文件的路径写进去即可。
也实现当子进程挂掉的时候,父进程可以准确获取子进程挂掉的信息的,可以选择是否自己启动和报警。supervisor还提供了一个功能,可以为supervisord或者每个子进程,设置一个非root的user,这个user就可以管理它对应的进程。
安装
步骤 1 更新依赖库
apt-get update -y
步骤 2 安装 supervisor
默认情况下,Supervisor 包在 Ubuntu 20.04 默认存储库中可用。您可以使用以下命令安装它:
apt-get install supervisor -y
安装 Supervisor 后,您可以使用以下命令验证已安装的 Supervisor 版本:
supervisord -v
接下来,使用以下命令验证 Supervisor 服务的状态:
sudo systemctl status supervisor
[sudo] password for www:
● supervisor.service - Supervisor process control system for UNIX
Loaded: loaded (/lib/systemd/system/supervisor.service; enabled; vendor preset: enabled)
Active: active (running) since Wed 2021-07-14 15:36:03 CST; 23min ago
Docs: http://supervisord.org
Main PID: 104478 (supervisord)
Tasks: 2 (limit: 2315)
Memory: 25.4M
CGroup: /system.slice/supervisor.service
├─104478 /usr/bin/python3 /usr/bin/supervisord -n -c /etc/supervisor/supervisord.conf
使用Supervisor管理订阅消息进程
您需要为要管理的每个服务创建一个独立的配置文件。您可以使用以下命令创建 rocket-mq-queue
配置文件:
sudo vim /etc/supervisor/conf.d/rocket-mq-queue.conf
添加以下几行:
[program:rocket-mq-queue]
command=sudo -u www /usr/local/php-8.3/bin/php /home/www/website/consumer.php
autostart=true
autorestart=true
startretries=5
numprocs=1
startsecs=0
process_name=%(program_name)s_%(process_num)02d
stderr_logfile=/var/log/supervisor/%(program_name)s_stderr.log
stderr_logfile_maxbytes=10MB
stdout_logfile=/var/log/supervisor/%(program_name)s_stdout.log
stdout_logfile_maxbytes=10MB
命令行推荐使用绝对路径
/usr/local/php-8.3/bin/php /home/www/website/consumer.php
完成后保存并关闭文件。接下来,告诉主管了解新配置:
sudo supervisorctl reread
你应该得到以下输出:
rocket-mq-queue: available
查看任务队列状态 supervisorctl status
rocket-mq-queue:rocket-mq-queue_00 RUNNING pid 104745, uptime 0:23:52
常用命令
supervisorctl status //查看所有进程的状态
supervisorctl stop es //停止es
supervisorctl start es //启动es
supervisorctl restart //重启es
supervisorctl update //配置文件修改后使用该命令加载新的配置
supervisorctl reload //重新启动配置中的所有程序
问题清单
1. 守护进程脚本执行停止执行
解决方案一
supervisord
用户组问题。配置文件增加 user=www
指示supervisord使用此 UNIX 用户帐户作为运行程序的帐户。如果supervisord以 root 用户身份运行,则只能切换 用户。如果supervisord 不能切换到指定的用户,程序将不会启动。
由于supervisord
是以 root
用户组启动,所以会导致普通用户www
业务写入日志失败。
$ sudo systemctl status supervisor.service
● supervisor.service - Supervisor process control system for UNIX
Loaded: loaded (/lib/systemd/system/supervisor.service; enabled; vendor preset: enabled)
Active: active (running) since Thu 2023-08-24 16:45:41 CST; 6s ago
Docs: http://supervisord.org
Process: 8608 ExecStop=/usr/bin/supervisorctl $OPTIONS shutdown (code=exited, status=0/SUCCESS)
Main PID: 8609 (supervisord)
Tasks: 6 (limit: 4915)
CGroup: /system.slice/supervisor.service
├─ 8609 /usr/bin/python /usr/bin/supervisord -n -c /etc/supervisor/supervisord.conf
├─ 8613 sudo -u www /usr/local/php-8.3/bin/php /home/www/website/consumer.php
└─ 21598 /usr/local/php-8.3/bin/php /home/www/website/consumer.php
可以看到怎么启动了两个进程。修改配置文件,重新加载服务
sudo vim /etc/supervisor/conf.d/rocket-mq-queue.conf
解决方案二
给阿里云提交工单

【问题小结】服务端排查在这个时间段内 运行正常,没办法判断客户端断开原因,建议看下运行环境等,从现象来看脚本内容以及这种方式使用上应该是没问题的,中途断掉 可能跟环境有关了,需要再从客户端看下,有需要协助确认的随时联系我们
解决方案三
阿里云MYSQL数据库使用代理地址在常驻内存守护经常,会自动断开问题,于是乎,是不是因为使用数据库链接池导致,关闭连接池

解决方案四
查看代码代码是否有问题,于是乎捕捉异常
try {
// `consumer.php`
} catch (Throwable $throwable) {
$errorMsg = '[MQ] [ROCKET-MQ_队列异常退出] ' . $throwable->getMessage() . '|' . $throwable->getFile() . '|' . $throwable->getLine();
Log::error($errorMsg);
}
钉钉异常捕捉
响应错误: [MQ] [ROCKET-MQ_TCP_FIN-RST用户服务队列异常退出] [MQ]用户创建添加异
原文始发于微信公众号(开源技术小栈):RocketMQ PHP生产端和消费端代码优雅实现
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/248340.html