如何使用samza在kafka主题上创建分区?

txu3uszq  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(373)

我有一些samza的工作,负责阅读Kafka主题的所有信息,并为新主题编写新信息。为了发送新消息,我使用samza的内置outgoingmessageenvelope。还可以使用messagecollector发送新消息。它看起来像这样:

collector.send(new OutgoingMessageEnvelope(SystemStream, newMessage))

有没有一种方法可以用它来为Kafka主题添加分区?比如在用户id上分区之类的。
或者如果有更好的方式,我会很乐意听到它!

qgelzfjb

qgelzfjb1#

您应该能够使用分区键发送消息,

public OutgoingMessageEnvelope(SystemStream systemStream,
                               java.lang.Object partitionKey,
                               java.lang.Object key,
                               java.lang.Object message)
Constructs a new OutgoingMessageEnvelope from specified components.
Parameters:
systemStream - Object representing the appropriate stream of which this envelope will be sent on.
partitionKey - A key representing which partition of the systemStream to send this envelope on.
key - A deserialized key to be used for the message.
message - A deserialized message to be sent in this envelope.

使用此方法将对数据进行分区。但是,我认为如果您正在以编程方式控制分区的数量,那么应该使用kafkaapi来创建/更改这里提到的主题

相关问题