本文最后编辑于 前,其中的内容可能需要更新。
单master模式一般用于本地测试,不建议线上环境使用,一旦broker宕机或者重启,会导致整个服务不可用
1. 环境准备
- Linux centos7
- JDK 1.8
- RocketMQ 4.8.0 安装包:下载地址
2. 安装并启动
1. 解压
将RocketMQ压缩包解压并移动到/usr/local
目录下
1 2 3 4
| unzip rocketmq-all-4.8.0-bin-release.zip
mv rocketmq-all-4.8.0-bin-release /usr/local/rocketmq
|
以下命令都视作在RocketMQ的根目录下执行
2. 启动 NameServer
1 2 3 4
| nohup sh bin/mqnamesrv &
tail -f ~/logs/rocketmqlogs/namesrv.log
|
nohup
=>不挂断地运行命令;&
=>在后台运行;一般两个一起用,nohup command &
3. 启动 Broker
1 2 3 4
| nohup sh bin/mqbroker -n localhost:9876 &
tail -f ~/logs/rocketmqlogs/broker.log
|
初次启动问题:RocketMQ默认配置的启动内存较大,启动时很有可能会因为内存不足而启动失败,所以启动Broker之前需要修改以下文件:
修改bin/runbroker.sh
文件
1 2 3 4
| vim bin/runbroker.sh
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m"
|
修改bin/runserver.sh
文件
1 2 3 4
| vim bin/runserver.sh
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
|
启动完成后可使用 jps
出命令查看是否启动成功
jps
命令是由jdk提供的,可以显示当前所有java进程
4. 测试
使用官方demo测试 RocketMQ
发送消息:
1 2 3 4
| export NAMESRV_ADDR=localhost:9876
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
|
接收消息:
1 2 3 4
| export NAMESRV_ADDR=localhost:9876
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
|
执行成功的话会打印出一堆密密麻麻的消息
5. 关闭 RocketMQ
1 2 3 4
| sh bin/mqshutdown namesrv
sh bin/mqshutdown broker
|
3. Java代码测试
1. 修改配置
现在我们需要对RocketMQ的配置文件进行修改
修改文件:conf/broker.conf
1 2 3 4 5 6 7 8 9 10 11 12
| autoCreateTopicEnable = true
brokerIP1 = 192.168.56.112
storePathRootDir = /usr/local/rocketmq/store storePathCommitLog = /usr/local/rocketmq/store/commitlog storePathConsumeQueue = /usr/local/rocketmq/store/consumequeue storePathIndex = /usr/local/rocketmq/store/index storeCheckpoint = /usr/local/rocketmq/store/checkpoint abortFile = /usr/local/rocketmq/store/abort
|
创建消息存储路径(配置了之后文件夹应该会自动创建,如果没有自动创建文件夹的话可以手动创建一下):
1 2 3 4
| mkdir /usr/local/rocketmq/store mkdir /usr/local/rocketmq/store/commitlog mkdir /usr/local/rocketmq/store/consumequeue mkdir /usr/local/rocketmq/store/index
|
2. 修改防火墙
简单粗暴,直接关闭防火墙
1 2 3 4 5 6
| systemctl stop firewalld.service
firewall-cmd --state
systemctl disable firewalld.service
|
或者只开放需要的端口(更安全)
需要开放三个端口:9876(NameServer注册端口) 10909(生产者发送消息的端口)10911(消费者接收消息的端口)
1
| firewall-cmd --zone=public --add-port=9876/tcp --permanent
|
参数说明:
--zone=public
作用域
--add-port=9876/tcp
开放的端口和访问类型,需要关闭端口的话就用:--remove-port=9876/tcp
--permanent
永久生效,指重启防火墙之后配仍然生效。使用此参数需要重新加载防火墙
重置防火墙
查看开放的端口列表
1
| firewall-cmd --zone=public --list-ports
|
3. 重新启动RocketMQ
此处启动 NameServer 时和上面一样,但是启动 Broker 时需要使用刚刚修改的配置文件
1
| nohup sh bin/mqbroker -n localhost:9876 -c conf/broker.conf &
|
-n localhost:9876
=> 由于 Broker 需要注册在 NameServer 上,这里指向的是 NameServer 的地址,默认端口为9876
-c conf/broker.conf
=> 这里指向的是上面修改的配置文件路径(相对路径和绝对路径都可以),不带上这个的话rocketMQ启动时不会使用刚刚修改的配置文件
4. 添加pom依赖
终于到了代码环节,依赖似乎并没有什么版本的限制,用最新的4.8.0应该也可以
1 2 3 4 5
| <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.4.0</version> </dependency>
|
5. Consumer 类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class Consumer {
public static void main(String[] args) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("rocketmq_test_group"); consumer.setNamesrvAddr("192.168.56.112:9876"); consumer.subscribe("TopicTest", "*"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.printf("%s consumer接收新消息: %s %n", Thread.currentThread().getName(), msgs); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.printf("Consumer Started.%n"); } }
|
6. Producer 类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.common.RemotingHelper;
public class SyncProducer {
public static void main(String[] args) throws Exception { DefaultMQProducer producer = new DefaultMQProducer("rocketmq_test_group"); producer.setNamesrvAddr("192.168.56.112:9876"); producer.start(); for (int i = 0; i < 100; i++) { Message msg = new Message("TopicTest" , "TagA" , ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) ); SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); } producer.shutdown(); } }
|
7. 测试
消费者和生产者的代码是官方的示例代码,此处只是进行一个简单的测试,生产端发送一个Topic为TopicTest
的同步消息,消费端消费TopicTest
主题的消息
启动main方法进行测试
到此就全部完成了!想要了解其他 RocketMQ 的常见用法可以参考官方示例
4. 命令汇总
启动 NameServer
1
| nohup sh bin/mqnamesrv &
|
查看 NameServer 日志
1
| tail -f ~/logs/rocketmqlogs/namesrv.log
|
启动Broker
1
| nohup sh bin/mqbroker -n localhost:9876 -c conf/broker.conf &
|
查看 Broker 日志
1
| tail -f ~/logs/rocketmqlogs/broker.log
|
关闭 NameServer 和 Broker
1 2
| sh bin/mqshutdown namesrv sh bin/mqshutdown broker
|
查看 RocketMQ 服务运行状态