org.springframework.data.redis.listener.RedisMessageListenerContainer类的使用及代码示例

x33g5p2x  于2022-01-29 转载在 其他  
字(9.8k)|赞(0)|评价(0)|浏览(1190)

本文整理了Java中org.springframework.data.redis.listener.RedisMessageListenerContainer类的一些代码示例,展示了RedisMessageListenerContainer类的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。RedisMessageListenerContainer类的具体详情如下:
包路径:org.springframework.data.redis.listener.RedisMessageListenerContainer
类名称:RedisMessageListenerContainer

RedisMessageListenerContainer介绍

[英]Container providing asynchronous behaviour for Redis message listeners. Handles the low level details of listening, converting and message dispatching.

As oppose to the low level Redis (one connection per subscription), the container uses only one connection that is 'multiplexed' for all registered listeners, the message dispatch being done through the task executor.

Note the container uses the connection in a lazy fashion (the connection is used only if at least one listener is configured).

Adding and removing listeners at the same time has undefined results. It is strongly recommended to synchronize/order these methods accordingly.
[中]为Redis消息侦听器提供异步行为的容器。处理侦听、转换和消息分发的低级细节。
与低级别的Redis(每个订阅一个连接)相反,容器只使用一个对所有注册侦听器“多路复用”的连接,消息调度通过任务执行器完成。
注意:容器以惰性方式使用连接(仅当至少配置了一个侦听器时才使用连接)。
同时添加和删除侦听器会产生未定义的结果。强烈建议对这些方法进行相应的同步/排序。

代码示例

Official Spring framework guide

代码示例来源:origin: spring-guides/gs-messaging-redis

@Bean
RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
    MessageListenerAdapter listenerAdapter) {
  RedisMessageListenerContainer container = new RedisMessageListenerContainer();
  container.setConnectionFactory(connectionFactory);
  container.addMessageListener(listenerAdapter, new PatternTopic("chat"));
  return container;
}

代码示例来源:origin: spring-projects/spring-data-redis

/**
 * Register instance within the container.
 *
 * @param container never {@literal null}.
 */
protected void doRegister(RedisMessageListenerContainer container) {
  listenerContainer.addMessageListener(this, TOPIC_ALL_KEYEVENTS);
}

代码示例来源:origin: spring-projects/spring-session

@Bean
public RedisMessageListenerContainer redisMessageListenerContainer() {
  RedisMessageListenerContainer container = new RedisMessageListenerContainer();
  container.setConnectionFactory(this.redisConnectionFactory);
  if (this.redisTaskExecutor != null) {
    container.setTaskExecutor(this.redisTaskExecutor);
  }
  if (this.redisSubscriptionExecutor != null) {
    container.setSubscriptionExecutor(this.redisSubscriptionExecutor);
  }
  container.addMessageListener(sessionRepository(), Arrays.asList(
      new ChannelTopic(sessionRepository().getSessionDeletedChannel()),
      new ChannelTopic(sessionRepository().getSessionExpiredChannel())));
  container.addMessageListener(sessionRepository(),
      Collections.singletonList(new PatternTopic(
          sessionRepository().getSessionCreatedChannelPrefix() + "*")));
  return container;
}

代码示例来源:origin: spring-projects/spring-data-redis

public void stop(Runnable callback) {
  stop();
  callback.run();
}

代码示例来源:origin: spring-projects/spring-data-redis

private void initMessageListenerContainer() {
  this.messageListenerContainer = new RedisMessageListenerContainer();
  this.messageListenerContainer.setConnectionFactory(((RedisTemplate<?, ?>) redisOps).getConnectionFactory());
  this.messageListenerContainer.afterPropertiesSet();
  this.messageListenerContainer.start();
}

代码示例来源:origin: spring-projects/spring-data-redis

/**
 * Handle subscription task exception. Will attempt to restart the subscription if the Exception is a connection
 * failure (for example, Redis was restarted).
 *
 * @param ex Throwable exception
 */
