org.apache.kafka.streams.kstream.KStreamBuilder.<init>()方法的使用及代码示例

x33g5p2x  于2022-01-23 转载在 其他  
字(15.9k)|赞(0)|评价(0)|浏览(93)

本文整理了Java中org.apache.kafka.streams.kstream.KStreamBuilder.<init>()方法的一些代码示例,展示了KStreamBuilder.<init>()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。KStreamBuilder.<init>()方法的具体详情如下:
包路径:org.apache.kafka.streams.kstream.KStreamBuilder
类名称:KStreamBuilder
方法名:<init>

KStreamBuilder.<init>介绍

暂无

代码示例

代码示例来源:origin: SeldonIO/seldon-server

KStreamBuilder builder = new KStreamBuilder();

代码示例来源:origin: SeldonIO/seldon-server

influxDB.enableBatch(50, 5, TimeUnit.SECONDS);
KStreamBuilder builder = new KStreamBuilder();

代码示例来源:origin: SeldonIO/seldon-server

influxDB.enableBatch(50, 5, TimeUnit.SECONDS);
KStreamBuilder builder = new KStreamBuilder();

代码示例来源:origin: outbrain/Aletheia

public AletheiaStreams(final Map<String, Object> streamsAppConfig) {
 AletheiaConfig.registerEndPointTemplate(KafkaTopicEndPointTemplate.TYPE, KafkaTopicEndPointTemplate.class);
 this.kstreamBuilder = new KStreamBuilder();
 this.originalConfig = streamsAppConfig;
}

代码示例来源:origin: com.github.piotr-kalanski/kafka-streams-app-common

private KafkaStreams createTopology(Properties config) {
  KStreamBuilder builder = new KStreamBuilder();
  buildTopology(builder);
  return new KafkaStreams(builder, config);
}

代码示例来源:origin: com.github.piotr-kalanski.kafka/kafka-streams-app-common

private KafkaStreams createTopology(Properties config) {
  KStreamBuilder builder = new KStreamBuilder();
  buildTopology(builder);
  return new KafkaStreams(builder, config);
}

代码示例来源:origin: homeaway/stream-registry

@SuppressWarnings("unchecked")
@Override
public void configure(Map<String, Object> configs) {
  // Get the infra manager topic name
  Validate.validState(configs.containsKey(INFRAMANAGER_TOPIC), "Infra Manager Topic name is not provided.");
  String infraManagerTopic = configs.get(INFRAMANAGER_TOPIC).toString();
  log.info("Infra Manager Topic Name Read: {}", infraManagerTopic);
  // Get the infra state store name
  Validate.validState(configs.containsKey(INFRAMANAGER_STATE_STORE), "Infra Manager State Store name is not provided.");
  infraStateStoreName = configs.get(INFRAMANAGER_STATE_STORE).toString();
  log.info("Infra Manager State Store Name Read: {}", infraStateStoreName);
  // Populate our kstreams properties map
  Properties infraKStreamsProperties = new Properties();
  Validate.validState(configs.containsKey(INFRA_KSTREAM_PROPS), "InfraKStreams properties is not provided.");
  Map<String, Object> infraKStreamsPropertiesMap = (Map<String, Object>) configs.get(INFRA_KSTREAM_PROPS);
  infraKStreamsPropertiesMap.forEach(infraKStreamsProperties::put);
  log.info("Infra KStreams Properties: {}", infraKStreamsProperties);
  // initialize the kstreams processor
  KStreamBuilder infraKStreamBuilder = new KStreamBuilder();
  kTable = infraKStreamBuilder.globalTable(infraManagerTopic, infraStateStoreName);
  infraKStreams = new KafkaStreams(infraKStreamBuilder, infraKStreamsProperties);
}

代码示例来源:origin: simplesteph/kafka-streams-course

public static void main(String[] args) {
  Properties config = new Properties();
  config.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-starter-app");
  config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
  config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
  config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
  config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
  KStreamBuilder builder = new KStreamBuilder();
  KStream<String, String> kStream = builder.stream("input-topic-name");
  // do stuff
  kStream.to("word-count-output");
  KafkaStreams streams = new KafkaStreams(builder, config);
  streams.cleanUp(); // only do this in dev - not in prod
  streams.start();
  // print the topology
  System.out.println(streams.toString());
  // shutdown hook to correctly close the streams application
  Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}

