org.apache.kafka.streams.kstream.KStreamBuilder.stream()方法的使用及代码示例

x33g5p2x  于2022-01-23 转载在 其他  
字(14.6k)|赞(0)|评价(0)|浏览(102)

本文整理了Java中org.apache.kafka.streams.kstream.KStreamBuilder.stream()方法的一些代码示例,展示了KStreamBuilder.stream()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。KStreamBuilder.stream()方法的具体详情如下:
包路径:org.apache.kafka.streams.kstream.KStreamBuilder
类名称:KStreamBuilder
方法名:stream

KStreamBuilder.stream介绍

暂无

代码示例

代码示例来源:origin: SeldonIO/seldon-server

System.out.println("topic:"+topic);
final String parseDateMethod = ns.getString("parse_date_method");
KStream<byte[], JsonNode> source = builder.stream(Serdes.ByteArray(),jsonSerde,topic);

代码示例来源:origin: SeldonIO/seldon-server

KStream<String, JsonNode> source = builder.stream(stringSerde,jsonSerde,ns.getString("topic"));

代码示例来源:origin: SeldonIO/seldon-server

KStream<String, JsonNode> source = builder.stream(stringSerde,jsonSerde,ns.getString("topic"));

代码示例来源:origin: simplesteph/kafka-streams-course

public static void main(String[] args) {
  Properties config = new Properties();
  config.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-starter-app");
  config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
  config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
  config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
  config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
  KStreamBuilder builder = new KStreamBuilder();
  KStream<String, String> kStream = builder.stream("input-topic-name");
  // do stuff
  kStream.to("word-count-output");
  KafkaStreams streams = new KafkaStreams(builder, config);
  streams.cleanUp(); // only do this in dev - not in prod
  streams.start();
  // print the topology
  System.out.println(streams.toString());
  // shutdown hook to correctly close the streams application
  Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}

代码示例来源:origin: jetoile/hadoop-unit

public void run() throws InterruptedException {
    Properties props = new Properties();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "test1");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, this.broker);
    props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

    // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

    KStreamBuilder builder = new KStreamBuilder();

    builder.stream(this.inputTopic).to(this.outputTopic);

    KafkaStreams streams = new KafkaStreams(builder, props);
    streams.start();

    // usually the stream application would be running forever,
    // in this example we just let it run for some time and stop since the input data is finite.
    Thread.sleep(5000L);

    streams.close();
  }
}

代码示例来源:origin: JohnReedLOL/kafka-streams

builder.stream("secure-input").to("secure-output");
KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
streams.start();

代码示例来源:origin: jetoile/hadoop-unit

public void run() throws InterruptedException {
    Properties props = new Properties();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "test1");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, this.broker);
    props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

    // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

    KStreamBuilder builder = new KStreamBuilder();

    KStream<String, String> stream = builder.stream(this.topic);

    stream.foreach((key, value) -> {
      System.out.println("================");
      System.out.println(value);
    });

    KafkaStreams streams = new KafkaStreams(builder, props);
    streams.start();

    // usually the stream application would be running forever,
    // in this example we just let it run for some time and stop since the input data is finite.
    Thread.sleep(5000L);

    streams.close();
  }
}

代码示例来源:origin: bbejeck/kafka-streams

public static void main(String[] args) {
  StreamsConfig streamingConfig = new StreamsConfig(getProperties());
  KStreamBuilder kStreamBuilder = new KStreamBuilder();
  KStream<String, String> patternStreamI = kStreamBuilder.stream(Serdes.String(), Serdes.String(), Pattern.compile("topic-\\d"));
  KStream<String, String> namedTopicKStream = kStreamBuilder.stream(Serdes.String(), Serdes.String(), "topic-Z");
  KStream<String, String> patternStreamII = kStreamBuilder.stream(Serdes.String(), Serdes.String(), Pattern.compile("topic-[A-Y]+"));
  patternStreamI.print("pattern-\\d");
  namedTopicKStream.print("topic-Z");
  patternStreamII.print("topic-[A-Y]+");
  System.out.println("Starting stream regex consumer Example");
  KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, streamingConfig);
  kafkaStreams.start();
}

