编写单元测试以Assertflink函数示例是可序列化的

lf3rwulv  于 2021-06-21  发布在  Flink
关注(0)|答案(1)|浏览(416)

我在运行时遇到了以下异常:

org.apache.flink.api.common.InvalidProgramException: The implementation of the RichFlatMapFunction is not serializable. The object probably contains or references non serializable fields.

虽然我知道发生了什么,知道如何修复它,但我想确保它不会再次发生。当有人向这个richflatmapfunction类添加不可序列化的字段时,我希望单元测试失败,而不是在运行时失败。
有没有一种方法可以编写一个单元测试,使用与flink相同的函数序列化代码来Assert函数是可序列化的?

falq053o

falq053o1#

对于此场景,请改用集成测试。
在下面的代码中,行 env.execute(); 将运行管道并序列化运算符 MultiplyByTwo 以及 CollectSink .
你可以用同样的方法来测试 RichFlatMapFunction 是可序列化的。

public class ExampleIntegrationTest extends AbstractTestBase {

    @Test
    public void testMultiply() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // configure your test environment
        env.setParallelism(1);

        // values are collected in a static variable
        CollectSink.values.clear();

        // create a stream of custom elements and apply transformations
        env.fromElements(1L, 21L, 22L)
                .map(new MultiplyByTwo())
                .addSink(new CollectSink());

        // execute
        env.execute();

        // verify your results
        assertEquals(Lists.newArrayList(2L, 42L, 44L), CollectSink.values);
    }

    // create a testing sink
    private static class CollectSink implements SinkFunction<Long> {

        // must be static
        public static final List<Long> values = new ArrayList<>();

        @Override
        public synchronized void invoke(Long value) throws Exception {
            values.add(value);
        }
    }
}

参考文献:https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html#integration-测试

相关问题