如何使用sparkr获取最新数据?

swvgeqrz  于 2021-06-28  发布在  Hive
关注(0)|答案(2)|浏览(266)

使用 SparkR 嵌套数组如何“分解”?我试过用 explode 像这样:

dat <- nested_spark_df %>% 
     mutate(a=explode(metadata)) %>%
     head()

但是,尽管上述操作不会导致抛出异常,但不会提升中的嵌套字段 metadata 到最高层。基本上我是在寻找类似Hive的行为 LATERAL VIEW explode() 无需依赖 HiveContext .
注意,在代码片段中,我使用的是nse enabled via SparkRext . 我认为同等的直率- SparkR 大概是 ... %>% mutate(a=explode(nested_spark_df$metadata)) ... 或者类似的东西。

编辑

我试过用 LATERAL VIEW explode(...)SparkR::sql 功能。它似乎与Parquet和兽人的数据伟大的工作。但是,在使用嵌套的avro数据时,我尝试了:

dat <- collect(sql(HiveContext,
                   paste0("SELECT a.id, ax.arrival_airport, x.arrival_runway ",
                          "FROM avrodb.flight a ",  
                             "LATERAL VIEW explode(a.metadata) a AS ax ",
                          "WHERE ax.arrival_airport='ATL'")))

但是当一个交换掉的时候 avrodbparquetdb 包含等价的数据,它实现了我所期望的。

Error in invokeJava(isStatic = TRUE, className, methodName, ...) :
  org.apache.spark.SparkException: Job aborted due to stage failure: Task 4 in stage 5.0 failed 4 times, most recent failure: Lost task 4.3 in stage 5.0 (TID 1345, dev-dn04.myorg.org): org.apache.avro.AvroTypeException: Found metadata, expecting union
    at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:292)
    at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
    at org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:267)
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:155)
    at org.apache.avro.generic.GenericDatumReader.readArray(GenericDatumReader.java:219)
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:155)
    at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:193)
    at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:183)
    at org.apache.avr
Calls: <Anonymous> ... collect -> collect -> .local -> callJStatic -> invokeJava

尽管我在启动spark时包含了databricks avro包,但这一点仍然存在。用spark读取相同的数据 SQLContext (而不是 HiveContext )效果很好,只是我还没有弄清楚如何有效地使用 explode() 功能。我还确认了这不是数据本身的问题,通过hive使用我尝试运行的相同hql语句成功地查询了相同的文件 SparkR::sql(HiveContext, hql)

holgip5t

holgip5t1#

此时,在dplyr中处理数组列是很棘手的,例如,请参阅本期。可能是最好的选择 explode() 通过Spark。还要注意,使用dsl版本的 explode (请参阅此答案)因此您可能希望通过 sql() .

6qftjkof

6qftjkof2#

感谢@sim。不过,我终于想出了一个明智的办法。关键是 explode 当所有分解值仍嵌套在一层深时的操作 select 必须执行。例如:

dat <- nested_spark_df %>% 
 mutate(a=explode(nested_spark_df$metadata)) %>%
 select("id", "a.fld1", "a.fld2")

这将导致 SparkR DataFrame 具有3列的对象: id , fld1 ,和 fld2 (否) a. 预先准备)。
我的心理障碍是我试图让爆炸像Pig一样 flatten 它将在模式的顶层创建一组新的字段名。

相关问题