如何从udf访问广播变量和广播变量是在调用这个udf的另一个类中定义的

uurv41yg  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(664)

如何从udf和广播变量访问广播变量是在调用此udf的另一个类中定义的。

/* this udf is in different file */
    package com.abc
     public class JavaClass{
        public static UserDefinedFunction getvalue = udf((String param) -> {

        return "String value";
        }, DataTypes.StringType);

    }    

**/* below code is in different file */**

    package com.xyz;
    import com.abc.JavaClass;
     public class AnotherClassToCallUDF{
        pubic static void main(String args[]) {
    Dataset<Row> abc = .......;

    abc.withColoumn("new-col",JavaClass.getvalue.apply("passing some value"));
    }

    }       

  **/* in the above code , how to pass broadcast variable while calling udf...since udf accepts only col typ`enter code here`e and lit type ..it does not accept anything else..then how to access broadcast variable which is defined in main class and accessing in another class..       

* /**
2eafrhcq

2eafrhcq1#

试试这个-

1. 创建自定义项

class MyUDF implements UDF1<Long, String> {
        private Map<Long, String> broadCastMap;
        public MyUDF(Broadcast<Map<Long, String>> broadCastMap) {
           this.broadCastMap = broadCastMap.value();
        }
        public String call(Long id) {
            return id +" -> " + broadCastMap.getOrDefault(id, "No mapping");
        }
    }

2. 使用自定义项传递广播并使用它

Dataset<Row> inputDf = spark.range(1, 5).withColumn("col1", lit("a"));
        inputDf.show(false);
        inputDf.printSchema();
        /**
         * +---+----+
         * |id |col1|
         * +---+----+
         * |1  |a   |
         * |2  |a   |
         * |3  |a   |
         * |4  |a   |
         * +---+----+
         *
         * root
         *  |-- id: long (nullable = false)
         *  |-- col1: string (nullable = false)
         */

        // Create broadcast
        Map<Long, String> map = new HashMap<>();
        map.put(1L, "b");
        map.put(2L, "c");
        Broadcast<Map<Long, String>> broadCastMap = new JavaSparkContext(spark.sparkContext()).broadcast(map);

        UserDefinedFunction myUdf = udf(new MyUDF(broadCastMap), DataTypes.StringType);

        spark.sqlContext().udf().register("myUdf", myUdf);

        inputDf.withColumn("new_col", callUDF("myUdf",
                JavaConverters.asScalaBufferConverter(Collections.singletonList(col("id"))).asScala()))
                .show();
        /**
         * +---+----+---------------+
         * | id|col1|        new_col|
         * +---+----+---------------+
         * |  1|   a|         1 -> b|
         * |  2|   a|         2 -> c|
         * |  3|   a|3 -> No mapping|
         * |  4|   a|4 -> No mapping|
         * +---+----+---------------+
         */

3. 不向functionregistry注册udf

inputDf.withColumn("new_col", myUdf.apply(col("id")))
                .show();
        /**
         * +---+----+---------------+
         * | id|col1|        new_col|
         * +---+----+---------------+
         * |  1|   a|         1 -> b|
         * |  2|   a|         2 -> c|
         * |  3|   a|3 -> No mapping|
         * |  4|   a|4 -> No mapping|
         * +---+----+---------------+
         */

相关问题