Open Source, Open Future!
  menu
107 文章
ღゝ◡╹)ノ❤️

RocketMQ--单主单从

配置

  1. 准备两台服务器,修改hosts文件(vim /etc/hosts),加入映射关系:
192.168.1.201 rocketmq-nameserver1
192.168.1.201 rocketmq-master1
192.168.1.202 rocketmq-nameserver2
192.168.1.202 rocketmq-master1-slave1
  1. 重启网卡:
systemctl restart network
  1. 防火墙命令:
# 查看防火墙
systemctl status firewalld.service
# 关闭防火墙(方便测试)
systemctl stop firewalld.service
  1. 解压安装包,解压文件到/usr/local/目录下:
unzip rocketmq-all-4.4.0-bin-release.zip -d /usr/local
  1. 更改解压后的文件名:
mv rocketmq-all-4.4.0-bin-release rocketmq
  1. 创建RocketMQ存储文件的目录,执行如下命令:
mkdir logs
mkdir store
cd store/
mkdir commitlog
mkdir consumequeue
mkdir index
文件夹说明
logs存储RocketMQ日志目录
store存储RocketMQ数据文件目录
commitlog存储RocketMQ消息信息
consumequeue存储消息的索引数据
index存储消息的索引数据
  1. 配置环境变量(vim /etc/profile),在文件的末尾添加:
ROCKETMQ_HOME=/usr/local/rocketmq
PATH=$PATH:$ROCKETMQ_HOME/bin
export ROCKETMQ_HOME PATH
export ROCKETMQ_HOME PATH
 ROCKETMQ_HOME PATH
  1. 使环境变量生效:
source /etc/profile
  1. 修改2m-2s-async目录下的broker-a.properties:
vim /usr/local/rocketmq/conf/2m-2s-async/broker-a.properties

用以下内容覆盖:

#所属集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-a
#0 表示Master, > 0 表示slave
brokerId=0
#nameServer 地址,分号分割
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
#在发送消息时,自动创建服务器不存在的Topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许Broker自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=10911
#删除文件时间点,默认是凌晨4点
deleteWhen=04
#文件保留时间,默认48小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/usr/local/rocketmq/store
#commitLog存储路径
storePathCommitLog=/usr/local/rocketmq/store/commitlog
#消费队列存储路径
storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/rocketmq/store/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq/store/checkpoint
#abort 文件存储路径
abortFile=/usr/local/rocketmq/store/abort
#限制的消息大小
maxMessageSize=65536
# flushCommitLogLeastPages=4
# flushConsumeQueueLeastPages=2
# flushCommitLogThoroughInterval=10000
# flushConsumeQueueThoroughInterval=60000
# Broker 的角色
# - ASYNC_MASTER 异步复制Master
# - SYNC_MASTER 同步双写Master
# - SLAVE
brokerRole=ASYNC_MASTER
# 刷盘方式
# - ASYNC_FLUSH 异步刷盘
# - SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageTreadPoolNums=128
#拉消息线程池数量
#pullMessageTreadPoolNums=128

  1. 修改2m-2s-async目录下的broker-a-s.properties:
vim /usr/local/rocketmq/conf/2m-2s-async/broker-a-s.properties

用以下内容覆盖:

#所属集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-a
#0 表示Master, > 0 表示slave
brokerId=1
#nameServer 地址,分号分割
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
#在发送消息时,自动创建服务器不存在的Topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许Broker自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=10911
#删除文件时间点,默认是凌晨4点
deleteWhen=04
#文件保留时间,默认48小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/usr/local/rocketmq/store
#commitLog存储路径
storePathCommitLog=/usr/local/rocketmq/store/commitlog
#消费队列存储路径
storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/rocketmq/store/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq/store/checkpoint
#abort 文件存储路径
abortFile=/usr/local/rocketmq/store/abort
#限制的消息大小
maxMessageSize=65536
# flushCommitLogLeastPages=4
# flushConsumeQueueLeastPages=2
# flushCommitLogThoroughInterval=10000
# flushConsumeQueueThoroughInterval=60000
# Broker 的角色
# - ASYNC_MASTER 异步复制Master
# - SYNC_MASTER 同步双写Master
# - SLAVE
brokerRole=SLAVE
# 刷盘方式
# - ASYNC_FLUSH 异步刷盘
# - SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageTreadPoolNums=128
#拉消息线程池数量
#pullMessageTreadPoolNums=128

