spring@jmslistener配置以并行(异步)读取消息

w80xi6nr  于 2021-06-27  发布在  Java
关注(0)|答案(2)|浏览(626)

我用的是Spring的 @JmsListener 读取和处理来自azure主题的消息。我可能有1000+在我的主题信息,我需要阅读平行的消息,而不是一个接一个的消息。如果在处理消息时出现任何异常,我需要将消息推送到死信队列。
下面是我根据spring文档尝试的示例代码:

@Component
public class Receiver {

  @JmsListener(destination = "mailbox", containerFactory = "myFactory")
  public void receiveMessage(Email email) {
    System.out.println("Received <" + email + ">");
  }

}
@SpringBootApplication
@EnableJms
public class Application {

  @Bean
  public JmsListenerContainerFactory<?> myFactory(ConnectionFactory connectionFactory,
                          DefaultJmsListenerContainerFactoryConfigurer configurer) {
    DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
    // This provides all boot's default to this factory, including the message converter
    configurer.configure(factory, connectionFactory);
    // You could still override some of Boot's default if necessary.
    return factory;
  }

  @Bean // Serialize message content to json using TextMessage
  public MessageConverter jacksonJmsMessageConverter() {
    MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
    converter.setTargetType(MessageType.TEXT);
    converter.setTypeIdPropertyName("_type");
    return converter;
  }

  public static void main(String[] args) {
    // Launch the application
    ConfigurableApplicationContext context = SpringApplication.run(Application.class, args);

    JmsTemplate jmsTemplate = context.getBean(JmsTemplate.class);

    // Send a message with a POJO - the template reuse the message converter
    System.out.println("Sending an email message.");
    jmsTemplate.convertAndSend("mailbox", new Email("info@example.com", "Hello"));
  }
}

我试着设置并发,但没有成功。

@JmsListener(destination = "mailbox", containerFactory = "myFactory", concurrency = "100")
public void receiveMessage(Email email) {
  System.out.println("Received <" + email + ">");
}

有谁能告诉你这件事吗?

fcg9iug3

fcg9iug31#

您可以像下面那样为批接收设置预取策略。你可以阅读这篇关于springboot和azure服务总线的博文。
(下面的代码段采用kotlin语法)

@Bean
@Primary
fun myFactory(connectionFactory: ConnectionFactory): JmsListenerContainerFactory<DefaultMessageListenerContainer> {
    ((connectionFactory as CachingConnectionFactory).targetConnectionFactory as JmsConnectionFactory)
        .prefetchPolicy = prefetchPolicy()
    val listenerContainerFactory = DefaultJmsListenerContainerFactory()
    listenerContainerFactory.setConnectionFactory(connectionFactory)
    listenerContainerFactory.setSubscriptionDurable(true)
    listenerContainerFactory.setSessionTransacted(true)
    listenerContainerFactory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE)
    return listenerContainerFactory
}

private fun prefetchPolicy(): JmsPrefetchPolicy {
    val prefetchPolicy = JmsDefaultPrefetchPolicy()
    prefetchPolicy.setAll(100) // example 100
    return prefetchPolicy
}
8yparm6h

8yparm6h2#

为了调试它,作为第一步,我建议您打开org.springframework.jms的调试日志,查看行为并发布日志。

相关问题