org.apache.kafka.streams.KeyValue.pair()方法的使用及代码示例

x33g5p2x  于2022-01-23 转载在 其他  
字(8.4k)|赞(0)|评价(0)|浏览(91)

本文整理了Java中org.apache.kafka.streams.KeyValue.pair()方法的一些代码示例,展示了KeyValue.pair()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。KeyValue.pair()方法的具体详情如下:
包路径:org.apache.kafka.streams.KeyValue
类名称:KeyValue
方法名:pair

KeyValue.pair介绍

[英]Create a new key-value pair.
[中]创建一个新的键值对。

代码示例

代码示例来源:origin: openzipkin/brave

@Override public KeyValue<K, V> transform(K key, V value) {
  return KeyValue.pair(key, value);
 }
});

代码示例来源:origin: openzipkin/brave

@Override public KeyValue<K, V> transform(K key, V value) {
  action.apply(key, value);
  return KeyValue.pair(key, value);
 }
});

代码示例来源:origin: confluentinc/kafka-streams-examples

@Override
public KeyValue<byte[], String> transform(final byte[] recordKey, final String recordValue) {
 // The record value contains the IP address in string representation.
 // The original record key is ignored because we don't need it for this logic.
 String anonymizedIpAddress = anonymizeIpAddress(recordValue);
 return KeyValue.pair(recordKey, anonymizedIpAddress);
}

代码示例来源:origin: confluentinc/kafka-streams-examples

@Override
public KeyValue<String, Long> transform(byte[] key, String value) {
 // For simplification (and unlike the traditional wordcount) we assume that the value is
 // a single word, i.e. we don't split the value by whitespace into potentially one or more
 // words.
 Optional<Long> count = Optional.ofNullable(stateStore.get(value));
 Long incrementedCount = count.orElse(0L) + 1;
 stateStore.put(value, incrementedCount);
 return KeyValue.pair(value, incrementedCount);
}

代码示例来源:origin: confluentinc/kafka-streams-examples

private static <K, V> List<KeyValue<K, V>> readKeysAndValues(int numberToRead,
  String bootstrapServers, Deserializer<K> keyDes, Deserializer<V> valDes, String topicName)
  throws InterruptedException {
 Properties consumerConfig = new Properties();
 consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
 consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "Test-Reader-" + consumerCounter++);
 consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
 KafkaConsumer<K, V> consumer = new KafkaConsumer<>(consumerConfig, keyDes, valDes);
 consumer.subscribe(singletonList(topicName));
 List<KeyValue<K, V>> actualValues = new ArrayList<>();
 TestUtils.waitForCondition(() -> {
  ConsumerRecords<K, V> records = consumer.poll(100);
  for (ConsumerRecord<K, V> record : records) {
   actualValues.add(KeyValue.pair(record.key(), record.value()));
  }
  return actualValues.size() == numberToRead;
 }, 20000, "Timed out reading orders.");
 consumer.close();
 return actualValues;
}

代码示例来源:origin: confluentinc/kafka-streams-examples

final KStream<String, String> originalAndUppercased = textLines.map((key, value) -> KeyValue.pair(value, value.toUpperCase()));

代码示例来源:origin: confluentinc/kafka-streams-examples

.groupBy((userId, region) -> KeyValue.pair(region, region))
.count()

代码示例来源:origin: confluentinc/kafka-streams-examples

public KeyValue<K, V> transform(final K key, final V value) {
 E eventId = idExtractor.apply(key, value);
 if (eventId == null) {
  return KeyValue.pair(key, value);
 } else {
  KeyValue<K, V> output;
  if (isDuplicate(eventId)) {
   output = null;
   updateTimestampOfExistingEventToPreventExpiry(eventId, context.timestamp());
  } else {
   output = KeyValue.pair(key, value);
   rememberNewEvent(eventId, context.timestamp());
  }
  return output;
 }
}

代码示例来源:origin: confluentinc/kafka-streams-examples

assertThat(firstSession.get(0), equalTo(KeyValue.pair(userId + "@" +start+"->"+start, 1L)));
                               1);
assertThat(secondSession.get(0), equalTo(KeyValue.pair(userId + "@" + secondSessionStart + "->" + secondSessionStart,
                            1L)));
assertThat(results, equalTo(Arrays.asList(KeyValue.pair(new Windowed<>(userId, new SessionWindow(start, start)),1L),
                     KeyValue.pair(new Windowed<>(userId, new SessionWindow(secondSessionStart, secondSessionStart)),1L))));
assertThat(merged, equalTo(Arrays.asList(KeyValue.pair(userId + "@" +start+"->"+start, null),
                     KeyValue.pair(userId + "@" +secondSessionStart
                            +"->"+secondSessionStart, null),
                     KeyValue.pair(userId + "@"
                           +start+"->"+secondSessionStart,
                        3L))));
assertThat(mergedResults, equalTo(Collections.singletonList(KeyValue.pair(new Windowed<>(userId, new SessionWindow(start, secondSessionStart)), 3L))));

代码示例来源:origin: confluentinc/kafka-streams-examples

