Spring Boot Kafka JsonSerializer和JsonDeserializer示例

x33g5p2x  于2022-10-12 转载在 Spring  
字(7.1k)|赞(0)|评价(0)|浏览(590)

在前面的教程中,我们已经了解了如何在Spring引导项目中在Kafka Producer和Kafka Consumer之间交换String格式消息。
在本教程中,我们将学习如何使用Spring Kafka库提供的JsonSerializerJsonDeserializer类来存储和检索Apache Kafka主题中的JSON并返回Java模型对象。
如果您是Apache Kafka的新手,那么您应该看看我的文章——ApacheKafka核心概念。
基本上,您将学习如何以JSON字节[]的形式向Apache Kafka发送和接收Java对象。
Apache Kafka存储和传输字节[]。有许多内置的序列化器和反序列化器,但不包括任何针对JSON的序列化器。Spring Kafka创建了一个JsonSerializerJsonDeserializer,我们可以使用它们将Java对象转换为JSON或从JSON转换为Java对象。
我们将使用JsonSerializer将Java对象作为JSON字节[]发送到Kafka主题。之后,我们将配置如何接收JSON字节[],并使用JsonDeserializer将其自动转换为Java对象。

1.安装和设置Apache Kafka

1.从官方网站https://kafka.apache.org/downloads下载Kafka
2.在本地文件系统中解压缩Kafka zip
运行以下命令,以正确的顺序启动所有服务:
3.启动Zookeeper服务
使用以下命令启动Zookeeper服务:

# Start the ZooKeeper service
# Note: Soon, ZooKeeper will no longer be required by Apache Kafka.
$ bin/zookeeper-server-start.sh config/zookeeper.properties

4.启动Kafka Broker
打开另一个终端会话并运行以下命令以启动Kafka代理:

# Start the Kafka broker service
$ bin/kafka-server-start.sh config/server.properties

一旦所有服务成功启动,您将有一个基本的Kafka环境正在运行并准备好使用。

2.在IntelliJ中创建和设置Spring Boot项目

使用https://start.spring.io/创建Spring引导项目
添加依赖项:

<dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-web</artifactId>
</dependency>
   
 <dependency>
      <groupId>org.springframework.kafka</groupId>
      <artifactId>spring-kafka</artifactId>
</dependency>

Import in IntelliJ并运行spring boot应用程序

3.在应用程序中配置Kafka Producer和Consumer。属性文件

在应用程序中。属性文件,添加Kafka代理地址以及与消费者和生产者相关的配置。
打开application.properties文件,让我们配置Kafka Producer和Consumer以交换JSON消息:

spring.kafka.consumer.bootstrap-servers: localhost:9092
spring.kafka.consumer.group-id: group-id
spring.kafka.consumer.auto-offset-reset: earliest
spring.kafka.consumer.key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=*

spring.kafka.producer.bootstrap-servers: localhost:9092
spring.kafka.producer.key-serializer: org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer: org.springframework.kafka.support.serializer.JsonSerializer

我们正在使用以下Consumer属性将JSON转换为Java对象:

spring.kafka.consumer.value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer

我们正在使用以下Producer属性将Java对象转换为JSON:

spring.kafka.producer.value-serializer: org.springframework.kafka.support.serializer.JsonSerializer

让我们了解一下上述属性的含义。
spring.kafka.consumer.bootstrap-servers—用于建立到Kafka集群的初始连接的主机:端口对的逗号分隔列表。覆盖消费者的全局属性。
spring.kafka.consumer.group-id-标识此消费者所属的消费者组的唯一字符串。
spring.kafka.consumer.auto-offset-reset-当Kafka中没有初始偏移量或服务器上不再存在当前偏移量时,该怎么办。
spring.kafka.consumer.key-deserializer-密钥的反序列化程序类。
spring.kafka.consumer.value-deserializer-值的反序列化程序类。
spring.kafka.producer.key-serializer-密钥的序列化程序类。
spring.kafka.producer.value-serializer-值的序列化程序类

4.创建Kafka主题

要在启动时创建主题,请添加NewTopic类型的bean。如果主题已经存在,则忽略该bean。在这个例子中,我们将使用主题名“javaguides”。
让我们创建一个KafkaTopicConfig文件并添加以下内容:

package net.javaguides.springbootkafka;

import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.TopicBuilder;

@Configuration
public class KafkaTopicConfig {

    @Bean
    public NewTopic javaguidesTopic(){
        return TopicBuilder.name("javaguides")
                .build();
    }
}