代码示例来源:origin: simplesteph/kafka-streams-course

builder.stream(Serdes.String(), jsonSerde, "bank-transactions");

代码示例来源:origin: simplesteph/kafka-streams-course

KStream<String, String> textLines = builder.stream("favourite-colour-input");

代码示例来源:origin: timothyrenner/kafka-streams-ex

KStream<byte[], String> text = builder.stream("console");

代码示例来源:origin: outbrain/Aletheia

public <TDomainClass> KStream<String, TDomainClass> stream(final AletheiaConfig config,
                              final Class<TDomainClass> domainClass,
                              final String consumeFromEndPointId,
                              final String datumSerDeId,
                              final SerDeListener<TDomainClass> serDeListener) {
 final KafkaTopicConsumptionEndPoint consumptionEndPoint = getKafkaTopicConsumptionEndPoint(config, consumeFromEndPointId);
 final Serde<TDomainClass> valueSerDe = AletheiaSerdes.serdeFrom(domainClass, datumSerDeId, config, serDeListener);
 return kstreamBuilder.stream(Serdes.String(), valueSerDe, consumptionEndPoint.getTopicName());
}

代码示例来源:origin: vgoldin/cqrs-eventsourcing-kafka

@Override
public void start() throws Exception {
  StreamsConfig streamsConfig = new StreamsConfig(getProperties());
  Serde<EventEnvelope> envelopeSerde = initializeEnvelopeSerde();
  Predicate<String, EventEnvelope> inventoryItemCreated = (k, v) -> k.equals(InventoryItemCreated.class.getSimpleName());
  Predicate<String, EventEnvelope> inventoryItemRenamed =  (k, v) -> k.equals(InventoryItemRenamed.class.getSimpleName());
  Predicate<String, EventEnvelope> inventoryItemDeactivated = (k, v) -> k.equals(InventoryItemDeactivated.class.getSimpleName());
  KStreamBuilder builder = new KStreamBuilder();
  KStream<String, EventEnvelope>[] filteredStreams = builder
      .stream(Serdes.String(), envelopeSerde, INVENTORY_ITEM_TOPIC)
      .selectKey((k, v) -> v.eventType)
      .branch(inventoryItemCreated, inventoryItemRenamed, inventoryItemDeactivated);
  filteredStreams[0].process(InventoryItemCreatedHandler::new);
  filteredStreams[1].process(InventoryItemRenamedHandler::new);
  filteredStreams[2].process(InventoryItemDeactivatedHandler::new);
  kafkaStreams = new KafkaStreams(builder, streamsConfig);
  kafkaStreams.cleanUp(); // -- only because we are using in-memory
  kafkaStreams.start();
}

代码示例来源:origin: JohnReedLOL/kafka-streams

KStream<String, WikiFeed> feeds = builder.stream("WikipediaFeed");

代码示例来源:origin: ebi-wp/kafka-streams-api-websockets

KStream<String, String> source = builder.stream("data-in");

代码示例来源:origin: outbrain/Aletheia

public KStream<String, DatumEnvelope> envelopeStream(final AletheiaConfig config,
                           final String consumeFromEndPointId) {
 final KafkaTopicConsumptionEndPoint endPoint = getKafkaTopicConsumptionEndPoint(config, consumeFromEndPointId);
 final Serde<DatumEnvelope> envelopeSerde = serdeFrom(new AletheiaKafkaEnvelopeSerializer(), new AletheiaKafkaEnvelopeDeserializer());
 return kstreamBuilder.stream(Serdes.String(), envelopeSerde, endPoint.getTopicName());
}

代码示例来源:origin: bbejeck/kafka-streams

