org.apache.kafka.streams.kstream.Window.start()方法的使用及代码示例

x33g5p2x  于2022-02-03 转载在 其他  
字(9.9k)|赞(0)|评价(0)|浏览(93)

本文整理了Java中org.apache.kafka.streams.kstream.Window.start()方法的一些代码示例,展示了Window.start()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Window.start()方法的具体详情如下:
包路径:org.apache.kafka.streams.kstream.Window
类名称:Window
方法名:start

Window.start介绍

[英]Return the start timestamp of this window.
[中]返回此窗口的开始时间戳。

代码示例

代码示例来源:origin: confluentinc/kafka-streams-examples

private static void consumeOutput(String bootstrapServers, String schemaRegistryUrl) {
 final Properties consumerProperties = new Properties();
 consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
 consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
 consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
      StringDeserializer.class);
 consumerProperties.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
 consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG,
   "top-articles-lambda-example-consumer");
 consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
 final Deserializer<Windowed<String>> windowedDeserializer = WindowedSerdes.timeWindowedSerdeFrom(String.class).deserializer();
 final KafkaConsumer<Windowed<String>, String> consumer = new KafkaConsumer<>(consumerProperties,
                                        windowedDeserializer,
                                        Serdes.String().deserializer());
 consumer.subscribe(Collections.singleton(TopArticlesLambdaExample.TOP_NEWS_PER_INDUSTRY_TOPIC));
 while (true) {
  ConsumerRecords<Windowed<String>, String> consumerRecords = consumer.poll(Long.MAX_VALUE);
  for (ConsumerRecord<Windowed<String>, String> consumerRecord : consumerRecords) {
   System.out.println(consumerRecord.key().key() + "@" + consumerRecord.key().window().start() +  "=" + consumerRecord.value());
  }
 }
}

代码示例来源:origin: confluentinc/kafka-streams-examples

.map((key, value) -> new KeyValue<>(key.key() + "@" + key.window().start() + "->" + key.window().end(), value))

代码示例来源:origin: org.apache.kafka/kafka-streams

@Override
public String toString() {
  return "[" + key + "@" + window.start() + "/" + window.end() + "]";
}

代码示例来源:origin: org.apache.kafka/kafka-streams

public static <K> byte[] toBinary(final Windowed<K> timeKey,
                 final Serializer<K> serializer,
                 final String topic) {
  final byte[] bytes = serializer.serialize(topic, timeKey.key());
  final ByteBuffer buf = ByteBuffer.allocate(bytes.length + TIMESTAMP_SIZE);
  buf.put(bytes);
  buf.putLong(timeKey.window().start());
  return buf.array();
}

代码示例来源:origin: org.apache.kafka/kafka-streams

public static byte[] toBinary(final Windowed<Bytes> sessionKey) {
    final byte[] bytes = sessionKey.key().get();
    final ByteBuffer buf = ByteBuffer.allocate(bytes.length + 2 * TIMESTAMP_SIZE);
    buf.put(bytes);
    buf.putLong(sessionKey.window().end());
    buf.putLong(sessionKey.window().start());
    return buf.array();
  }
}

代码示例来源:origin: org.apache.kafka/kafka-streams

public static <K> byte[] toBinary(final Windowed<K> sessionKey,
                 final Serializer<K> serializer,
                 final String topic) {
  final byte[] bytes = serializer.serialize(topic, sessionKey.key());
  final ByteBuffer buf = ByteBuffer.allocate(bytes.length + 2 * TIMESTAMP_SIZE);
  buf.put(bytes);
  buf.putLong(sessionKey.window().end());
  buf.putLong(sessionKey.window().start());
  return buf.array();
}

代码示例来源:origin: org.apache.kafka/kafka-streams

public static <K> Bytes toStoreKeyBinary(final Windowed<K> timeKey,
                     final int seqnum,
                     final StateSerdes<K, ?> serdes) {
  final byte[] serializedKey = serdes.rawKey(timeKey.key());
  return toStoreKeyBinary(serializedKey, timeKey.window().start(), seqnum);
}

代码示例来源:origin: spring-cloud/spring-cloud-stream-samples

@StreamListener("input")
  @SendTo({"output1","output2","output3"})
  @SuppressWarnings("unchecked")
  public KStream<?, WordCount>[] process(KStream<Object, String> input) {
    Predicate<Object, WordCount> isEnglish = (k, v) -> v.word.equals("english");
    Predicate<Object, WordCount> isFrench =  (k, v) -> v.word.equals("french");
    Predicate<Object, WordCount> isSpanish = (k, v) -> v.word.equals("spanish");
    return input
        .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
        .groupBy((key, value) -> value)
        .windowedBy(timeWindows)
        .count(Materialized.as("WordCounts-1"))
        .toStream()
        .map((key, value) -> new KeyValue<>(null, new WordCount(key.key(), value, new Date(key.window().start()), new Date(key.window().end()))))
        .branch(isEnglish, isFrench, isSpanish);
  }
}

代码示例来源:origin: org.apache.kafka/kafka-streams

@Override
  int compare(final Bytes cacheKey, final Windowed<Bytes> storeKey) {
    final Bytes storeKeyBytes = WindowKeySchema.toStoreKeyBinary(storeKey.key(), storeKey.window().start(), 0);
    return cacheFunction.compareSegmentedKeys(cacheKey, storeKeyBytes);
  }
}

代码示例来源:origin: org.apache.kafka/kafka-streams

