com.rabbitmq.client.Channel.basicPublish()方法的使用及代码示例

x33g5p2x  于2022-01-18 转载在 其他  
字(12.4k)|赞(0)|评价(0)|浏览(544)

本文整理了Java中com.rabbitmq.client.Channel.basicPublish()方法的一些代码示例,展示了Channel.basicPublish()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Channel.basicPublish()方法的具体详情如下:
包路径:com.rabbitmq.client.Channel
类名称:Channel
方法名:basicPublish

Channel.basicPublish介绍

暂无

代码示例

代码示例来源:origin: apache/flink

/**
 * Called when new data arrives to the sink, and forwards it to RMQ.
 *
 * @param value
 *            The incoming data
 */
@Override
public void invoke(IN value) {
  try {
    byte[] msg = schema.serialize(value);
    if (publishOptions == null) {
      channel.basicPublish("", queueName, null, msg);
    } else {
      boolean mandatory = publishOptions.computeMandatory(value);
      boolean immediate = publishOptions.computeImmediate(value);
      Preconditions.checkState(!(returnListener == null && (mandatory || immediate)),
        "Setting mandatory and/or immediate flags to true requires a ReturnListener.");
      String rk = publishOptions.computeRoutingKey(value);
      String exchange = publishOptions.computeExchange(value);
      channel.basicPublish(exchange, rk, mandatory, immediate,
        publishOptions.computeProperties(value), msg);
    }
  } catch (IOException e) {
    if (logFailuresOnly) {
      LOG.error("Cannot send RMQ message {} at {}", queueName, rmqConnectionConfig.getHost(), e);
    } else {
      throw new RuntimeException("Cannot send RMQ message " + queueName + " at " + rmqConnectionConfig.getHost(), e);
    }
  }
}

代码示例来源:origin: apache/nifi

/**
 * Publishes message with provided AMQP properties (see
 * {@link BasicProperties}) to a pre-defined AMQP Exchange.
 *
 * @param bytes bytes representing a message.
 * @param properties instance of {@link BasicProperties}
 * @param exchange the name of AMQP exchange to which messages will be published.
 *            If not provided 'default' exchange will be used.
 * @param routingKey (required) the name of the routingKey to be used by AMQP-based
 *            system to route messages to its final destination (queue).
 */
void publish(byte[] bytes, BasicProperties properties, String routingKey, String exchange) {
  this.validateStringProperty("routingKey", routingKey);
  exchange = exchange == null ? "" : exchange.trim();
  if (exchange.length() == 0) {
    processLog.info("The 'exchangeName' is not specified. Messages will be sent to default exchange");
  }
  processLog.info("Successfully connected AMQPPublisher to " + this.connectionString + " and '" + exchange
      + "' exchange with '" + routingKey + "' as a routing key.");
  final Channel channel = getChannel();
  if (channel.isOpen()) {
    try {
      channel.basicPublish(exchange, routingKey, true, properties, bytes);
    } catch (Exception e) {
      throw new IllegalStateException("Failed to publish to Exchange '" + exchange + "' with Routing Key '" + routingKey + "'.", e);
    }
  } else {
    throw new IllegalStateException("This instance of AMQPPublisher is invalid since its publishingChannel is closed");
  }
}

代码示例来源:origin: zendesk/maxwell

@Override
public void push(RowMap r) throws Exception {
  if ( !r.shouldOutput(outputConfig) ) {
    context.setPosition(r.getNextPosition());
    return;
  }
  String value = r.toJSON(outputConfig);
  String routingKey = getRoutingKeyFromTemplate(r);
  channel.basicPublish(exchangeName, routingKey, props, value.getBytes());
  if ( r.isTXCommit() ) {
    context.setPosition(r.getNextPosition());
  }
  if ( LOGGER.isDebugEnabled()) {
    LOGGER.debug("->  routing key:" + routingKey + ", partition:" + value);
  }
}

代码示例来源:origin: yacy/yacy_grid_mcp

private Queue<byte[]> sendInternal(byte[] message) throws IOException {
  try {
    channel.basicPublish(DEFAULT_EXCHANGE, this.queueName, MessageProperties.PERSISTENT_BASIC, message);
  } catch (Exception e) {
    // try to reconnect and re-try once...
    connect();
    channel.basicPublish(DEFAULT_EXCHANGE, this.queueName, MessageProperties.PERSISTENT_BASIC, message);
    // if that fails, it simply throws an exception
  }
  return this;
}

代码示例来源:origin: testcontainers/testcontainers-java

