配置
- 准备两台服务器,修改hosts文件(
vim /etc/hosts
),加入映射关系:
192.168.1.201 rocketmq-nameserver1
192.168.1.201 rocketmq-master1
192.168.1.202 rocketmq-nameserver2
192.168.1.202 rocketmq-master1-slave1
- 重启网卡:
systemctl restart network
- 防火墙命令:
# 查看防火墙
systemctl status firewalld.service
# 关闭防火墙(方便测试)
systemctl stop firewalld.service
- 解压安装包,解压文件到/usr/local/目录下:
unzip rocketmq-all-4.4.0-bin-release.zip -d /usr/local
- 更改解压后的文件名:
mv rocketmq-all-4.4.0-bin-release rocketmq
- 创建RocketMQ存储文件的目录,执行如下命令:
mkdir logs
mkdir store
cd store/
mkdir commitlog
mkdir consumequeue
mkdir index
文件夹 | 说明 |
---|---|
logs | 存储RocketMQ日志目录 |
store | 存储RocketMQ数据文件目录 |
commitlog | 存储RocketMQ消息信息 |
consumequeue | 存储消息的索引数据 |
index | 存储消息的索引数据 |
- 配置环境变量(
vim /etc/profile
),在文件的末尾添加:
ROCKETMQ_HOME=/usr/local/rocketmq
PATH=$PATH:$ROCKETMQ_HOME/bin
export ROCKETMQ_HOME PATH
export ROCKETMQ_HOME PATH
ROCKETMQ_HOME PATH
- 使环境变量生效:
source /etc/profile
- 修改2m-2s-async目录下的broker-a.properties:
vim /usr/local/rocketmq/conf/2m-2s-async/broker-a.properties
用以下内容覆盖:
#所属集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-a
#0 表示Master, > 0 表示slave
brokerId=0
#nameServer 地址,分号分割
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
#在发送消息时,自动创建服务器不存在的Topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许Broker自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=10911
#删除文件时间点,默认是凌晨4点
deleteWhen=04
#文件保留时间,默认48小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/usr/local/rocketmq/store
#commitLog存储路径
storePathCommitLog=/usr/local/rocketmq/store/commitlog
#消费队列存储路径
storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/rocketmq/store/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq/store/checkpoint
#abort 文件存储路径
abortFile=/usr/local/rocketmq/store/abort
#限制的消息大小
maxMessageSize=65536
# flushCommitLogLeastPages=4
# flushConsumeQueueLeastPages=2
# flushCommitLogThoroughInterval=10000
# flushConsumeQueueThoroughInterval=60000
# Broker 的角色
# - ASYNC_MASTER 异步复制Master
# - SYNC_MASTER 同步双写Master
# - SLAVE
brokerRole=ASYNC_MASTER
# 刷盘方式
# - ASYNC_FLUSH 异步刷盘
# - SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageTreadPoolNums=128
#拉消息线程池数量
#pullMessageTreadPoolNums=128
- 修改2m-2s-async目录下的broker-a-s.properties:
vim /usr/local/rocketmq/conf/2m-2s-async/broker-a-s.properties
用以下内容覆盖:
#所属集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-a
#0 表示Master, > 0 表示slave
brokerId=1
#nameServer 地址,分号分割
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
#在发送消息时,自动创建服务器不存在的Topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许Broker自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=10911
#删除文件时间点,默认是凌晨4点
deleteWhen=04
#文件保留时间,默认48小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/usr/local/rocketmq/store
#commitLog存储路径
storePathCommitLog=/usr/local/rocketmq/store/commitlog
#消费队列存储路径
storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/rocketmq/store/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq/store/checkpoint
#abort 文件存储路径
abortFile=/usr/local/rocketmq/store/abort
#限制的消息大小
maxMessageSize=65536
# flushCommitLogLeastPages=4
# flushConsumeQueueLeastPages=2
# flushCommitLogThoroughInterval=10000
# flushConsumeQueueThoroughInterval=60000
# Broker 的角色
# - ASYNC_MASTER 异步复制Master
# - SYNC_MASTER 同步双写Master
# - SLAVE
brokerRole=SLAVE
# 刷盘方式
# - ASYNC_FLUSH 异步刷盘
# - SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageTreadPoolNums=128
#拉消息线程池数量
#pullMessageTreadPoolNums=128
启动
- 启动namesrv前,若要修改启动参数:
vim /usr/local/rocketmq/bin/runserver.sh
- 分别启动两台服务器上的namesrv
nohup sh mqnamesrv &
- 启动broker前,若要修改启动参数:
vim /usr/local/rocketmq/bin/runbroker.sh
- 启动192.168.1.201上的broker:
nohup sh mqbroker -c /usr/local/rocketmq/conf/2m-2s-async/broker-a.properties > /dev/null 2>&1 &
- 启动192.168.1.202上的broker:
nohup sh mqbroker -c /usr/local/rocketmq/conf/2m-2s-async/broker-a-s.properties > /dev/null 2>&1 &
- 输入jps查看进程:
[root@rocketmq-nameserver1 ~]# jps
7206 BrokerStartup
7213 Jps
7183 NamesrvStartup
- 配置控制台:
rocketmq.config.namesrvAddr=192.168.1.201:9876;192.168.1.202:9876
测试
Master宕机后,消费者仍然可以从Slave消费,但MQ不能接收新的消息。
- 生产者
public class Producer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("producer1");
producer.setNamesrvAddr("192.168.1.201:9876;192.168.1.202:9876");
producer.start();
for (int i = 0; i < 3; i++) {
Message msg = new Message("TopicTest", "TagA", ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
producer.shutdown();
}
}
- 输出结果:
SendResult [sendStatus=SEND_OK, msgId=C0A801671A9818B4AAC29A6678340000, offsetMsgId=C0A801C900002A9F000000000001507F, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=3], queueOffset=5]
SendResult [sendStatus=SEND_OK, msgId=C0A801671A9818B4AAC29A6678850001, offsetMsgId=C0A801C900002A9F0000000000015131, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=0], queueOffset=7]
SendResult [sendStatus=SEND_OK, msgId=C0A801671A9818B4AAC29A66788E0002, offsetMsgId=C0A801C900002A9F00000000000151E3, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=1], queueOffset=4]
- 停掉192.168.1.201的broker:
sh mqshutdown broker
- 再次发送消息,报错:
Exception in thread "main" org.apache.rocketmq.client.exception.MQClientException: No route info of this topic: TopicTest
See http://rocketmq.apache.org/docs/faq/ for further details.
at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl(DefaultMQProducerImpl.java:684)
at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1342)
at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1288)
at org.apache.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:324)
at org.apache.rocketmq.example.quickstart.Producer.main(Producer.java:35)
- 消费者:
public class Consumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer1");
consumer.setNamesrvAddr("192.168.1.201:9876;192.168.1.202:9876");
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.printf("%s 消费消息: %s %n", Thread.currentThread().getName(), new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
}
控制台结果如下:
ConsumeMessageThread_1 消费消息: Hello RocketMQ 0
ConsumeMessageThread_2 消费消息: Hello RocketMQ 1
ConsumeMessageThread_3 消费消息: Hello RocketMQ 2
关闭
- 关闭broker命令:
sh mqshutdown broker
- 关闭namesrv命令:
sh mqshutdown namesrv