【RocketMq实战第二篇】-SpringBoot集成RocketMq

x33g5p2x  于2021-12-19 转载在 其他  
字(19.1k)|赞(0)|评价(0)|浏览(229)

前言

本文笔者是一波三折啊,很多人像我一样第一次在springboot里搞rocketmq的,遇到各种麻烦,我也是一样,就比如:

踩坑:

  • nameserver启动成功了,broker启动失败,然后broker日志文件没有生成,错误原因也找不到。
  • 在java里发送消息时发送失败,幸好有报错信息[Send [3] times, still failed],就是重发三次也没发出去的意思。网上资料说是broker启动方式不对,然而并没有解决
  • ……

本文是简单的使用发送消息和消费消息,让大家了解一下流程。

正文

在此之前我们要启动一个nameserver和一个broker

上文我已经介绍了,链接如下:启动消息服务

第一步

创建一个maven项目,pom文件如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>rocketmq_demo</groupId>
    <artifactId>rocketmq_demo</artifactId>
    <version>1.0.0</version>
    <packaging>jar</packaging>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.0.RELEASE</version>
    </parent>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-tools</artifactId>
            <version>4.2.0</version>
        </dependency>
    </dependencies>
</project>

第二步

创建启动类,RocketMQApp,这个启动类当作生产者

package com;

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
 * @author yanlin
 * @version v1.3
 * @date 2019-01-24 3:43 PM
 * @since v8.0
 **/
@SpringBootApplication
public class RocketMQApp {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        SpringApplication.run(RocketMQApp.class, args);
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.start();
        for (int i = 0; i < 20; i++) {
            try {
                Message msg = new Message("TopicTest" /* Topic */,
                        "TagA" /* Tag */,
                        ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
                );
                SendResult sendResult = producer.send(msg);
                System.out.printf("%s%n", sendResult);
            } catch (Exception e) {
                e.printStackTrace();
                Thread.sleep(1000);
            }
        }
        producer.shutdown();
    }
}

第三步

创建消费者 RocketMQConsumer,代码如下

package com.rocketmq;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

/**
 * @author yanlin
 * @version v1.3
 * @date 2019-01-24 3:24 PM
 * @since v8.0
 **/
