com.rabbitmq.client.Channel.basicAck()方法的使用及代码示例

x33g5p2x  于2022-01-18 转载在 其他  
字(8.9k)|赞(0)|评价(0)|浏览(849)

本文整理了Java中com.rabbitmq.client.Channel.basicAck()方法的一些代码示例,展示了Channel.basicAck()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Channel.basicAck()方法的具体详情如下:
包路径:com.rabbitmq.client.Channel
类名称:Channel
方法名:basicAck

Channel.basicAck介绍

暂无

代码示例

代码示例来源:origin: apache/flink

@Override
protected void acknowledgeSessionIDs(List<Long> sessionIds) {
  try {
    for (long id : sessionIds) {
      channel.basicAck(id, false);
    }
    channel.txCommit();
  } catch (IOException e) {
    throw new RuntimeException("Messages could not be acknowledged during checkpoint creation.", e);
  }
}

代码示例来源:origin: qiurunze123/miaosha

@RabbitListener(queues=MQConfig.MIAOSHATEST)
  public void receiveMiaoShaMessage(Message message, Channel channel) throws IOException {
    log.info("接受到的消息为:{}",message);
    String messRegister = new String(message.getBody(), "UTF-8");
    channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
    MiaoShaMessageVo msm  = RedisService.stringToBean(messRegister, MiaoShaMessageVo.class);
    messageService.insertMs(msm);
    }
}

代码示例来源:origin: apache/nifi

public void acknowledge(final GetResponse response) throws IOException {
  if (autoAcknowledge) {
    return;
  }
  getChannel().basicAck(response.getEnvelope().getDeliveryTag(), true);
}

代码示例来源:origin: Graylog2/graylog2-server

@Override
  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    long deliveryTag = envelope.getDeliveryTag();
    try {
      totalBytesRead.addAndGet(body.length);
      lastSecBytesReadTmp.addAndGet(body.length);
      final RawMessage rawMessage = new RawMessage(body);
      // TODO figure out if we want to unsubscribe after a certain time, or if simply blocking is enough here
      if (amqpTransport.isThrottled()) {
        amqpTransport.blockUntilUnthrottled();
      }
      sourceInput.processRawMessage(rawMessage);
      channel.basicAck(deliveryTag, false);
    } catch (Exception e) {
      LOG.error("Error while trying to process AMQP message", e);
      if (channel.isOpen()) {
        channel.basicNack(deliveryTag, false, requeueInvalid);
        if (LOG.isDebugEnabled()) {
          if (requeueInvalid) {
            LOG.debug("Re-queue message with delivery tag {}", deliveryTag);
          } else {
            LOG.debug("Message with delivery tag {} not re-queued", deliveryTag);
          }
        }
      }
    }
  }
});

代码示例来源:origin: yu199195/myth

/**
 * Message container simple message listener container.
 *
 * @return the simple message listener container
 */
