一、环境
nameserver节点
节点 |
端口 |
nameserver-1 |
8876 |
nameserver-2 |
8878 |
nameserver-3 |
8879 |
broker节点:
节点 |
端口 |
broker-a |
10911 |
broker-a-s |
10921 |
broker-b |
10931 |
broker-b-s |
10941 |
二、配置
2.1 安装
1
| wget -c wget -c https://archive.apache.org/dist/rocketmq/5.0.0/rocketmq-all-5.0.0-bin-release.zip
|
2.2 nameserver配置
nameserver-1:
1 2 3
| vi namesrv.properties
listenPort=8876
|
nameserver-2:
1 2 3
| vi namesrv-b.properties
listenPort=8878
|
nameserver-3:
1 2 3
| vi namesrv-c.properties
listenPort=8879
|
2.3 broker配置
broker-a:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| brokerClusterName=DefaultCluster brokerName=broker-a brokerId=0 deleteWhen=04 fileReservedTime=48 brokerRole=ASYNC_MASTER flushDiskType=ASYNC_FLUSH namesrvAddr=192.168.66.70:8876
diskMaxUsedSpaceRatio=88
storePathRootDir=/app/rocketmq/store-a
storePathCommitLog=/app/rocketmq/store-a/commitlog
storePathConsumeQueue=/app/rocketmq/store-a/consumequeue
storePathIndex=/app/rocketmq/store-a/index
storeCheckpoint=/app/rocketmq/store-a/checkpoint
abortFile=/app/rocketmq/store-a/abort
maxMessageSize=65536
aclEnable=true
listenPort=10911
|
broker-a-s:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| brokerClusterName=DefaultCluster brokerName=broker-a brokerId=1 deleteWhen=04 fileReservedTime=48 brokerRole=SLAVE flushDiskType=ASYNC_FLUSH namesrvAddr=192.168.66.70:8876
diskMaxUsedSpaceRatio=88
storePathRootDir=/app/rocketmq/store-a-s
storePathCommitLog=/app/rocketmq/store-a-s/commitlog
storePathConsumeQueue=/app/rocketmq/store-a-s/consumequeue
storePathIndex=/app/rocketmq/store-a-s/index
storeCheckpoint=/app/rocketmq/store-a-s/checkpoint
abortFile=/app/rocketmq/store-a-s/abort
maxMessageSize=65536
aclEnable=true listenPort=10921
|
broker-b:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| brokerClusterName=DefaultCluster brokerName=broker-b brokerId=0 deleteWhen=04 fileReservedTime=48 brokerRole=ASYNC_MASTER flushDiskType=ASYNC_FLUSH namesrvAddr=192.168.66.70:8876
diskMaxUsedSpaceRatio=88
storePathRootDir=/app/rocketmq/store-b
storePathCommitLog=/app/rocketmq/store-b/commitlog
storePathConsumeQueue=/app/rocketmq/store-b/consumequeue
storePathIndex=/app/rocketmq/store-b/index
storeCheckpoint=/app/rocketmq/store-b/checkpoint
abortFile=/app/rocketmq/store-b/abort
maxMessageSize=65536
aclEnable=true listenPort=10931
|
broker-b-s:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| brokerClusterName=DefaultCluster brokerName=broker-b brokerId=1 deleteWhen=04 fileReservedTime=48 brokerRole=SLAVE flushDiskType=ASYNC_FLUSH namesrvAddr=192.168.66.70:8876
diskMaxUsedSpaceRatio=88
storePathRootDir=/app/rocketmq/store-b-s
storePathCommitLog=/app/rocketmq/store-b-s/commitlog
storePathConsumeQueue=/app/rocketmq/store-b-s/consumequeue
storePathIndex=/app/rocketmq/store-b-s/index
storeCheckpoint=/app/rocketmq/store-b-s/checkpoint
abortFile=/app/rocketmq/store-b-s/abort
maxMessageSize=65536
aclEnable=true listenPort=10941
|
三、启动服务
启动nameserver
1 2 3
| nohup sh bin/mqnamesrv -c conf/namesrv.properties & nohup sh bin/mqnamesrv -c conf/namesrv-b.properties & nohup sh bin/mqnamesrv -c conf/namesrv-c.properties &
|
启动broker
1 2 3 4
| nohup sh bin/mqbroker -c conf/2m-2s-async/broker-a.properties & nohup sh bin/mqbroker -c conf/2m-2s-async/broker-a-s.properties & nohup sh bin/mqbroker -c conf/2m-2s-async/broker-b.properties & nohup sh bin/mqbroker -c conf/2m-2s-async/broker-b-s.properties &
|
四、测试
发送端:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| public class SyncProducer { public static final String NAMESRVADDR="192.168.66.70:8876;192.168.66.70:8878;192.168.66.70:8879"; private static final String CONSUMER_GROUP1="please_rename_unique_group_name"; private static final String TopicTest="TopicTest"; public static void main(String[] args) throws Exception { DefaultMQProducer producer = new DefaultMQProducer(CONSUMER_GROUP1); producer.setNamesrvAddr(NAMESRVADDR); producer.start(); for (int i = 0; i < 100; i++) { Message msg = new Message(TopicTest, "TagA" , ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) ); SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); } producer.shutdown(); } }
|
消费端:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| public class Consumer { public static final String NAMESRVADDR="192.168.66.70:8876;192.168.66.70:8878;192.168.66.70:8879"; private static final String CONSUMER_GROUP1="please_rename_unique_group_name"; private static final String TopicTest="TopicTest"; public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP1);
consumer.setNamesrvAddr(NAMESRVADDR);
consumer.subscribe(TopicTest, "*"); consumer.registerMessageListener(new MessageListenerConcurrently() { public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.printf("Consumer Started.%n"); } }
|
更多源码:
https://github.com/leellun/rocketmq-learn.git