我一直在尝试建立一个Kafka流应用程序与Spark使用。我有一个用于测试的静态数据集。在运行一次代码之后,kafka设置当前偏移量,这样我就不能在第二次运行时重新处理数据。跑步 kafka-streams-application-reset
应该重置偏移量。但是,重新运行代码会导致 GlobalKTable
. 我能够重新分析数据的唯一方法是更改Kafka连接中的id。这就是我要做的。
在kafka中设置示例数据:
kafka-console-producer --broker-list localhost:9092 \
--topic testTopic \
--property "parse.key=true" \
--property "key.separator=:"
1:abcd
2:bcde
3:cdef
4:defg
5:efgh
6:fghi
7:ghij
8:hijk
9:ijkl
10:jklm
scala代码:
//Streams imports - need to update Kafka
import org.apache.kafka.common.serialization.Serdes
//import org.apache.kafka.common.utils.Bytes
import org.apache.kafka.streams._
import org.apache.kafka.streams.kstream.{GlobalKTable, KStream, KTable, Materialized, Produced, KStreamBuilder}
import org.apache.kafka.streams.StreamsConfig
import org.apache.kafka.streams.state.{KeyValueIterator, QueryableStoreTypes, ReadOnlyKeyValueStore, KeyValueStore}
import org.apache.kafka.streams.state.Stores
import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
import java.util.{Properties}
val kafkaServer = "127.0.0.1:9092"
val p = new Properties()
p.put(StreamsConfig.APPLICATION_ID_CONFIG, "testStream")
p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServer)
p.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass())
p.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass())
p.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
p.put(StreamsConfig.CLIENT_ID_CONFIG, "test-consumer-stream")
val config = new StreamsConfig(p)
val builder: StreamsBuilder = new StreamsBuilder()
val imkvs = Stores.inMemoryKeyValueStore("testLookup-stream")
val sBuilder = Stores.keyValueStoreBuilder(imkvs, Serdes.String, Serdes.String).withLoggingDisabled().withCachingEnabled()
val gTable: GlobalKTable[String, String] = builder.globalTable("testTopic", Materialized.as(imkvs).withKeySerde(Serdes.String()).withValueSerde(Serdes.String()).withCachingDisabled())
val streams: KafkaStreams = new KafkaStreams(builder.build(), config)
streams.start()
val read: ReadOnlyKeyValueStore[String, String] = streams.store(gTable.queryableStoreName(), QueryableStoreTypes.keyValueStore[String, String]())
val hexLookup = "2"
println(read.get(hexLookup))
val iter: KeyValueIterator[String, String] = read.all()
while(iter.hasNext) {
val next = iter.next()
println(next.key + ": " + next.value)
}
流复位命令:
kafka-streams-application-reset --application-id testStream \
--bootstrap-servers localhost:9092 \
--to-earliest
1) 我是不是编错代码了,还是 kafka-streams-application-reset
功能不正常?2) 我希望用 inMemoryKeyValueStore
会导致Kafka无法跟踪当前偏移量;有没有办法强迫 GlobalKTable
不保留当前偏移?我想一直搜索整个数据集。
软件版本:
Kafka1.1.1-1
汇合4.1.1-1
spark scala 2.3.1版
Kafka客户端1.1.0
Kafka流1.1.0
1条答案
按热度按时间n3schb8v1#
如果要从空的内部状态重新启动应用程序并重新处理偏移量0中的数据,则必须提供“--input topics”参数和逗号分隔的主题列表。
您可以在此处找到更多详细信息:https://kafka.apache.org/10/documentation/streams/developer-guide/app-reset-tool
关于globalktable,理想情况下,它是流/主题顶部的物化视图,就像任何其他可查询存储一样。
另外,globalktable始终应用“auto.offset.reset”策略“earliest”,而不考虑streamsconfig中指定的值。
因此,它应该允许您随时查询整个表。