spark.read.does加载缓存数据和元数据?

sczxawaw  于 2021-07-13  发布在  Spark
关注(0)|答案(0)|浏览(217)

context我在customer文件夹中有两个json文件。customer0.json和customer1.json customer0.json的内容是

[
  {
    "Customer_id": "103",
    "Customer_name": "abc",
    "email_address": "abc@gmail.com"
  },
  {
    "Customer_id": "104",
    "Customer_name": "xyz",
    "email_address": "xyz@gmail.com"
  }
]

customer1.json的内容是

[
  {
    "Customer_id": "105",
    "Customer_name": "efg",
    "email_address": "efg@gmail.com"
  },
  {
    "Customer_id": "106",
    "Customer_name": "uvw",
    "email_address": "uvw@gmail.com"
  }
]

当我执行下面的代码时,缓存状态的结果为false,结果df显示在show上(即action call)

val customerSchema =
      new StructType()
        .add("Customer_id", "string")
        .add("Customer_name", "string")
        .add("email_address", "string")

    val customerStreamingDf = spark
      .read
      .format("json")
      .option("multiLine", value = true)
      .schema(customerSchema)
      .load("src/test/resources/input/data/customer")

    println (customerStreamingDf.storageLevel.useMemory)
    customerStreamingDf.show()

印象:spark在调用spark.read.load时不缓存任何内容,并且所有的内容都在操作调用(show)上延迟执行。
案例1我想检查在加载之后和显示之前(操作调用)删除一个源文件(比如customer0.json)会发生什么。

val customerSchema =
      new StructType()
        .add("Customer_id", "string")
        .add("Customer_name", "string")
        .add("email_address", "string")

    val customerStreamingDf = spark
      .read
      .format("json")
      .option("multiLine", value = true)
      .schema(customerSchema)
      .load("src/test/resources/input/data/customer")

    println (customerStreamingDf.storageLevel.useMemory)

    println("sleeping")
    Thread.sleep(20000)
    println("awake")
    customerStreamingDf.show()

如果我在这个线程休眠时删除这个文件,我会在show action调用中得到一个filenotfound错误。

java.io.FileNotFoundException: File file:.....customer1.json does not exist
It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.

当我在show调用之前添加下面的行时,它只从customer1.json数据生成dataframe,没有任何错误。

customerStreamingDf.createTempView("cust")
  spark.catalog.refreshTable("cust")

印象:spark在调用spark.read.load时缓存文件元数据,如果源文件被删除,则需要刷新该元数据。
案例2我想检查在加载之后和显示之前(操作调用)修改一个源文件(比如customer0.json中的id 103到130)会发生什么。
如果在线程休眠时将customer0.json文件中的id 103修改为130,则生成的dataframe不会选择此修改,并且仍然在输出dataframe中显示103 id。
无论是否使用spark目录刷新,此行为都保持不变(如案例1所述)。
印象:spark在调用spark.read.load时缓存文件数据,修改被忽略。
问spark.read.load上到底发生了什么?
是缓存数据,为什么df.storagelevel.usemory为false?为什么源文件中的修改没有得到反映?
它是否缓存元数据,是否有任何选项也可以懒洋洋地这样做?
请帮帮我!

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题