RocketMQ单机部署(单Master模式)

  1. 1. 1. 环境准备
  2. 2. 2. 安装并启动
    1. 2.1. 1. 解压
    2. 2.2. 2. 启动 NameServer
    3. 2.3. 3. 启动 Broker
    4. 2.4. 4. 测试
    5. 2.5. 5. 关闭 RocketMQ
  3. 3. 3. Java代码测试
    1. 3.1. 1. 修改配置
    2. 3.2. 2. 修改防火墙
    3. 3.3. 3. 重新启动RocketMQ
    4. 3.4. 4. 添加pom依赖
    5. 3.5. 5. Consumer 类
    6. 3.6. 6. Producer 类
    7. 3.7. 7. 测试
  4. 4. 4. 命令汇总

单master模式一般用于本地测试,不建议线上环境使用,一旦broker宕机或者重启,会导致整个服务不可用

1. 环境准备

  • Linux centos7
  • JDK 1.8
  • RocketMQ 4.8.0 安装包:下载地址

2. 安装并启动

1. 解压

将RocketMQ压缩包解压并移动到/usr/local目录下

1
2
3
4
# 解压 unzip命令无法识别时需要先执行 yum install -y unzip 进行安装
unzip rocketmq-all-4.8.0-bin-release.zip
# 移动并重命名
mv rocketmq-all-4.8.0-bin-release /usr/local/rocketmq

以下命令都视作在RocketMQ的根目录下执行

2. 启动 NameServer

1
2
3
4
# 启动 NameServer 命令
nohup sh bin/mqnamesrv &
# 启动成功后会自动在根目录生成日志,查看日志命令
tail -f ~/logs/rocketmqlogs/namesrv.log

nohup=>不挂断地运行命令;&=>在后台运行;一般两个一起用,nohup command &

3. 启动 Broker

1
2
3
4
# 启动 Broker
nohup sh bin/mqbroker -n localhost:9876 &
# 查看日志
tail -f ~/logs/rocketmqlogs/broker.log

初次启动问题:RocketMQ默认配置的启动内存较大,启动时很有可能会因为内存不足而启动失败,所以启动Broker之前需要修改以下文件:

修改bin/runbroker.sh文件

1
2
3
4
# 打开 runbroker.sh 文件,没有vim编辑器可以先下载,或者使用 vi 命令打开文件
vim bin/runbroker.sh
# 找到jvm内存配置,修改为以下配置
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m"

修改bin/runserver.sh文件

1
2
3
4
# 打开 runserver.sh 文件
vim bin/runserver.sh
# 修改为以下配置
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"

启动完成后可使用 jps 出命令查看是否启动成功

jps 命令是由jdk提供的,可以显示当前所有java进程

4. 测试

使用官方demo测试 RocketMQ

发送消息:

1
2
3
4
# 设置环境变量
export NAMESRV_ADDR=localhost:9876
# 使用安装包的Demo发送消息
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer

接收消息:

1
2
3
4
# 设置环境变量
export NAMESRV_ADDR=localhost:9876
# 接收消息
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer

执行成功的话会打印出一堆密密麻麻的消息

5. 关闭 RocketMQ

1
2
3
4
# 关闭 NameServer
sh bin/mqshutdown namesrv
# 关闭 Broker
sh bin/mqshutdown broker

3. Java代码测试

1. 修改配置

现在我们需要对RocketMQ的配置文件进行修改

修改文件:conf/broker.conf

1
2
3
4
5
6
7
8
9
10
11
12
# 将 autoCreateTopicEnable 属性值修改为 true,开启自动创建Topic
autoCreateTopicEnable = true
# 增加IP的配置
brokerIP1 = 192.168.56.112

# 消息储存路径配置,不添加的话默认消息存储目录就是 ~/store/...
storePathRootDir = /usr/local/rocketmq/store
storePathCommitLog = /usr/local/rocketmq/store/commitlog
storePathConsumeQueue = /usr/local/rocketmq/store/consumequeue
storePathIndex = /usr/local/rocketmq/store/index
storeCheckpoint = /usr/local/rocketmq/store/checkpoint
abortFile = /usr/local/rocketmq/store/abort

创建消息存储路径(配置了之后文件夹应该会自动创建,如果没有自动创建文件夹的话可以手动创建一下):

