spark数据集:数据集的铸造列

b4lqfgs4  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(309)

这是我的数据集:

Dataset<Row> myResult = pot.select(col("number")
                    , col("document")
                    , explode(col("mask")).as("mask"));

我现在需要从现有的myresult创建一个新的数据集。如下所示:

Dataset<Row> myResultNew = myResult.select(col("number")
                , col("name")
                , col("age")
                , col("class")
                , col("mask");

名称、年龄和类别是从数据集myresult的列文档创建的。我想我可以调用列文档上的函数,然后对它执行任何操作。

myResult.select(extract(col("document")));

 private String extract(final Column document) {
        //TODO ADD A NEW COLUMN nam, age, class TO THE NEW DATASET.
        // PARSE DOCUMENT AND GET THEM.

     XMLParser doc= (XMLParser) document // this doesnt work???????

}

我的问题是:文档的类型是column,我需要将其转换为不同的对象类型,并对其进行解析以提取名称、年龄和类。我怎么能那样做。文档是一个xml,我需要进行解析以获取其他3列,因此无法避免将其转换为xml。

n53p2ov0

n53p2ov01#

转换 extract 方法将是一个尽可能接近您所要求的解决方案。自定义项可以获取一个或多个列的值,并使用此输入执行任何逻辑。

import org.apache.spark.sql.expressions.UserDefinedFunction;
import org.apache.spark.sql.types.DataTypes;

import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.functions.udf;

[...]

UserDefinedFunction extract = udf(
        (String document) -> {
            List<String> result = new ArrayList<>();
            XMLParser doc = XMLParser.parse(document);
            String name = ... //read name from xml document
            String age = ... //read age from xml document
            String clazz = ... //read class from xml document
            result.add(name);
            result.add(age);
            result.add(clazz);
            return result;
         }, DataTypes.createArrayType(DataTypes.StringType)
);

自定义项的一个限制是它们只能返回一列。因此,函数返回一个字符串数组,该数组必须在以后解包。

Dataset<Row> myResultNew = myResult
    .withColumn("extract", extract.apply(col("document"))) //1
    .withColumn("name", col("extract").getItem(0))         //2
    .withColumn("age", col("extract").getItem(1))          //2
    .withColumn("class", col("extract").getItem(2))        //2
    .drop("document", "extract");                          //3

调用udf并使用包含xml文档的列作为 apply 功能
从步骤1返回的数组中创建结果列
放下中间柱
注意:udf在数据集中每行执行一次。如果xml解析器的创建成本很高,这可能会减慢spark作业的执行,因为每行示例化一个解析器。由于spark的并行特性,不可能对下一行重用解析器。如果这是一个问题,另一个选择(至少在java世界中稍微复杂一点)是使用mappartitions。在这里,每个行不需要一个解析器,但每个数据集分区只需要一个解析器。
另一种完全不同的方法是使用sparkxml。

相关问题