ApacheFlink:由于类型擦除,无法自动确定函数的返回类型

nbysray5  于 2021-06-21  发布在  Flink
关注(0)|答案(2)|浏览(465)

我在java中使用flink编写了一个简单的程序,它将一个文件或文本作为输入,然后使用flatmap函数打印所有单词。
这是我的代码:

final ParameterTool params = ParameterTool.fromArgs(args);

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.getConfig().setGlobalJobParameters(params);
        // show user defined parameters in the apache flink dashboard

        DataStream<String> dataStream;

        if(params.has("input")) 
        {
            System.out.println("Executing Words example with file input");
            dataStream = env.readTextFile(params.get("input"));
        }else if (params.has("host") && params.has("port")) 
        {
            System.out.println("Executing Words example with socket stream");
            dataStream = env.socketTextStream(params.get("host"), Integer.parseInt(params.get("port")));
        }
        else {
            System.exit(1);
            return;
        }

        DataStream<String> wordDataStream = dataStream.flatMap(
                (String sentence, Collector<String> out) -> {
                    for(String word: sentence.split(" "))
                        out.collect(word);
        });

        wordDataStream.print();

        env.execute("Word Split");

但是当我用这个命令运行它时:

bin/flink run -c Words FlinkExample-0.0.1-SNAPSHOT.jar --host localhost --port 9999

我得到以下错误:
程序失败,出现以下异常:
函数main(words)的返回类型。java:32)'由于类型擦除,无法自动确定。通过对转换调用的结果使用returns(…)方法,或者让函数实现“resulttypequeryable”接口,可以给出类型信息提示。
(第32行引用第二个数据流的声明)

hjzp0vay

hjzp0vay1#

我认为对错误消息的简短描述是非常好的,但是让我扩展一下。
为了执行一个程序,flink需要知道被处理的值的类型,因为它需要序列化和反序列化它们。Flink的类型系统是基于 TypeInformation 它描述了一种数据类型。指定函数时,flink会尝试推断该函数的返回类型。对于示例中的flatmapfunction,传递给 Collector .
不幸的是,一些lambda函数由于类型擦除而丢失了这些信息,因此flink无法自动推断类型。因此,必须显式声明返回类型。
您可以提供以下类型信息:

DataStream<String> wordDataStream = dataStream.flatMap(
    (String sentence, Collector<String> out) -> {
        for(String word: sentence.split(" "))
        out.collect(word); // collect objects of type String
    }
).returns(Types.STRING); // declare return type of flatmap lambda function as String
xxls0lw8

xxls0lw82#

我也面临同样的问题。我尝试下面的链接,它为我工作。
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/java8.html#compiler-局限性
虽然它是旧版本,但它为我工作。我不能执行maven compile install,但我可以运行java主类。如果执行maven编译安装很重要,那么您应该在尝试之前考虑一下。

相关问题