本文整理了Java中org.springframework.data.redis.listener.RedisMessageListenerContainer
类的一些代码示例,展示了RedisMessageListenerContainer
类的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。RedisMessageListenerContainer
类的具体详情如下:
包路径:org.springframework.data.redis.listener.RedisMessageListenerContainer
类名称:RedisMessageListenerContainer
[英]Container providing asynchronous behaviour for Redis message listeners. Handles the low level details of listening, converting and message dispatching.
As oppose to the low level Redis (one connection per subscription), the container uses only one connection that is 'multiplexed' for all registered listeners, the message dispatch being done through the task executor.
Note the container uses the connection in a lazy fashion (the connection is used only if at least one listener is configured).
Adding and removing listeners at the same time has undefined results. It is strongly recommended to synchronize/order these methods accordingly.
[中]为Redis消息侦听器提供异步行为的容器。处理侦听、转换和消息分发的低级细节。
与低级别的Redis(每个订阅一个连接)相反,容器只使用一个对所有注册侦听器“多路复用”的连接,消息调度通过任务执行器完成。
注意:容器以惰性方式使用连接(仅当至少配置了一个侦听器时才使用连接)。
同时添加和删除侦听器会产生未定义的结果。强烈建议对这些方法进行相应的同步/排序。
Official Spring framework guide
代码示例来源:origin: spring-guides/gs-messaging-redis
@Bean
RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
MessageListenerAdapter listenerAdapter) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.addMessageListener(listenerAdapter, new PatternTopic("chat"));
return container;
}
代码示例来源:origin: spring-projects/spring-data-redis
/**
* Register instance within the container.
*
* @param container never {@literal null}.
*/
protected void doRegister(RedisMessageListenerContainer container) {
listenerContainer.addMessageListener(this, TOPIC_ALL_KEYEVENTS);
}
代码示例来源:origin: spring-projects/spring-session
@Bean
public RedisMessageListenerContainer redisMessageListenerContainer() {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(this.redisConnectionFactory);
if (this.redisTaskExecutor != null) {
container.setTaskExecutor(this.redisTaskExecutor);
}
if (this.redisSubscriptionExecutor != null) {
container.setSubscriptionExecutor(this.redisSubscriptionExecutor);
}
container.addMessageListener(sessionRepository(), Arrays.asList(
new ChannelTopic(sessionRepository().getSessionDeletedChannel()),
new ChannelTopic(sessionRepository().getSessionExpiredChannel())));
container.addMessageListener(sessionRepository(),
Collections.singletonList(new PatternTopic(
sessionRepository().getSessionCreatedChannelPrefix() + "*")));
return container;
}
代码示例来源:origin: spring-projects/spring-data-redis
public void stop(Runnable callback) {
stop();
callback.run();
}
代码示例来源:origin: spring-projects/spring-data-redis
private void initMessageListenerContainer() {
this.messageListenerContainer = new RedisMessageListenerContainer();
this.messageListenerContainer.setConnectionFactory(((RedisTemplate<?, ?>) redisOps).getConnectionFactory());
this.messageListenerContainer.afterPropertiesSet();
this.messageListenerContainer.start();
}
代码示例来源:origin: spring-projects/spring-data-redis
/**
* Handle subscription task exception. Will attempt to restart the subscription if the Exception is a connection
* failure (for example, Redis was restarted).
*
* @param ex Throwable exception
*/
protected void handleSubscriptionException(Throwable ex) {
listening = false;
subscriptionTask.closeConnection();
if (ex instanceof RedisConnectionFailureException) {
if (isRunning()) {
logger.error("Connection failure occurred. Restarting subscription task after " + recoveryInterval + " ms");
sleepBeforeRecoveryAttempt();
lazyListen();
}
} else {
logger.error("SubscriptionTask aborted with exception:", ex);
}
}
代码示例来源:origin: spring-projects/spring-data-redis
private void initMapping(Map<? extends MessageListener, Collection<? extends Topic>> listeners) {
// stop the listener if currently running
if (isRunning()) {
subscriptionTask.cancel();
}
patternMapping.clear();
channelMapping.clear();
listenerTopics.clear();
if (!CollectionUtils.isEmpty(listeners)) {
for (Map.Entry<? extends MessageListener, Collection<? extends Topic>> entry : listeners.entrySet()) {
addListener(entry.getKey(), entry.getValue());
}
}
// resume activity
if (initialized) {
start();
}
}
代码示例来源:origin: mrdear/JavaWEB
/**
* 注入消息容器
* @param jedisConnectionFactory jedis连接池
* @param listenerAdapter 监听适配器
* @return bean
*/
@Bean
public RedisMessageListenerContainer container(RedisConnectionFactory jedisConnectionFactory,
MessageListenerAdapter listenerAdapter){
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(jedisConnectionFactory);
//绑定监听者与信道的管理
container.addMessageListener(listenerAdapter,new PatternTopic("java"));
return container;
}
代码示例来源:origin: spring-projects/spring-integration
@Test
@RedisAvailable
public void testRedisPublishingMessageHandler() throws Exception {
int numToTest = 10;
String topic = "si.test.channel";
final CountDownLatch latch = new CountDownLatch(numToTest * 2);
RedisConnectionFactory connectionFactory = this.getConnectionFactoryForTest();
MessageListenerAdapter listener = new MessageListenerAdapter();
listener.setDelegate(new Listener(latch));
listener.setSerializer(new StringRedisSerializer());
listener.afterPropertiesSet();
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.afterPropertiesSet();
container.addMessageListener(listener, Collections.<Topic>singletonList(new ChannelTopic(topic)));
container.start();
this.awaitContainerSubscribed(container);
final RedisPublishingMessageHandler handler = new RedisPublishingMessageHandler(connectionFactory);
handler.setTopicExpression(new LiteralExpression(topic));
for (int i = 0; i < numToTest; i++) {
handler.handleMessage(MessageBuilder.withPayload("test-" + i).build());
}
for (int i = 0; i < numToTest; i++) {
handler.handleMessage(MessageBuilder.withPayload(("test-" + i).getBytes()).build());
}
assertTrue(latch.await(10, TimeUnit.SECONDS));
container.stop();
}
代码示例来源:origin: spring-projects/spring-integration
((BeanFactoryAware) this.messageConverter).setBeanFactory(this.getBeanFactory());
this.container.setConnectionFactory(this.connectionFactory);
if (!(this.taskExecutor instanceof ErrorHandlingTaskExecutor)) {
ErrorHandler errorHandler = new MessagePublishingErrorHandler(
this.taskExecutor = new ErrorHandlingTaskExecutor(this.taskExecutor, errorHandler);
this.container.setTaskExecutor(this.taskExecutor);
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageListenerDelegate());
adapter.setSerializer(this.serializer);
adapter.afterPropertiesSet();
this.container.addMessageListener(adapter, new ChannelTopic(this.topicName));
this.container.afterPropertiesSet();
this.dispatcher.setBeanFactory(this.getBeanFactory());
this.initialized = true;
代码示例来源:origin: com.lodsve/lodsve-redis
public RedisTimerMessageListenerContainer(LodsveRedisConnectionFactory connectionFactory, RedisTimerListener listener) {
super.setConnectionFactory(connectionFactory);
super.addMessageListener(listener, new PatternTopic("__keyevent@*:expired"));
}
}
代码示例来源:origin: com.alibaba/tac-engine
@Bean
public RedisMessageListenerContainer redisMessageListenerContainer(JedisConnectionFactory jedisConnectionFactory) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(jedisConnectionFactory);
// set thread pool
container.setTaskExecutor(ThreadPoolUtils.createThreadPool(10, "tac-redis-subscribe-pool"));
return container;
}
代码示例来源:origin: spring-projects/spring-integration
this.container.addMessageListener(adapter, topicList);
this.container.afterPropertiesSet();
代码示例来源:origin: com.github.xiaolyuh/layering-cache-core
@Override
public void afterPropertiesSet() throws Exception {
messageListener.setCacheManager(this);
container.setConnectionFactory(getRedisTemplate().getConnectionFactory());
container.afterPropertiesSet();
messageListener.afterPropertiesSet();
BeanFactory.getBean(StatsService.class).setCacheManager(this);
if (getStats()) {
// 采集缓存命中率数据
BeanFactory.getBean(StatsService.class).syncCacheStats();
}
}
代码示例来源:origin: yuboon/Aooms
@Bean("j2CacheRedisMessageListenerContainer")
@ConditionalOnBean(RedisConnectionFactory.class)
RedisMessageListenerContainer container(
@Qualifier("j2CahceRedisConnectionFactory") RedisConnectionFactory j2CahceRedisConnectionFactory) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(j2CahceRedisConnectionFactory);
return container;
}
代码示例来源:origin: spring-projects/spring-integration
public RedisInboundChannelAdapter(RedisConnectionFactory connectionFactory) {
Assert.notNull(connectionFactory, "connectionFactory must not be null");
this.container.setConnectionFactory(connectionFactory);
}
代码示例来源:origin: spring-projects/spring-data-redis
public void stop() {
if (isRunning()) {
running = false;
subscriptionTask.cancel();
}
if (logger.isDebugEnabled()) {
logger.debug("Stopped RedisMessageListenerContainer");
}
}
代码示例来源:origin: sentilo/sentilo
private void restartListenerContainer() {
LOGGER.info("Stopping listener container for process {}", processName);
try {
listenerContainer.stop();
} catch (final Exception re) {
LOGGER.warn("An error occurred while stopping listener container. If it is not running, we proceed to start it", re.getMessage());
}
// Clear count pending events value
countPendingEvents = 0;
LOGGER.info("Starting listener container");
if (!listenerContainer.isRunning()) {
listenerContainer.start();
}
}
代码示例来源:origin: spring-projects/spring-integration
@Override
public void start() {
this.container.start();
}
代码示例来源:origin: sentilo/sentilo
@Override
public void start() {
if (listenerContainer.isActive() && listenerContainer.isRunning()) {
processName = getProcessName();
monitorListener = this;
monitorTopic = new ChannelTopic(buildMonitorChannelName(processName));
if (subscribeMonitor()) {
running = true;
LOGGER.info("RedisSubscriptionMonitor started for process {}", processName);
}
}
}
内容来源于网络,如有侵权,请联系作者删除!