「工程实践」RocketMQ安装并整合SpringBoot
RocketMQ 是阿里巴巴团队使用Java语言开发的一个分布式、队列模型的消息中间件,后开源给Apache基金会成为了Apache的顶级开源项目,具有高性能、高可靠、高实时、分布式特点。
RocketMQ 主要由Producer、Broker、Consumer、NameServer组成;其中Producer负责生产消息;Consumer负责消费消息;Broker是MQ服务,负责接收、分发消息;NameServer是路由中心,负责MQ服务之间的协调。
1. Centos安装RocketMQ
- 官网下载
RocketMQ安装包
# 进入自定义软件安装目录
cd /mnt/newdatadrive/apps
# wget下载RocketMQ安装包
wget -c "https://mirrors.bfsu.edu.cn/apache/rocketmq/4.8.0/rocketmq-all-4.8.0-bin-release.zip"
- 解压安装(环境基于
JDK1.8或以上)
# 解压
unzip rocketmq-all-4.8.0-bin-release.zip
# 重命名为rocketmq
mv rocketmq-all-4.8.0-bin-release rocketmq
# 进入安装目录
cd rocketmq
- 修改配置
# RocketMQ的默认内存占用非常高,是4×4g的,将4g调整为512m
# 编辑runserver.sh
vim bin/runserver.sh
JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn256m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
# 编辑runbroker.sh
vim bin/runbroker.sh
JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn256m"
- 配置
RocketMQ的环境变量
# 编辑/etc/profile
vim /etc/profile
# 添加:
export ROCKETMQ_HOME=/mnt/newdatadrive/apps/rocketmq
export PATH=$JAVA_HOME/bin:$ROCKETMQ_HOME/bin:$PATH
# 使rocketmq的配置生效
source /etc/profile
- 启动
RockerMQ顺序
# 先启动 NameServer,然后启动 Broker
nohup sh bin/mqnamesrv &
nohup sh bin/mqbroker -n localhost:9876 &
- 关闭
RockerMQ顺序
# 先关闭Broker,再关闭NameServer
sh bin/mqshutdown broker
sh bin/mqshutdown namesrv
- 启动日志
# 查看 Name Server 启动日志
tail -f ~/logs/rocketmqlogs/namesrv.log
# 查看 Broker Server 启动日志
tail -f ~/logs/rocketmqlogs/broker.log
# 若出现如下报错
file doesn't exist on this path: /root/store/commitlog
file doesn't exist on this path: /root/store/consumequeue
# 对应创建即可:
cd ~/store
mkdir commitlog consumequeue
- 防火墙
- 若外网
IP调试,关闭防火墙 或 开放防火墙端口9876,10911
- 若外网
# NameServer默认端口:9876
firewall-cmd --add-port=9876/tcp --permanent
# Broker对外服务的监听端口
firewall-cmd --add-port=10911/tcp --permanent
# 更新防火墙规则
firewall-cmd --reload
2. SpringBoot 整合 RocketMQ
pom.xml引入组件rocketmq-spring-boot-starter依赖
<!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-spring-boot-starter -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.0</version>
</dependency>
- 修改
application.yml,添加RocketMQ相关配置
# 多个name-server(集群)使用英文;分割
rocketmq:
name-server: 192.168.2.100:9876
producer:
group: test-group
- 消息生产者
@Component
public class MessageProducer {
@Resource
private RocketMQTemplate rocketMQTemplate;
/**
* 生产者发送消息
* @param topic 主题
* @param message 消息体
*/
public void sendMessage(String topic, String message){
this.rocketMQTemplate.convertAndSend(topic, message);
}
}
- 消费者
/**
* 消费者监听消息
*/
@Slf4j
@Component
@RocketMQMessageListener(
topic = "test-topic", // 指定topic
consumerGroup = "test-group", // 指定消费组
selectorExpression = "*"
)
public class MessageConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
log.info("收到的消息是: {}", message);
}
}
- 测试类
@Slf4j
@SpringBootTest
class DemoApplicationTests {
@Autowired
private MessageProducer messageProducer;
@Test
void testMQ() {
String message = "Hello RocketMQ!";
messageProducer.sendMessage("test-topic",message);
log.info("生产一条消息:" + message);
}
}
- 运行结果
2021-06-10 14:56:25.180 INFO 17720 --- [ main] a.r.s.s.DefaultRocketMQListenerContainer : running container: DefaultRocketMQListenerContainer{consumerGroup='test-group', nameServer='192.168.2.100:9876', topic='test-topic', consumeMode=CONCURRENTLY, selectorType=TAG, selectorExpression='*', messageModel=CLUSTERING}
2021-06-10 14:56:25.188 INFO 17720 --- [ main] o.a.r.s.a.ListenerContainerConfiguration : Register the listener to container, listenerBeanName:messageConsumer, containerBeanName:org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer_1
2021-06-10 14:56:25.230 INFO 17720 --- [ main] c.e.fastdfsdemo.DemoApplicationTests : Started DemoApplicationTests in 10.371 seconds (JVM running for 13.888)
2021-06-10 14:56:26.410 INFO 17720 --- [ main] c.e.fastdfsdemo.DemoApplicationTests : 生产一条消息:Hello RocketMQ!
2021-06-10 14:56:26.426 INFO 17720 --- [MessageThread_1] c.e.f.rocketmq.MessageConsumer : 收到的消息是: Hello RocketMQ!
2021-06-10 14:56:26.496 INFO 17720 --- [lientSelector_1] RocketmqRemoting : closeChannel: close the connection to remote address[192.168.2.100:10911] result: true
2021-06-10 14:56:26.496 INFO 17720 --- [lientSelector_1] RocketmqRemoting : closeChannel: close the connection to remote address[192.168.2.100:9876] result: true
END .
相关系列文章
- 「学习笔记」SpringCloud(五)OpenFeign整合Sentinel实现熔断降级
- 「学习笔记」SpringCloud(四)OpenFeign服务间调用
- 「学习笔记」SpringCloud(三)Config配置中心
- 「学习笔记」SpringCloud(二)Gateway网关
- 「学习笔记」SpringCloud(一)Eureka注册中心
- 「学习笔记」Spring Reactive Stack(六)响应式 HTTP 请求客户端 WebClient
- 「学习笔记」Spring Reactive Stack(五)服务端事件推送Server-Sent Events
- 「学习笔记」Spring Reactive Stack(四)响应式方式访问Redis
- 「学习笔记」Spring Reactive Stack(三)使用R2DBC访问MySQL
- 「学习笔记」Spring Reactive Stack(二)Reactor异常处理
- 「学习笔记」Spring Reactive Stack(一)Spring WebFlux响应式Web框架入门
- 「工程实践」Spring Boot - 使用RocketMQ实战样例
- 「工程实践」RocketMQ安装并整合SpringBoot
- 「工程实践」Spring Boot + Thymeleaf页面静态化实现
- 「工程实践」Spring Boot - Java接口幂等性设计
- 「工程实践」Spring Boot 整合 FastDFS
- 「学习笔记」Spring Boot MVC 应用
- 「学习笔记」Spring Boot 数据库访问
- 「学习笔记」Spring Boot 入门
- 「学习笔记」Spring--SSM框架整合(Spring+SpringMVC+MyBatis)
- 「学习笔记」Spring--持久层框架Mybatis
- 「学习笔记」Spring--MVC框架
- 「学习笔记」Spring--面向切面编程(AOP模块)
- 「学习笔记」Spring--事务管理
- 「学习笔记」Spring--JDBC详解
- 「学习笔记」Spring--IoC注解实现
- 「学习笔记」Spring--IoC控制反转