代码示例来源:origin: jetoile/hadoop-unit

public void run() throws InterruptedException {
    Properties props = new Properties();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "test1");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, this.broker);
    props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

    // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

    KStreamBuilder builder = new KStreamBuilder();

    builder.stream(this.inputTopic).to(this.outputTopic);

    KafkaStreams streams = new KafkaStreams(builder, props);
    streams.start();

    // usually the stream application would be running forever,
    // in this example we just let it run for some time and stop since the input data is finite.
    Thread.sleep(5000L);

    streams.close();
  }
}

代码示例来源:origin: JohnReedLOL/kafka-streams

streamsConfiguration.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, "test1234");
KStreamBuilder builder = new KStreamBuilder();

代码示例来源:origin: jetoile/hadoop-unit

public void run() throws InterruptedException {
    Properties props = new Properties();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "test1");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, this.broker);
    props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

    // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

    KStreamBuilder builder = new KStreamBuilder();

    KStream<String, String> stream = builder.stream(this.topic);

    stream.foreach((key, value) -> {
      System.out.println("================");
      System.out.println(value);
    });

    KafkaStreams streams = new KafkaStreams(builder, props);
    streams.start();

    // usually the stream application would be running forever,
    // in this example we just let it run for some time and stop since the input data is finite.
    Thread.sleep(5000L);

    streams.close();
  }
}

代码示例来源:origin: vgoldin/cqrs-eventsourcing-kafka

@Override
public void start() throws Exception {
  StreamsConfig streamsConfig = new StreamsConfig(getProperties());
  Serde<EventEnvelope> envelopeSerde = initializeEnvelopeSerde();
  Predicate<String, EventEnvelope> inventoryItemCreated = (k, v) -> k.equals(InventoryItemCreated.class.getSimpleName());
  Predicate<String, EventEnvelope> inventoryItemRenamed =  (k, v) -> k.equals(InventoryItemRenamed.class.getSimpleName());
  Predicate<String, EventEnvelope> inventoryItemDeactivated = (k, v) -> k.equals(InventoryItemDeactivated.class.getSimpleName());
  KStreamBuilder builder = new KStreamBuilder();
  KStream<String, EventEnvelope>[] filteredStreams = builder
      .stream(Serdes.String(), envelopeSerde, INVENTORY_ITEM_TOPIC)
      .selectKey((k, v) -> v.eventType)
      .branch(inventoryItemCreated, inventoryItemRenamed, inventoryItemDeactivated);
  filteredStreams[0].process(InventoryItemCreatedHandler::new);
  filteredStreams[1].process(InventoryItemRenamedHandler::new);
  filteredStreams[2].process(InventoryItemDeactivatedHandler::new);
  kafkaStreams = new KafkaStreams(builder, streamsConfig);
  kafkaStreams.cleanUp(); // -- only because we are using in-memory
  kafkaStreams.start();
}

代码示例来源:origin: JohnReedLOL/kafka-streams

final Serde<Long> longSerde = Serdes.Long();
KStreamBuilder builder = new KStreamBuilder();

代码示例来源:origin: ebi-wp/kafka-streams-api-websockets

KStreamBuilder builder = new KStreamBuilder();

代码示例来源:origin: homeaway/stream-registry

public ManagedKStreams(Properties streamProperties, TopicsConfig topicsConfig, KStreamsProcessorListener testListener) {
  this.streamProperties = streamProperties;
  this.topicsConfig = topicsConfig;
  stateStoreName = topicsConfig.getStateStoreName();
  KStreamBuilder kStreamBuilder= new KStreamBuilder();
  kStreamBuilder.globalTable(topicsConfig.getProducerTopic(), stateStoreName);
  streams = new KafkaStreams(kStreamBuilder, streamProperties);
  // [ #132 ] - Improve build times by notifying test listener that we are running
  streams.setStateListener((newState, oldState) -> {
    if (!isRunning && newState == KafkaStreams.State.RUNNING) {
      isRunning = true;
      if( testListener != null) {
        testListener.stateStoreInitialized();
      }
    }
  });
  streams.setUncaughtExceptionHandler((t, e) -> log.error("KafkaStreams job failed", e));
}

代码示例来源:origin: bbejeck/kafka-streams