public void run()  {
  StreamsConfig streamsConfig = new StreamsConfig(getProperties());
  JsonSerializer<Tweet> tweetJsonSerializer = new JsonSerializer<>();
  JsonDeserializer<Tweet> tweetJsonDeserializer = new JsonDeserializer<>(Tweet.class);
  Serde<Tweet> tweetSerde = Serdes.serdeFrom(tweetJsonSerializer, tweetJsonDeserializer);
  KStreamBuilder kStreamBuilder = new KStreamBuilder();
  Classifier classifier = new Classifier();
  classifier.train(new File("src/main/resources/kafkaStreamsTwitterTrainingData_clean.csv"));
  KeyValueMapper<String, Tweet, String> languageToKey = (k, v) ->
    StringUtils.isNotBlank(v.getText()) ? classifier.classify(v.getText()):"unknown";
  Predicate<String, Tweet> isEnglish = (k, v) -> k.equals("english");
  Predicate<String, Tweet> isFrench =  (k, v) -> k.equals("french");
  Predicate<String, Tweet> isSpanish = (k, v) -> k.equals("spanish");
  KStream<String, Tweet> tweetKStream = kStreamBuilder.stream(Serdes.String(), tweetSerde, "twitterData");
  KStream<String, Tweet>[] filteredStreams = tweetKStream.selectKey(languageToKey).branch(isEnglish, isFrench, isSpanish);
  filteredStreams[0].to(Serdes.String(), tweetSerde, "english");
  filteredStreams[1].to(Serdes.String(), tweetSerde, "french");
  filteredStreams[2].to(Serdes.String(), tweetSerde, "spanish");
  kafkaStreams = new KafkaStreams(kStreamBuilder, streamsConfig);
  System.out.println("Starting twitter analysis streams");
  kafkaStreams.start();
  System.out.println("Started");
}

代码示例来源:origin: amient/hello-kafka-streams

private static KafkaStreams createWikipediaStreamsInstance(String bootstrapServers) {
  final Serializer<JsonNode> jsonSerializer = new JsonSerializer();
  final Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer();
  final Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer, jsonDeserializer);
  KStreamBuilder builder = new KStreamBuilder();
  Properties props = new Properties();
  props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wikipedia-streams");
  props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
  KStream<JsonNode, JsonNode> wikipediaRaw = builder.stream(jsonSerde, jsonSerde, "wikipedia-raw");
  KStream<String, WikipediaMessage> wikipediaParsed =
      wikipediaRaw.map(WikipediaMessage::parceIRC)
          .filter(WikipediaMessage::filterNonNull)
          .through(Serdes.String(), new JsonPOJOSerde<>(WikipediaMessage.class), "wikipedia-parsed");
  KTable<String, Long> totalEditsByUser = wikipediaParsed
      .filter((key, value) -> value.type == WikipediaMessage.Type.EDIT)
      .countByKey(Serdes.String(), "wikipedia-edits-by-user");
  //some print
  totalEditsByUser.toStream().process(() -> new AbstractProcessor<String, Long>() {
    @Override
    public void process(String user, Long numEdits) {
      System.out.println("USER: " + user + " num.edits: " + numEdits);
    }
  });
  return new KafkaStreams(builder, props);
}

代码示例来源:origin: bbejeck/kafka-streams

