从kafka读取并在parquet中写入hdfs

t1rydlwq  于 2021-05-29  发布在  Hadoop
关注(0)|答案(3)|浏览(312)

我对bigdata生态系统还很陌生,有点开始了。
我读过几篇关于使用spark流媒体阅读Kafka主题的文章,但我想知道是否可以使用spark作业而不是流媒体来阅读Kafka的文章?如果是,你们能帮我指出一些文章或代码片段,可以让我开始。
我的问题的第二部分是以Parquet格式向hdfs写信。一旦我读了Kafka的书,我想我会有rdd。将此rdd转换为dataframe,然后将dataframe作为Parquet文件写入。这是正确的方法吗。
谢谢你的帮助。
谢谢

zbdgwd5y

zbdgwd5y1#

使用Kafka流。sparkstreaming是一个误称(它的小批量引擎盖下,至少高达2.2)。
https://eng.verizondigitalmedia.com/2017/04/28/kafka-to-hdfs-parquetserializer/

p4rjhz4m

p4rjhz4m2#

对于从kafka读取数据并以parquet格式将其写入hdfs,可以使用spark batch作业而不是流式处理,您可以使用spark结构化流式处理。
结构化流是构建在sparksql引擎上的可伸缩且容错的流处理引擎。您可以用在静态数据上表示批处理计算的方式来表示流计算。sparksql引擎将负责以增量和连续的方式运行它,并在流数据不断到达时更新最终结果。您可以使用scala、java、python或r中的dataset/dataframe api来表示流聚合、事件时间窗口、流到批连接等。计算是在同一个优化的spark sql引擎上执行的。最后,系统通过检查点和预写日志来保证端到端的精确一次容错。简言之,结构化流提供了快速、可扩展、容错、端到端的精确一次流处理,而用户无需对流进行推理。
它与Kafka一起作为一个内置的源,也就是说,我们可以从Kafka投票数据。它与kafka broker版本0.10.0或更高版本兼容。
为了以批处理模式从kafka中提取数据,可以为定义的偏移范围创建数据集/Dataframe。

// Subscribe to 1 topic defaults to the earliest and latest offsets
val df = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

// Subscribe to multiple topics, specifying explicit Kafka offsets
val df = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1,topic2")
  .option("startingOffsets", """{"topic1":{"0":23,"1":-2},"topic2":{"0":-2}}""")
  .option("endingOffsets", """{"topic1":{"0":50,"1":-1},"topic2":{"0":-1}}""")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

// Subscribe to a pattern, at the earliest and latest offsets
val df = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribePattern", "topic.*")
  .option("startingOffsets", "earliest")
  .option("endingOffsets", "latest")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

源中的每一行都有以下架构:

| Column           | Type          |
|:-----------------|--------------:|
| key              |        binary |
| value            |        binary |
| topic            |        string |
| partition        |           int |
| offset           |          long |
| timestamp        |          long |
| timestampType    |           int |

现在,要以Parquet格式将数据写入hdfs,可以编写以下代码:

df.write.parquet("hdfs://data.parquet")

有关spark structured streaming+kafka的更多信息,请参阅以下指南-kafka集成指南
希望有帮助!

y3bcpkx1

y3bcpkx13#

关于这个主题,你已经有好几个答案了。
只是想强调一下-小心直接流到Parquet地板桌上。当Parquet地板的行组大小足够大(为简单起见,您可以说文件大小应该在64-256mb左右)以利用字典压缩、bloom过滤器等(一个Parquet地板文件中可以有多个行块,通常每个文件中都有多个行块;尽管行块不能跨越多个Parquet文件)
如果你直接流到一个Parquet表,那么你很可能会得到一堆很小的Parquet文件(取决于spark流的小批量大小和数据量)。查询这样的文件可能非常慢。例如,parquet可能需要读取所有文件的头来协调模式,这是一个很大的开销。如果是这种情况,您将需要有一个单独的进程,例如,作为一个解决方案,它将读取旧文件,并将它们写入“合并”(这不是一个简单的文件级合并,一个进程实际上需要读入所有Parquet数据并溢出较大的Parquet文件)。
这种解决方法可能会扼杀数据“流”的最初目的。你也可以看看其他技术,比如apache kudu、apache kafka、apache druid、kinesis等等。
更新:自从我发布这个答案,现在这里有一个新的强大的球员-三角洲湖。https://delta.io/ 如果你习惯了Parquet,你会发现delta非常有吸引力(实际上,delta是建立在Parquet层+元数据之上的)。三角洲湖提供:
Spark酸处理:
可序列化的隔离级别确保读者永远不会看到不一致的数据。
可扩展的元数据处理:利用spark的分布式处理能力,轻松处理PB级表和数十亿个文件的所有元数据。
流与批统一:三角洲湖中的一个表既是一个批表,也是一个流源和流汇。流式数据接收、批历史回填、交互式查询都是现成的。
模式强制:自动处理模式变化,以防止在接收过程中插入错误记录。
时间旅行:数据版本控制支持回滚、完整的历史审计跟踪和可复制的机器学习实验。
upserts和deletes:支持合并、更新和删除操作,以支持复杂的用例,如更改数据捕获、缓慢更改维度(scd)操作、流式upserts等等。

相关问题