protected void handleSubscriptionException(Throwable ex) {
  listening = false;
  subscriptionTask.closeConnection();
  if (ex instanceof RedisConnectionFailureException) {
    if (isRunning()) {
      logger.error("Connection failure occurred. Restarting subscription task after " + recoveryInterval + " ms");
      sleepBeforeRecoveryAttempt();
      lazyListen();
    }
  } else {
    logger.error("SubscriptionTask aborted with exception:", ex);
  }
}

代码示例来源:origin: spring-projects/spring-data-redis

private void initMapping(Map<? extends MessageListener, Collection<? extends Topic>> listeners) {
  // stop the listener if currently running
  if (isRunning()) {
    subscriptionTask.cancel();
  }
  patternMapping.clear();
  channelMapping.clear();
  listenerTopics.clear();
  if (!CollectionUtils.isEmpty(listeners)) {
    for (Map.Entry<? extends MessageListener, Collection<? extends Topic>> entry : listeners.entrySet()) {
      addListener(entry.getKey(), entry.getValue());
    }
  }
  // resume activity
  if (initialized) {
    start();
  }
}

代码示例来源:origin: mrdear/JavaWEB

/**
 * 注入消息容器
 * @param jedisConnectionFactory jedis连接池
 * @param listenerAdapter 监听适配器
 * @return bean
 */
@Bean
public RedisMessageListenerContainer container(RedisConnectionFactory jedisConnectionFactory,
  MessageListenerAdapter listenerAdapter){
 RedisMessageListenerContainer container = new RedisMessageListenerContainer();
 container.setConnectionFactory(jedisConnectionFactory);
 //绑定监听者与信道的管理
 container.addMessageListener(listenerAdapter,new PatternTopic("java"));
 return container;
}

代码示例来源:origin: spring-projects/spring-integration

@Test
@RedisAvailable
public void testRedisPublishingMessageHandler() throws Exception {
  int numToTest = 10;
  String topic = "si.test.channel";
  final CountDownLatch latch = new CountDownLatch(numToTest * 2);
  RedisConnectionFactory connectionFactory = this.getConnectionFactoryForTest();
  MessageListenerAdapter listener = new MessageListenerAdapter();
  listener.setDelegate(new Listener(latch));
  listener.setSerializer(new StringRedisSerializer());
  listener.afterPropertiesSet();
  RedisMessageListenerContainer container = new RedisMessageListenerContainer();
  container.setConnectionFactory(connectionFactory);
  container.afterPropertiesSet();
  container.addMessageListener(listener, Collections.<Topic>singletonList(new ChannelTopic(topic)));
  container.start();
  this.awaitContainerSubscribed(container);
  final RedisPublishingMessageHandler handler = new RedisPublishingMessageHandler(connectionFactory);
  handler.setTopicExpression(new LiteralExpression(topic));
  for (int i = 0; i < numToTest; i++) {
    handler.handleMessage(MessageBuilder.withPayload("test-" + i).build());
  }
  for (int i = 0; i < numToTest; i++) {
    handler.handleMessage(MessageBuilder.withPayload(("test-" + i).getBytes()).build());
  }
  assertTrue(latch.await(10, TimeUnit.SECONDS));
  container.stop();
}

代码示例来源:origin: spring-projects/spring-integration

((BeanFactoryAware) this.messageConverter).setBeanFactory(this.getBeanFactory());
this.container.setConnectionFactory(this.connectionFactory);
if (!(this.taskExecutor instanceof ErrorHandlingTaskExecutor)) {
  ErrorHandler errorHandler = new MessagePublishingErrorHandler(
  this.taskExecutor = new ErrorHandlingTaskExecutor(this.taskExecutor, errorHandler);
this.container.setTaskExecutor(this.taskExecutor);
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageListenerDelegate());
adapter.setSerializer(this.serializer);
adapter.afterPropertiesSet();
this.container.addMessageListener(adapter, new ChannelTopic(this.topicName));
this.container.afterPropertiesSet();
this.dispatcher.setBeanFactory(this.getBeanFactory());
this.initialized = true;

代码示例来源:origin: com.lodsve/lodsve-redis

public RedisTimerMessageListenerContainer(LodsveRedisConnectionFactory connectionFactory, RedisTimerListener listener) {
    super.setConnectionFactory(connectionFactory);
    super.addMessageListener(listener, new PatternTopic("__keyevent@*:expired"));
  }
}

