com.hazelcast.jet.pipeline.Sinks.jmsQueueBuilder()方法的使用及代码示例

x33g5p2x  于2022-01-30 转载在 其他  
字(2.6k)|赞(0)|评价(0)|浏览(99)

本文整理了Java中com.hazelcast.jet.pipeline.Sinks.jmsQueueBuilder()方法的一些代码示例,展示了Sinks.jmsQueueBuilder()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Sinks.jmsQueueBuilder()方法的具体详情如下:
包路径:com.hazelcast.jet.pipeline.Sinks
类名称:Sinks
方法名:jmsQueueBuilder

Sinks.jmsQueueBuilder介绍

[英]Returns a builder object that offers a step-by-step fluent API to build a custom JMS queue sink for the Pipeline API. See javadoc on JmsSinkBuilder methods for more details.

Behavior on job restart: the processor is stateless. If the job is restarted, duplicate events can occur. If you need exactly-once behavior, you must ensure idempotence on the application level.

IO failures should be handled by the JMS provider. If any JMS operation throws an exception, the job will fail. Most of the providers offer a configuration parameter to enable auto-reconnection, refer to provider documentation for details.

Default local parallelism for this processor is 4 (or less if less CPUs are available).
[中]返回一个生成器对象,该对象提供一个循序渐进的fluent API,用于为管道API构建自定义JMS队列接收器。有关更多详细信息,请参阅JmsSinkBuilder方法上的javadoc。
作业重新启动时的行为:处理器是无状态的。如果重新启动作业,可能会发生重复事件。如果只需要一次行为,则必须确保应用程序级别的幂等性。
IO故障应由JMS提供程序处理。如果任何JMS操作引发异常,作业将失败。大多数提供程序都提供了一个配置参数来启用自动重新连接,有关详细信息,请参阅提供程序文档。
此处理器的默认本地并行度为4(如果可用CPU较少,则小于4)。

代码示例

代码示例来源:origin: hazelcast/hazelcast-jet

/**
 * Convenience for {@link #jmsQueueBuilder(DistributedSupplier)}. Creates a
 * connection without any authentication parameters and uses non-transacted
 * sessions with {@code Session.AUTO_ACKNOWLEDGE} mode. If a received item
 * is not an instance of {@code javax.jms.Message}, the sink wraps {@code
 * item.toString()} into a {@link javax.jms.TextMessage}.
 *
 * @param factorySupplier supplier to obtain JMS connection factory
 * @param name            the name of the queue
 */
@Nonnull
public static <T> Sink<T> jmsQueue(
    @Nonnull DistributedSupplier<ConnectionFactory> factorySupplier,
    @Nonnull String name
) {
  return Sinks.<T>jmsQueueBuilder(factorySupplier)
      .destinationName(name)
      .build();
}

代码示例来源:origin: hazelcast/hazelcast-jet-code-samples

private static Pipeline buildPipeline() {
  Pipeline p = Pipeline.create();
  p.drawFrom(Sources.jmsQueue(() -> new ActiveMQConnectionFactory(ActiveMQBroker.BROKER_URL), INPUT_QUEUE))
   .filter(message -> message.getJMSPriority() > 3)
   .map(message -> (TextMessage) message)
   // print the message text to the log
   .peek(TextMessage::getText)
   .drainTo(Sinks.<TextMessage>jmsQueueBuilder(() -> new ActiveMQConnectionFactory(ActiveMQBroker.BROKER_URL))
       .destinationName(OUTPUT_QUEUE)
       .messageFn((session, message) -> {
           // create new text message with the same text and few additional properties
           TextMessage textMessage = session.createTextMessage(message.getText());
           textMessage.setBooleanProperty("isActive", true);
           textMessage.setJMSPriority(8);
           return textMessage;
         }
       )
       .build());
  return p;
}

相关文章