Spring Cloud Alibaba 04_使用 RocketMQ 实现消息的生产和消费

x33g5p2x  于2021-12-18 转载在 其他  
字(7.4k)|赞(0)|评价(0)|浏览(301)

Spring Cloud Alibaba 04_使用 RocketMQ 实现消息的生产和消费

RocketMQ 的下载安装和配置

官网下载地址:http://rocketmq.apache.org/dowloading/releases/

/usr/local 下创建 rocket 文件夹,将 rocketmq-all-4.7.1-bin-release.zip 传入此文件夹并解压:

unzip rocketmq-all-4.7.1-bin-release.zip

如果提示 unzip 命令未安装,可用使用如下命令安装:

yum install -y unzip zip

解压完成后,进入 /usr/local/rocket/rocketmq-all-4.7.1-bin-release/bin 文件目录,启动 NameServer:

不要写localhost,要写公网ip!!!

nohup ./mqnamesrv -n 118.31.106.51:9876 &

检查是否启动成功:

netstat -an | grep 9876
或者
jps

启动 Broker 之前需要先修改 runserver.shrunbroker.shtools.sh 中的 JVM 内存设置:

vim runserver.sh

JAVA_OPT="${JAVA_OPT} -server -Xms128m -Xmx128m -Xmn128m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=128m"

-----------------------------------------------------------------------------------

vim runbroker.sh

JAVA_OPT="${JAVA_OPT} -server -Xms128m -Xmx256m -Xmn256m"

-----------------------------------------------------------------------------------
vim tools.sh

JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn256m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=128m"

启动 Broker (要写公网ip,不要写localhost,参数一个都不能少!)

nohup sh ./mqbroker -n 118.31.106.51:9876 -c ../conf/broker.conf autoCreateTopicEnable=true &

查看日志

tail -f ~/logs/rocketmqlogs/broker.log

# 出现以下日志就说明启动成功
2020-10-26 11:07:01 INFO main - The broker[izbp10tup89om84qulgxbsz, 172.20.171.240:10911] boot success. serializeType=JSON and name server is localhost:9876

在 bin 目录下测试使用 RocketMQ 进行消息发送和接收:

# 指定NameServer的ip和端口
export NAMESRV_ADDR=localhost:9876

# 发送消息
./tools.sh org.apache.rocketmq.example.quickstart.Producer

# 接收消息
./tools.sh org.apache.rocketmq.example.quickstart.Consumer

安装RocketMQ控制台

下载地址:https://github.com/apache/rocketmq-externals

首先修改 RocketMQ conf 目录下的 broker.conf 文件,添加以下两个配置(设置允许远程跨域连接):

brokerIP1=118.31.106.51
namesrvAddr=118.31.106.51:9876

修改下载好的 rocketmq-console 中的 application.properties 配置文件

server.port=9877
rocketmq.config.namesrvAddr=118.31.106.51:9876

rocketmq-console 的根目录使用如下命令打包:

mvn clean package -Dmaven.test.skip=true

运行 target 目录里生成的 jar 包

java -jar rocketmq-console-ng-1.0.0.jar

然后访问 http://localhost:9877 进入控制台

java 测试消息的生产和消费

添加 rocketmq 依赖:

<dependency>
	<groupId>org.apache.rocketmq</groupId>
	<artifactId>rocketmq-spring-boot-starter</artifactId>
	<version>2.1.0</version>
</dependency>

测试生产消息:

public class Test {
    public static void main(String[] args) throws Exception {
        //创建消息生产者
        DefaultMQProducer producer = new DefaultMQProducer("blu-producer-group");
        //设置NameServer
        producer.setNamesrvAddr("118.31.106.51:9876");
        //启动生产者
        producer.start();
        //构建消息对象
        Message message = new Message("bluTopic", "bluTag", ("Test MQ").getBytes());
        //发送消息
        SendResult result = producer.send(message, 5000);
        System.out.println(result);
        //关闭生产者
        producer.shutdown();
    }
}

运行结果:

SendResult [sendStatus=SEND_OK, msgId=00000000000000000000000000000001000018B4AAC2841C69D40000, offsetMsgId=761F6A3300002A9F00000000000C6301, messageQueue=MessageQueue [topic=bluTopic, brokerName=broker-a, queueId=3], queueOffset=0]
15:40:55.690 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[118.31.106.51:9876] result: true
15:40:55.702 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[118.31.106.51:10911] result: true

