read csv在apache flink中有超过22个列

cfh9epnr  于 2021-06-21  发布在  Flink
关注(0)|答案(1)|浏览(243)

我一直在做的事情如下:

val data = env.readCsvFile[ElecNormNew](getClass.getResource("/elecNormNew.arff").getPath)

val dataSet = data map { tuple =>
      val list = tuple.productIterator.toList
      val numList = list map (_.asInstanceOf[Double])
      LabeledVector(numList(8), DenseVector(numList.take(8).toArray))
    }

哪里 ElecNorNew 是一个 case class :

case class ElecNormNew(
  var date: Double,
  var day: Double,
  var period: Double,
  var nswprice: Double,
  var nswdemand: Double,
  var vicprice: Double,
  var vicdemand: Double,
  var transfer: Double,
  var label: Double) extends Serializable {
}

如Flink的文件所述。但是现在我正在尝试读取一个有53列的csv。有没有办法使这个过程自动化?我需要创建一个包含53个字段的pojo吗?

更新

在费边的回答之后,我在尝试:

val fieldTypes: Array[TypeInformation[_]] = Array(Types.STRING, Types.INT)
  val rowIF = new RowCsvInputFormat(new Path(getClass.getResource("/lungcancer.csv").getPath), fieldTypes)
  val csvData: DataSet[Row] = env.createInput[Row](rowIF)
  val dataSet2 = csvData.map { tuple =>
      ???
  }

但不知道该怎么继续,我该怎么用 RowTypeInfo ?

hfsqlsce

hfsqlsce1#

你可以使用 RowCsvInputFormat 具体如下:

val fieldTypes: Array[TypeInformation[_]] = Array(Types.STRING, Types.INT)

val rowIF = new RowCsvInputFormat(new Path("file:///myCsv"), fieldTypes)
val csvData: DataSet[Row] = env.createInput[Row](rowIF)
``` `Row` 将数据存储在 `Array[Any]` . 因此,flink不能自动推断 `Row` . 这使得它比类型化元组或case类更难使用。您需要明确地提供 `RowTypeInfo` 使用正确的类型。这可以作为隐式值或通过扩展 `ResultTypeQueryable` 接口。

相关问题