动态启动并关闭kafkalistener,以便在会话开始时加载以前的消息

neekobn8  于 2021-06-05  发布在  Kafka
关注(0)|答案(1)|浏览(353)

我有一个Kafka利斯泰纳从一开始就阅读信息的工作代码 (offset=0) 一个主题(总是在运行)。对于我的用例(消息传递),我需要两件事:
总是捕获特定主题/分区的新消息(此使用者总是在运行)并发送到前端websocket+stomp(我已经有这个部分了)
启动新使用者以获取特定主题/分区从开始到当前的消息,仅当前端发出信号时,然后停止,以便前端websocket+stomp(在会话开始时)可以获取这些数据(为已故用户或以后的用户加载以前的消息)
如果我能动态地(在从前端得到信号之后)添加/删除带有参数的kafkalistener(post方法的数据),它将同时为这两种方法服务
实际上,我该如何实现这一点?我是否应该考虑使用post方法通知后端,我需要立即加载此主题/分区的以前的消息,并将这些消息发送到此“.”url?但是,我怎样才能动态地启动和关闭消费者(kafkalistener),而不必一直运行并在那里传递参数呢?

idv4meu8

idv4meu81#

下面是一个快速的spring引导应用程序,展示了如何动态创建容器。

@SpringBootApplication
public class So61950229Application {

    public static void main(String[] args) {
        SpringApplication.run(So61950229Application.class, args);
    }

    @Bean
    public ApplicationRunner runner(DynamicListener listener, KafkaTemplate<String, String> template) {
        return args -> {
            IntStream.range(0, 10).forEach(i -> template.send("so61950229", "foo" + i));
            System.out.println("Hit enter to start a listener");
            System.in.read();
            listener.newContainer("so61950229", 0);
            System.out.println("Hit enter to start another listener");
            System.in.read();
            listener.newContainer("so61950229", 0);
        };
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("so61950229").partitions(1).replicas(1).build();
    }

}

@Component
class DynamicListener {

    private static final Logger LOG = LoggerFactory.getLogger(DynamicListener.class);

    private final ConcurrentKafkaListenerContainerFactory<String, String> factory;

    private final ConcurrentMap<String, AbstractMessageListenerContainer<String, String>> containers
            = new ConcurrentHashMap<>();

    DynamicListener(ConcurrentKafkaListenerContainerFactory<String, String> factory) {
        this.factory = factory;
    }

    void newContainer(String topic, int partition) {
        ConcurrentMessageListenerContainer<String, String> container =
                this.factory.createContainer(new TopicPartitionOffset(topic, partition));
        String groupId = UUID.randomUUID().toString();
        container.getContainerProperties().setGroupId(groupId);
        container.setupMessageListener((MessageListener) record -> {
            System.out.println(record);
        });
        this.containers.put(groupId, container);
        container.start();
    }

    @EventListener
    public void idle(ListenerContainerIdleEvent event) {
        AbstractMessageListenerContainer<String, String> container = this.containers.remove(
                event.getContainer(ConcurrentMessageListenerContainer.class).getContainerProperties().getGroupId());
        if (container != null) {
            LOG.info("Stopping idle container");
            container.stop(() -> LOG.info("Stopped"));
        }
    }

}
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.listener.idle-event-interval=5000

相关问题