测试消费消息:

public class ConsumerTest {

    public static void main(String[] args) throws MQClientException {
        //创建消息消费者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("blu-consumer-group");
        //设置NameServer
        consumer.setNamesrvAddr("118.31.106.51:9876");
        //指定订阅的主题和标签
        consumer.subscribe("bluTopic","*");
        //回调函数
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                System.out.println("message ==>"+ list);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
    }

}

运行后将一直处于监听状态,当消息一被生产就会被其消费:

message ==>[MessageExt [queueId=3, storeSize=217, queueOffset=0, sysFlag=0, bornTimestamp=1603698054612, bornHost=/122.96.140.226:34668, storeTimestamp=1603698054317, storeHost=/118.31.106.51:10911, msgId=761F6A3300002A9F00000000000C6301, commitLogOffset=811777, bodyCRC=689321773, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='bluTopic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1, CONSUME_START_TIME=1603871119123, UNIQ_KEY=00000000000000000000000000000001000018B4AAC2841C69D40000, CLUSTER=DefaultCluster, WAIT=true, TAGS=bluTag}, body=[84, 101, 115, 116, 32, 77, 81], transactionId='null'}]]
message ==>[MessageExt [queueId=2, storeSize=217, queueOffset=0, sysFlag=0, bornTimestamp=1603697450377, bornHost=/112.86.129.72:18631, storeTimestamp=1603697450096, storeHost=/118.31.106.51:10911, msgId=761F6A3300002A9F00000000000C6228, commitLogOffset=811560, bodyCRC=689321773, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='bluTopic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1, CONSUME_START_TIME=1603871119123, UNIQ_KEY=00000000000000000000000000000001000018B4AAC2841331850000, CLUSTER=DefaultCluster, WAIT=true, TAGS=bluTag}, body=[84, 101, 115, 116, 32, 77, 81], transactionId='null'}]]

SpringBoot 整合 RocketMQ

服务提供者 Provider:

  • Provider 中添加 rocketmq 相关依赖:
<dependency>
	<groupId>org.apache.rocketmq</groupId>
	<artifactId>rocketmq-spring-boot-starter</artifactId>
	<version>2.1.0</version>
</dependency>
<dependency>
	<groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.7.0</version>
</dependency>
  • application.yml 中添加如下配置:
# RocketMQ 配置
rocketmq:
  name-server: 118.31.106.51:9876
  producer:
    group: blu-provider
  • Order 实体类:
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Order {
    private Integer id;
    private String buyerName;
    private String buyerTel;
    private String address;
    private Date createDate;
}
  • ProviderController 添加如下方法,用于生产消息:
@Autowired
private RocketMQTemplate rocketMQTemplate;

@GetMapping("create")
public Order create(){
	Order order = new Order(1,"BLU","15651776666","江苏南京",new Date());
	this.rocketMQTemplate.convertAndSend("blu-order-topic",order);
	return order;
}

服务消费者 Consumer:

  • Consumer 中添加 rocketmq 相关依赖:
<dependency>
	<groupId>org.apache.rocketmq</groupId>
	<artifactId>rocketmq-spring-boot-starter</artifactId>
	<version>2.1.0</version>
</dependency>
<dependency>
	<groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.7.0</version>
</dependency>
  • application.yml 中添加如下配置:
rocketmq:
  name-server: 118.31.106.51:9876
  • Order 实体类:
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Order {
    private Integer id;
    private String buyerName;
    private String buyerTel;
    private String address;
    private Date createDate;
}
  • MessageService,用于接收消息:
package com.blu.service;

import com.blu.entity.Order;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;

@Service
@RocketMQMessageListener(consumerGroup = "blu-consumer",topic = "blu-order-topic")
public class MessageService implements RocketMQListener<Order> {

    @Override
    public void onMessage(Order order) {
        System.out.println("新的订单:"+order);
    }
}

启动,每当请求 Provider 的 /create 接口生产消息时,Consumer就会同步接收到消息:

新的订单:Order(id=1, buyerName=BLU, buyerTel=15651776666, address=江苏南京, createDate=Wed Oct 28 17:08:13 CST 2020)
新的订单:Order(id=1, buyerName=BLU, buyerTel=15651776666, address=江苏南京, createDate=Wed Oct 28 17:09:44 CST 2020)

相关文章