return Collections.singletonList(KeyValue.pair(key, 2 * value));
} catch (SerializationException e) {

代码示例来源:origin: confluentinc/kafka-streams-examples

final List<String> inputValues = Arrays.asList("Hello World", "Hello Kafka Streams", "All streams lead to Kafka");
final List<KeyValue<String, Long>> expectedResult = Arrays.asList(
 KeyValue.pair("Hello", 1L),
 KeyValue.pair("Hello", 2L),
 KeyValue.pair("All", 1L)
);

代码示例来源:origin: confluentinc/kafka-streams-examples

.groupBy((userId, region) -> KeyValue.pair(region, region))
.count();

代码示例来源:origin: confluentinc/kafka-streams-examples

@Override
public KeyValue<String, OrderValidation> transform(final Product productId,
                          final KeyValue<Order, Integer> orderAndStock) {
 //Process each order/inventory pair one at a time
 OrderValidation validated;
 Order order = orderAndStock.key;
 Integer warehouseStockCount = orderAndStock.value;
 //Look up locally 'reserved' stock from our state store
 Long reserved = reservedStocksStore.get(order.getProduct());
 if (reserved == null) {
  reserved = 0L;
 }
 //If there is enough stock available (considering both warehouse inventory and reserved stock) validate the order
 if (warehouseStockCount - reserved - order.getQuantity() >= 0) {
  //reserve the stock by adding it to the 'reserved' store
  reservedStocksStore.put(order.getProduct(), reserved + order.getQuantity());
  //validate the order
  validated = new OrderValidation(order.getId(), INVENTORY_CHECK, PASS);
 } else {
  //fail the order
  validated = new OrderValidation(order.getId(), INVENTORY_CHECK, FAIL);
 }
 return KeyValue.pair(validated.getOrderId(), validated);
}

代码示例来源:origin: confluentinc/kafka-streams-examples

playEvents.filter((region, event) -> event.getDuration() >= MIN_CHARTABLE_DURATION)
  .map((key, value) -> KeyValue.pair(value.getSongId(), value));
  KeyValue.pair(song.getGenre().toLowerCase(),
    new SongPlayCount(song.getId(), plays)),
Serialized.with(Serdes.String(), songPlayCountSerde))
  KeyValue.pair(TOP_FIVE_KEY,
    new SongPlayCount(song.getId(), plays)),
Serialized.with(Serdes.String(), songPlayCountSerde))

代码示例来源:origin: org.apache.kafka/kafka-streams

@Override
  public KeyValue<Windowed<Bytes>, byte[]> next() {
    final KeyValue<Bytes, byte[]> next = bytesIterator.next();
    return KeyValue.pair(SessionKeySchema.from(next.key), next.value);
  }
}

代码示例来源:origin: org.apache.kafka/kafka-streams

private List<KeyValue<Bytes, byte[]>> innerEntries(final List<KeyValue<K, V>> from) {
  final List<KeyValue<Bytes, byte[]>> byteEntries = new ArrayList<>();
  for (final KeyValue<K, V> entry : from) {
    byteEntries.add(KeyValue.pair(Bytes.wrap(serdes.rawKey(entry.key)), serdes.rawValue(entry.value)));
  }
  return byteEntries;
}

代码示例来源:origin: org.apache.kafka/kafka-streams

private KeyValue<K, V> nextCacheValue(final Bytes nextCacheKey) {
  final KeyValue<Bytes, LRUCacheEntry> next = cacheIterator.next();
  if (!next.key.equals(nextCacheKey)) {
    throw new IllegalStateException("Next record key is not the peeked key value; this should not happen");
  }
  return KeyValue.pair(deserializeCacheKey(next.key), deserializeCacheValue(next.value));
}

代码示例来源:origin: org.apache.kafka/kafka-streams

@Override
public KeyValue<Long, V> next() {
  final KeyValue<Bytes, byte[]> next = bytesIterator.next();
  final long timestamp = WindowKeySchema.extractStoreTimestamp(next.key.get());
  final V value = serdes.valueFrom(next.value);
  return KeyValue.pair(timestamp, value);
}

代码示例来源:origin: simplesteph/medium-blog-kafka-udemy

@Override
public KeyValue<String, Review> transform(String courseId, Review review) {
  Long reviewId = review.getId();
  Long now = System.currentTimeMillis();
  if (reviewStore.get(reviewId) == null && !isReviewExpired(review, now, timeToKeepAReview)) {
    reviewStore.put(review.getId(), review);
    updateMinTimestamp(review);
    return KeyValue.pair(courseId, review);
  } else {
    return null;
  }
}

代码示例来源:origin: org.apache.kafka/kafka-streams

@Override
public KeyValue<Windowed<K>, V> next() {
  final KeyValue<Bytes, byte[]> next = bytesIterator.next();
  final long timestamp = WindowKeySchema.extractStoreTimestamp(next.key.get());
  final K key = WindowKeySchema.extractStoreKey(next.key.get(), serdes);
  final V value = serdes.valueFrom(next.value);
  return KeyValue.pair(
    new Windowed<>(key, WindowKeySchema.timeWindowForSize(timestamp, windowSize)),
    value
  );
}

相关文章

微信公众号

最新文章

更多