本文整理了Java中com.rabbitmq.client.Channel.basicAck()
方法的一些代码示例,展示了Channel.basicAck()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Channel.basicAck()
方法的具体详情如下:
包路径:com.rabbitmq.client.Channel
类名称:Channel
方法名:basicAck
暂无
代码示例来源:origin: apache/flink
@Override
protected void acknowledgeSessionIDs(List<Long> sessionIds) {
try {
for (long id : sessionIds) {
channel.basicAck(id, false);
}
channel.txCommit();
} catch (IOException e) {
throw new RuntimeException("Messages could not be acknowledged during checkpoint creation.", e);
}
}
代码示例来源:origin: qiurunze123/miaosha
@RabbitListener(queues=MQConfig.MIAOSHATEST)
public void receiveMiaoShaMessage(Message message, Channel channel) throws IOException {
log.info("接受到的消息为:{}",message);
String messRegister = new String(message.getBody(), "UTF-8");
channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
MiaoShaMessageVo msm = RedisService.stringToBean(messRegister, MiaoShaMessageVo.class);
messageService.insertMs(msm);
}
}
代码示例来源:origin: apache/nifi
public void acknowledge(final GetResponse response) throws IOException {
if (autoAcknowledge) {
return;
}
getChannel().basicAck(response.getEnvelope().getDeliveryTag(), true);
}
代码示例来源:origin: Graylog2/graylog2-server
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
long deliveryTag = envelope.getDeliveryTag();
try {
totalBytesRead.addAndGet(body.length);
lastSecBytesReadTmp.addAndGet(body.length);
final RawMessage rawMessage = new RawMessage(body);
// TODO figure out if we want to unsubscribe after a certain time, or if simply blocking is enough here
if (amqpTransport.isThrottled()) {
amqpTransport.blockUntilUnthrottled();
}
sourceInput.processRawMessage(rawMessage);
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
LOG.error("Error while trying to process AMQP message", e);
if (channel.isOpen()) {
channel.basicNack(deliveryTag, false, requeueInvalid);
if (LOG.isDebugEnabled()) {
if (requeueInvalid) {
LOG.debug("Re-queue message with delivery tag {}", deliveryTag);
} else {
LOG.debug("Message with delivery tag {} not re-queued", deliveryTag);
}
}
}
}
}
});
代码示例来源:origin: yu199195/myth
/**
* Message container simple message listener container.
*
* @return the simple message listener container
*/
@Bean
@ConditionalOnProperty(prefix = "spring.rabbitmq", name = "host")
public SimpleMessageListenerContainer messageContainer() {
SimpleMessageListenerContainer container =
new SimpleMessageListenerContainer(connectionFactory);
container.setQueues(queue());
container.setExposeListenerChannel(true);
container.setMaxConcurrentConsumers(2);
container.setConcurrentConsumers(1);
//设置确认模式手工确认
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
container.setMessageListener((ChannelAwareMessageListener) (message, channel) -> {
byte[] messageBody = message.getBody();
LOGGER.debug("motan 框架接收到的消息");
//确认消息成功消费
final Boolean success = mythMqReceiveService.processMessage(messageBody);
if (success) {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
});
return container;
}
代码示例来源:origin: yu199195/myth
@Bean
@ConditionalOnProperty(prefix = "spring.rabbitmq", name = "host")
public SimpleMessageListenerContainer messageContainer() {
SimpleMessageListenerContainer container =
new SimpleMessageListenerContainer(connectionFactory);
container.setQueues(queue());
container.setExposeListenerChannel(true);
container.setMaxConcurrentConsumers(1);
container.setConcurrentConsumers(1);
//设置确认模式手工确认
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
container.setMessageListener((ChannelAwareMessageListener) (message, channel) -> {
byte[] messageBody = message.getBody();
//确认消息成功消费
final Boolean success = mythMqReceiveService.processMessage(messageBody);
if (success) {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
});
return container;
}
代码示例来源:origin: yu199195/myth
/**
* Message container simple message listener container.
*
* @return the simple message listener container
*/
@Bean
@ConditionalOnProperty(prefix = "spring.rabbitmq", name = "host")
public SimpleMessageListenerContainer messageContainer() {
SimpleMessageListenerContainer container =
new SimpleMessageListenerContainer(connectionFactory);
container.setQueues(queue());
container.setExposeListenerChannel(true);
container.setMaxConcurrentConsumers(3);
container.setConcurrentConsumers(1);
//设置确认模式手工确认
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
container.setMessageListener((ChannelAwareMessageListener) (message, channel) -> {
byte[] messageBody = message.getBody();
LogUtil.debug(LOGGER,()->"springcloud account服务 amqp接收消息");
//确认消息成功消费
final Boolean success = mythMqReceiveService.processMessage(messageBody);
if (success) {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
});
return container;
}
代码示例来源:origin: icclab/cyclops
public void ackMessage(long deliveryTag) {
try {
channel.basicAck(deliveryTag, false);
} catch (Exception ignored) {
}
}
代码示例来源:origin: spring-projects/spring-integration
@ServiceActivator(inputChannel = "foo", outputChannel = "bar")
public Integer handle(@Payload String payload, @Header(AmqpHeaders.CHANNEL) Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) Long deliveryTag) throws Exception {
if (++called > 2) {
channel.basicAck(deliveryTag, false);
}
else {
channel.basicNack(deliveryTag, false, true);
}
return called;
}
代码示例来源:origin: spring-projects/spring-integration
switch (status) {
case ACCEPT:
this.ackInfo.getChannel().basicAck(deliveryTag, false);
break;
case REJECT:
代码示例来源:origin: spring-projects/spring-integration
StaticMessageHeaderAccessor.getAcknowledgmentCallback(received)
.acknowledge(Status.ACCEPT);
verify(channel).basicAck(123L, false);
Channel cached = conn.createChannel(false); // should have been "closed"
verify(connection, times(2)).createChannel();
代码示例来源:origin: spring-projects/spring-amqp
protected synchronized void sendAck(long now) throws IOException {
getChannel().basicAck(this.latestDeferredDeliveryTag, true);
this.lastAck = now;
this.pendingAcks = 0;
}
代码示例来源:origin: gmr/rabbitmq-flume-plugin
private void ackMessage(long deliveryTag) {
try {
channel.basicAck(deliveryTag, false);
} catch (IOException ex) {
logger.error("Error acknowledging message from {}: {}", this, ex);
counterGroup.incrementAndGet(COUNTER_EXCEPTION);
}
counterGroup.incrementAndGet(COUNTER_ACK);
}
代码示例来源:origin: battcn/spring-boot2-learning
@RabbitListener(queues = {RabbitConfig.MANUAL_BOOK_QUEUE})
public void listenerManualAck(Book book, Message message, Channel channel) {
log.info("[listenerManualAck 监听的消息] - [{}]", book.toString());
try {
// TODO 通知 MQ 消息已被成功消费,可以ACK了
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (IOException e) {
// TODO 如果报错了,那么我们可以进行容错处理,比如转移当前消息进入其它队列
}
}
}
代码示例来源:origin: battcn/spring-boot2-learning
@RabbitListener(queues = {RabbitConfig.REGISTER_QUEUE_NAME})
public void listenerDelayQueue(Book book, Message message, Channel channel) {
log.info("[listenerDelayQueue 监听的消息] - [消费时间] - [{}] - [{}]", LocalDateTime.now(), book.toString());
try {
// TODO 通知 MQ 消息已被成功消费,可以ACK了
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (IOException e) {
// TODO 如果报错了,那么我们可以进行容错处理,比如转移当前消息进入其它队列
}
}
代码示例来源:origin: yidao620c/SpringBootBucket
/**
* FANOUT广播队列监听一.
*
* @param message the message
* @param channel the channel
* @throws IOException the io exception 这里异常需要处理
*/
@RabbitListener(queues = {"FANOUT_QUEUE_A"})
public void on(Message message, Channel channel) throws IOException {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
log.debug("FANOUT_QUEUE_A " + new String(message.getBody()));
}
代码示例来源:origin: yidao620c/SpringBootBucket
/**
* FANOUT广播队列监听二.
*
* @param message the message
* @param channel the channel
* @throws IOException the io exception 这里异常需要处理
*/
@RabbitListener(queues = {"FANOUT_QUEUE_B"})
public void t(Message message, Channel channel) throws IOException {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
log.debug("FANOUT_QUEUE_B " + new String(message.getBody()));
}
代码示例来源:origin: heibaiying/spring-samples-for-all
@Override
public void onMessage(Message message, Channel channel) throws Exception {
// 可以在这个地方得到消息额外属性
MessageProperties properties = message.getMessageProperties();
//得到消息体内容
byte[] body = message.getBody();
System.out.println(firstQueue().getName() + "收到消息:" + new String(body));
//第二个参数 代表是否一次签收多条
channel.basicAck(properties.getDeliveryTag(), false);
}
});
代码示例来源:origin: spring-projects/spring-amqp
private void asyncSuccess(InvocationResult resultArg, Message request, Channel channel, Object source, Object r) {
doHandleResult(new InvocationResult(r, resultArg.getSendTo(), resultArg.getReturnType()), request,
channel, source);
try {
channel.basicAck(request.getMessageProperties().getDeliveryTag(), false);
}
catch (IOException e) {
this.logger.error("Failed to nack message", e);
}
}
代码示例来源:origin: spring-projects/spring-amqp
@Override
public void onMessage(Message message, Channel channel) throws Exception {
String value = new String(message.getBody());
try {
logger.debug("Acking: " + value);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
finally {
latch.countDown();
}
}
}
内容来源于网络,如有侵权,请联系作者删除!