本文整理了Java中org.apache.kafka.streams.kstream.KStreamBuilder.globalTable()
方法的一些代码示例,展示了KStreamBuilder.globalTable()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。KStreamBuilder.globalTable()
方法的具体详情如下:
包路径:org.apache.kafka.streams.kstream.KStreamBuilder
类名称:KStreamBuilder
方法名:globalTable
暂无
代码示例来源:origin: homeaway/stream-registry
@SuppressWarnings("unchecked")
@Override
public void configure(Map<String, Object> configs) {
// Get the infra manager topic name
Validate.validState(configs.containsKey(INFRAMANAGER_TOPIC), "Infra Manager Topic name is not provided.");
String infraManagerTopic = configs.get(INFRAMANAGER_TOPIC).toString();
log.info("Infra Manager Topic Name Read: {}", infraManagerTopic);
// Get the infra state store name
Validate.validState(configs.containsKey(INFRAMANAGER_STATE_STORE), "Infra Manager State Store name is not provided.");
infraStateStoreName = configs.get(INFRAMANAGER_STATE_STORE).toString();
log.info("Infra Manager State Store Name Read: {}", infraStateStoreName);
// Populate our kstreams properties map
Properties infraKStreamsProperties = new Properties();
Validate.validState(configs.containsKey(INFRA_KSTREAM_PROPS), "InfraKStreams properties is not provided.");
Map<String, Object> infraKStreamsPropertiesMap = (Map<String, Object>) configs.get(INFRA_KSTREAM_PROPS);
infraKStreamsPropertiesMap.forEach(infraKStreamsProperties::put);
log.info("Infra KStreams Properties: {}", infraKStreamsProperties);
// initialize the kstreams processor
KStreamBuilder infraKStreamBuilder = new KStreamBuilder();
kTable = infraKStreamBuilder.globalTable(infraManagerTopic, infraStateStoreName);
infraKStreams = new KafkaStreams(infraKStreamBuilder, infraKStreamsProperties);
}
代码示例来源:origin: simplesteph/kafka-streams-course
GlobalKTable<String, String> usersGlobalTable = builder.globalTable("user-table");
代码示例来源:origin: homeaway/stream-registry
public ManagedKStreams(Properties streamProperties, TopicsConfig topicsConfig, KStreamsProcessorListener testListener) {
this.streamProperties = streamProperties;
this.topicsConfig = topicsConfig;
stateStoreName = topicsConfig.getStateStoreName();
KStreamBuilder kStreamBuilder= new KStreamBuilder();
kStreamBuilder.globalTable(topicsConfig.getProducerTopic(), stateStoreName);
streams = new KafkaStreams(kStreamBuilder, streamProperties);
// [ #132 ] - Improve build times by notifying test listener that we are running
streams.setStateListener((newState, oldState) -> {
if (!isRunning && newState == KafkaStreams.State.RUNNING) {
isRunning = true;
if( testListener != null) {
testListener.stateStoreInitialized();
}
}
});
streams.setUncaughtExceptionHandler((t, e) -> log.error("KafkaStreams job failed", e));
}
内容来源于网络,如有侵权,请联系作者删除!