public static void main(String[] args) {
  StreamsConfig streamsConfig = new StreamsConfig(getProperties());
  JsonDeserializer<Purchase> purchaseJsonDeserializer = new JsonDeserializer<>(Purchase.class);
  JsonSerializer<Purchase> purchaseJsonSerializer = new JsonSerializer<>();
  JsonSerializer<RewardAccumulator> rewardAccumulatorJsonSerializer = new JsonSerializer<>();
  JsonDeserializer<RewardAccumulator> rewardAccumulatorJsonDeserializer = new JsonDeserializer<>(RewardAccumulator.class);
  Serde<RewardAccumulator> rewardAccumulatorSerde = Serdes.serdeFrom(rewardAccumulatorJsonSerializer,rewardAccumulatorJsonDeserializer);
  JsonSerializer<PurchasePattern> purchasePatternJsonSerializer = new JsonSerializer<>();
  JsonDeserializer<PurchasePattern> purchasePatternJsonDeserializer = new JsonDeserializer<>(PurchasePattern.class);
  Serde<PurchasePattern> purchasePatternSerde = Serdes.serdeFrom(purchasePatternJsonSerializer,purchasePatternJsonDeserializer);
  Serde<Purchase> purchaseSerde = Serdes.serdeFrom(purchaseJsonSerializer,purchaseJsonDeserializer);
  Serde<String> stringSerde = Serdes.String();
  KStreamBuilder kStreamBuilder = new KStreamBuilder();
  KStream<String,Purchase> purchaseKStream = kStreamBuilder.stream(stringSerde,purchaseSerde,"src-topic")
      .mapValues(p -> Purchase.builder(p).maskCreditCard().build());
  purchaseKStream.mapValues(purchase -> PurchasePattern.builder(purchase).build()).to(stringSerde,purchasePatternSerde,"patterns");
  purchaseKStream.mapValues(purchase -> RewardAccumulator.builder(purchase).build()).to(stringSerde,rewardAccumulatorSerde,"rewards");
  purchaseKStream.to(stringSerde,purchaseSerde,"purchases");
  System.out.println("Starting PurchaseStreams Example");
  KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder,streamsConfig);
  kafkaStreams.start();
  System.out.println("Now started PurchaseStreams Example");
}

代码示例来源:origin: bbejeck/kafka-streams

public static void main(String[] args) {
  StreamsConfig streamingConfig = new StreamsConfig(getProperties());
  JsonSerializer<StockTransactionCollector> stockTransactionsSerializer = new JsonSerializer<>();
  JsonDeserializer<StockTransactionCollector> stockTransactionsDeserializer = new JsonDeserializer<>(StockTransactionCollector.class);
  JsonDeserializer<StockTransaction> stockTxnDeserializer = new JsonDeserializer<>(StockTransaction.class);
  JsonSerializer<StockTransaction> stockTxnJsonSerializer = new JsonSerializer<>();
  Serde<StockTransaction> transactionSerde = Serdes.serdeFrom(stockTxnJsonSerializer,stockTxnDeserializer);
  StringSerializer stringSerializer = new StringSerializer();
  StringDeserializer stringDeserializer = new StringDeserializer();
  Serde<String> stringSerde = Serdes.serdeFrom(stringSerializer,stringDeserializer);
  Serde<StockTransactionCollector> collectorSerde = Serdes.serdeFrom(stockTransactionsSerializer,stockTransactionsDeserializer);
  WindowedSerializer<String> windowedSerializer = new WindowedSerializer<>(stringSerializer);
  WindowedDeserializer<String> windowedDeserializer = new WindowedDeserializer<>(stringDeserializer);
  Serde<Windowed<String>> windowedSerde = Serdes.serdeFrom(windowedSerializer,windowedDeserializer);
  KStreamBuilder kStreamBuilder = new KStreamBuilder();
  KStream<String,StockTransaction> transactionKStream =  kStreamBuilder.stream(stringSerde,transactionSerde,"stocks");
  transactionKStream.map((k,v)-> new KeyValue<>(v.getSymbol(),v))
           .through(stringSerde, transactionSerde,"stocks-out")
           .groupBy((k,v) -> k, stringSerde, transactionSerde)
           .aggregate(StockTransactionCollector::new,
              (k, v, stockTransactionCollector) -> stockTransactionCollector.add(v),
              TimeWindows.of(10000),
              collectorSerde, "stock-summaries")
      .to(windowedSerde,collectorSerde,"transaction-summary");
  System.out.println("Starting StockStreams Example");
  KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder,streamingConfig);
  kafkaStreams.start();
  System.out.println("Now started StockStreams Example");
}

相关文章

微信公众号

最新文章

更多