@Test
public void simpleRabbitMqTest() throws IOException, TimeoutException {
  ConnectionFactory factory = new ConnectionFactory();
  factory.setHost(rabbitMq.getContainerIpAddress());
  factory.setPort(rabbitMq.getMappedPort(RABBITMQ_PORT));
  Connection connection = factory.newConnection();
  Channel channel = connection.createChannel();
  channel.exchangeDeclare(RABBIQMQ_TEST_EXCHANGE, "direct", true);
  String queueName = channel.queueDeclare().getQueue();
  channel.queueBind(queueName, RABBIQMQ_TEST_EXCHANGE, RABBITMQ_TEST_ROUTING_KEY);
  // Set up a consumer on the queue
  final boolean[] messageWasReceived = new boolean[1];
  channel.basicConsume(queueName, false, new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
      messageWasReceived[0] = Arrays.equals(body, RABBITMQ_TEST_MESSAGE.getBytes());
    }
  });
  // post a message
  channel.basicPublish(RABBIQMQ_TEST_EXCHANGE, RABBITMQ_TEST_ROUTING_KEY, null, RABBITMQ_TEST_MESSAGE.getBytes());
  // check the message was received
  assertTrue("The message was received", Unreliables.retryUntilSuccess(5, TimeUnit.SECONDS, () -> {
    if (!messageWasReceived[0]) {
      throw new IllegalStateException("Message not received yet");
    }
    return true;
  }));
}

代码示例来源:origin: zstackio/zstack

void send() throws IOException {
  try {
    chan.basicPublish(exchange.toString(), serviceId,
        true, msg.getAMQPProperties(), data);
  } catch (ShutdownSignalException e) {
    if (!(conn instanceof AutorecoveringConnection) || serverIps.size() <= 1 || !Platform.IS_RUNNING) {
      // the connection is not recoverable
      throw e;
    }
    logger.warn(String.format("failed to send a message because %s; as the connection is recoverable," +
        "we are doing recoverable send right now", e.getMessage()));
    if (!recoverSend()) {
      throw e;
    }
  }
}

代码示例来源:origin: spring-projects/spring-integration

@Test
public void testInt2773WithOverrideToDefaultAmqpTemplateExchangeAndRoutingLey() throws IOException {
  ConnectionFactory connectionFactory = context.getBean(ConnectionFactory.class);
  Connection mockConnection = mock(Connection.class);
  Channel mockChannel = mock(Channel.class);
  when(connectionFactory.createConnection()).thenReturn(mockConnection);
  PublisherCallbackChannelImpl publisherCallbackChannel = new PublisherCallbackChannelImpl(mockChannel,
      mock(ExecutorService.class));
  when(mockConnection.createChannel(false)).thenReturn(publisherCallbackChannel);
  MessageChannel requestChannel = context.getBean("overrideTemplateAttributesToEmpty", MessageChannel.class);
  requestChannel.send(MessageBuilder.withPayload("test").build());
  Mockito.verify(mockChannel, Mockito.times(1)).basicPublish(Mockito.eq(""), Mockito.eq(""),
      Mockito.anyBoolean(), Mockito.any(BasicProperties.class), Mockito.any(byte[].class));
}

代码示例来源:origin: spring-projects/spring-integration

@Test
public void testInt2773WithDefaultAmqpTemplateExchangeAndRoutingKey() throws IOException {
  ConnectionFactory connectionFactory = context.getBean(ConnectionFactory.class);
  Connection mockConnection = mock(Connection.class);
  Channel mockChannel = mock(Channel.class);
  when(connectionFactory.createConnection()).thenReturn(mockConnection);
  PublisherCallbackChannelImpl publisherCallbackChannel = new PublisherCallbackChannelImpl(mockChannel,
      mock(ExecutorService.class));
  when(mockConnection.createChannel(false)).thenReturn(publisherCallbackChannel);
  MessageChannel requestChannel = context.getBean("withDefaultAmqpTemplateExchangeAndRoutingKey",
      MessageChannel.class);
  requestChannel.send(MessageBuilder.withPayload("test").build());
  Mockito.verify(mockChannel, Mockito.times(1)).basicPublish(Mockito.eq(""), Mockito.eq(""),
      Mockito.anyBoolean(), Mockito.any(BasicProperties.class), Mockito.any(byte[].class));
}

代码示例来源:origin: spring-projects/spring-integration

@Test
public void testInt2773UseDefaultAmqpTemplateExchangeAndRoutingLey() throws IOException {
  ConnectionFactory connectionFactory = context.getBean(ConnectionFactory.class);
  Connection mockConnection = mock(Connection.class);
  Channel mockChannel = mock(Channel.class);
  when(connectionFactory.createConnection()).thenReturn(mockConnection);
  PublisherCallbackChannelImpl publisherCallbackChannel = new PublisherCallbackChannelImpl(mockChannel,
      mock(ExecutorService.class));
  when(mockConnection.createChannel(false)).thenReturn(publisherCallbackChannel);
  MessageChannel requestChannel = context.getBean("toRabbitOnlyWithTemplateChannel", MessageChannel.class);
  requestChannel.send(MessageBuilder.withPayload("test").build());
  Mockito.verify(mockChannel, Mockito.times(1)).basicPublish(Mockito.eq("default.test.exchange"),
      Mockito.eq("default.routing.key"),
      Mockito.anyBoolean(), Mockito.any(BasicProperties.class), Mockito.any(byte[].class));
}

