如何捕获从flink的readfile(path)抛出的异常?

m4pnthwp  于 2021-07-15  发布在  Flink
关注(0)|答案(1)|浏览(466)

我使用flink监视hdfs中的新文件(文件是gzip格式),并处理它们。

env.readFile(filePath)

当文件有效时它可以工作,
但如果gzip文件无效,flink作业将被终止。
有异常日志:

java.io.IOException: Error opening the Input Split hdfs://mdw:8020/user/data/15_077_4.gz [0,-1]: Not in GZIP format
    at org.apache.flink.api.common.io.FileInputFormat.open(FileInputFormat.java:824) ~[k.jar:?]
    at org.apache.flink.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:472) ~[k.jar:?]
    at org.apache.flink.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:49) ~[k.jar:?]
    at org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.loadSplit(ContinuousFileReaderOperator.java:381) ~[k.jar:?]
    at org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.access$300(ContinuousFileReaderOperator.java:88) ~[k.jar:?]
    at org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$ReaderState$2.prepareToProcessRecord(ContinuousFileReaderOperator.java:112) ~[k.jar:?]
    at org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.processRecord(ContinuousFileReaderOperator.java:322) ~[k.jar:?]
    at org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.lambda$new$0(ContinuousFileReaderOperator.java:225) ~[k.jar:?]
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47) ~[k.jar:?]
    at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78) ~[k.jar:?]
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:301) ~[k.jar:?]
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:183) ~[k.jar:?]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:569) ~[k.jar:?]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:534) ~[k.jar:?]
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) ~[k.jar:?]
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) ~[k.jar:?]
    at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_181]
Caused by: java.util.zip.ZipException: Not in GZIP format
    at java.util.zip.GZIPInputStream.readHeader(GZIPInputStream.java:165) ~[?:1.8.0_181]
    at java.util.zip.GZIPInputStream.<init>(GZIPInputStream.java:79) ~[?:1.8.0_181]
    at java.util.zip.GZIPInputStream.<init>(GZIPInputStream.java:91) ~[?:1.8.0_181]
    at org.apache.flink.api.common.io.compression.GzipInflaterInputStreamFactory.create(GzipInflaterInputStreamFactory.java:43) ~[k.jar:?]
    at org.apache.flink.api.common.io.compression.GzipInflaterInputStreamFactory.create(GzipInflaterInputStreamFactory.java:32) ~[k.jar:?]
    at org.apache.flink.api.common.io.FileInputFormat.decorateInputStream(FileInputFormat.java:848) ~[k.jar:?]
    at org.apache.flink.api.common.io.FileInputFormat.open(FileInputFormat.java:820) ~[k.jar:?]
    ... 16 more

我想跳过无效文件,而不是杀死Flink。
但我不知道如何捕捉异常,因为异常是由flink的内部代码引发的。
我该怎么办?

xyhw6mcr

xyhw6mcr1#

我们有更具体的需要,所以我们写了一个习惯 FlatMapFunction 它是用一个要检查的目录列表构造的,并将从自定义源接收一个常规的“tickler”事件。当它得到这个事件(在它的 flatMap() 方法)它将检查是否有新文件(匹配某些条件),如果有,它将打开文件,读取条目,并通过 Collector . 所以在这种情况下,我们可以完全控制错误处理。

相关问题