public class RocketMQConsumer {
    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.subscribe("TopicTest", "*");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                            ConsumeConcurrentlyContext context) {
                System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n");
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

最后一步

右键启动启动类-RocketMQApp,观察控制台:

主要发送消息的日志如下

SendResult [sendStatus=SEND_OK, msgId=0A08406F070318B4AAC27A2D94180000, offsetMsgId=0A08406F00002A9F0000000000058B56, messageQueue=MessageQueue [topic=TopicTest, brokerName=yanlin.local, queueId=0], queueOffset=505]
SendResult [sendStatus=SEND_OK, msgId=0A08406F070318B4AAC27A2D94270001, offsetMsgId=0A08406F00002A9F0000000000058C08, messageQueue=MessageQueue [topic=TopicTest, brokerName=yanlin.local, queueId=1], queueOffset=505]
SendResult [sendStatus=SEND_OK, msgId=0A08406F070318B4AAC27A2D942A0002, offsetMsgId=0A08406F00002A9F0000000000058CBA, messageQueue=MessageQueue [topic=TopicTest, brokerName=yanlin.local, queueId=2], queueOffset=505]
SendResult [sendStatus=SEND_OK, msgId=0A08406F070318B4AAC27A2D942D0003, offsetMsgId=0A08406F00002A9F0000000000058D6C, messageQueue=MessageQueue [topic=TopicTest, brokerName=yanlin.local, queueId=3], queueOffset=505]
SendResult [sendStatus=SEND_OK, msgId=0A08406F070318B4AAC27A2D942F0004, offsetMsgId=0A08406F00002A9F0000000000058E1E, messageQueue=MessageQueue [topic=TopicTest, brokerName=yanlin.local, queueId=0], queueOffset=506]
SendResult [sendStatus=SEND_OK, msgId=0A08406F070318B4AAC27A2D94310005, offsetMsgId=0A08406F00002A9F0000000000058ED0, messageQueue=MessageQueue [topic=TopicTest, brokerName=yanlin.local, queueId=1], queueOffset=506]
SendResult [sendStatus=SEND_OK, msgId=0A08406F070318B4AAC27A2D943C0006, offsetMsgId=0A08406F00002A9F0000000000058F82, messageQueue=MessageQueue [topic=TopicTest, brokerName=yanlin.local, queueId=2], queueOffset=506]
SendResult [sendStatus=SEND_OK, msgId=0A08406F070318B4AAC27A2D943E0007, offsetMsgId=0A08406F00002A9F0000000000059034, messageQueue=MessageQueue [topic=TopicTest, brokerName=yanlin.local, queueId=3], queueOffset=506]
SendResult [sendStatus=SEND_OK, msgId=0A08406F070318B4AAC27A2D94410008, offsetMsgId=0A08406F00002A9F00000000000590E6, messageQueue=MessageQueue [topic=TopicTest, brokerName=yanlin.local, queueId=0], queueOffset=507]
SendResult [sendStatus=SEND_OK, msgId=0A08406F070318B4AAC27A2D94440009, offsetMsgId=0A08406F00002A9F0000000000059198, messageQueue=MessageQueue [topic=TopicTest, brokerName=yanlin.local, queueId=1], queueOffset=507]
SendResult [sendStatus=SEND_OK, msgId=0A08406F070318B4AAC27A2D944C000A, offsetMsgId=0A08406F00002A9F000000000005924A, messageQueue=MessageQueue [topic=TopicTest, brokerName=yanlin.local, queueId=2], queueOffset=507]
SendResult [sendStatus=SEND_OK, msgId=0A08406F070318B4AAC27A2D944E000B, offsetMsgId=0A08406F00002A9F00000000000592FD, messageQueue=MessageQueue [topic=TopicTest, brokerName=yanlin.local, queueId=3], queueOffset=507]
SendResult [sendStatus=SEND_OK, msgId=0A08406F070318B4AAC27A2D9452000C, offsetMsgId=0A08406F00002A9F00000000000593B0, messageQueue=MessageQueue [topic=TopicTest, brokerName=yanlin.local, queueId=0], queueOffset=508]
SendResult [sendStatus=SEND_OK, msgId=0A08406F070318B4AAC27A2D9455000D, offsetMsgId=0A08406F00002A9F0000000000059463, messageQueue=MessageQueue [topic=TopicTest, brokerName=yanlin.local, queueId=1], queueOffset=508]
SendResult [sendStatus=SEND_OK, msgId=0A08406F070318B4AAC27A2D945A000E, offsetMsgId=0A08406F00002A9F0000000000059516, messageQueue=MessageQueue [topic=TopicTest, brokerName=yanlin.local, queueId=2], queueOffset=508]
SendResult [sendStatus=SEND_OK, msgId=0A08406F070318B4AAC27A2D945E000F, offsetMsgId=0A08406F00002A9F00000000000595C9, messageQueue=MessageQueue [topic=TopicTest, brokerName=yanlin.local, queueId=3], queueOffset=508]
SendResult [sendStatus=SEND_OK, msgId=0A08406F070318B4AAC27A2D94600010, offsetMsgId=0A08406F00002A9F000000000005967C, messageQueue=MessageQueue [topic=TopicTest, brokerName=yanlin.local, queueId=0], queueOffset=509]
SendResult [sendStatus=SEND_OK, msgId=0A08406F070318B4AAC27A2D94650011, offsetMsgId=0A08406F00002A9F000000000005972F, messageQueue=MessageQueue [topic=TopicTest, brokerName=yanlin.local, queueId=1], queueOffset=509]
SendResult [sendStatus=SEND_OK, msgId=0A08406F070318B4AAC27A2D94670012, offsetMsgId=0A08406F00002A9F00000000000597E2, messageQueue=MessageQueue [topic=TopicTest, brokerName=yanlin.local, queueId=2], queueOffset=509]
SendResult [sendStatus=SEND_OK, msgId=0A08406F070318B4AAC27A2D946C0013, offsetMsgId=0A08406F00002A9F0000000000059895, messageQueue=MessageQueue [topic=TopicTest, brokerName=yanlin.local, queueId=3], queueOffset=509]

右键启动消费者-RocketMQConsumer,观察控制台如下

Receive New Messages 后面的就是消息主体

ConsumeMessageThread_1 Receive New Messages: [MessageExt [queueId=0, storeSize=178, queueOffset=505, sysFlag=0, bornTimestamp=1548321807386, bornHost=/10.8.64.111:50266, storeTimestamp=1548321807396, storeHost=/10.8.64.111:10911, msgId=0A08406F00002A9F0000000000058B56, commitLogOffset=363350, bodyCRC=613185359, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=506, CONSUME_START_TIME=1548321807411, UNIQ_KEY=0A08406F070318B4AAC27A2D94180000, WAIT=true, TAGS=TagA}, body=16]]]
ConsumeMessageThread_2 Receive New Messages: [MessageExt [queueId=1, storeSize=178, queueOffset=505, sysFlag=0, bornTimestamp=1548321807400, bornHost=/10.8.64.111:50266, storeTimestamp=1548321807400, storeHost=/10.8.64.111:10911, msgId=0A08406F00002A9F0000000000058C08, commitLogOffset=363528, bodyCRC=1401636825, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=506, CONSUME_START_TIME=1548321807411, UNIQ_KEY=0A08406F070318B4AAC27A2D94270001, WAIT=true, TAGS=TagA}, body=16]]]
ConsumeMessageThread_5 Receive New Messages: [MessageExt [queueId=0, storeSize=178, queueOffset=506, sysFlag=0, bornTimestamp=1548321807407, bornHost=/10.8.64.111:50266, storeTimestamp=1548321807408, storeHost=/10.8.64.111:10911, msgId=0A08406F00002A9F0000000000058E1E, commitLogOffset=364062, bodyCRC=601994070, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=507, CONSUME_START_TIME=1548321807413, UNIQ_KEY=0A08406F070318B4AAC27A2D942F0004, WAIT=true, TAGS=TagA}, body=16]]]
ConsumeMessageThread_4 Receive New Messages: [MessageExt [queueId=3, storeSize=178, queueOffset=505, sysFlag=0, bornTimestamp=1548321807405, bornHost=/10.8.64.111:50266, storeTimestamp=1548321807406, storeHost=/10.8.64.111:10911, msgId=0A08406F00002A9F0000000000058D6C, commitLogOffset=363884, bodyCRC=1032136437, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=506, CONSUME_START_TIME=1548321807412, UNIQ_KEY=0A08406F070318B4AAC27A2D942D0003, WAIT=true, TAGS=TagA}, body=16]]]
ConsumeMessageThread_3 Receive New Messages: [MessageExt [queueId=2, storeSize=178, queueOffset=505, sysFlag=0, bornTimestamp=1548321807402, bornHost=/10.8.64.111:50266, storeTimestamp=1548321807403, storeHost=/10.8.64.111:10911, msgId=0A08406F00002A9F0000000000058CBA, commitLogOffset=363706, bodyCRC=1250039395, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=506, CONSUME_START_TIME=1548321807413, UNIQ_KEY=0A08406F070318B4AAC27A2D942A0002, WAIT=true, TAGS=TagA}, body=16]]]
ConsumeMessageThread_6 Receive New Messages: [MessageExt [queueId=1, storeSize=178, queueOffset=506, sysFlag=0, bornTimestamp=1548321807409, bornHost=/10.8.64.111:50266, storeTimestamp=1548321807415, storeHost=/10.8.64.111:10911, msgId=0A08406F00002A9F0000000000058ED0, commitLogOffset=364240, bodyCRC=1424393152, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=507, CONSUME_START_TIME=1548321807418, UNIQ_KEY=0A08406F070318B4AAC27A2D94310005, WAIT=true, TAGS=TagA}, body=16]]]
ConsumeMessageThread_7 Receive New Messages: [MessageExt [queueId=2, storeSize=178, queueOffset=506, sysFlag=0, bornTimestamp=1548321807420, bornHost=/10.8.64.111:50266, storeTimestamp=1548321807421, storeHost=/10.8.64.111:10911, msgId=0A08406F00002A9F0000000000058F82, commitLogOffset=364418, bodyCRC=1307562618, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=507, CONSUME_START_TIME=1548321807423, UNIQ_KEY=0A08406F070318B4AAC27A2D943C0006, WAIT=true, TAGS=TagA}, body=16]]]
ConsumeMessageThread_8 Receive New Messages: [MessageExt [queueId=3, storeSize=178, queueOffset=506, sysFlag=0, bornTimestamp=1548321807422, bornHost=/10.8.64.111:50266, storeTimestamp=1548321807423, storeHost=/10.8.64.111:10911, msgId=0A08406F00002A9F0000000000059034, commitLogOffset=364596, bodyCRC=988340972, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=507, CONSUME_START_TIME=1548321807426, UNIQ_KEY=0A08406F070318B4AAC27A2D943E0007, WAIT=true, TAGS=TagA}, body=16]]]
ConsumeMessageThread_9 Receive New Messages: [MessageExt [queueId=0, storeSize=178, queueOffset=507, sysFlag=0, bornTimestamp=1548321807425, bornHost=/10.8.64.111:50266, storeTimestamp=1548321807427, storeHost=/10.8.64.111:10911, msgId=0A08406F00002A9F00000000000590E6, commitLogOffset=364774, bodyCRC=710410109, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=508, CONSUME_START_TIME=1548321807430, UNIQ_KEY=0A08406F070318B4AAC27A2D94410008, WAIT=true, TAGS=TagA}, body=16]]]
ConsumeMessageThread_10 Receive New Messages: [MessageExt [queueId=1, storeSize=178, queueOffset=507, sysFlag=0, bornTimestamp=1548321807428, bornHost=/10.8.64.111:50266, storeTimestamp=1548321807430, storeHost=/10.8.64.111:10911, msgId=0A08406F00002A9F0000000000059198, commitLogOffset=364952, bodyCRC=1565577195, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=508, CONSUME_START_TIME=1548321807432, UNIQ_KEY=0A08406F070318B4AAC27A2D94440009, WAIT=true, TAGS=TagA}, body=16]]]
ConsumeMessageThread_11 Receive New Messages: [MessageExt [queueId=2, storeSize=179, queueOffset=507, sysFlag=0, bornTimestamp=1548321807436, bornHost=/10.8.64.111:50266, storeTimestamp=1548321807437, storeHost=/10.8.64.111:10911, msgId=0A08406F00002A9F000000000005924A, commitLogOffset=365130, bodyCRC=193412630, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=508, CONSUME_START_TIME=1548321807439, UNIQ_KEY=0A08406F070318B4AAC27A2D944C000A, WAIT=true, TAGS=TagA}, body=17]]]
ConsumeMessageThread_12 Receive New Messages: [MessageExt [queueId=3, storeSize=179, queueOffset=507, sysFlag=0, bornTimestamp=1548321807438, bornHost=/10.8.64.111:50266, storeTimestamp=1548321807439, storeHost=/10.8.64.111:10911, msgId=0A08406F00002A9F00000000000592FD, commitLogOffset=365309, bodyCRC=2088767104, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=508, CONSUME_START_TIME=1548321807442, UNIQ_KEY=0A08406F070318B4AAC27A2D944E000B, WAIT=true, TAGS=TagA}, body=17]]]
ConsumeMessageThread_13 Receive New Messages: [MessageExt [queueId=0, storeSize=179, queueOffset=508, sysFlag=0, bornTimestamp=1548321807442, bornHost=/10.8.64.111:50266, storeTimestamp=1548321807443, storeHost=/10.8.64.111:10911, msgId=0A08406F00002A9F00000000000593B0, commitLogOffset=365488, bodyCRC=1703501626, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=509, CONSUME_START_TIME=1548321807447, UNIQ_KEY=0A08406F070318B4AAC27A2D9452000C, WAIT=true, TAGS=TagA}, body=17]]]
ConsumeMessageThread_14 Receive New Messages: [MessageExt [queueId=1, storeSize=179, queueOffset=508, sysFlag=0, bornTimestamp=1548321807445, bornHost=/10.8.64.111:50266, storeTimestamp=1548321807446, storeHost=/10.8.64.111:10911, msgId=0A08406F00002A9F0000000000059463, commitLogOffset=365667, bodyCRC=311324588, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=509, CONSUME_START_TIME=1548321807448, UNIQ_KEY=0A08406F070318B4AAC27A2D9455000D, WAIT=true, TAGS=TagA}, body=17]]]
ConsumeMessageThread_15 Receive New Messages: [MessageExt [queueId=2, storeSize=179, queueOffset=508, sysFlag=0, bornTimestamp=1548321807450, bornHost=/10.8.64.111:50266, storeTimestamp=1548321807451, storeHost=/10.8.64.111:10911, msgId=0A08406F00002A9F0000000000059516, commitLogOffset=365846, bodyCRC=216726031, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=509, CONSUME_START_TIME=1548321807453, UNIQ_KEY=0A08406F070318B4AAC27A2D945A000E, WAIT=true, TAGS=TagA}, body=17]]]
ConsumeMessageThread_16 Receive New Messages: [MessageExt [queueId=3, storeSize=179, queueOffset=508, sysFlag=0, bornTimestamp=1548321807454, bornHost=/10.8.64.111:50266, storeTimestamp=1548321807454, storeHost=/10.8.64.111:10911, msgId=0A08406F00002A9F00000000000595C9, commitLogOffset=366025, bodyCRC=2079181465, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=509, CONSUME_START_TIME=1548321807457, UNIQ_KEY=0A08406F070318B4AAC27A2D945E000F, WAIT=true, TAGS=TagA}, body=17]]]
ConsumeMessageThread_17 Receive New Messages: [MessageExt [queueId=0, storeSize=179, queueOffset=509, sysFlag=0, bornTimestamp=1548321807456, bornHost=/10.8.64.111:50266, storeTimestamp=1548321807458, storeHost=/10.8.64.111:10911, msgId=0A08406F00002A9F000000000005967C, commitLogOffset=366204, bodyCRC=1659149091, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=510, CONSUME_START_TIME=1548321807460, UNIQ_KEY=0A08406F070318B4AAC27A2D94600010, WAIT=true, TAGS=TagA}, body=17]]]
ConsumeMessageThread_18 Receive New Messages: [MessageExt [queueId=1, storeSize=179, queueOffset=509, sysFlag=0, bornTimestamp=1548321807461, bornHost=/10.8.64.111:50266, storeTimestamp=1548321807462, storeHost=/10.8.64.111:10911, msgId=0A08406F00002A9F000000000005972F, commitLogOffset=366383, bodyCRC=367242165, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=510, CONSUME_START_TIME=1548321807464, UNIQ_KEY=0A08406F070318B4AAC27A2D94650011, WAIT=true, TAGS=TagA}, body=17]]]
ConsumeMessageThread_19 Receive New Messages: [MessageExt [queueId=2, storeSize=179, queueOffset=509, sysFlag=0, bornTimestamp=1548321807463, bornHost=/10.8.64.111:50266, storeTimestamp=1548321807467, storeHost=/10.8.64.111:10911, msgId=0A08406F00002A9F00000000000597E2, commitLogOffset=366562, bodyCRC=89962020, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=510, CONSUME_START_TIME=1548321807469, UNIQ_KEY=0A08406F070318B4AAC27A2D94670012, WAIT=true, TAGS=TagA}, body=17]]]
ConsumeMessageThread_20 Receive New Messages: [MessageExt [queueId=3, storeSize=179, queueOffset=509, sysFlag=0, bornTimestamp=1548321807468, bornHost=/10.8.64.111:50266, storeTimestamp=1548321807469, storeHost=/10.8.64.111:10911, msgId=0A08406F00002A9F0000000000059895, commitLogOffset=366741, bodyCRC=1918600882, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=510, CONSUME_START_TIME=1548321807471, UNIQ_KEY=0A08406F070318B4AAC27A2D946C0013, WAIT=true, TAGS=TagA}, body=17]]]

我们发送了20条,然后成功消费20条,这就是rocketmq最简单的应用案例了。以后的篇幅我就会讲解更细致的内容,让大家能适用任何场景下,让大家了解各个对象的原理。

相关文章