背景
一些机器生成事件。这些事件被发送到我们的kafka集群,在那里每台机器都有自己的主题(app.machine events.machine name)。因为顺序在每台机器上都很重要,分区大小目前不是问题,所以所有主题都由单个分区组成。因此,n个主题目前也意味着n个分区。
消费/处理应用程序使用了kafka流,我们已经给出了 StreamsConfig.APPLICATION_ID_CONFIG
/ "application.id"
“machine event processor”,对于每个示例都保持不变,这意味着它们被放入kafka的同一消费者组。此消费者已订阅该模式 app.machine-events.*
,至于处理器,它处理哪台机器的事件并不重要。这一点可以通过 ./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group machine-event-processor --members --verbose
显示与正在运行的所有处理服务的IP数匹配的列表(&I)。
预期
给定20台机器和5个处理器示例,我们希望每个处理器能处理~4个分区(因此也就是~4个主题)。
事实上
有一个处理器处理20个分区(因此有20个主题),另外4个处理器什么都不做/空闲。杀死“幸运”处理器后,所有20个分区都被重新平衡到另一个处理器上,导致新处理器处理20个分区/主题,3个处理器空闲。
我已经试过了
查看partition.grouper。我不觉得我完全理解它,但就我所能找到的,反正只有defaultpartitioner选项,编写一个自定义的应该没有必要,因为(根据文档)这个设置应该可以工作。它确实提到了分区根据分区键加入到任务中(对于我们来说都是0,因为每个主题只有一个分区),但是我不能完全理解这部分。
消费者使用的圆机器人: settings.put(StreamsConfig.consumerPrefix(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG), new RoundRobinAssignor().getClass.getName)
(尝试了几个值,因为似乎没有任何变化。)
查看其他配置属性,看看我是否遗漏了什么:我想没有。
代码,简化
val streamConfig = new Properties
// {producer.metadata.max.age.ms=5000, consumer.metadata.max.age.ms=5000, default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde, consumer.partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor, bootstrap.servers=kafka:9092, application.id=machine-event-processor, default.value.serde=org.apache.kafka.common.serialization.Serdes$ByteArraySerde}
val builder: StreamsBuilder = new StreamsBuilder
val topicStream: KStream[String, Array[Byte]] = builder.stream(Pattern.compile("app.machine-events.*"))
topicStream.process(new MessageProcessorSupplier(context)) // The event is delegated to a processor, doing the actual processing logic
val eventStreams = new KafkaStreams(builder.build(), streamConfig)
eventStreams.start()
笔记
正在使用kafka streams 2.0.0: <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-streams</artifactId> <version>2.0.0</version> </dependency>
Kafka正在一个容器中运行,使用 wurstmeister/kafka:2.11-2.0.0
版本。docker-stack.yml服务: kafka: image: wurstmeister/kafka:2.11-2.0.0 ports: - target: 9094 published: 9094 protocol: tcp mode: host volumes: - /var/run/docker.sock:/var/run/docker.sock healthcheck: test: ["CMD-SHELL", "$$(netstat -ltn | grep -q 9092)"] interval: 15s timeout: 10s retries: 5 environment: HOSTNAME_COMMAND: "docker info | grep ^Name: | cut -d' ' -f 2" KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_ZOOKEEPER_CONNECTION_TIMEOUT_MS: 36000 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT KAFKA_ADVERTISED_LISTENERS: INSIDE://:9092,OUTSIDE://_{HOSTNAME_COMMAND}:9094 KAFKA_LISTENERS: INSIDE://:9092,OUTSIDE://:9094 KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE KAFKA_DEFAULT_REPLICATION_FACTOR: 2 deploy: replicas: 2 restart_policy: condition: on-failure delay: 5s max_attempts: 3 window: 120s
Kafka是在一个双节点设置,形成一个集群。通过docker环境变量,我们将复制因子设置为 2
,因此每个分区在每个节点上都应该有一个复制。
我发现并检查的相关主题/问题/讨论
基普-49
https://faust.readthedocs.io/en/latest/developerguide/partition_assignor.html
查看了Kafka的邮件档案,但没有发现任何东西
checkout 流示例应用程序
全方位的寻找别人遇到这个问题,却没有给我需要的答案。也发现了Kafka-7144,但这应该不是我们的问题,因为我们正在运行2.0.0
如果有人遇到过类似的问题,或者能够指出我可能非常愚蠢的错误,请开导我!
1条答案
按热度按时间mcdcgff01#
对于将来遇到同样问题的读者,解决方案是不使用n个主题(每个主题有1个分区),而是使用1个主题(每个主题有n个分区)。即使有120个分区和400多个机器/事件源,多个事件类型也会被放在同一个分区中,但这不会影响事件的顺序。
实现是将record键设置为机器名,并让底层逻辑负责哪个值到哪个分区。因为我们现在有一个用户组,其中有x个用户订阅了这个主题,所以分区被均匀地分配给用户,每个用户使用120/x分区。
这正如马蒂亚斯所说,这一点在2018年比利时devoxx大会上得到了confluent其他乐于助人人士的进一步证实。谢谢您!
提示
使用wurstmeister/kafka docker图像时,请考虑使用环境属性:
Kafka创建主题:“app.machine”-events:120:2"
意义
主题-name:number-of-partitions:复制因子