启动

  1. 启动namesrv前,若要修改启动参数:
vim /usr/local/rocketmq/bin/runserver.sh
  1. 分别启动两台服务器上的namesrv
nohup sh mqnamesrv &
  1. 启动broker前,若要修改启动参数:
vim /usr/local/rocketmq/bin/runbroker.sh
  1. 启动192.168.1.201上的broker:
nohup sh mqbroker -c /usr/local/rocketmq/conf/2m-2s-async/broker-a.properties  > /dev/null 2>&1 &
  1. 启动192.168.1.202上的broker:
nohup sh mqbroker -c /usr/local/rocketmq/conf/2m-2s-async/broker-a-s.properties  > /dev/null 2>&1 &
  1. 输入jps查看进程:
[root@rocketmq-nameserver1 ~]# jps
7206 BrokerStartup
7213 Jps
7183 NamesrvStartup
  1. 配置控制台:
rocketmq.config.namesrvAddr=192.168.1.201:9876;192.168.1.202:9876
  1. 查看控制台(http://localhost:8080/#/cluster):

image.png

测试

Master宕机后,消费者仍然可以从Slave消费,但MQ不能接收新的消息。
  1. 生产者
public class Producer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("producer1");
        producer.setNamesrvAddr("192.168.1.201:9876;192.168.1.202:9876");
        producer.start();

        for (int i = 0; i < 3; i++) {
            Message msg = new Message("TopicTest", "TagA", ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
            SendResult sendResult = producer.send(msg);
            System.out.printf("%s%n", sendResult);
        }
        producer.shutdown();
    }
}
  1. 输出结果:
SendResult [sendStatus=SEND_OK, msgId=C0A801671A9818B4AAC29A6678340000, offsetMsgId=C0A801C900002A9F000000000001507F, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=3], queueOffset=5]
SendResult [sendStatus=SEND_OK, msgId=C0A801671A9818B4AAC29A6678850001, offsetMsgId=C0A801C900002A9F0000000000015131, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=0], queueOffset=7]
SendResult [sendStatus=SEND_OK, msgId=C0A801671A9818B4AAC29A66788E0002, offsetMsgId=C0A801C900002A9F00000000000151E3, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=1], queueOffset=4]
  1. 停掉192.168.1.201的broker:
sh mqshutdown broker
  1. 再次发送消息,报错:
Exception in thread "main" org.apache.rocketmq.client.exception.MQClientException: No route info of this topic: TopicTest
See http://rocketmq.apache.org/docs/faq/ for further details.
	at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl(DefaultMQProducerImpl.java:684)
	at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1342)
	at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1288)
	at org.apache.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:324)
	at org.apache.rocketmq.example.quickstart.Producer.main(Producer.java:35)
  1. 消费者:
public class Consumer {

    public static void main(String[] args) throws Exception {

        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer1");
        consumer.setNamesrvAddr("192.168.1.201:9876;192.168.1.202:9876");
        consumer.subscribe("TopicTest", "*");

        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.printf("%s 消费消息: %s %n", Thread.currentThread().getName(), new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();
    }
}

控制台结果如下:

ConsumeMessageThread_1 消费消息: Hello RocketMQ 0 
ConsumeMessageThread_2 消费消息: Hello RocketMQ 1 
ConsumeMessageThread_3 消费消息: Hello RocketMQ 2 

关闭

  1. 关闭broker命令:
sh mqshutdown broker
  1. 关闭namesrv命令:
sh mqshutdown namesrv