amazon emr在提交apache flink作业时遇到hadoop错误

v440hwme  于 2021-06-21  发布在  Flink
关注(0)|答案(2)|浏览(312)
Added Depedency Pom Details :

<dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <version>1.7.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>1.7.1</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-core</artifactId>
            <version>1.7.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.7.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-runtime_2.11</artifactId>
            <version>1.7.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table_2.11</artifactId>
            <version>1.7.1</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka-0.10_2.11</artifactId>
            <version>1.7.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-filesystem_2.11</artifactId>
            <version>1.7.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-hadoop-compatibility_2.11</artifactId>
            <version>1.7.1</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-s3-fs-hadoop</artifactId>
            <version>1.7.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-shaded-hadoop</artifactId>
            <version>1.7.1</version>
            <type>pom</type>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-aws</artifactId>
            <version>2.8.5</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>2.8.5</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.8.5</version>
        </dependency>
        <dependency>
            <groupId>com.amazonaws</groupId>
            <artifactId>aws-java-sdk-s3</artifactId>
            <version>1.11.529</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-connectors</artifactId>
            <version>1.1.5</version>
            <type>pom</type>
        </dependency>
    </dependencies>

java.lang.unsupportedoperationexception:只有在org.apache.flink.runtime.fs.hdfs.hadooprecoverablewriter(hadooprecoverablewriter)上的hdfs和HadoopVersion2.7或更新版本才支持hadoop上的可恢复写入程序。java:57)在org.apache.flink.runtime.fs.hdfs.hadoopfilesystem.createrecoverablewriter(hadoopfilesystem)上。java:202)在org.apache.flink.core.fs.safetynetwrapperfilesystem.createrecoverablewriter(safetynetwrapperfilesystem)。java:69)在org.apache.flink.streaming.api.functions.sink.filesystem.bucket。java:112)位于org.apache.flink.streaming.api.functions.sink.filesystem.streamingfilesink$rowformatbuilder.createbuckets(streamingfilesink)。java:242)在org.apache.flink.streaming.api.functions.sink.filesystem.streamingfilesink.initializestate(streamingfilesink)。java:327)在org.apache.flink.streaming.util.functions.streamingfunctionutils.tryrestorefunction(streamingfunctionutils。java:178)在org.apache.flink.streaming.util.functions.streamingfunctionutils.restorefunctionstate(streamingfunctionutils。java:160)位于org.apache.flink.streaming.api.operators.abstractudfstreamoperator.initializestate(abstractudfstreamoperator)。java:96)在org.apache.flink.streaming.api.operators.abstractstreamoperator.initializestate(abstractstreamoperator。java:278)在org.apache.flink.streaming.runtime.tasks.streamtask.initializestate(streamtask。java:738)在org.apache.flink.streaming.runtime.tasks.streamtask.invoke(streamtask。java:289)在org.apache.flink.runtime.taskmanager.task.run(任务。java:704)在java.lang.thread.run(线程。java:748)

zbsbpyhn

zbsbpyhn1#

flink使用一种叫做serviceloader的东西来加载与可插入文件系统接口所需的组件。如果你想看看flink在代码里是怎么做的,那就去 org.apache.flink.core.fs.FileSystem . 注意这个问题 initialize 函数,它利用 RAW_FACTORIES 变量。 RAW_FACTORIES 是由函数创建的 loadFileSystems ,您可以看到它利用了java的 ServiceLoader .
在flink上启动应用程序之前,需要先设置文件系统组件。这意味着您的flink应用程序不需要捆绑这些组件,它们应该为您的应用程序提供。
emr没有提供flink需要的s3文件系统组件来将s3用作流文件接收器。抛出这个异常并不是因为版本不够高,而是因为flink加载hadoop文件系统时没有与该版本匹配的文件系统 s3 方案(见此处代码)。
您可以通过为my flink应用程序启用调试日志记录级别(emr允许您在配置中执行此操作)来查看文件系统是否正在加载:

{
    "Classification": "flink-log4j",
    "Properties": {
      "log4j.rootLogger": "DEBUG,file"
    }
  },{
    "Classification": "flink-log4j-yarn-session",
    "Properties": {
      "log4j.rootLogger": "DEBUG,stdout"
    }
  }

相关的日志在yarn资源管理器中可用,查看单个节点的日志。搜索字符串 "Added file system" 应该可以帮助您找到所有成功加载的文件系统。
在这个研究中,还可以使用ssh连接到主节点并使用flink scala repl,在这里我可以看到给定文件uri时flink决定加载什么文件系统。
解决方案是将s3文件系统实现的jar放到 /usr/lib/flink/lib/ 在开始flink应用程序之前。这可以通过获取 flink-s3-fs-hadoop 或者 flink-s3-fs-presto (取决于您使用的实现)。我的引导动作脚本如下所示:

sudo mkdir -p /usr/lib/flink/lib
cd /usr/lib/flink/lib

sudo curl -O https://search.maven.org/remotecontent?filepath=org/apache/flink/flink-s3-fs-hadoop/1.8.1/flink-s3-fs-hadoop-1.8.1.jar
pzfprimi

pzfprimi2#

为了使用Flink的 StreamingFileSink 只有一次保证,您需要使用hadoop>= 2.7 . 以下版本 2.7 不支持。因此,请确保您在emr上运行的是最新的hadoop版本。

相关问题