尝试使用ide使用flink从s3读取文件未找到org.apache.hadoop.fs.s3a.s3afilesystem类

4ktjp1zp  于 2021-07-12  发布在  Spark
关注(0)|答案(1)|浏览(689)

我正在尝试使用intellij中的flink从s3读取文件,出现以下异常:
原因:java.lang.classnotfoundexception:找不到类org.apache.hadoop.fs.s3a.s3afilesystem
我的代码是这样的:

import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.parquet.column.page.PageReadStore
import org.apache.parquet.example.data.simple.convert.GroupRecordConverter
import org.apache.parquet.hadoop.ParquetFileReader
import org.apache.parquet.hadoop.util.HadoopInputFile
import org.apache.parquet.io.ColumnIOFactory

class ParquetSourceFunction extends SourceFunction[String]{
  override def run(ctx: SourceFunction.SourceContext[String]): Unit = {
    val inputPath = "s3a://path-to-bucket/"
    val outputPath = "s3a://path-to-output-bucket/"
    val conf = new Configuration()
    conf.set("fs.s3a.impl","org.apache.hadoop.fs.s3a.S3AFileSystem")
    val readFooter = ParquetFileReader.open(HadoopInputFile.fromPath(new Path(inputPath), conf))
    val metadata = readFooter.getFileMetaData
    val schema = metadata.getSchema
    val parquetFileReader = new ParquetFileReader(conf, metadata, new Path(inputPath), readFooter.getRowGroups, schema.getColumns)
    //    val parquetFileReader2 = new ParquetFileReader(new Path(inputPath), ParquetReadOptions)
    var pages: PageReadStore = null

try {
  while ({ pages = parquetFileReader.readNextRowGroup; pages != null }) {
    val rows = pages.getRowCount
    val columnIO = new ColumnIOFactory().getColumnIO(schema)
    val recordReader = columnIO.getRecordReader(pages, new GroupRecordConverter(schema))
    (0L until rows).foreach { _ =>
      val group = recordReader.read()
      val myString = group.getString("field_name", 0)
      ctx.collect(myString)
    }
  }
}
  }

  override def cancel(): Unit = ???
}

object Job {
  def main(args: Array[String]): Unit = {
    // set up the execution environment
    lazy val env = StreamExecutionEnvironment.getExecutionEnvironment

    lazy val stream = env.addSource(new ParquetSourceFunction)
    stream.print()
    env.execute()
  }
}

sbt依赖项:val flinkversion=“1.12.1”

val flinkDependencies = Seq(
  "org.apache.flink" %% "flink-clients" % flinkVersion,// % "provided",
  "org.apache.flink" %% "flink-scala" % flinkVersion,// % "provided",
  "org.apache.flink" %% "flink-streaming-scala" % flinkVersion, // % "provided")
  "org.apache.flink" %% "flink-parquet" % flinkVersion)

lazy val root = (project in file(".")).
  settings(
    libraryDependencies ++= flinkDependencies,
    libraryDependencies += "org.apache.hadoop" % "hadoop-common" % "3.3.0" ,
    libraryDependencies += "org.apache.parquet" % "parquet-hadoop" % "1.11.1",
    libraryDependencies += "org.apache.flink" %% "flink-table-planner-blink" % "1.12.1" //% "provided"
  )
but5z9lq

but5z9lq1#

s3仅通过添加相应的 flink-s3-fs-hadoop 你的插件文件夹,如插件文档所述。对于ide本地设置,默认情况下,应该包含plugins目录的根目录是工作目录。您可以使用env var覆盖它 FLINK_PLUGINS_DIR .
为了得到 flink-s3-fs-hadoop 对于插件,我猜一些sbt胶水是必要的(或者你手动做一次)。在gradle中,我会定义一个插件范围,并将自定义任务中的jar复制到plugin dir。

相关问题