代码示例来源:origin: zstackio/zstack

chan.basicPublish(exchange.toString(), serviceId,
    true, msg.getAMQPProperties(), data);
return true;

代码示例来源:origin: vector4wang/spring-boot-quick

public static void main(String[] args) {
    try {
      ConnectionFactory factory = new ConnectionFactory();
      factory.setUsername("guest");
      factory.setPassword("guest");
      factory.setHost("60.205.191.82");
      factory.setPort(5672);
      Connection conn = factory.newConnection();
      Channel channel = conn.createChannel();

//            channel.qu
      channel.queueDeclare("hello", false, false, false, null);
      String message = "Hello World!";
      channel.basicPublish("", "hello", null, message.getBytes());
      System.out.println(" [x] Sent '" + message + "'");

      channel.close();
      conn.close();
    } catch (IOException e) {
      e.printStackTrace();
    } catch (TimeoutException e) {
      e.printStackTrace();
    }
  }
}

代码示例来源:origin: spring-projects/spring-amqp

@Override
public void basicPublish(String exchange, String routingKey,
    boolean mandatory, boolean immediate, BasicProperties props,
    byte[] body) throws IOException {
  this.delegate.basicPublish(exchange, routingKey, mandatory, props, body);
}

代码示例来源:origin: org.springframework.amqp/spring-rabbit

@Override
public void basicPublish(String exchange, String routingKey,
    boolean mandatory, boolean immediate, BasicProperties props,
    byte[] body) throws IOException {
  this.delegate.basicPublish(exchange, routingKey, mandatory, props, body);
}

代码示例来源:origin: spring-projects/spring-amqp

@Override
public void basicPublish(String exchange, String routingKey,
    boolean mandatory, BasicProperties props, byte[] body)
    throws IOException {
  this.delegate.basicPublish(exchange, routingKey, mandatory, props, body);
}

代码示例来源:origin: spring-projects/spring-integration

@Test
public void testInbound() {
  @SuppressWarnings("unchecked")
  final Message<String> out = (Message<String>) new ObjectToJsonTransformer()
      .transform(new GenericMessage<Foo>(new Foo()));
  MessageProperties messageProperties = new MessageProperties();
  DefaultAmqpHeaderMapper.outboundMapper().fromHeadersToRequest(out.getHeaders(), messageProperties);
  final BasicProperties props = new DefaultMessagePropertiesConverter().fromMessageProperties(messageProperties,
      "UTF-8");
  this.rabbitTemplate.execute(channel -> {
    channel.basicPublish("", JSON_TESTQ, props, out.getPayload().getBytes());
    return null;
  });
  Object received = this.rabbitTemplate.receiveAndConvert(JSON_TESTQ);
  assertThat(received, instanceOf(Foo.class));
}

代码示例来源:origin: espertechinc/esper

public void send(byte[] bytes) {
  try {
    channel.basicPublish("", settings.getQueueName(), null, bytes);
  } catch (IOException e) {
    String message = "Failed to publish to AMQP: " + e.getMessage();
    log.error(message, e);
    throw new RuntimeException(message, e);
  }
}

代码示例来源:origin: org.apache.apex/malhar-contrib

@Override
 public void processTuple(byte[] tuple)
 {
  try {
   channel.basicPublish(exchange, "", null, tuple);
  } catch (IOException e) {
   DTThrowable.rethrow(e);
  }
 }
}

代码示例来源:origin: NationalSecurityAgency/lemongrenade

public void rePublish(GetResponse response) {
  Envelope envelope = response.getEnvelope();
  try {
    channel.basicPublish(envelope.getExchange(), envelope.getRoutingKey(), response.getProps(), response.getBody());
  } catch (IOException e) {
    e.printStackTrace();
  }
}

代码示例来源:origin: spring-projects/spring-amqp

public void processAndReply(MessagingMessageListenerAdapter listener,
    org.springframework.amqp.core.Message message, String expectedExchange, String routingKey,
    boolean mandatory, String expectedCorrelationId) throws Exception {
  Channel channel = mock(Channel.class);
  listener.onMessage(message, channel);
  ArgumentCaptor<AMQP.BasicProperties> argument = ArgumentCaptor.forClass(AMQP.BasicProperties.class);
  verify(channel).basicPublish(eq(expectedExchange), eq(routingKey), eq(mandatory),
      argument.capture(), aryEq(message.getBody()));
  assertEquals("Wrong correlationId in reply", expectedCorrelationId, argument.getValue().getCorrelationId());
}

代码示例来源:origin: spring-projects/spring-amqp

protected void doPublish(Channel channel, Address replyTo, Message message) throws IOException {
  channel.basicPublish(replyTo.getExchangeName(), replyTo.getRoutingKey(), this.mandatoryPublish,
      this.messagePropertiesConverter.fromMessageProperties(message.getMessageProperties(), this.encoding),
      message.getBody());
}

相关文章

微信公众号

最新文章

更多