public static Bytes toStoreKeyBinary(final Windowed<Bytes> timeKey,
                   final int seqnum) {
  final byte[] bytes = timeKey.key().get();
  return toStoreKeyBinary(bytes, timeKey.window().start(), seqnum);
}

代码示例来源:origin: spring-cloud/spring-cloud-stream-samples

@StreamListener("binding2")
@SendTo("singleOutput")
public KStream<?, WordCount> process(KStream<Object, String> input) {
  return input
      .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
      .map((key, value) -> new KeyValue<>(value, value))
      .groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
      .windowedBy(timeWindows)
      .count(Materialized.as("WordCounts-1"))
      .toStream()
      .map((key, value) -> new KeyValue<>(null, new WordCount(key.key(), value, new Date(key.window().start()), new Date(key.window().end()))));
}

代码示例来源:origin: spring-cloud/spring-cloud-stream-samples

@StreamListener("input")
  @SendTo("output")
  public KStream<?, WordCount> process(KStream<Object, String> input) {
    return input
        .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
        .map((key, value) -> new KeyValue<>(value, value))
        .groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
        .windowedBy(TimeWindows.of(5000))
        .count(Materialized.as("WordCounts-1"))
        .toStream()
        .map((key, value) -> new KeyValue<>(null, new WordCount(key.key(), value, new Date(key.window().start()), new Date(key.window().end()))));
  }
}

代码示例来源:origin: spring-cloud/spring-cloud-stream-samples

@StreamListener("input")
  @SendTo("output")
  public KStream<?, WordCount> process(KStream<Object, String> input) {
    return input
        .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
        .map((key, value) -> new KeyValue<>(value, value))
        .groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
        .windowedBy(TimeWindows.of(30000))
        .count(Materialized.as("WordCounts-1"))
        .toStream()
        .map((key, value) -> new KeyValue<>(null, new WordCount(key.key(), value, new Date(key.window().start()), new Date(key.window().end()))));
  }
}

代码示例来源:origin: org.apache.kafka/kafka-streams

private AGG fetchPrevious(final Bytes rawKey, final Window window) {
  try (final KeyValueIterator<Windowed<Bytes>, byte[]> iterator = bytesStore.findSessions(rawKey, window.start(), window.end())) {
    if (!iterator.hasNext()) {
      return null;
    }
    return serdes.valueFrom(iterator.next().value);
  }
}

代码示例来源:origin: ebi-wp/kafka-streams-api-websockets

Serdes.serdeFrom(new MySerde(), new MySerde()),
  "data-store")
.toStream((key, value) -> key.key().toString() + " " + key.window().start())
.mapValues((job) -> job.computeAvgTime().toString());

代码示例来源:origin: spring-cloud/spring-cloud-stream-samples

@StreamListener("input")
@SendTo("output")
public KStream<Integer, ProductStatus> process(KStream<Object, Product> input) {
  return input
      .filter((key, product) -> productIds().contains(product.getId()))
      .map((key, value) -> new KeyValue<>(value, value))
      .groupByKey(Serialized.with(new JsonSerde<>(Product.class), new JsonSerde<>(Product.class)))
      .windowedBy(timeWindows)
      .count(Materialized.as("product-counts"))
      .toStream()
      .map((key, value) -> new KeyValue<>(key.key().id, new ProductStatus(key.key().id,
          value, Instant.ofEpochMilli(key.window().start()).atZone(ZoneId.systemDefault()).toLocalTime(),
          Instant.ofEpochMilli(key.window().end()).atZone(ZoneId.systemDefault()).toLocalTime())));
}

代码示例来源:origin: habren/KafkaExample

.toStream()
.map((Windowed<String> window, Long value) -> {
  return new KeyValue<String, String>(window.key(), String.format("key=%s, value=%s, start=%d, end=%d\n",window.key(), value, window.window().start(), window.window().end()));
  });
kStream.to(Serdes.String(), Serdes.String(), "count");

代码示例来源:origin: org.apache.kafka/kafka-streams

private void maybeForward(final ThreadCache.DirtyEntry entry,
             final Bytes key,
             final Windowed<K> windowedKey,
             final InternalProcessorContext context) {
  if (flushListener != null) {
    final ProcessorRecordContext current = context.recordContext();
    context.setRecordContext(entry.entry().context());
    try {
      final V oldValue = sendOldValues ? fetchPrevious(key, windowedKey.window().start()) : null;
      flushListener.apply(windowedKey, serdes.valueFrom(entry.newValue()), oldValue);
    } finally {
      context.setRecordContext(current);
    }
  }
}

代码示例来源:origin: org.apache.kafka/kafka-streams

@Override
  public boolean hasNext(final KeyValueIterator<Bytes, ?> iterator) {
    while (iterator.hasNext()) {
      final Bytes bytes = iterator.peekNextKey();
      final Windowed<Bytes> windowedKey = SessionKeySchema.from(bytes);
      if ((binaryKeyFrom == null || windowedKey.key().compareTo(binaryKeyFrom) >= 0)
        && (binaryKeyTo == null || windowedKey.key().compareTo(binaryKeyTo) <= 0)
        && windowedKey.window().end() >= from
        && windowedKey.window().start() <= to) {
        return true;
      }
      iterator.next();
    }
    return false;
  }
};

代码示例来源:origin: jresoort/kafkastreams-workshop

.mapValues(SumCount::average, Materialized.with(new WindowedSerde<>(Serdes.String()), Serdes.Double()))
.toStream()
.map(((key, average) -> new KeyValue<>(key.key(), new Average(average, key.window().start(), key.window().start() + 300000))))

相关文章

微信公众号

最新文章

更多