public static void main(String[] args) {
  StreamsConfig streamingConfig = new StreamsConfig(getProperties());
  KStreamBuilder kStreamBuilder = new KStreamBuilder();
  KStream<String, String> patternStreamI = kStreamBuilder.stream(Serdes.String(), Serdes.String(), Pattern.compile("topic-\\d"));
  KStream<String, String> namedTopicKStream = kStreamBuilder.stream(Serdes.String(), Serdes.String(), "topic-Z");
  KStream<String, String> patternStreamII = kStreamBuilder.stream(Serdes.String(), Serdes.String(), Pattern.compile("topic-[A-Y]+"));
  patternStreamI.print("pattern-\\d");
  namedTopicKStream.print("topic-Z");
  patternStreamII.print("topic-[A-Y]+");
  System.out.println("Starting stream regex consumer Example");
  KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, streamingConfig);
  kafkaStreams.start();
}

代码示例来源:origin: bbejeck/kafka-streams

public void run()  {
  StreamsConfig streamsConfig = new StreamsConfig(getProperties());
  JsonSerializer<Tweet> tweetJsonSerializer = new JsonSerializer<>();
  JsonDeserializer<Tweet> tweetJsonDeserializer = new JsonDeserializer<>(Tweet.class);
  Serde<Tweet> tweetSerde = Serdes.serdeFrom(tweetJsonSerializer, tweetJsonDeserializer);
  KStreamBuilder kStreamBuilder = new KStreamBuilder();
  Classifier classifier = new Classifier();
  classifier.train(new File("src/main/resources/kafkaStreamsTwitterTrainingData_clean.csv"));
  KeyValueMapper<String, Tweet, String> languageToKey = (k, v) ->
    StringUtils.isNotBlank(v.getText()) ? classifier.classify(v.getText()):"unknown";
  Predicate<String, Tweet> isEnglish = (k, v) -> k.equals("english");
  Predicate<String, Tweet> isFrench =  (k, v) -> k.equals("french");
  Predicate<String, Tweet> isSpanish = (k, v) -> k.equals("spanish");
  KStream<String, Tweet> tweetKStream = kStreamBuilder.stream(Serdes.String(), tweetSerde, "twitterData");
  KStream<String, Tweet>[] filteredStreams = tweetKStream.selectKey(languageToKey).branch(isEnglish, isFrench, isSpanish);
  filteredStreams[0].to(Serdes.String(), tweetSerde, "english");
  filteredStreams[1].to(Serdes.String(), tweetSerde, "french");
  filteredStreams[2].to(Serdes.String(), tweetSerde, "spanish");
  kafkaStreams = new KafkaStreams(kStreamBuilder, streamsConfig);
  System.out.println("Starting twitter analysis streams");
  kafkaStreams.start();
  System.out.println("Started");
}

代码示例来源:origin: amient/hello-kafka-streams

private static KafkaStreams createWikipediaStreamsInstance(String bootstrapServers) {
  final Serializer<JsonNode> jsonSerializer = new JsonSerializer();
  final Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer();
  final Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer, jsonDeserializer);
  KStreamBuilder builder = new KStreamBuilder();
  Properties props = new Properties();
  props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wikipedia-streams");
  props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
  KStream<JsonNode, JsonNode> wikipediaRaw = builder.stream(jsonSerde, jsonSerde, "wikipedia-raw");
  KStream<String, WikipediaMessage> wikipediaParsed =
      wikipediaRaw.map(WikipediaMessage::parceIRC)
          .filter(WikipediaMessage::filterNonNull)
          .through(Serdes.String(), new JsonPOJOSerde<>(WikipediaMessage.class), "wikipedia-parsed");
  KTable<String, Long> totalEditsByUser = wikipediaParsed
      .filter((key, value) -> value.type == WikipediaMessage.Type.EDIT)
      .countByKey(Serdes.String(), "wikipedia-edits-by-user");
  //some print
  totalEditsByUser.toStream().process(() -> new AbstractProcessor<String, Long>() {
    @Override
    public void process(String user, Long numEdits) {
      System.out.println("USER: " + user + " num.edits: " + numEdits);
    }
  });
  return new KafkaStreams(builder, props);
}

代码示例来源:origin: bbejeck/kafka-streams

