java—在kafka 0.8.2中是否可以向现有主题添加分区

ru9i0ody  于 2021-06-08  发布在  Kafka
关注(0)|答案(5)|浏览(332)

我有一个kafka集群运行2个分区。我在想办法把分区数增加到3。但是,我不想丢失主题中现有的消息。我试图阻止Kafka,修改 server.properties 文件将分区数增加到3,然后重新启动kafka。然而,这似乎没有改变任何事情。使用Kafka ConsumerOffsetChecker ,我仍然看到它只使用了2个分区。我使用的Kafka版本是0.8.2.2。在版本0.8.1中,曾经有一个名为 kafka-add-partitions.sh ,我想这可能会起作用。但是,我在0.8.2中没有看到任何这样的脚本。有没有办法做到这一点?我确实尝试过创建一个全新的主题,对于这个主题,它似乎使用了3个分区,根据 server.properties 文件。然而,对于现有的主题,它似乎并不在意。

3hvapo4f

3hvapo4f1#

在我看来 zk_host:port/chroot 对于参数 --zookeeper 引发了以下异常:
错误java.lang.illegalargumentexception:主题我的主题名称在zk路径zk上不存在_host:port/chroot.
所以,我尝试了以下方法,效果很好:

bin/kafka-topics.sh --alter --zookeeper zk_host:port --topic my_topic_name --partitions 10
ehxuflar

ehxuflar2#

如果您在windows中使用kafka,请尝试在主题中为alter或add partition编写以下代码 .\bin\windows\kafka-topics.bat --alter --zookeeper localhost:2181 --topic TopicName --partitions 20.\bin\windows\kafka-topics.bat --alter --zookeeper localhost:2181 --topic TopicName --replica-assignment 0:1:2,0:1:2,0:1:2,2:1:0 --partitions 10

sz81bmfz

sz81bmfz3#

看起来您可以改用此脚本:

bin/kafka-topics.sh --zookeeper zk_host:port/chroot --alter --topic my_topic_name 
   --partitions 40

在代码中,看起来他们做了同样的事情:

AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(topic, partitionReplicaList, zkClient, true)
``` `kafka-topics.sh` 执行这段代码以及kafka add partitions脚本使用的addpartitions命令。
但是,在使用键时必须注意重新分区:
请注意,分区的一个用例是对数据进行语义分区,而添加分区并不会更改现有数据的分区,因此如果用户依赖该分区,这可能会干扰他们。如果数据是按 `hash(key) % number_of_partitions` 然后,通过添加分区,这个分区可能会被洗牌,但kafka不会尝试以任何方式自动重新分配数据。
jq6vz3qz

jq6vz3qz4#

我觉得这个问题有点老了,但我还是会回答的。
如果您有一个kafka主题,但希望更改分区或副本的数量,则可以使用流式转换将原始主题中的所有消息自动流式传输到具有所需分区或副本数量的新kafka主题中。

yvt65v4c

yvt65v4c5#

任何人谁想要新的Kafka版本的解决方案。请按照这个方法。
kafka的整个数据保留和传输策略依赖于分区,所以要小心增加分区的影响(kafka的较新版本显示了与此相关的警告)请尝试避免一个代理具有太多前导分区的配置。
有简单的三阶段方法。
步骤1:增加主题中的分区 ./bin/kafka-topics.sh --zookeeper localhost:9092 --alter --topic testKafka_5 --partitions 6 步骤2:为给定主题创建分区json文件
{“version”:1,“partitions”:[{“topic”:“testkafka\u 5”,“partition”:0,“replicas”:[0,1,2]},{“topic”:“testkafka\u 5”,“partition”:1,“replicas”:[2,1,0]},{“topic”:“testkafka\u 5”,“partition”:2,“replicas”:[1,2,0]},{“topic”:“testkafka\u 5”,“partition”:4,“replicas”:[2,1,0]},{“topic”:“testkafka\u 5”,“partition”:5,“replicas”:[1,2,0]}
使用较新的分区和副本创建文件。最好将副本扩展到不同的代理,但它们应该存在于同一集群中。考虑到远程副本的延迟。将给定的文件传输到您的Kafka。
步骤3:重新分配分区并验证

./bin/kafka-reassign-partitions.sh --zookeeper localhost:9092 --reassignment-json-file bin/increase-replication-factor.json  --execute

./bin/kafka-reassign-partitions.sh --zookeeper localhost:9092 --reassignment-json-file bin/increase-replication-factor.json --verify

您可以使用--descripe命令检查更改的效果。

相关问题