删除spark structured streaming编写的损坏的Parquet文件时会丢失数据吗?

ntjbwcob  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(564)

我使用spark structured streaming作为消费者从kafka获取数据,遵循参考的指南https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html
然后将数据作为parquet文件保存到hdfs。
这里是我的问题:程序运行良好,但一些容器很少失败(但它确实发生)导致一些损坏的Parquet文件。它将导致错误,如[不是Parquet文件(长度太小:4)]或[.parquet不是Parquet文件。在尾部预期的幻数[80,65,82,49],但在阅读时发现[56,52,53,51]]。我必须将它们移动到其他目录,并确保来自hive的查询工作正常。但我不确定这是否会导致数据丢失。
我知道spark structured streaming使用checkpoint来重新测量,但是由于一些数据已写入parquet,我不确定偏移量是否标记为committed。

w3nuxt5m

w3nuxt5m1#

我做了一个非常基本的练习,将一个txt文件加载到spark structured streaming读取的文件目录中。结构化流的writestream正在写入Parquet文件。加载两个文件后,我看到spark生成的元数据提到了这两个文件。因此,如果删除其中一个(包括使用文件接收器创建的元数据文件),那么从hdfs读取parquet文件会失败,只有一个例外(找不到文件)。

scala> val ParquetDF1 = spark.read.parquet("/user/root/sink2")
19/05/29 09:57:27 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 13.0 (TID 19, quickstart.cloudera, executor 2): org.apache.spark.SparkException: Exception thrown in awaitResult:
        at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:226)
        at org.apache.spark.util.ThreadUtils$.parmap(ThreadUtils.scala:290)
        at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.readParquetFootersInParallel(ParquetFileFormat.scala:537)
        at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$9.apply(ParquetFileFormat.scala:610)
        at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$9.apply(ParquetFileFormat.scala:602)

Caused by: org.apache.hadoop.ipc.RemoteException(java.io.FileNotFoundException): File does not exist: /user/root/sink2/part-00000-454836ef-f7bc-444e-9a6b-e81e640a196d-c000.snappy.parquet
        at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:66)
        at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:56)
        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:2092)
        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:2062)
        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1975)

这里唯一的区别是-您使用的是hive,而我直接从hdfs构建ParquetDataframe。

相关问题