触发时间,maxoffsetspertrigger-为spark结构化流媒体,同时读取来自kafka的消息?

dz6r00yl  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(792)

我有一个结构化的流媒体应用程序,读取Kafka的消息。每天的消息总数约为180亿条,每分钟的峰值消息数为12500000条。最大邮件大小为2 kb。
如何确保我的结构化流媒体应用程序能够处理如此大的数据量和速度?基本上,我只想知道如何设置最佳触发时间,maxoffsetspertrigger,或任何其他配置,使工作顺利进行,并能够处理故障和重新启动。

u2nhd7ah

u2nhd7ah1#

您可以在固定间隔的微批量或连续批量中运行spark结构化流应用程序。下面是一些可用于调整流应用程序的选项。
Kafka配置:
Kafka的分区数:
你可以增加Kafka的分区数。因此,更多的消费者可以同时读取数据。根据输入速率和引导服务器的数量将其设置为适当的数字。
spark流配置:
驱动程序和执行程序内存配置:
计算每批数据的大小(#记录*每条消息的大小),并相应地设置内存。
执行人人数:
在kafka主题中将executors的数量设置为number of partitions。这增加了并行性。同时读取数据的任务数。
限制偏移量:
每个触发间隔处理的最大偏移量的速率限制。指定的偏移总数将按比例分布在不同卷的主题分区中。

val df = spark
    .read
    .format("kafka")
    .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
    .option("subscribe", "topicName")
    .option("startingOffsets", "latest")
    .option("maxOffsetsPerTrigger", "1000000")
    .load()

使用检查点从故障中恢复:
如果出现故障或有意关闭,可以恢复上一个查询的上一个进度和状态,并在其停止的位置继续。这是通过使用检查点和预写日志来完成的。

finalDF
  .writeStream
  .outputMode("complete")
  .option("checkpointLocation", "path/to/HDFS/dir")
  .format("memory")
  .start()

触发:
流式查询的触发器设置定义了流式数据处理的时间,该查询是作为具有固定批处理间隔的微批处理查询执行,还是作为连续处理查询执行。

相关问题