org.apache.kafka.streams.kstream.KStreamBuilder.globalTable()方法的使用及代码示例

x33g5p2x  于2022-01-23 转载在 其他  
字(2.7k)|赞(0)|评价(0)|浏览(105)

本文整理了Java中org.apache.kafka.streams.kstream.KStreamBuilder.globalTable()方法的一些代码示例,展示了KStreamBuilder.globalTable()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。KStreamBuilder.globalTable()方法的具体详情如下:
包路径:org.apache.kafka.streams.kstream.KStreamBuilder
类名称:KStreamBuilder
方法名:globalTable

KStreamBuilder.globalTable介绍

暂无

代码示例

代码示例来源:origin: homeaway/stream-registry

@SuppressWarnings("unchecked")
@Override
public void configure(Map<String, Object> configs) {
  // Get the infra manager topic name
  Validate.validState(configs.containsKey(INFRAMANAGER_TOPIC), "Infra Manager Topic name is not provided.");
  String infraManagerTopic = configs.get(INFRAMANAGER_TOPIC).toString();
  log.info("Infra Manager Topic Name Read: {}", infraManagerTopic);
  // Get the infra state store name
  Validate.validState(configs.containsKey(INFRAMANAGER_STATE_STORE), "Infra Manager State Store name is not provided.");
  infraStateStoreName = configs.get(INFRAMANAGER_STATE_STORE).toString();
  log.info("Infra Manager State Store Name Read: {}", infraStateStoreName);
  // Populate our kstreams properties map
  Properties infraKStreamsProperties = new Properties();
  Validate.validState(configs.containsKey(INFRA_KSTREAM_PROPS), "InfraKStreams properties is not provided.");
  Map<String, Object> infraKStreamsPropertiesMap = (Map<String, Object>) configs.get(INFRA_KSTREAM_PROPS);
  infraKStreamsPropertiesMap.forEach(infraKStreamsProperties::put);
  log.info("Infra KStreams Properties: {}", infraKStreamsProperties);
  // initialize the kstreams processor
  KStreamBuilder infraKStreamBuilder = new KStreamBuilder();
  kTable = infraKStreamBuilder.globalTable(infraManagerTopic, infraStateStoreName);
  infraKStreams = new KafkaStreams(infraKStreamBuilder, infraKStreamsProperties);
}

代码示例来源:origin: simplesteph/kafka-streams-course

GlobalKTable<String, String> usersGlobalTable = builder.globalTable("user-table");

代码示例来源:origin: homeaway/stream-registry

public ManagedKStreams(Properties streamProperties, TopicsConfig topicsConfig, KStreamsProcessorListener testListener) {
  this.streamProperties = streamProperties;
  this.topicsConfig = topicsConfig;
  stateStoreName = topicsConfig.getStateStoreName();
  KStreamBuilder kStreamBuilder= new KStreamBuilder();
  kStreamBuilder.globalTable(topicsConfig.getProducerTopic(), stateStoreName);
  streams = new KafkaStreams(kStreamBuilder, streamProperties);
  // [ #132 ] - Improve build times by notifying test listener that we are running
  streams.setStateListener((newState, oldState) -> {
    if (!isRunning && newState == KafkaStreams.State.RUNNING) {
      isRunning = true;
      if( testListener != null) {
        testListener.stateStoreInitialized();
      }
    }
  });
  streams.setUncaughtExceptionHandler((t, e) -> log.error("KafkaStreams job failed", e));
}

相关文章

微信公众号

最新文章

更多