public static void main(String[] args) {
  StreamsConfig streamingConfig = new StreamsConfig(getProperties());
  JsonSerializer<StockTransactionCollector> stockTransactionsSerializer = new JsonSerializer<>();
  JsonDeserializer<StockTransactionCollector> stockTransactionsDeserializer = new JsonDeserializer<>(StockTransactionCollector.class);
  JsonDeserializer<StockTransaction> stockTxnDeserializer = new JsonDeserializer<>(StockTransaction.class);
  JsonSerializer<StockTransaction> stockTxnJsonSerializer = new JsonSerializer<>();
  Serde<StockTransaction> transactionSerde = Serdes.serdeFrom(stockTxnJsonSerializer,stockTxnDeserializer);
  StringSerializer stringSerializer = new StringSerializer();
  StringDeserializer stringDeserializer = new StringDeserializer();
  Serde<String> stringSerde = Serdes.serdeFrom(stringSerializer,stringDeserializer);
  Serde<StockTransactionCollector> collectorSerde = Serdes.serdeFrom(stockTransactionsSerializer,stockTransactionsDeserializer);
  WindowedSerializer<String> windowedSerializer = new WindowedSerializer<>(stringSerializer);
  WindowedDeserializer<String> windowedDeserializer = new WindowedDeserializer<>(stringDeserializer);
  Serde<Windowed<String>> windowedSerde = Serdes.serdeFrom(windowedSerializer,windowedDeserializer);
  KStreamBuilder kStreamBuilder = new KStreamBuilder();
  KStream<String,StockTransaction> transactionKStream =  kStreamBuilder.stream(stringSerde,transactionSerde,"stocks");
  transactionKStream.map((k,v)-> new KeyValue<>(v.getSymbol(),v))
           .through(stringSerde, transactionSerde,"stocks-out")
           .groupBy((k,v) -> k, stringSerde, transactionSerde)
           .aggregate(StockTransactionCollector::new,
              (k, v, stockTransactionCollector) -> stockTransactionCollector.add(v),
              TimeWindows.of(10000),
              collectorSerde, "stock-summaries")
      .to(windowedSerde,collectorSerde,"transaction-summary");
  System.out.println("Starting StockStreams Example");
  KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder,streamingConfig);
  kafkaStreams.start();
  System.out.println("Now started StockStreams Example");
}

代码示例来源:origin: bbejeck/kafka-streams

public static void main(String[] args) {
  StreamsConfig streamsConfig = new StreamsConfig(getProperties());
  JsonDeserializer<Purchase> purchaseJsonDeserializer = new JsonDeserializer<>(Purchase.class);
  JsonSerializer<Purchase> purchaseJsonSerializer = new JsonSerializer<>();
  JsonSerializer<RewardAccumulator> rewardAccumulatorJsonSerializer = new JsonSerializer<>();
  JsonDeserializer<RewardAccumulator> rewardAccumulatorJsonDeserializer = new JsonDeserializer<>(RewardAccumulator.class);
  Serde<RewardAccumulator> rewardAccumulatorSerde = Serdes.serdeFrom(rewardAccumulatorJsonSerializer,rewardAccumulatorJsonDeserializer);
  JsonSerializer<PurchasePattern> purchasePatternJsonSerializer = new JsonSerializer<>();
  JsonDeserializer<PurchasePattern> purchasePatternJsonDeserializer = new JsonDeserializer<>(PurchasePattern.class);
  Serde<PurchasePattern> purchasePatternSerde = Serdes.serdeFrom(purchasePatternJsonSerializer,purchasePatternJsonDeserializer);
  Serde<Purchase> purchaseSerde = Serdes.serdeFrom(purchaseJsonSerializer,purchaseJsonDeserializer);
  Serde<String> stringSerde = Serdes.String();
  KStreamBuilder kStreamBuilder = new KStreamBuilder();
  KStream<String,Purchase> purchaseKStream = kStreamBuilder.stream(stringSerde,purchaseSerde,"src-topic")
      .mapValues(p -> Purchase.builder(p).maskCreditCard().build());
  purchaseKStream.mapValues(purchase -> PurchasePattern.builder(purchase).build()).to(stringSerde,purchasePatternSerde,"patterns");
  purchaseKStream.mapValues(purchase -> RewardAccumulator.builder(purchase).build()).to(stringSerde,rewardAccumulatorSerde,"rewards");
  purchaseKStream.to(stringSerde,purchaseSerde,"purchases");
  System.out.println("Starting PurchaseStreams Example");
  KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder,streamsConfig);
  kafkaStreams.start();
  System.out.println("Now started PurchaseStreams Example");
}

相关文章

微信公众号

最新文章

更多