apache flink:带有tablefunction的左连接不返回预期结果

i7uaboj4  于 2021-06-25  发布在  Flink
关注(0)|答案(1)|浏览(277)

flink版本:1.3.1
我创建了两个表,一个来自内存,另一个来自udtf。当我测试join和left join时,它们返回相同的结果。我所期望的是left join比join有更多的行。
我的测试代码是:

public class ExerciseUDF {
        public static void main(String[] args) throws Exception {
            test_3();
        }
        public static void test_3() throws Exception {
                // 1. set up execution environment
                ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
                BatchTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);

                DataSet<WC> input = env.fromElements(
                        new WC("Hello", 1),
                        new WC("Ciao", 1),
                        new WC("Hello", 1));

                // 2. register the DataSet as table "WordCount"
                tEnv.registerDataSet("WordCount", input, "word, frequency");

                Table table;
                DataSet<WC> result;
                        DataSet<WCUpper> resultUpper;
                table = tEnv.scan("WordCount");
                // 3. table left join user defined table
                System.out.println("table left join user defined table");
                tEnv.registerFunction("myTableUpperFunc",new MyTableFunc_2());
                table = tEnv.sql("SELECT S.word as word, S.frequency as frequency, S.word as myupper FROM WordCount as S left join LATERAL TABLE(myTableUpperFunc(S.word)) as T(word,myupper) on S.word = T.word");
                resultUpper = tEnv.toDataSet(table, WCUpper.class);
                resultUpper.print(); // out put —— WCUpper Ciao 1 CIAO, however, without the row having Hello

                // 4. table join user defined table
                System.out.println("table join user defined table");
                tEnv.registerFunction("myTableUpperFunc",new MyTableFunc_2());
                table = tEnv.scan("WordCount");
                table = tEnv.sql("SELECT S.word as word, S.frequency as frequency, T.myupper as myupper FROM WordCount as S join LATERAL TABLE(myTableUpperFunc(S.word)) as T(word,myupper) on S.word = T.word"
                );
                resultUpper = tEnv.toDataSet(table, WCUpper.class);
                resultUpper.print();
            }

            public static class WC {
                public String word;
                public long frequency;

                // public constructor to make it a Flink POJO
                public WC() {
                }

                public WC(String word, long frequency) {
                    this.word = word;
                    this.frequency = frequency;
                }

                @Override
                public String toString() {
                    return "WC " + word + " " + frequency;
                }
            }

            // user defined table function
            public static class MyTableFunc_2 extends TableFunction<Tuple2<String,String>>{
                public void eval(String str){ // hello --> hello HELLO
                    System.out.println("upper func executed for "+str);
                    if(str.equals("Hello")){
                        return;
                    }
                    collect(new Tuple2<String,String>(str,str.toUpperCase()));
                    // collect(new Tuple2<String,String>(str,str.toUpperCase()));
                }
            }
    }

left join和join查询的输出是相同的。在这两种情况下,只返回一行。
WC上一个ciao 1个ciao
但是,我认为left join查询应该保留“hello”行。

soat7uwm

soat7uwm1#

是的,你说得对。
这是tablefunction外部连接与 predicate 的转换中的一个错误,需要修复。
谢谢,费边

相关问题