defaultjmslistenercontainerfactory配置进程并发性

nnsrf1az  于 2021-07-06  发布在  Java
关注(0)|答案(1)|浏览(741)

我正在为jmsfactory配置并发性,以便使用我的队列。
我做到了:

@Bean(name = "myFactory")
  public DefaultJmsListenerContainerFactory sqsFactory(SQSConnectionFactory connectionFactory,
      CustomJmsListenerConfigurer configurer) {
    DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
    configurer.configure(factory, connectionFactory);
    factory.setConcurrency("1-3");
    return factory;
  }

我看到defaultjmslistenercontainerfactory.setconcurrency在后面调用defaultmessagelistenercontainer。
我的应用程序中配置了2个队列,我使用的是spring boot:

@JmsListener(destination = "queue1", containerFactory = "myFactory")
@JmsListener(destination = "queue2", containerFactory = "myFactory")

我正在阅读spring文档,遇到了一些方法,现在我有了一些疑问。
1-以下两者之间的区别是什么:

setConcurrency(String concurrency)
setConcurrentConsumers(int concurrentConsumers)

即使阅读文档,我也不理解这种差异以及这种配置是如何改变应用程序行为的。我认为setconcurrency应该是每个@jmslister用来从队列中获取消息的线程数。。。你能解释一个配置映像的例子吗,我有100条消息排队(每个配置的队列)?
2-设置maxmessagespertask(int maxmessagespertask)
如果我在队列中有100条消息,concurrency=3,这个数字是10(默认值),那么行为是什么?

pkmbmrz7

pkmbmrz71#

阅读这两种情况下的javadocs。

  1. setConcurrency(String concurrency) 这只是一种方便
/**
 * Specify concurrency limits via a "lower-upper" String, e.g. "5-10", or a simple
 * upper limit String, e.g. "10" (the lower limit will be 1 in this case).
 * <p>This listener container will always hold on to the minimum number of consumers
 * ({@link #setConcurrentConsumers}) and will slowly scale up to the maximum number
 * of consumers {@link #setMaxConcurrentConsumers} in case of increasing load.
 */
@Override
public void setConcurrency(String concurrency) {
    try {
        int separatorIndex = concurrency.indexOf('-');
        if (separatorIndex != -1) {
            setConcurrentConsumers(Integer.parseInt(concurrency.substring(0, separatorIndex)));
            setMaxConcurrentConsumers(Integer.parseInt(concurrency.substring(separatorIndex + 1)));
        }
        else {
            setConcurrentConsumers(1);
            setMaxConcurrentConsumers(Integer.parseInt(concurrency));
        }
    }
    catch (NumberFormatException ex) {
        throw new IllegalArgumentException("Invalid concurrency value [" + concurrency + "]: only " +
                "single maximum integer (e.g. \"5\") and minimum-maximum combo (e.g. \"3-5\") supported.");
    }
}
``` `setConcurrency("1-3")` 与相同

setConcurrentConsumers(1);
setMaxConcurrentConsumers(3);

对消息处理没有影响;它只是意味着每10条消息就循环(停止和启动)一次使用者任务。

/**

  • Specify the maximum number of messages to process in one task.
  • More concretely, this limits the number of message reception attempts
  • per task, which includes receive iterations that did not actually
  • pick up a message until they hit their timeout (see the
  • {@link #setReceiveTimeout "receiveTimeout"} property).
  • reusing the original invoker threads until shutdown (at the
  • expense of limited dynamic scheduling).
  • short-lived tasks, the default is 10 instead. Specify a number
  • of 10 to 100 messages to balance between rather long-lived and
  • rather short-lived tasks here.
  • sticking with the same thread all the way through, while short-lived
  • tasks allow thread pools to control the scheduling. Hence, thread
  • pools will usually prefer short-lived tasks.
  • @see #setTaskExecutor
  • @see #setReceiveTimeout
  • @see org.springframework.scheduling.SchedulingTaskExecutor#prefersShortLivedTasks()
    */
    public void setMaxMessagesPerTask(int maxMessagesPerTask) {
    Assert.isTrue(maxMessagesPerTask != 0, "'maxMessagesPerTask' must not be 0");
    synchronized (this.lifecycleMonitor) {
    this.maxMessagesPerTask = maxMessagesPerTask;
    }
    }

相关问题