1
2
3
4
mkdir /usr/local/rocketmq/store
mkdir /usr/local/rocketmq/store/commitlog
mkdir /usr/local/rocketmq/store/consumequeue
mkdir /usr/local/rocketmq/store/index

2. 修改防火墙

简单粗暴,直接关闭防火墙

1
2
3
4
5
6
# 关闭
systemctl stop firewalld.service
# 查看防火墙状态
firewall-cmd --state
# 禁止开机自启
systemctl disable firewalld.service

或者只开放需要的端口(更安全)

需要开放三个端口:9876(NameServer注册端口) 10909(生产者发送消息的端口)10911(消费者接收消息的端口)

1
firewall-cmd --zone=public --add-port=9876/tcp --permanent

参数说明:

--zone=public 作用域

--add-port=9876/tcp 开放的端口和访问类型,需要关闭端口的话就用:--remove-port=9876/tcp

--permanent 永久生效,指重启防火墙之后配仍然生效。使用此参数需要重新加载防火墙

重置防火墙

1
firewall-cmd --reload

查看开放的端口列表

1
firewall-cmd --zone=public --list-ports

3. 重新启动RocketMQ

此处启动 NameServer 时和上面一样,但是启动 Broker 时需要使用刚刚修改的配置文件

1
nohup sh bin/mqbroker -n localhost:9876 -c conf/broker.conf  &

-n localhost:9876 => 由于 Broker 需要注册在 NameServer 上,这里指向的是 NameServer 的地址,默认端口为9876

-c conf/broker.conf => 这里指向的是上面修改的配置文件路径(相对路径和绝对路径都可以),不带上这个的话rocketMQ启动时不会使用刚刚修改的配置文件

4. 添加pom依赖

终于到了代码环节,依赖似乎并没有什么版本的限制,用最新的4.8.0应该也可以

1
2
3
4
5
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.4.0</version>
</dependency>

5. Consumer 类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

/**
* 消费者,消费消息
* @author hcf
*/
public class Consumer {

public static void main(String[] args) throws MQClientException {
// 实例化消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("rocketmq_test_group");
// 设置NameServer的地址,改成自己的ip
consumer.setNamesrvAddr("192.168.56.112:9876");
// 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息
consumer.subscribe("TopicTest", "*");
// 注册回调实现类来处理从broker拉取回来的消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("%s consumer接收新消息: %s %n", Thread.currentThread().getName(), msgs);
// 标记该消息已经被成功消费
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动消费者实例
consumer.start();
System.out.printf("Consumer Started.%n");
}
}

6. Producer 类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;

/**
* 生产者,发送同步消息
* @author hcf
*/
public class SyncProducer {

public static void main(String[] args) throws Exception {
// 实例化消息生产者Producer
DefaultMQProducer producer = new DefaultMQProducer("rocketmq_test_group");
// 设置NameServer的地址,改成自己的ip
producer.setNamesrvAddr("192.168.56.112:9876");
// 启动Producer实例
producer.start();
for (int i = 0; i < 100; i++) {
// 创建消息,并指定Topic,Tag和消息体
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
// 发送消息到一个Broker
SendResult sendResult = producer.send(msg);
// 通过sendResult返回消息是否成功送达
System.out.printf("%s%n", sendResult);
}
// 如果不再发送消息,关闭Producer实例。
producer.shutdown();
}
}

7. 测试

消费者和生产者的代码是官方的示例代码,此处只是进行一个简单的测试,生产端发送一个Topic为TopicTest的同步消息,消费端消费TopicTest主题的消息

启动main方法进行测试

Producer生产者打印

Consumer消费者打印

到此就全部完成了!想要了解其他 RocketMQ 的常见用法可以参考官方示例

4. 命令汇总

启动 NameServer

1
nohup sh bin/mqnamesrv &

查看 NameServer 日志

1
tail -f ~/logs/rocketmqlogs/namesrv.log

启动Broker

1
nohup sh bin/mqbroker -n localhost:9876 -c conf/broker.conf  &

查看 Broker 日志

1
tail -f ~/logs/rocketmqlogs/broker.log

关闭 NameServer 和 Broker

1
2
sh bin/mqshutdown namesrv
sh bin/mqshutdown broker

查看 RocketMQ 服务运行状态

1
jps