本文整理了Java中org.apache.kafka.streams.kstream.KStreamBuilder.stream()
方法的一些代码示例,展示了KStreamBuilder.stream()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。KStreamBuilder.stream()
方法的具体详情如下:
包路径:org.apache.kafka.streams.kstream.KStreamBuilder
类名称:KStreamBuilder
方法名:stream
暂无
代码示例来源:origin: SeldonIO/seldon-server
System.out.println("topic:"+topic);
final String parseDateMethod = ns.getString("parse_date_method");
KStream<byte[], JsonNode> source = builder.stream(Serdes.ByteArray(),jsonSerde,topic);
代码示例来源:origin: SeldonIO/seldon-server
KStream<String, JsonNode> source = builder.stream(stringSerde,jsonSerde,ns.getString("topic"));
代码示例来源:origin: SeldonIO/seldon-server
KStream<String, JsonNode> source = builder.stream(stringSerde,jsonSerde,ns.getString("topic"));
代码示例来源:origin: simplesteph/kafka-streams-course
public static void main(String[] args) {
Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-starter-app");
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> kStream = builder.stream("input-topic-name");
// do stuff
kStream.to("word-count-output");
KafkaStreams streams = new KafkaStreams(builder, config);
streams.cleanUp(); // only do this in dev - not in prod
streams.start();
// print the topology
System.out.println(streams.toString());
// shutdown hook to correctly close the streams application
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
代码示例来源:origin: jetoile/hadoop-unit
public void run() throws InterruptedException {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "test1");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, this.broker);
props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
// setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
KStreamBuilder builder = new KStreamBuilder();
builder.stream(this.inputTopic).to(this.outputTopic);
KafkaStreams streams = new KafkaStreams(builder, props);
streams.start();
// usually the stream application would be running forever,
// in this example we just let it run for some time and stop since the input data is finite.
Thread.sleep(5000L);
streams.close();
}
}
代码示例来源:origin: JohnReedLOL/kafka-streams
builder.stream("secure-input").to("secure-output");
KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
streams.start();
代码示例来源:origin: jetoile/hadoop-unit
public void run() throws InterruptedException {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "test1");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, this.broker);
props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
// setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> stream = builder.stream(this.topic);
stream.foreach((key, value) -> {
System.out.println("================");
System.out.println(value);
});
KafkaStreams streams = new KafkaStreams(builder, props);
streams.start();
// usually the stream application would be running forever,
// in this example we just let it run for some time and stop since the input data is finite.
Thread.sleep(5000L);
streams.close();
}
}
代码示例来源:origin: bbejeck/kafka-streams
public static void main(String[] args) {
StreamsConfig streamingConfig = new StreamsConfig(getProperties());
KStreamBuilder kStreamBuilder = new KStreamBuilder();
KStream<String, String> patternStreamI = kStreamBuilder.stream(Serdes.String(), Serdes.String(), Pattern.compile("topic-\\d"));
KStream<String, String> namedTopicKStream = kStreamBuilder.stream(Serdes.String(), Serdes.String(), "topic-Z");
KStream<String, String> patternStreamII = kStreamBuilder.stream(Serdes.String(), Serdes.String(), Pattern.compile("topic-[A-Y]+"));
patternStreamI.print("pattern-\\d");
namedTopicKStream.print("topic-Z");
patternStreamII.print("topic-[A-Y]+");
System.out.println("Starting stream regex consumer Example");
KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, streamingConfig);
kafkaStreams.start();
}
代码示例来源:origin: simplesteph/kafka-streams-course
builder.stream(Serdes.String(), jsonSerde, "bank-transactions");
代码示例来源:origin: simplesteph/kafka-streams-course
KStream<String, String> textLines = builder.stream("favourite-colour-input");
代码示例来源:origin: timothyrenner/kafka-streams-ex
KStream<byte[], String> text = builder.stream("console");
代码示例来源:origin: outbrain/Aletheia
public <TDomainClass> KStream<String, TDomainClass> stream(final AletheiaConfig config,
final Class<TDomainClass> domainClass,
final String consumeFromEndPointId,
final String datumSerDeId,
final SerDeListener<TDomainClass> serDeListener) {
final KafkaTopicConsumptionEndPoint consumptionEndPoint = getKafkaTopicConsumptionEndPoint(config, consumeFromEndPointId);
final Serde<TDomainClass> valueSerDe = AletheiaSerdes.serdeFrom(domainClass, datumSerDeId, config, serDeListener);
return kstreamBuilder.stream(Serdes.String(), valueSerDe, consumptionEndPoint.getTopicName());
}
代码示例来源:origin: vgoldin/cqrs-eventsourcing-kafka
@Override
public void start() throws Exception {
StreamsConfig streamsConfig = new StreamsConfig(getProperties());
Serde<EventEnvelope> envelopeSerde = initializeEnvelopeSerde();
Predicate<String, EventEnvelope> inventoryItemCreated = (k, v) -> k.equals(InventoryItemCreated.class.getSimpleName());
Predicate<String, EventEnvelope> inventoryItemRenamed = (k, v) -> k.equals(InventoryItemRenamed.class.getSimpleName());
Predicate<String, EventEnvelope> inventoryItemDeactivated = (k, v) -> k.equals(InventoryItemDeactivated.class.getSimpleName());
KStreamBuilder builder = new KStreamBuilder();
KStream<String, EventEnvelope>[] filteredStreams = builder
.stream(Serdes.String(), envelopeSerde, INVENTORY_ITEM_TOPIC)
.selectKey((k, v) -> v.eventType)
.branch(inventoryItemCreated, inventoryItemRenamed, inventoryItemDeactivated);
filteredStreams[0].process(InventoryItemCreatedHandler::new);
filteredStreams[1].process(InventoryItemRenamedHandler::new);
filteredStreams[2].process(InventoryItemDeactivatedHandler::new);
kafkaStreams = new KafkaStreams(builder, streamsConfig);
kafkaStreams.cleanUp(); // -- only because we are using in-memory
kafkaStreams.start();
}
代码示例来源:origin: JohnReedLOL/kafka-streams
KStream<String, WikiFeed> feeds = builder.stream("WikipediaFeed");
代码示例来源:origin: ebi-wp/kafka-streams-api-websockets
KStream<String, String> source = builder.stream("data-in");
代码示例来源:origin: outbrain/Aletheia
public KStream<String, DatumEnvelope> envelopeStream(final AletheiaConfig config,
final String consumeFromEndPointId) {
final KafkaTopicConsumptionEndPoint endPoint = getKafkaTopicConsumptionEndPoint(config, consumeFromEndPointId);
final Serde<DatumEnvelope> envelopeSerde = serdeFrom(new AletheiaKafkaEnvelopeSerializer(), new AletheiaKafkaEnvelopeDeserializer());
return kstreamBuilder.stream(Serdes.String(), envelopeSerde, endPoint.getTopicName());
}
代码示例来源:origin: bbejeck/kafka-streams
public void run() {
StreamsConfig streamsConfig = new StreamsConfig(getProperties());
JsonSerializer<Tweet> tweetJsonSerializer = new JsonSerializer<>();
JsonDeserializer<Tweet> tweetJsonDeserializer = new JsonDeserializer<>(Tweet.class);
Serde<Tweet> tweetSerde = Serdes.serdeFrom(tweetJsonSerializer, tweetJsonDeserializer);
KStreamBuilder kStreamBuilder = new KStreamBuilder();
Classifier classifier = new Classifier();
classifier.train(new File("src/main/resources/kafkaStreamsTwitterTrainingData_clean.csv"));
KeyValueMapper<String, Tweet, String> languageToKey = (k, v) ->
StringUtils.isNotBlank(v.getText()) ? classifier.classify(v.getText()):"unknown";
Predicate<String, Tweet> isEnglish = (k, v) -> k.equals("english");
Predicate<String, Tweet> isFrench = (k, v) -> k.equals("french");
Predicate<String, Tweet> isSpanish = (k, v) -> k.equals("spanish");
KStream<String, Tweet> tweetKStream = kStreamBuilder.stream(Serdes.String(), tweetSerde, "twitterData");
KStream<String, Tweet>[] filteredStreams = tweetKStream.selectKey(languageToKey).branch(isEnglish, isFrench, isSpanish);
filteredStreams[0].to(Serdes.String(), tweetSerde, "english");
filteredStreams[1].to(Serdes.String(), tweetSerde, "french");
filteredStreams[2].to(Serdes.String(), tweetSerde, "spanish");
kafkaStreams = new KafkaStreams(kStreamBuilder, streamsConfig);
System.out.println("Starting twitter analysis streams");
kafkaStreams.start();
System.out.println("Started");
}
代码示例来源:origin: amient/hello-kafka-streams
private static KafkaStreams createWikipediaStreamsInstance(String bootstrapServers) {
final Serializer<JsonNode> jsonSerializer = new JsonSerializer();
final Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer();
final Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer, jsonDeserializer);
KStreamBuilder builder = new KStreamBuilder();
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wikipedia-streams");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
KStream<JsonNode, JsonNode> wikipediaRaw = builder.stream(jsonSerde, jsonSerde, "wikipedia-raw");
KStream<String, WikipediaMessage> wikipediaParsed =
wikipediaRaw.map(WikipediaMessage::parceIRC)
.filter(WikipediaMessage::filterNonNull)
.through(Serdes.String(), new JsonPOJOSerde<>(WikipediaMessage.class), "wikipedia-parsed");
KTable<String, Long> totalEditsByUser = wikipediaParsed
.filter((key, value) -> value.type == WikipediaMessage.Type.EDIT)
.countByKey(Serdes.String(), "wikipedia-edits-by-user");
//some print
totalEditsByUser.toStream().process(() -> new AbstractProcessor<String, Long>() {
@Override
public void process(String user, Long numEdits) {
System.out.println("USER: " + user + " num.edits: " + numEdits);
}
});
return new KafkaStreams(builder, props);
}
代码示例来源:origin: bbejeck/kafka-streams
public static void main(String[] args) {
StreamsConfig streamsConfig = new StreamsConfig(getProperties());
JsonDeserializer<Purchase> purchaseJsonDeserializer = new JsonDeserializer<>(Purchase.class);
JsonSerializer<Purchase> purchaseJsonSerializer = new JsonSerializer<>();
JsonSerializer<RewardAccumulator> rewardAccumulatorJsonSerializer = new JsonSerializer<>();
JsonDeserializer<RewardAccumulator> rewardAccumulatorJsonDeserializer = new JsonDeserializer<>(RewardAccumulator.class);
Serde<RewardAccumulator> rewardAccumulatorSerde = Serdes.serdeFrom(rewardAccumulatorJsonSerializer,rewardAccumulatorJsonDeserializer);
JsonSerializer<PurchasePattern> purchasePatternJsonSerializer = new JsonSerializer<>();
JsonDeserializer<PurchasePattern> purchasePatternJsonDeserializer = new JsonDeserializer<>(PurchasePattern.class);
Serde<PurchasePattern> purchasePatternSerde = Serdes.serdeFrom(purchasePatternJsonSerializer,purchasePatternJsonDeserializer);
Serde<Purchase> purchaseSerde = Serdes.serdeFrom(purchaseJsonSerializer,purchaseJsonDeserializer);
Serde<String> stringSerde = Serdes.String();
KStreamBuilder kStreamBuilder = new KStreamBuilder();
KStream<String,Purchase> purchaseKStream = kStreamBuilder.stream(stringSerde,purchaseSerde,"src-topic")
.mapValues(p -> Purchase.builder(p).maskCreditCard().build());
purchaseKStream.mapValues(purchase -> PurchasePattern.builder(purchase).build()).to(stringSerde,purchasePatternSerde,"patterns");
purchaseKStream.mapValues(purchase -> RewardAccumulator.builder(purchase).build()).to(stringSerde,rewardAccumulatorSerde,"rewards");
purchaseKStream.to(stringSerde,purchaseSerde,"purchases");
System.out.println("Starting PurchaseStreams Example");
KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder,streamsConfig);
kafkaStreams.start();
System.out.println("Now started PurchaseStreams Example");
}
代码示例来源:origin: bbejeck/kafka-streams
public static void main(String[] args) {
StreamsConfig streamingConfig = new StreamsConfig(getProperties());
JsonSerializer<StockTransactionCollector> stockTransactionsSerializer = new JsonSerializer<>();
JsonDeserializer<StockTransactionCollector> stockTransactionsDeserializer = new JsonDeserializer<>(StockTransactionCollector.class);
JsonDeserializer<StockTransaction> stockTxnDeserializer = new JsonDeserializer<>(StockTransaction.class);
JsonSerializer<StockTransaction> stockTxnJsonSerializer = new JsonSerializer<>();
Serde<StockTransaction> transactionSerde = Serdes.serdeFrom(stockTxnJsonSerializer,stockTxnDeserializer);
StringSerializer stringSerializer = new StringSerializer();
StringDeserializer stringDeserializer = new StringDeserializer();
Serde<String> stringSerde = Serdes.serdeFrom(stringSerializer,stringDeserializer);
Serde<StockTransactionCollector> collectorSerde = Serdes.serdeFrom(stockTransactionsSerializer,stockTransactionsDeserializer);
WindowedSerializer<String> windowedSerializer = new WindowedSerializer<>(stringSerializer);
WindowedDeserializer<String> windowedDeserializer = new WindowedDeserializer<>(stringDeserializer);
Serde<Windowed<String>> windowedSerde = Serdes.serdeFrom(windowedSerializer,windowedDeserializer);
KStreamBuilder kStreamBuilder = new KStreamBuilder();
KStream<String,StockTransaction> transactionKStream = kStreamBuilder.stream(stringSerde,transactionSerde,"stocks");
transactionKStream.map((k,v)-> new KeyValue<>(v.getSymbol(),v))
.through(stringSerde, transactionSerde,"stocks-out")
.groupBy((k,v) -> k, stringSerde, transactionSerde)
.aggregate(StockTransactionCollector::new,
(k, v, stockTransactionCollector) -> stockTransactionCollector.add(v),
TimeWindows.of(10000),
collectorSerde, "stock-summaries")
.to(windowedSerde,collectorSerde,"transaction-summary");
System.out.println("Starting StockStreams Example");
KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder,streamingConfig);
kafkaStreams.start();
System.out.println("Now started StockStreams Example");
}
内容来源于网络,如有侵权,请联系作者删除!