kafka流媒体重置问题

qaxu7uf2  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(293)

我一直在尝试建立一个Kafka流应用程序与Spark使用。我有一个用于测试的静态数据集。在运行一次代码之后,kafka设置当前偏移量,这样我就不能在第二次运行时重新处理数据。跑步 kafka-streams-application-reset 应该重置偏移量。但是,重新运行代码会导致 GlobalKTable . 我能够重新分析数据的唯一方法是更改Kafka连接中的id。这就是我要做的。
在kafka中设置示例数据:

kafka-console-producer --broker-list localhost:9092 \
    --topic testTopic \
    --property "parse.key=true" \
    --property "key.separator=:"

1:abcd
2:bcde
3:cdef
4:defg
5:efgh
6:fghi
7:ghij
8:hijk
9:ijkl
10:jklm

scala代码:

//Streams imports - need to update Kafka
import org.apache.kafka.common.serialization.Serdes
//import org.apache.kafka.common.utils.Bytes
import org.apache.kafka.streams._
import org.apache.kafka.streams.kstream.{GlobalKTable, KStream, KTable, Materialized, Produced, KStreamBuilder}
import org.apache.kafka.streams.StreamsConfig
import org.apache.kafka.streams.state.{KeyValueIterator, QueryableStoreTypes, ReadOnlyKeyValueStore, KeyValueStore}
import org.apache.kafka.streams.state.Stores
import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
import java.util.{Properties}

val kafkaServer = "127.0.0.1:9092"
val p = new Properties()
p.put(StreamsConfig.APPLICATION_ID_CONFIG, "testStream")
p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServer)
p.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass())
p.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass())
p.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
p.put(StreamsConfig.CLIENT_ID_CONFIG, "test-consumer-stream")
val config = new StreamsConfig(p)

val builder: StreamsBuilder = new StreamsBuilder()
val imkvs = Stores.inMemoryKeyValueStore("testLookup-stream")
val sBuilder = Stores.keyValueStoreBuilder(imkvs, Serdes.String, Serdes.String).withLoggingDisabled().withCachingEnabled()

val gTable: GlobalKTable[String, String] = builder.globalTable("testTopic", Materialized.as(imkvs).withKeySerde(Serdes.String()).withValueSerde(Serdes.String()).withCachingDisabled())
val streams: KafkaStreams = new KafkaStreams(builder.build(), config)
streams.start()

val read: ReadOnlyKeyValueStore[String, String] = streams.store(gTable.queryableStoreName(), QueryableStoreTypes.keyValueStore[String, String]())
val hexLookup = "2"
println(read.get(hexLookup))

val iter: KeyValueIterator[String, String] = read.all()
while(iter.hasNext) {
  val next = iter.next()
  println(next.key + ": " + next.value)
}

流复位命令:

kafka-streams-application-reset --application-id testStream \
    --bootstrap-servers localhost:9092 \
    --to-earliest

1) 我是不是编错代码了,还是 kafka-streams-application-reset 功能不正常?2) 我希望用 inMemoryKeyValueStore 会导致Kafka无法跟踪当前偏移量;有没有办法强迫 GlobalKTable 不保留当前偏移?我想一直搜索整个数据集。
软件版本:
Kafka1.1.1-1
汇合4.1.1-1
spark scala 2.3.1版
Kafka客户端1.1.0
Kafka流1.1.0

n3schb8v

n3schb8v1#

如果要从空的内部状态重新启动应用程序并重新处理偏移量0中的数据,则必须提供“--input topics”参数和逗号分隔的主题列表。

bin/kafka-streams-application-reset.sh --application-id testApplication1  --input-topics demoTopic1

您可以在此处找到更多详细信息:https://kafka.apache.org/10/documentation/streams/developer-guide/app-reset-tool
关于globalktable,理想情况下,它是流/主题顶部的物化视图,就像任何其他可查询存储一样。
另外,globalktable始终应用“auto.offset.reset”策略“earliest”,而不考虑streamsconfig中指定的值。
因此,它应该允许您随时查询整个表。

相关问题