本文整理了Java中com.hazelcast.jet.pipeline.Sinks.logger()
方法的一些代码示例,展示了Sinks.logger()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Sinks.logger()
方法的具体详情如下:
包路径:com.hazelcast.jet.pipeline.Sinks
类名称:Sinks
方法名:logger
[英]Convenience for #logger(DistributedFunction) with Object.toString() as the toStringFn.
[中]方便使用带有对象的#记录器(分布式函数)。toString()作为toStringFn。
代码示例来源:origin: hazelcast/hazelcast-jet
/**
* Convenience for {@link #logger(DistributedFunction)} with {@code
* Object.toString()} as the {@code toStringFn}.
*/
@Nonnull
public static <T> Sink<T> logger() {
return logger(Object::toString);
}
代码示例来源:origin: hazelcast/hazelcast-jet-demos
public static Pipeline build() {
Pipeline p = Pipeline.create();
// Palladium and Platinum only
p.drawFrom(Sources.<String, Object>mapJournal(
Constants.IMAP_NAME_PRECIOUS, JournalInitialPosition.START_FROM_OLDEST)
).map(e -> e.getKey() + "==" + e.getValue())
.filter(str -> str.toLowerCase().startsWith("p"))
.drainTo(Sinks.logger())
;
return p;
}
代码示例来源:origin: hazelcast/hazelcast-jet-code-samples
private Pipeline buildPipeline() {
Pipeline p = Pipeline.create();
p.drawFrom(KafkaSources.kafka(brokerProperties(), TOPIC))
.drainTo(Sinks.logger());
return p;
}
代码示例来源:origin: hazelcast/hazelcast-jet-code-samples
public static void main(String[] args) {
ApplicationContext context = new AnnotationConfigApplicationContext(AppConfig.class);
JetInstance jet = context.getBean(JetInstance.class);
Pipeline pipeline = Pipeline.create();
pipeline.drawFrom(CustomSourceP.customSource())
.drainTo(Sinks.logger());
JobConfig jobConfig = new JobConfig()
.addClass(AnnotationBasedConfigurationSample.class)
.addClass(CustomSourceP.class);
jet.newJob(pipeline, jobConfig).join();
jet.shutdown();
}
}
代码示例来源:origin: hazelcast/hazelcast-jet-code-samples
@RequestMapping("/submitJob")
public void submitJob() {
Pipeline pipeline = Pipeline.create();
pipeline.drawFrom(CustomSourceP.customSource())
.drainTo(Sinks.logger());
JobConfig jobConfig = new JobConfig()
.addClass(SpringBootSample.class)
.addClass(CustomSourceP.class);
instance.newJob(pipeline, jobConfig).join();
}
代码示例来源:origin: hazelcast/hazelcast-jet-code-samples
public static void main(String[] args) {
ApplicationContext context = new GenericXmlApplicationContext("application-context.xml");
JetInstance jetInstance = (JetInstance) context.getBean("instance");
JetInstance jetClient = (JetInstance) context.getBean("client");
Pipeline pipeline = Pipeline.create();
pipeline.drawFrom(CustomSourceP.customSource())
.drainTo(Sinks.logger());
JobConfig jobConfig = new JobConfig()
.addClass(XmlConfigurationSample.class)
.addClass(CustomSourceP.class);
jetClient.newJob(pipeline, jobConfig).join();
jetClient.shutdown();
jetInstance.shutdown();
}
}
代码示例来源:origin: hazelcast/hazelcast-jet-code-samples
public static void main(String[] args) {
ApplicationContext context = new GenericXmlApplicationContext("application-context-with-schema.xml");
JetInstance jetInstance = (JetInstance) context.getBean("instance");
JetInstance jetClient = (JetInstance) context.getBean("client");
Pipeline pipeline = Pipeline.create();
pipeline.drawFrom(CustomSourceP.customSource())
.drainTo(Sinks.logger());
JobConfig jobConfig = new JobConfig()
.addClass(XmlConfigurationWithSchemaSample.class)
.addClass(CustomSourceP.class);
jetClient.newJob(pipeline, jobConfig).join();
jetClient.shutdown();
jetInstance.shutdown();
}
}
代码示例来源:origin: hazelcast/hazelcast-jet-code-samples
private static Pipeline buildPipeline() {
Pipeline p = Pipeline.create();
p.drawFrom(Sources.<Trade, Integer, Trade>mapJournal(TRADES_MAP_NAME,
DistributedPredicate.alwaysTrue(), EventJournalMapEvent::getNewValue, START_FROM_CURRENT))
.addTimestamps(Trade::getTime, 3000)
.groupingKey(Trade::getTicker)
.window(WindowDefinition.sliding(SLIDING_WINDOW_LENGTH_MILLIS, SLIDE_STEP_MILLIS))
.aggregate(counting(),
(winStart, winEnd, key, result) -> String.format("%s %5s %4d", toLocalTime(winEnd), key, result))
.drainTo(Sinks.logger());
return p;
}
代码示例来源:origin: hazelcast/hazelcast-jet-code-samples
})
.drainTo(Sinks.logger());
return p;
代码示例来源:origin: hazelcast/hazelcast-jet-code-samples
})
.drainTo(Sinks.logger());
代码示例来源:origin: hazelcast/hazelcast-jet-code-samples
brokEntries, joinMapEntries(Trade::brokerId),
Tuple3::tuple3
).drainTo(Sinks.logger());
代码示例来源:origin: hazelcast/hazelcast-jet-code-samples
@SuppressWarnings("Convert2MethodRef") // https://bugs.openjdk.java.net/browse/JDK-8154236
private static Pipeline aggregate() {
Pipeline p = Pipeline.create();
p.drawFrom(Sources.<PageVisit, Integer, PageVisit>mapJournal(PAGE_VISIT,
mapPutEvents(), mapEventNewValue(), START_FROM_OLDEST))
.addTimestamps(pv -> pv.timestamp(), 100)
.window(sliding(10, 1))
.aggregate(counting())
.drainTo(Sinks.logger());
return p;
}
代码示例来源:origin: hazelcast/hazelcast-jet-code-samples
@SuppressWarnings("Convert2MethodRef") // https://bugs.openjdk.java.net/browse/JDK-8154236
private static Pipeline groupAndAggregate() {
Pipeline p = Pipeline.create();
p.drawFrom(Sources.<PageVisit, Integer, PageVisit>mapJournal(PAGE_VISIT,
mapPutEvents(), mapEventNewValue(), START_FROM_OLDEST))
.addTimestamps(pv -> pv.timestamp(), 100)
.window(sliding(10, 1))
.groupingKey(pv -> pv.userId())
.aggregate(toList())
.drainTo(Sinks.logger());
return p;
}
代码示例来源:origin: hazelcast/hazelcast-jet-code-samples
private static Pipeline buildPipeline() {
Pipeline p = Pipeline.create();
p.drawFrom(Sources.<PriceUpdateEvent, String, Tuple2<Integer, Long>>mapJournal(
"prices",
mapPutEvents(),
e -> new PriceUpdateEvent(e.getKey(), e.getNewValue().f0(), e.getNewValue().f1()),
START_FROM_CURRENT
))
.addTimestamps(PriceUpdateEvent::timestamp, LAG_SECONDS * 1000)
.setLocalParallelism(1)
.groupingKey(PriceUpdateEvent::ticker)
.window(WindowDefinition.sliding(WINDOW_SIZE_SECONDS * 1000, 1000))
.aggregate(AggregateOperations.counting())
.drainTo(Sinks.logger());
return p;
}
代码示例来源:origin: hazelcast/hazelcast-jet-code-samples
private static Pipeline buildPipeline() {
// we'll calculate two aggregations over the same input data:
// 1. number of viewed product listings
// 2. set of purchased product IDs
// Output of the aggregation will be List{Integer, Set<String>}
AggregateOperation1<ProductEvent, ?, Tuple2<Long, Set<String>>> aggrOp = allOf(
summingLong(e -> e.getProductEventType() == VIEW_LISTING ? 1 : 0),
mapping(e -> e.getProductEventType() == PURCHASE ? e.getProductId() : null, toSet())
);
Pipeline p = Pipeline.create();
p.drawFrom(Sources.<ProductEvent>streamFromProcessor("generator",
ProcessorMetaSupplier.of(GenerateEventsP::new, 1)))
.addTimestamps(ProductEvent::getTimestamp, 0)
.groupingKey(ProductEvent::getUserId)
.window(WindowDefinition.session(SESSION_TIMEOUT))
.aggregate(aggrOp, SessionWindow::sessionToString)
.drainTo(Sinks.logger());
return p;
}
代码示例来源:origin: hazelcast/hazelcast-jet-code-samples
@SuppressWarnings("Convert2MethodRef") // https://bugs.openjdk.java.net/browse/JDK-8154236
private static Pipeline coGroup() {
Pipeline p = Pipeline.create();
StreamStageWithKey<PageVisit, Integer> pageVisits = p
.drawFrom(Sources.<PageVisit, Integer, PageVisit>mapJournal(PAGE_VISIT,
mapPutEvents(), mapEventNewValue(), START_FROM_OLDEST))
.addTimestamps(pv -> pv.timestamp(), 100)
.groupingKey(pv -> pv.userId());
StreamStageWithKey<Payment, Integer> payments = p
.drawFrom(Sources.<Payment, Integer, Payment>mapJournal(PAYMENT,
mapPutEvents(), mapEventNewValue(), START_FROM_OLDEST))
.addTimestamps(pm -> pm.timestamp(), 100)
.groupingKey(pm -> pm.userId());
StreamStageWithKey<AddToCart, Integer> addToCarts = p
.drawFrom(Sources.<AddToCart, Integer, AddToCart>mapJournal(ADD_TO_CART,
mapPutEvents(), mapEventNewValue(), START_FROM_OLDEST))
.addTimestamps(atc -> atc.timestamp(), 100)
.groupingKey(atc -> atc.userId());
StageWithKeyAndWindow<PageVisit, Integer> windowStage = pageVisits.window(sliding(10, 1));
StreamStage<TimestampedEntry<Integer, Tuple3<List<PageVisit>, List<AddToCart>, List<Payment>>>> coGrouped =
windowStage.aggregate3(toList(), addToCarts, toList(), payments, toList());
coGrouped.drainTo(Sinks.logger());
return p;
}
代码示例来源:origin: hazelcast/hazelcast-jet-code-samples
@SuppressWarnings("Convert2MethodRef") // https://bugs.openjdk.java.net/browse/JDK-8154236
private static Pipeline coGroupWithBuilder() {
Pipeline p = Pipeline.create();
StreamStageWithKey<PageVisit, Integer> pageVisits = p
.drawFrom(Sources.<PageVisit, Integer, PageVisit>mapJournal(PAGE_VISIT,
mapPutEvents(), mapEventNewValue(), START_FROM_OLDEST))
.addTimestamps(pv -> pv.timestamp(), 100)
.groupingKey(pv -> pv.userId());
StreamStageWithKey<AddToCart, Integer> addToCarts = p
.drawFrom(Sources.<AddToCart, Integer, AddToCart>mapJournal(ADD_TO_CART,
mapPutEvents(), mapEventNewValue(), START_FROM_OLDEST))
.addTimestamps(atc -> atc.timestamp(), 100)
.groupingKey(atc -> atc.userId());
StreamStageWithKey<Payment, Integer> payments = p
.drawFrom(Sources.<Payment, Integer, Payment>mapJournal(PAYMENT,
mapPutEvents(), mapEventNewValue(), START_FROM_OLDEST))
.addTimestamps(pm -> pm.timestamp(), 100)
.groupingKey(pm -> pm.userId());
StageWithKeyAndWindow<PageVisit, Integer> windowStage = pageVisits.window(sliding(10, 1));
WindowGroupAggregateBuilder<Integer, List<PageVisit>> builder = windowStage.aggregateBuilder(toList());
Tag<List<PageVisit>> pageVisitTag = builder.tag0();
Tag<List<AddToCart>> addToCartTag = builder.add(addToCarts, toList());
Tag<List<Payment>> paymentTag = builder.add(payments, toList());
StreamStage<TimestampedEntry<Integer, Tuple3<List<PageVisit>, List<AddToCart>, List<Payment>>>> coGrouped =
builder.build((winStart, winEnd, key, r) -> new TimestampedEntry<>(
winEnd, key, tuple3(r.get(pageVisitTag), r.get(addToCartTag), r.get(paymentTag))));
coGrouped.drainTo(Sinks.logger());
return p;
}
内容来源于网络,如有侵权,请联系作者删除!