代码示例来源:origin: com.alibaba/tac-engine

@Bean
public RedisMessageListenerContainer redisMessageListenerContainer(JedisConnectionFactory jedisConnectionFactory) {
  RedisMessageListenerContainer container = new RedisMessageListenerContainer();
  container.setConnectionFactory(jedisConnectionFactory);
  // set thread pool
  container.setTaskExecutor(ThreadPoolUtils.createThreadPool(10, "tac-redis-subscribe-pool"));
  return container;
}

代码示例来源:origin: spring-projects/spring-integration

this.container.addMessageListener(adapter, topicList);
this.container.afterPropertiesSet();

代码示例来源:origin: com.github.xiaolyuh/layering-cache-core

@Override
public void afterPropertiesSet() throws Exception {
  messageListener.setCacheManager(this);
  container.setConnectionFactory(getRedisTemplate().getConnectionFactory());
  container.afterPropertiesSet();
  messageListener.afterPropertiesSet();
  BeanFactory.getBean(StatsService.class).setCacheManager(this);
  if (getStats()) {
    // 采集缓存命中率数据
    BeanFactory.getBean(StatsService.class).syncCacheStats();
  }
}

代码示例来源:origin: yuboon/Aooms

@Bean("j2CacheRedisMessageListenerContainer")
@ConditionalOnBean(RedisConnectionFactory.class)
RedisMessageListenerContainer container(
    @Qualifier("j2CahceRedisConnectionFactory") RedisConnectionFactory j2CahceRedisConnectionFactory) {
  RedisMessageListenerContainer container = new RedisMessageListenerContainer();
  container.setConnectionFactory(j2CahceRedisConnectionFactory);
  return container;
}

代码示例来源:origin: spring-projects/spring-integration

public RedisInboundChannelAdapter(RedisConnectionFactory connectionFactory) {
  Assert.notNull(connectionFactory, "connectionFactory must not be null");
  this.container.setConnectionFactory(connectionFactory);
}

代码示例来源:origin: spring-projects/spring-data-redis

public void stop() {
  if (isRunning()) {
    running = false;
    subscriptionTask.cancel();
  }
  if (logger.isDebugEnabled()) {
    logger.debug("Stopped RedisMessageListenerContainer");
  }
}

代码示例来源:origin: sentilo/sentilo

private void restartListenerContainer() {
 LOGGER.info("Stopping listener container for process {}", processName);
 try {
  listenerContainer.stop();
 } catch (final Exception re) {
  LOGGER.warn("An error occurred while stopping listener container. If it is not running, we proceed to start it", re.getMessage());
 }
 // Clear count pending events value
 countPendingEvents = 0;
 LOGGER.info("Starting listener container");
 if (!listenerContainer.isRunning()) {
  listenerContainer.start();
 }
}

代码示例来源:origin: spring-projects/spring-integration

@Override
public void start() {
  this.container.start();
}

代码示例来源:origin: sentilo/sentilo

@Override
public void start() {
 if (listenerContainer.isActive() && listenerContainer.isRunning()) {
  processName = getProcessName();
  monitorListener = this;
  monitorTopic = new ChannelTopic(buildMonitorChannelName(processName));
  if (subscribeMonitor()) {
   running = true;
   LOGGER.info("RedisSubscriptionMonitor started for process {}", processName);
  }
 }
}

相关文章

微信公众号

最新文章

更多