@Bean
@ConditionalOnProperty(prefix = "spring.rabbitmq", name = "host")
public SimpleMessageListenerContainer messageContainer() {
  SimpleMessageListenerContainer container =
      new SimpleMessageListenerContainer(connectionFactory);
  container.setQueues(queue());
  container.setExposeListenerChannel(true);
  container.setMaxConcurrentConsumers(2);
  container.setConcurrentConsumers(1);
  //设置确认模式手工确认
  container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
  container.setMessageListener((ChannelAwareMessageListener) (message, channel) -> {
    byte[] messageBody = message.getBody();
    LOGGER.debug("motan 框架接收到的消息");
    //确认消息成功消费
    final Boolean success = mythMqReceiveService.processMessage(messageBody);
    if (success) {
      channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
  });
  return container;
}

代码示例来源:origin: yu199195/myth

@Bean
@ConditionalOnProperty(prefix = "spring.rabbitmq", name = "host")
public SimpleMessageListenerContainer messageContainer() {
  SimpleMessageListenerContainer container =
      new SimpleMessageListenerContainer(connectionFactory);
  container.setQueues(queue());
  container.setExposeListenerChannel(true);
  container.setMaxConcurrentConsumers(1);
  container.setConcurrentConsumers(1);
  //设置确认模式手工确认
  container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
  container.setMessageListener((ChannelAwareMessageListener) (message, channel) -> {
    byte[] messageBody = message.getBody();
    //确认消息成功消费
    final Boolean success = mythMqReceiveService.processMessage(messageBody);
    if (success) {
      channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
  });
  return container;
}

代码示例来源:origin: yu199195/myth

/**
 * Message container simple message listener container.
 *
 * @return the simple message listener container
 */
@Bean
@ConditionalOnProperty(prefix = "spring.rabbitmq", name = "host")
public SimpleMessageListenerContainer messageContainer() {
  SimpleMessageListenerContainer container =
      new SimpleMessageListenerContainer(connectionFactory);
  container.setQueues(queue());
  container.setExposeListenerChannel(true);
  container.setMaxConcurrentConsumers(3);
  container.setConcurrentConsumers(1);
  //设置确认模式手工确认
  container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
  container.setMessageListener((ChannelAwareMessageListener) (message, channel) -> {
    byte[] messageBody = message.getBody();
    LogUtil.debug(LOGGER,()->"springcloud  account服务  amqp接收消息");
    //确认消息成功消费
    final Boolean success = mythMqReceiveService.processMessage(messageBody);
    if (success) {
      channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
  });
  return container;
}

代码示例来源:origin: icclab/cyclops

public void ackMessage(long deliveryTag) {
  try {
    channel.basicAck(deliveryTag, false);
  } catch (Exception ignored) {
  }
}

代码示例来源:origin: spring-projects/spring-integration

@ServiceActivator(inputChannel = "foo", outputChannel = "bar")
public Integer handle(@Payload String payload, @Header(AmqpHeaders.CHANNEL) Channel channel,
    @Header(AmqpHeaders.DELIVERY_TAG) Long deliveryTag) throws Exception {
  if (++called > 2) {
    channel.basicAck(deliveryTag, false);
  }
  else {
    channel.basicNack(deliveryTag, false, true);
  }
  return called;
}

代码示例来源:origin: spring-projects/spring-integration

switch (status) {
case ACCEPT:
  this.ackInfo.getChannel().basicAck(deliveryTag, false);
  break;
case REJECT:

代码示例来源:origin: spring-projects/spring-integration

StaticMessageHeaderAccessor.getAcknowledgmentCallback(received)
    .acknowledge(Status.ACCEPT);
verify(channel).basicAck(123L, false);
Channel cached = conn.createChannel(false); // should have been "closed"
verify(connection, times(2)).createChannel();

代码示例来源:origin: spring-projects/spring-amqp

protected synchronized void sendAck(long now) throws IOException {
  getChannel().basicAck(this.latestDeferredDeliveryTag, true);
  this.lastAck = now;
  this.pendingAcks = 0;
}

代码示例来源:origin: gmr/rabbitmq-flume-plugin

private void ackMessage(long deliveryTag) {
  try {
    channel.basicAck(deliveryTag, false);
  } catch (IOException ex) {
    logger.error("Error acknowledging message from {}: {}", this, ex);
    counterGroup.incrementAndGet(COUNTER_EXCEPTION);
  }
  counterGroup.incrementAndGet(COUNTER_ACK);
}

代码示例来源:origin: battcn/spring-boot2-learning

@RabbitListener(queues = {RabbitConfig.MANUAL_BOOK_QUEUE})
  public void listenerManualAck(Book book, Message message, Channel channel) {
    log.info("[listenerManualAck 监听的消息] - [{}]", book.toString());
    try {
      // TODO 通知 MQ 消息已被成功消费,可以ACK了
      channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    } catch (IOException e) {
      // TODO 如果报错了,那么我们可以进行容错处理,比如转移当前消息进入其它队列
    }
  }
}

代码示例来源:origin: battcn/spring-boot2-learning

@RabbitListener(queues = {RabbitConfig.REGISTER_QUEUE_NAME})
public void listenerDelayQueue(Book book, Message message, Channel channel) {
  log.info("[listenerDelayQueue 监听的消息] - [消费时间] - [{}] - [{}]", LocalDateTime.now(), book.toString());
  try {
    // TODO 通知 MQ 消息已被成功消费,可以ACK了
    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
  } catch (IOException e) {
    // TODO 如果报错了,那么我们可以进行容错处理,比如转移当前消息进入其它队列
  }
}

代码示例来源:origin: yidao620c/SpringBootBucket

/**
 * FANOUT广播队列监听一.
 *
 * @param message the message
 * @param channel the channel
 * @throws IOException the io exception  这里异常需要处理
 */
@RabbitListener(queues = {"FANOUT_QUEUE_A"})
public void on(Message message, Channel channel) throws IOException {
  channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
  log.debug("FANOUT_QUEUE_A " + new String(message.getBody()));
}

代码示例来源:origin: yidao620c/SpringBootBucket

/**
 * FANOUT广播队列监听二.
 *
 * @param message the message
 * @param channel the channel
 * @throws IOException the io exception   这里异常需要处理
 */
@RabbitListener(queues = {"FANOUT_QUEUE_B"})
public void t(Message message, Channel channel) throws IOException {
  channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
  log.debug("FANOUT_QUEUE_B " + new String(message.getBody()));
}

代码示例来源:origin: heibaiying/spring-samples-for-all

@Override
  public void onMessage(Message message, Channel channel) throws Exception {
    // 可以在这个地方得到消息额外属性
    MessageProperties properties = message.getMessageProperties();
    //得到消息体内容
    byte[] body = message.getBody();
    System.out.println(firstQueue().getName() + "收到消息:" + new String(body));
    //第二个参数 代表是否一次签收多条
    channel.basicAck(properties.getDeliveryTag(), false);
  }
});

代码示例来源:origin: spring-projects/spring-amqp

private void asyncSuccess(InvocationResult resultArg, Message request, Channel channel, Object source, Object r) {
  doHandleResult(new InvocationResult(r, resultArg.getSendTo(), resultArg.getReturnType()), request,
      channel, source);
  try {
    channel.basicAck(request.getMessageProperties().getDeliveryTag(), false);
  }
  catch (IOException e) {
    this.logger.error("Failed to nack message", e);
  }
}

代码示例来源:origin: spring-projects/spring-amqp

@Override
  public void onMessage(Message message, Channel channel) throws Exception {
    String value = new String(message.getBody());
    try {
      logger.debug("Acking: " + value);
      channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
    finally {
      latch.countDown();
    }
  }
}

相关文章

微信公众号

最新文章

更多