5.创建简单POJO以序列化/反序列化

让我们创建一个User类来向Kafka主题发送和接收User对象。
那么,User实例将被JsonSerializer序列化为一个字节数组。Kafka最终将这个字节数组存储到特定主题的给定分区中。
在反序列化期间,JsonDeserializer用于从Kafka接收JSON作为字节数组,将其转换为JSON字节数组到User对象,并将其返回给应用程序。

package net.javaguides.springbootkafka.payload;

public class User {
    private int id;
    private String firstName;
    private String lastName;

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public String getFirstName() {
        return firstName;
    }

    public void setFirstName(String firstName) {
        this.firstName = firstName;
    }

    public String getLastName() {
        return lastName;
    }

    public void setLastName(String lastName) {
        this.lastName = lastName;
    }

    @Override
    public String toString() {
        return "User{" +
                "id=" + id +
                ", firstName='" + firstName + '\'' +
                ", lastName='" + lastName + '\'' +
                '}';
    }
}

6.创建Kafka Producer以生成JSON消息

让我们创建Kafka Producer以使用Spring Kafka生成JSON消息。

Kafka模板

好吧,Spring引导为Spring的KafkaTemplate提供了一个自动配置,所以您可以直接在自己的bean中自动连接它。

package net.javaguides.springbootkafka.kafka;

import net.javaguides.springbootkafka.payload.User;
import net.javaguides.springbootkafka.utils.AppConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;

@Service
public class KafkaProducer {

    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProducer.class);

    @Autowired
    private KafkaTemplate<String, User> kafkaTemplate;

    public void sendMessage(User data){
        LOGGER.info(String.format("Message sent -> %s", data.toString()));

        Message<User> message = MessageBuilder
                .withPayload(data)
                .setHeader(KafkaHeaders.TOPIC, AppConstants.TOPIC_NAME)
                .build();

        kafkaTemplate.send(message);
    }
}

让我们从向Kafka Topic发送User对象开始。

**注意:**我们创建了一个KafkaTemplate<String, User>,因为我们将Java对象发送到Kafka主题,该主题将自动转换为JSON字节[]。

在本例中,我们使用1d14d1e创建了ae1d13d。添加我们要发送消息的主题也很重要。

7.创建REST API以发送JSON对象

让我们创建一个简单的POST REST API,将用户信息作为JSON对象发送。
我们将创建一个POST REST API来将完整的User对象作为JSON发布,而不是发送消息字符串,这样Kafka生产者就可以将User对象写入Kafka主题。

package net.javaguides.springbootkafka.controller;

import net.javaguides.springbootkafka.kafka.KafkaProducer;
import net.javaguides.springbootkafka.payload.User;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;

@RestController
@RequestMapping("/api/v1/kafka")
public class KafkaProducerController {

    private KafkaProducer kafkaProducer;

    public KafkaProducerController(KafkaProducer kafkaProducer) {
        this.kafkaProducer = kafkaProducer;
    }

    @PostMapping("/publish")
    public ResponseEntity<String> publish(@RequestBody User user){
        kafkaProducer.sendMessage(user);
        return ResponseEntity.ok("Message sent to kafka topic");
    }
}

8.创建Kafka消费者以消费JSON消息

让我们创建一个Kafka消费者来接收来自主题的JSON消息。在KafkaConsumer中,我们只需要在方法中添加User Java Object作为参数。

package net.javaguides.springbootkafka.kafka;

import net.javaguides.springbootkafka.payload.User;
import net.javaguides.springbootkafka.utils.AppConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Service
public class KafKaConsumer {

    private static final Logger LOGGER = LoggerFactory.getLogger(KafKaConsumer.class);

    @KafkaListener(topics = AppConstants.TOPIC_NAME,
                    groupId = AppConstants.GROUP_ID)
    public void consume(User data){
        LOGGER.info(String.format("Message received -> %s", data.toString()));
    }
}

9.演示

让我们运行Spring引导应用程序并进行演示。确保Zookeeper和Kafka服务已经启动并运行。
让我们使用Postman客户端进行POST REST API调用:

观察控制台日志:

结论

在本教程中,我们学习了如何使用Spring Kafka库提供的JsonSerializerJsonDeserializer类来存储和检索Apache Kafka主题中的JSON以及返回Java模型对象。

GitHub存储库

https://github.com/RameshMF/springboot-kafka-course/tree/main/springboot-kafka-tutorial

相关文章