本文整理了Java中org.apache.kafka.streams.KeyValue.pair()
方法的一些代码示例,展示了KeyValue.pair()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。KeyValue.pair()
方法的具体详情如下:
包路径:org.apache.kafka.streams.KeyValue
类名称:KeyValue
方法名:pair
[英]Create a new key-value pair.
[中]创建一个新的键值对。
代码示例来源:origin: openzipkin/brave
@Override public KeyValue<K, V> transform(K key, V value) {
return KeyValue.pair(key, value);
}
});
代码示例来源:origin: openzipkin/brave
@Override public KeyValue<K, V> transform(K key, V value) {
action.apply(key, value);
return KeyValue.pair(key, value);
}
});
代码示例来源:origin: confluentinc/kafka-streams-examples
@Override
public KeyValue<byte[], String> transform(final byte[] recordKey, final String recordValue) {
// The record value contains the IP address in string representation.
// The original record key is ignored because we don't need it for this logic.
String anonymizedIpAddress = anonymizeIpAddress(recordValue);
return KeyValue.pair(recordKey, anonymizedIpAddress);
}
代码示例来源:origin: confluentinc/kafka-streams-examples
@Override
public KeyValue<String, Long> transform(byte[] key, String value) {
// For simplification (and unlike the traditional wordcount) we assume that the value is
// a single word, i.e. we don't split the value by whitespace into potentially one or more
// words.
Optional<Long> count = Optional.ofNullable(stateStore.get(value));
Long incrementedCount = count.orElse(0L) + 1;
stateStore.put(value, incrementedCount);
return KeyValue.pair(value, incrementedCount);
}
代码示例来源:origin: confluentinc/kafka-streams-examples
private static <K, V> List<KeyValue<K, V>> readKeysAndValues(int numberToRead,
String bootstrapServers, Deserializer<K> keyDes, Deserializer<V> valDes, String topicName)
throws InterruptedException {
Properties consumerConfig = new Properties();
consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "Test-Reader-" + consumerCounter++);
consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
KafkaConsumer<K, V> consumer = new KafkaConsumer<>(consumerConfig, keyDes, valDes);
consumer.subscribe(singletonList(topicName));
List<KeyValue<K, V>> actualValues = new ArrayList<>();
TestUtils.waitForCondition(() -> {
ConsumerRecords<K, V> records = consumer.poll(100);
for (ConsumerRecord<K, V> record : records) {
actualValues.add(KeyValue.pair(record.key(), record.value()));
}
return actualValues.size() == numberToRead;
}, 20000, "Timed out reading orders.");
consumer.close();
return actualValues;
}
代码示例来源:origin: confluentinc/kafka-streams-examples
final KStream<String, String> originalAndUppercased = textLines.map((key, value) -> KeyValue.pair(value, value.toUpperCase()));
代码示例来源:origin: confluentinc/kafka-streams-examples
.groupBy((userId, region) -> KeyValue.pair(region, region))
.count()
代码示例来源:origin: confluentinc/kafka-streams-examples
public KeyValue<K, V> transform(final K key, final V value) {
E eventId = idExtractor.apply(key, value);
if (eventId == null) {
return KeyValue.pair(key, value);
} else {
KeyValue<K, V> output;
if (isDuplicate(eventId)) {
output = null;
updateTimestampOfExistingEventToPreventExpiry(eventId, context.timestamp());
} else {
output = KeyValue.pair(key, value);
rememberNewEvent(eventId, context.timestamp());
}
return output;
}
}
代码示例来源:origin: confluentinc/kafka-streams-examples
assertThat(firstSession.get(0), equalTo(KeyValue.pair(userId + "@" +start+"->"+start, 1L)));
1);
assertThat(secondSession.get(0), equalTo(KeyValue.pair(userId + "@" + secondSessionStart + "->" + secondSessionStart,
1L)));
assertThat(results, equalTo(Arrays.asList(KeyValue.pair(new Windowed<>(userId, new SessionWindow(start, start)),1L),
KeyValue.pair(new Windowed<>(userId, new SessionWindow(secondSessionStart, secondSessionStart)),1L))));
assertThat(merged, equalTo(Arrays.asList(KeyValue.pair(userId + "@" +start+"->"+start, null),
KeyValue.pair(userId + "@" +secondSessionStart
+"->"+secondSessionStart, null),
KeyValue.pair(userId + "@"
+start+"->"+secondSessionStart,
3L))));
assertThat(mergedResults, equalTo(Collections.singletonList(KeyValue.pair(new Windowed<>(userId, new SessionWindow(start, secondSessionStart)), 3L))));
代码示例来源:origin: confluentinc/kafka-streams-examples
return Collections.singletonList(KeyValue.pair(key, 2 * value));
} catch (SerializationException e) {
代码示例来源:origin: confluentinc/kafka-streams-examples
final List<String> inputValues = Arrays.asList("Hello World", "Hello Kafka Streams", "All streams lead to Kafka");
final List<KeyValue<String, Long>> expectedResult = Arrays.asList(
KeyValue.pair("Hello", 1L),
KeyValue.pair("Hello", 2L),
KeyValue.pair("All", 1L)
);
代码示例来源:origin: confluentinc/kafka-streams-examples
.groupBy((userId, region) -> KeyValue.pair(region, region))
.count();
代码示例来源:origin: confluentinc/kafka-streams-examples
@Override
public KeyValue<String, OrderValidation> transform(final Product productId,
final KeyValue<Order, Integer> orderAndStock) {
//Process each order/inventory pair one at a time
OrderValidation validated;
Order order = orderAndStock.key;
Integer warehouseStockCount = orderAndStock.value;
//Look up locally 'reserved' stock from our state store
Long reserved = reservedStocksStore.get(order.getProduct());
if (reserved == null) {
reserved = 0L;
}
//If there is enough stock available (considering both warehouse inventory and reserved stock) validate the order
if (warehouseStockCount - reserved - order.getQuantity() >= 0) {
//reserve the stock by adding it to the 'reserved' store
reservedStocksStore.put(order.getProduct(), reserved + order.getQuantity());
//validate the order
validated = new OrderValidation(order.getId(), INVENTORY_CHECK, PASS);
} else {
//fail the order
validated = new OrderValidation(order.getId(), INVENTORY_CHECK, FAIL);
}
return KeyValue.pair(validated.getOrderId(), validated);
}
代码示例来源:origin: confluentinc/kafka-streams-examples
playEvents.filter((region, event) -> event.getDuration() >= MIN_CHARTABLE_DURATION)
.map((key, value) -> KeyValue.pair(value.getSongId(), value));
KeyValue.pair(song.getGenre().toLowerCase(),
new SongPlayCount(song.getId(), plays)),
Serialized.with(Serdes.String(), songPlayCountSerde))
KeyValue.pair(TOP_FIVE_KEY,
new SongPlayCount(song.getId(), plays)),
Serialized.with(Serdes.String(), songPlayCountSerde))
代码示例来源:origin: org.apache.kafka/kafka-streams
@Override
public KeyValue<Windowed<Bytes>, byte[]> next() {
final KeyValue<Bytes, byte[]> next = bytesIterator.next();
return KeyValue.pair(SessionKeySchema.from(next.key), next.value);
}
}
代码示例来源:origin: org.apache.kafka/kafka-streams
private List<KeyValue<Bytes, byte[]>> innerEntries(final List<KeyValue<K, V>> from) {
final List<KeyValue<Bytes, byte[]>> byteEntries = new ArrayList<>();
for (final KeyValue<K, V> entry : from) {
byteEntries.add(KeyValue.pair(Bytes.wrap(serdes.rawKey(entry.key)), serdes.rawValue(entry.value)));
}
return byteEntries;
}
代码示例来源:origin: org.apache.kafka/kafka-streams
private KeyValue<K, V> nextCacheValue(final Bytes nextCacheKey) {
final KeyValue<Bytes, LRUCacheEntry> next = cacheIterator.next();
if (!next.key.equals(nextCacheKey)) {
throw new IllegalStateException("Next record key is not the peeked key value; this should not happen");
}
return KeyValue.pair(deserializeCacheKey(next.key), deserializeCacheValue(next.value));
}
代码示例来源:origin: org.apache.kafka/kafka-streams
@Override
public KeyValue<Long, V> next() {
final KeyValue<Bytes, byte[]> next = bytesIterator.next();
final long timestamp = WindowKeySchema.extractStoreTimestamp(next.key.get());
final V value = serdes.valueFrom(next.value);
return KeyValue.pair(timestamp, value);
}
代码示例来源:origin: simplesteph/medium-blog-kafka-udemy
@Override
public KeyValue<String, Review> transform(String courseId, Review review) {
Long reviewId = review.getId();
Long now = System.currentTimeMillis();
if (reviewStore.get(reviewId) == null && !isReviewExpired(review, now, timeToKeepAReview)) {
reviewStore.put(review.getId(), review);
updateMinTimestamp(review);
return KeyValue.pair(courseId, review);
} else {
return null;
}
}
代码示例来源:origin: org.apache.kafka/kafka-streams
@Override
public KeyValue<Windowed<K>, V> next() {
final KeyValue<Bytes, byte[]> next = bytesIterator.next();
final long timestamp = WindowKeySchema.extractStoreTimestamp(next.key.get());
final K key = WindowKeySchema.extractStoreKey(next.key.get(), serdes);
final V value = serdes.valueFrom(next.value);
return KeyValue.pair(
new Windowed<>(key, WindowKeySchema.timeWindowForSize(timestamp, windowSize)),
value
);
}
内容来源于网络,如有侵权,请联系作者删除!