管道简单代码如下:
source = env.addSource(kafkaConsumer) .map(func).setParallelism(2).sink()
如何确保出站顺序?
fnatzsnv1#
首先,假设您的示例中的其他所有对象都具有1的并行性,并且只有map函数将并行运行(尽管要真正做到这一点,它必须在某个地方进行配置;默认并行度高于1。)我们还假设您的kafka使用者使用一个分区读取单个主题,并且您正在询问如何实现一个并行转换,以保持输入中存在的顺序。有了这些假设,答案是你能做的不多。map操作符的两个示例之间存在竞争,非并行接收器将以任意方式交错这两个传入流。如果流记录是以某种方式标记的,比如使用递增的时间戳或id,那么您可以假设引入一些缓冲并重新建立原始顺序,可以在自定义接收器中,也可以在map和sink操作符之间的非并行richcomap函数中。另一方面,如果您的源代码以某种方式被分区或设置了密钥,并且您只需要在每个密钥的基础上维护或建立一个顺序,那么就有更好的答案了。
1条答案
按热度按时间fnatzsnv1#
首先,假设您的示例中的其他所有对象都具有1的并行性,并且只有map函数将并行运行(尽管要真正做到这一点,它必须在某个地方进行配置;默认并行度高于1。)
我们还假设您的kafka使用者使用一个分区读取单个主题,并且您正在询问如何实现一个并行转换,以保持输入中存在的顺序。
有了这些假设,答案是你能做的不多。map操作符的两个示例之间存在竞争,非并行接收器将以任意方式交错这两个传入流。
如果流记录是以某种方式标记的,比如使用递增的时间戳或id,那么您可以假设引入一些缓冲并重新建立原始顺序,可以在自定义接收器中,也可以在map和sink操作符之间的非并行richcomap函数中。
另一方面,如果您的源代码以某种方式被分区或设置了密钥,并且您只需要在每个密钥的基础上维护或建立一个顺序,那么就有更好的答案了。