scala—flink中处理的数据不保存在cassandra数据库中

xmd2e60i  于 2021-07-15  发布在  Flink
关注(0)|答案(0)|浏览(218)

我正在实现通过apachekafka从外部环境获取数据的应用程序。这些数据首先Map到对象,然后传递到进程(时间窗口)(请看下面的代码)

val busDataStream = env.addSource(kafkaConsumer)
  .filter { _.nonEmpty}
  .flatMap(line => JsonMethods.parse(line).toOption)
  .map(_.extract[BusModel])

class CustomProcess() extends ProcessWindowFunction[BusModel, BusModel, String, TimeWindow] {
  lazy val busState: ValueState[BusModel] = getRuntimeContext.getState(
    new ValueStateDescriptor[BusModel]("BusModel state", classOf[BusModel])
  )

  override def process(key: String, context: Context, elements: Iterable[BusModel], out: Collector[BusModel]): Unit = {
    for (e <- elements) {
      if (busState.value() != null) {
        out.collect(busState.value())
        val result: Double = calculateSomething(e, busState.value())
      }
      busState.update(e)
      println(s"BusState: ${busState.value()}")
    }
  }
}

val dataStream: DataStream[BusModel] = busDataStream
  .keyBy(_.VehicleNumber)
  .timeWindow(Time.seconds(10))
  .process(new CustomCountProc)

在准备好新的信息后,我想把这些数据输入Cassandra数据库。我尝试使用连接器实现这个值,但不幸的是,新记录没有显示在数据库中。。。
我还添加了一个createtypeinformation方法,该方法应该将所选对象的数据Map到数据库中的列类型,但不幸的是,这没有帮助。

createTypeInformation[(String, Double, Double, Double)]
val sinkStream = dataStream
  .map(busRide => (
    java.util.UUID.randomUUID.toString,
    busRide.valueA,
    busRide.valueB,
    busRide.valueC,
  ))

CassandraSink.addSink(sinkStream)
  .setQuery("INSERT INTO transport.bus_flink_speed(" +
    "\"FirstColumn\", " +
    "\"SecondColumn " +
    "\"ThirdColumn\", " +
    "\"ForthColumn\")" +
    " values (?, ?, ?, ?);")
  .setHost("localhost")
  .build()

env.execute("Flink Kafka Example")

有人知道为什么这样不行吗?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题