Spring Boot集成消息队列
Apache RocketMQ是由开源的轻量级消息队列,于2017年正式成为Apache顶级项目。
在分布式消息队列中间件领域,最热门的项目是Kafka和RocketMQ:
-
Kafka是较早开源的"消息处理平台",在写吞吐量上,有明显优势,更适合处理日志类消息。
-
RocketMQ借鉴了部分Kafka的设计思路,并对实时性、大分区数等方面进行了优化,较适合做为业务类的消息。
因此,本书选用RocketMQ做为业务类的消息队列。
安装并运行RocketMQ
RocketMQ的容器化比较落后,基本没有可用的镜像版本,我们采用手工单机部署的方式。
首先,下载最新版二进制文件,当前是4.9.1:
wget https://dlcdn.apache.org/rocketmq/4.9.1/rocketmq-all-4.9.1-bin-release.zip
完成后,解压缩:
unizp rocketmq-all-4.9.1-bin-release.zip
启动Name Server:
nohup sh bin/mqnamesrv &
tail -f ~/logs/rocketmqlogs/namesrv.log
最后启动Broker:
nohup sh bin/mqbroker -n 127.0.0.1:9876 &
tail -f ~/logs/rocketmqlogs/broker.log
如果启动成功,在上述两个日志中,会有如下的日志:
2021-10-12 4:30:02 INFO main - tls.client.keyPassword = null
2021-10-12 4:30:02 INFO main - tls.client.certPath = null
2021-10-12 4:30:02 INFO main - tls.client.authServer = false
2021-10-12 4:30:02 INFO main - tls.client.trustCertPath = null
2021-10-12 4:30:02 INFO main - Using JDK SSL provider
2021-10-12 4:30:03 INFO main - SSLContext created for server
2021-10-12 4:30:03 INFO main - Try to start service thread:FileWatchService started:false lastThread:null
2021-10-12 4:30:03 INFO NettyEventExecutor - NettyEventExecutor service started
2021-10-12 4:30:03 INFO FileWatchService - FileWatchService service started
2021-10-12 4:30:03 INFO main - The Name Server boot success. serializeType=JSON
2021-10-12 14:36:09 INFO brokerOutApi_thread_3 - register broker[0]to name server 127.0.0.1:9876 OK
2021-10-12 14:36:09 ERROR DiskCheckScheduledThread1 - Error when measuring disk space usage, file doesn't exist on this path: /Users/coder4/store/commitlog
2021-10-12 14:36:18 ERROR StoreScheduledThread1 - Error when measuring disk space usage, file doesn't exist on this path: /Users/coder4/store/commitlog
2021-10-12 14:36:19 ERROR DiskCheckScheduledThread1 - Error when measuring disk space usage, file doesn't exist on this path: /Users/coder4/store/commitlog
可以发现,NameServer是没有问题的,Broker报了一个"Error when measuring disk space usage"的错,这个是当前版本的Bug,不影响使用。
如果想退出服务,可以直接kill,或者执行:
sh bin/mqshutdown broker
sh bin/mqshutdown namesrv
RocketMQ架构简介
在集成RocketMQ之前,先介绍一下RocketMQ的基本架构:
-
NameServer:轻量级元信息服务,管理路由信息并提供对应的读写服务
-
Broker:支撑TOPIC和QUEUE的存储,支持Push和Pull两种协议,有容错、副本、故障恢复机制。
-
Producer:发布端服务,支持分布式部署,并向Broker集群发送
-
Consumer:消费端服务,同时支持Push和Pull协议。支持消费、广播、顺序消息等特性。
-
Topic:队列,用于区分不同消息。
-
Tag:同一个Topic下,可以设定不同Tag(例如前缀),通过Tag来过滤消息,只保留自己感兴趣的。
在使用Producer和Consumer时,需要指定消费组(Consumer Group),这是从Kafka中借鉴过来的机制。相同Consumer Group下的实例会共享同一个GroupId,会被认为是对等的、可负载均衡的。事件会随机分发给相同GroupId下的多个实例中。
在Spring Boot中集成RocketMQ
首先引入依赖:
implementation 'org.apache.rocketmq:rocketmq-client:4.9.1'
接着,我们创建生产者的抽象基类:
package com.coder4.homs.demo.server.mq;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import javax.annotation.PostConstruct;
import java.nio.charset.StandardCharsets;
/**
* @author coder4
*/
public abstract class BaseProducer<T> implements DisposableBean {
private final Logger LOG = LoggerFactory.getLogger(getClass());
abstract String getNamesrvAddr();
abstract String getProducerGroup();
abstract String getTopic();
abstract String getTag();
protected DefaultMQProducer producer;
private ObjectMapper objectMapper = new ObjectMapper();
public BaseProducer() {
producer = new
DefaultMQProducer(getProducerGroup());
}
@PostConstruct
public void postConstruct() {
producer.setNamesrvAddr(getNamesrvAddr());
try {
producer.start();
} catch (MQClientException e) {
LOG.error("producer start exception", e);
throw new RuntimeException(e);
}
}
@Override
public void destroy() throws Exception {
producer.shutdown();
}
protected Message buildMessage(String payload) {
return new Message(getTopic(),
getTag(),
payload.getBytes(StandardCharsets.UTF_8)
);
}
public void publish(T payload) {
try {
String val = objectMapper.writeValueAsString(payload);
producer.send(buildMessage(val));
LOG.info("publish success, topic = {}, tag = {}, msg = {}", getTopic(), getTag(), val);
} catch (Exception e) {
LOG.error("publish exception", e);
}
}
public void publishAsync(T payload) {
try {
String val = objectMapper.writeValueAsString(payload);
producer.send(buildMessage(val), new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
LOG.info("publishAsync success, topic = {}, tag = {}, msg = {}", getTopic(), getTag(), val);
}
@Override
public void onException(Throwable e) {
LOG.error("publish async exception", e);
}
});
} catch (Exception e) {
LOG.error("publishAsync exception", e);
}
}
}
如上所示:
-
nameServr、topic、tag由子类组成
-
我们在构造函数中,创建了Producer对象
-
postConstruct中:设定了NameServer地址,并启动producer
-
publish / publishAsync:发送消息,先根据topic和tag构造消息,然后调用同步 / 异步的接口发送。
-
destroy时,停止producer
接下来我们看下Consumer的基类:
/**
* @(#)BaseConsumer.java, 10月 12, 2021.
* <p>
* Copyright 2021 coder4.com. All rights reserved.
* CODER4.COM PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
*/
package com.coder4.homs.demo.server.mq;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.util.CollectionUtils;
import javax.annotation.PostConstruct;
/**
* @author coder4
*/
public abstract class BaseConsumer<T> implements DisposableBean {
protected final Logger LOG = LoggerFactory.getLogger(getClass());
private static final int DEFAULT_BATCH_SIZE = 1;
private static final int MAX_RETRY = 1024;
abstract String getNamesrvAddr();
abstract String getConsumerGroup();
abstract String getTopic();
abstract String getTag();
abstract Class<T> getClassT();
abstract boolean process(T msg);
private ObjectMapper objectMapper = new ObjectMapper();
protected DefaultMQPushConsumer consumer;
public BaseConsumer() {
consumer = new
DefaultMQPushConsumer(getConsumerGroup());
}
@PostConstruct
public void postConstruct() {
consumer.setNamesrvAddr(getNamesrvAddr());
try {
consumer.subscribe(getTopic(), getTag());
} catch (MQClientException e) {
LOG.error("consumer subscribe exception", e);
throw new RuntimeException(e);
}
consumer.setConsumeMessageBatchMaxSize(DEFAULT_BATCH_SIZE);
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
if (CollectionUtils.isEmpty(msgs)) {
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
if (msgs.size() != DEFAULT_BATCH_SIZE) {
LOG.error("MessageListenerConcurrently callback msgs.size() != 1");
}
MessageExt msg = msgs.get(0);
if (msg.getReconsumeTimes() >= MAX_RETRY) {
LOG.error("reconsume exceed max retry times");
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
try {
if (process(objectMapper.readValue(new String(msg.getBody()), getClassT()))) {
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} else {
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
} catch (Exception e) {
LOG.error("process exception", e);
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
});
try {
consumer.start();
} catch (MQClientException e) {
LOG.error("consumer start exception", e);
throw new RuntimeException(e);
}
}
@Override
public void destroy() throws Exception {
consumer.shutdown();
}
}
与Producer类似,topic、tag、namesrv由子类指定。
-
postConstruct:订阅了对应topic和tag的消息,并设定回掉函数,这里设定每批次最多拉取1个消息,以最简化处理失败的情况,你可以根据实际情况做出调整。
-
接受消息时,会调用子类的process进行处理,同时进行json的反序列化操作
接下来,我们来写一个Demo的生产者、消费者:
首先配置nameSrv:
# rocketmq
rocketmq.namesrv: 127.0.0.1:9876
接着,定义消息:
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class DemoMessage {
private String msg;
private long ts;
}
然后是具体的Consumer和Producer:
package com.coder4.homs.demo.server.mq;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
/**
* @author coder4
*/
@Service
public class DemoConsumer extends BaseConsumer<DemoMessage> {
@Value("${rocketmq.namesrv}")
private String namesrv;
@Override
String getNamesrvAddr() {
return namesrv;
}
@Override
String getConsumerGroup() {
return "demo-consumer";
}
@Override
String getTopic() {
return "demo";
}
@Override
String getTag() {
return "*";
}
@Override
Class<DemoMessage> getClassT() {
return DemoMessage.class;
}
@Override
boolean process(DemoMessage msg) {
LOG.info("process msg = {}", msg);
return true;
}
}
package com.coder4.homs.demo.server.mq;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
/**
* @author coder4
*/
@Service
public class DemoProducer extends BaseProducer<DemoMessage> {
@Value("${rocketmq.namesrv}")
private String namesrv;
@Override
String getNamesrvAddr() {
return namesrv;
}
@Override
String getProducerGroup() {
return "demo-producer";
}
@Override
String getTopic() {
return "demo";
}
@Override
String getTag() {
return "*";
}
}
我们可以调用Producer发送一个消息,然后会收到如下的日志,说明消息已经被成功处理!
2021-10-12 8:01:37.340 INFO 6270 --- [MessageThread_1] c.c.homs.demo.server.mq.DemoConsumer : process msg = DemoMessage(msg=123, ts=1634032897315)
由于篇幅所限,我们只实战了基础的消息收发,推荐你根据文档继续探索其他内容,包括:[集群部署](Deployment - Apache RocketMQ)、[顺序消息](Order Message - Apache RocketMQ)、[广播消息](Broadcasting - Apache RocketMQ)等内容。