如何使用sparkr从delta lib读取数据?

pftdvrlh  于 2021-05-16  发布在  Spark
关注(0)|答案(1)|浏览(442)

我找不到使用sparkr从delta访问数据的任何引用,所以我自己尝试了一下。首先,我用python创建了一个虚拟数据集:

from pyspark.sql.types import StructType,StructField, StringType, IntegerType
data2 = [("James","","Smith","36636","M",2000),
    ("Robert","","Williams","42114","M",5000),
    ("Maria","Anne","Jones","39192","F",5000),
    ("Jen","Mary","Brown","","F",-1)
  ]

schema = StructType([ \
    StructField("firstname",StringType(),True), \
    StructField("middlename",StringType(),True), \
    StructField("lastname",StringType(),True), \
    StructField("id", StringType(), True), \
    StructField("gender", StringType(), True), \
    StructField("salary", IntegerType(), True) \
  ])

df = spark.createDataFrame(data=data2,schema=schema)

df.write \
  .format("delta")\
  .mode("overwrite")\
  .option("userMetadata", "first-version") \
  .save("/temp/customers")

您可以修改此代码以更改数据,然后再次运行以模拟随时间的变化。我可以用python来验证:

df3 = spark.read \
  .format("delta") \
  .option("timestampAsOf", "2020-11-30 22:03:00") \
  .load("/temp/customers")
df3.show(truncate=False)

但我不知道怎样通过spark r的选项,你能帮我吗?

%r
library(SparkR)
teste_r <- read.df("/temp/customers", source="delta")
head(teste_r)

它可以工作,但只带来当前版本。

xfb7svmp

xfb7svmp1#

timestampAsOf 将用作中的参数 SparkR::read.df .

SparkR::read.df("/temp/customers", source = "delta", timestampAsOf = "2020-11-30 22:03:00")

这也可以通过 SparkR::sql .

SparkR::sql('
SELECT *
FROM delta.`/temp/customers`
TIMESTAMP AS OF "2020-11-30 22:03:00"
')

或者,在 sparklyr ,使用 timestamp 中的参数 spark_read_delta .

library(sparklyr)

sc <- spark_connect(method = "databricks")

spark_read_delta(sc, "/temp/customers", timestamp = "2020-11-30 22:03:00")

相关问题