kafka流中的自定义时间戳提取器

cs7cruho  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(323)

我正在尝试使用Kafka流来处理Kafka主题中的一些数据。数据来自kafka 0.11.0编写的kafka主题—没有嵌入时间戳的内容。在网上读了一些书之后,我明白了我可以通过扩展来解决这个问题 TimestampExtractor 类并将其传递给 StreamsConfig .
我就是这样做的--

class MyEventTimestampExtractor extends TimestampExtractor {
  override def extract(record: ConsumerRecord[AnyRef, AnyRef], prev: Long) = {
    record.value() match {
        case w: String => 1000L
        case _ => throw new RuntimeException(s"Called for $record")
    }
  }
}

我在github上基于这个代码
但是,当我做一个 sbt run ```
[error] /home/someuser/app/blahblah/src/main/scala/main.scala:34: class MyEventTimestampExtractor needs to be abstract, since method extract in trait TimestampExtractor of type (x$1: org.apache.kafka.clients.consumer.ConsumerRecord[Object,Object], x$2: Long)Long is not defined
[error] (Note that Long does not match Long)
[error] class MyEventTimestampExtractor extends TimestampExtractor {
[error] ^
[error] /home/someuser/app/blahblah/src/main/scala/main.scala:35: method extract overrides nothing.
[error] Note: the super classes of class MyEventTimestampExtractor contain the following, non final members named extract:
[error] def extract(x$1: org.apache.kafka.clients.consumer.ConsumerRecord[Object,Object],x$2: Long): Long
[error] override def extract(record: ConsumerRecord[AnyRef, AnyRef], prev: Long): Long = {
[error] ^
[error] two errors found
[error] (compile:compileIncremental) Compilation failed

我的build.sbt文件如下--

name := "kafka streams experiment"
version := "1.0"
scalaVersion := "2.12.4"

libraryDependencies ++= Seq(
"org.apache.kafka" % "kafka-streams" % "1.0.0"
)

我不太明白这个错误。特别是周围的部分 `Note that Long does not match Long` . 我会做错什么?谢谢!
xqnpmsa8

xqnpmsa81#

试着用(看一下 prev 函数参数:

override def extract(record: ConsumerRecord[AnyRef, AnyRef], prev: java.lang.Long) = {
x3naxklr

x3naxklr2#

您需要java.long,因为这个api是用java定义的,而您使用的是scala long

相关问题