11-RocketMQ
概念
broker
- Broker面向producer和consumer接受和发送消息
- 向nameserver提交自己的信息
- 是消息中间件的消息存储、转发服务器。
- 每个Broker节点,在启动时,都会遍历NameServer列表,与每个NameServer建立长连接,注册自己的信息,之后定时上报。
broker集群
- Broker高可用,可以配成Master/Slave结构,Master可写可读,Slave只可以读,Master将写入的数据同步给Slave。
- 一个Master可以对应多个Slave,但是一个Slave只能对应一个Master
- Master与Slave的对应关系通过指定相同的BrokerName,不同的BrokerId来定义BrokerId为0表示Master,非0表示Slave
- Master多机负载,可以部署多个broker
- 每个Broker与nameserver集群中的所有节点建立长连接,定时注册Topic信息到所有nameserver。
producer
- 消息的生产者
- 通过集群中的其中一个节点(随机选择)建立长连接,获得Topic的路由信息,包括Topic下面有哪些Queue,这些Queue分布在哪些Broker上等
- 接下来向提供Topic服务的Master建立长连接,且定时向Master发送心跳
consumer
消息的消费者,通过NameServer集群获得Topic的路由信息,连接到对应的Broker上消费消息。
注意,由于Master和Slave都可以读取消息,因此Consumer会与Master和Slave都建立连接。
- 两种消费形式:
- 拉取式消费:Consumer消费的一种类型,应用通常主动调用Consumer的拉消息方法从Broker服务器拉消息、主动权由应用控制。一旦获取了批量消息,应用就会启动消费过程。
- 推动式消费:Consumer消费的一种类型,该模式下Broker收到数据后会主动推送给消费端,该消费模式一般实时性较高。
nameserver
底层由netty实现,提供了路由管理、服务注册、服务发现的功能,是一个无状态节点
nameserver是服务发现者,集群中各个角色(producer、broker、consumer等)都需要定时想nameserver上报自己的状态,以便互相发现彼此,超时不上报的话,nameserver会把它从列表中剔除
nameserver可以部署多个,当多个nameserver存在的时候,其他角色同时向他们上报信息,以保证高可用,
NameServer集群间互不通信,没有主备的概念
nameserver内存式存储,nameserver中的broker、topic等信息默认不会持久化
为什么不用zookeeper?:rocketmq希望为了提高性能,CAP定理,客户端负载均衡
ProducerGroup
- 同一类Producer的集合,这类Producer发送同一类消息且发送逻辑一致。如果发送的是事务消息且原始生产者在发送之后崩溃,则Broker服务器会联系同一生产者组的其他生产者实例以提交或回溯消费。
ConsumerGroup
- 同一类Consumer的集合,这类Consumer通常消费同一类消息且消费逻辑一致。消费者组使得在消息消费方面,实现负载均衡和容错的目标变得非常容易。要注意的是,消费者组的消费者实例必须订阅完全相同的Topic。RocketMQ 支持两种消息模式:集群消费(Clustering)和广播消费(Broadcasting)。
- 请确保同一组内的每个消费者订阅信息保持一致。
消息
字段名 | 默认值 | 说明 |
---|---|---|
Topic | null | 必填,消息所属topic的名称 |
Body | null | 必填,消息体 |
Tags | null | 选填,消息标签,方便服务器过滤使用。目前只支持每个消息设置一个tag |
Keys | null | 选填,代表这条消息的业务关键词,服务器会根据keys创建哈希索引,设置后,可以在Console系统根据Topic、Keys来查询消息,由于是哈希索引,请尽可能保证key唯一,例如订单号,商品Id等。 |
Flag | 0 | 选填,完全由应用来设置,RocketMQ不做干预 |
DelayTimeLevel | 0 | 选填,消息延时级别,0表示不延时,大于0会延时特定的时间才会被消费 |
WaitStoreMsgOK | TRUE | 选填,表示消息是否在服务器落盘后才返回应答。 |
传输内容格式
- 可见传输内容主要可以分为以下4部分:
(1) 消息长度:总长度,四个字节存储,占用一个int类型;
(2) 序列化类型&消息头长度:同样占用一个int类型,第一个字节表示序列化类型,后面三个字节表示消息头长度;
(3) 消息头数据:经过序列化后的消息头数据;
(4) 消息主体数据:消息主体的二进制字节数据内容;
消息消费模式
集群消息
集群消息是指集群化部署消费者
当使用集群消费模式时,MQ 认为任意一条消息只需要被集群内的任意一个消费者处理即可。
特点- 每条消息只需要被处理一次,broker只会把消息发送给消费集群中的一个消费者
- 在消息重投时,不能保证路由到同一台机器上
- 消费状态由broker维护
广播消息
当使用广播消费模式时,MQ 会将每条消息推送给集群内所有注册过的客户端,保证消息至少被每台机器消费一次。
特点- 消费进度由consumer维护
- 保证每个消费者消费一次消息
- 消费失败的消息不会重投
发送方式
同步消息
- 消息发送中进入同步等待状态,可以保证消息投递一定到达
异步方式
- 想要快速发送消息,又不想丢失的时候可以使用异步消息
1
2
3
4
5
6
7
8
9
10producer.send(message,new SendCallback() {
public void onSuccess(SendResult sendResult) {
System.out.println("ok");
}
public void onException(Throwable e) {
e.printStackTrace();
System.out.println("err");
}
});
单向消息
- 只发送消息,不等待服务器响应,只发送请求不等待应答。此方式发送消息的过程耗时非常短,一般在微秒级别。
- producer.sendOneway(message);
批量发送
- 可以多条消息打包一起发送,减少网络传输次数提高效率。
producer.send(Collection c)
方法可以接受一个集合 实现批量发送1
2
3public SendResult send(Collection<Message> msgs) {
return this.defaultMQProducerImpl.send(batch(msgs));
}- 批量消息要求必要具有同一topic、相同消息配置
- 不支持延时消息
- 建议一个批量消息最好不要超过1MB大小
- 如果不确定是否超过限制,可以手动计算大小分批发送
消息查询
按照MessageId查询消息
+
按照Message Key查询消息
- 主要是基于RocketMQ的IndexFile索引文件来实现的
消息过滤
- RocketMQ分布式消息队列的消息过滤方式有别于其它MQ中间件,是在Consumer端订阅消息时再做消息过滤的
TAG
- 为每条消息打上tag,可以使用tag来过滤消费
- 在Producer中使用Tag:
Message msg = new Message("TopicTest","TagA" ,("Hello RocketMQ " ).getBytes(RemotingHelper.DEFAULT_CHARSET));
- 在consumer中通过tag过滤
- consumer.subscribe(“TopicTest”, “TagA||TagB”); // * 代表订阅Topic下的所有消息
SQL表达式过滤消息
- 消费者将收到包含TAGA或TAGB或TAGB的消息. 但限制是一条消息只能有一个标签,而这对于复杂的情况可能无效。 在这种情况下,您可以使用SQL表达式筛选出消息.
需要的配置
- 在
broker.conf
中添加配置enablePropertyFilter=true
- ./mqbroker -n 192.168.150.113:9876 -c broker.conf
使用
- 创建selector来使用
1
2MessageSelector selector = MessageSelector.bySql("order > 5");
consumer.subscribe("xxoo3", selector);
语法
RocketMQ只定义了一些基本的语法来支持这个功能。 你也可以很容易地扩展它.
- 数字比较, 像
>
,>=
,<
,<=
,BETWEEN
,=
; - 字符比较, 像
=
,<>
,IN
; IS NULL
或者IS NOT NULL
;- 逻辑运算
AND
,OR
,NOT
;
常量类型是: - 数字, 像123, 3.1415;
- 字符串, 像‘abc’,必须使用单引号;
NULL
, 特殊常数;- 布尔常量,
TRUE
或FALSE
;
延迟消息
- RocketMQ使用messageDelayLevel可以设置延迟投递
- 默认配置为 : messageDelayLevel 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
- 在
broker.conf
中添加配置- messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
- 这个配置项配置了从1级开始,各级延时的时间,可以修改这个指定级别的延时时间;
- 发送消息时设置:
- message.setDelayTimeLevel(1); 消息会暂存在SCHEDULE_TOPIC_XXXX的topic中
顺序消费
- 队列先天支持FIFO模型,单一生产和消费者下只要保证使用
MessageListenerOrderly
监听器即可 - 顺序消费表示消息消费的顺序同生产者为每个消息队列发送的顺序一致,所以如果正在处理全局顺序是强制性的场景,需要确保使用的主题只有一个消息队列。
- 并行消费不再保证消息顺序,消费的最大并行数量受每个消费者客户端指定的线程池限制。
- 跟普通消息相比,顺序消息的使用需要在producer的send()方法中添加MessageQueueSelector接口的实现类,并重写select选择使用的队列,因为顺序消息局部顺序,需要将所有消息指定发送到同一队列中。
全局顺序消费
- 全局顺序是指某个Topic下的所有消息都要保证顺序
- 全局顺序 对于指定的一个 Topic,所有消息按照严格的先入先出(FIFO)的顺序进行发布和消费。 适用场景:性能要求不高,所有的消息严格按照 FIFO 原则进行消息发布和消费的场景
分区顺序消费
- 部分顺序消息只要保证每一组消息被顺序消费即可。
- 分区顺序 对于指定的一个 Topic,所有消息根据 sharding key 进行区块分区。 同一个分区内的消息按照严格的 FIFO 顺序进行发布和消费。 Sharding key 是顺序消息中用来区分不同分区的关键字段,和普通消息的 Key 是完全不同的概念。 适用场景:性能要求高,以 sharding key 作为分区字段,在同一个区块中严格的按照 FIFO 原则进行消息发布和消费的场景。
如何保证顺序消费
- 同一topic
- 同一个QUEUE
- 发消息的时候一个线程去发送消息
- 消费的时候 一个线程 消费一个queue里的消息或者使用MessageListenerOrderly
- 多个queue 只能保证单个queue里的顺序
消息重投
生产者在发送消息时,同步消息失败会重投,异步消息有重试,oneway没有任何保证
producer设置发送超时时间
1
2
3
4
5
6// 异步发送时 重试次数,默认 2
producer.setRetryTimesWhenSendAsyncFailed(1);
// 同步发送时 重试次数,默认 2
producer.setRetryTimesWhenSendFailed(1);
// 是否向其他broker发送请求 默认false
producer.setRetryAnotherBrokerWhenNotStoreOK(true);Consumer
- 消费超时设置
consumer.setConsumeTimeout()
- 消费超时设置
broker投递
- 只有在消息模式为MessageModel.CLUSTERING集群模式时,Broker才会自动进行重试,广播消息不重试
- 重投使用
messageDelayLevel
- 默认值: messageDelayLevel 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
死信队列
- 死信队列默认只有写权限。
事务消息
- RocketMQ 4.3+提供分布事务功能,通过 RocketMQ 事务消息能达到分布式事务的最终一致
RocketMQ事务
Half Message:预处理消息,当broker收到此类消息后,会存储到RMQ_SYS_TRANS_HALF_TOPIC的消息消费队列中,半消息不是说一半消息,而是这个消息对消费者来说不可见,发送成功后发送方再执行本地事务,本地食物执行成功,该消息对消费者变更为可见状态。
检查事务状态:Broker会开启一个定时任务,消费RMQ_SYS_TRANS_HALF_TOPIC队列中的消息,每次执行任务会向消息发送者确认事务执行状态(提交、回滚、未知),如果是未知,等待下一次回调。
超时:如果超过回查次数,默认回滚消息
第一步先给 Broker 发送事务消息即半消息,再根据本地事务的结果向 Broker 发送 Commit 或者 RollBack 命令。
并且 RocketMQ 的发送方会提供一个反查事务状态接口,如果一段时间内半消息没有收到任何操作请求,那么 Broker 会通过反查接口得知发送方事务是否执行成功,然后执行 Commit 或者 RollBack 命令。
如果是 Commit 那么订阅方就能收到这条消息,然后再做对应的操作,做完了之后再消费这条消息即可。
如果是 RollBack 那么订阅方收不到这条消息,等于事务就没执行过。
可以看到通过 RocketMQ 还是比较容易实现的,RocketMQ 提供了事务消息的功能,我们只需要定义好事务反查接口即可
TransactionListener的两个方法
executeLocalTransaction
半消息发送成功触发此方法来执行本地事务,返回提交或者回退结果
checkLocalTransaction
broker将发送检查消息来检查事务状态,并将调用此方法来获取本地事务状态
本地事务执行状态
LocalTransactionState.COMMIT_MESSAGE
执行事务成功,确认提交
LocalTransactionState.ROLLBACK_MESSAGE
回滚消息,broker端会删除半消息
LocalTransactionState.UNKNOW
暂时为未知状态,等待broker回查