本文整理了Java中com.rabbitmq.client.Channel.basicPublish()
方法的一些代码示例,展示了Channel.basicPublish()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Channel.basicPublish()
方法的具体详情如下:
包路径:com.rabbitmq.client.Channel
类名称:Channel
方法名:basicPublish
暂无
代码示例来源:origin: apache/flink
/**
* Called when new data arrives to the sink, and forwards it to RMQ.
*
* @param value
* The incoming data
*/
@Override
public void invoke(IN value) {
try {
byte[] msg = schema.serialize(value);
if (publishOptions == null) {
channel.basicPublish("", queueName, null, msg);
} else {
boolean mandatory = publishOptions.computeMandatory(value);
boolean immediate = publishOptions.computeImmediate(value);
Preconditions.checkState(!(returnListener == null && (mandatory || immediate)),
"Setting mandatory and/or immediate flags to true requires a ReturnListener.");
String rk = publishOptions.computeRoutingKey(value);
String exchange = publishOptions.computeExchange(value);
channel.basicPublish(exchange, rk, mandatory, immediate,
publishOptions.computeProperties(value), msg);
}
} catch (IOException e) {
if (logFailuresOnly) {
LOG.error("Cannot send RMQ message {} at {}", queueName, rmqConnectionConfig.getHost(), e);
} else {
throw new RuntimeException("Cannot send RMQ message " + queueName + " at " + rmqConnectionConfig.getHost(), e);
}
}
}
代码示例来源:origin: apache/nifi
/**
* Publishes message with provided AMQP properties (see
* {@link BasicProperties}) to a pre-defined AMQP Exchange.
*
* @param bytes bytes representing a message.
* @param properties instance of {@link BasicProperties}
* @param exchange the name of AMQP exchange to which messages will be published.
* If not provided 'default' exchange will be used.
* @param routingKey (required) the name of the routingKey to be used by AMQP-based
* system to route messages to its final destination (queue).
*/
void publish(byte[] bytes, BasicProperties properties, String routingKey, String exchange) {
this.validateStringProperty("routingKey", routingKey);
exchange = exchange == null ? "" : exchange.trim();
if (exchange.length() == 0) {
processLog.info("The 'exchangeName' is not specified. Messages will be sent to default exchange");
}
processLog.info("Successfully connected AMQPPublisher to " + this.connectionString + " and '" + exchange
+ "' exchange with '" + routingKey + "' as a routing key.");
final Channel channel = getChannel();
if (channel.isOpen()) {
try {
channel.basicPublish(exchange, routingKey, true, properties, bytes);
} catch (Exception e) {
throw new IllegalStateException("Failed to publish to Exchange '" + exchange + "' with Routing Key '" + routingKey + "'.", e);
}
} else {
throw new IllegalStateException("This instance of AMQPPublisher is invalid since its publishingChannel is closed");
}
}
代码示例来源:origin: zendesk/maxwell
@Override
public void push(RowMap r) throws Exception {
if ( !r.shouldOutput(outputConfig) ) {
context.setPosition(r.getNextPosition());
return;
}
String value = r.toJSON(outputConfig);
String routingKey = getRoutingKeyFromTemplate(r);
channel.basicPublish(exchangeName, routingKey, props, value.getBytes());
if ( r.isTXCommit() ) {
context.setPosition(r.getNextPosition());
}
if ( LOGGER.isDebugEnabled()) {
LOGGER.debug("-> routing key:" + routingKey + ", partition:" + value);
}
}
代码示例来源:origin: yacy/yacy_grid_mcp
private Queue<byte[]> sendInternal(byte[] message) throws IOException {
try {
channel.basicPublish(DEFAULT_EXCHANGE, this.queueName, MessageProperties.PERSISTENT_BASIC, message);
} catch (Exception e) {
// try to reconnect and re-try once...
connect();
channel.basicPublish(DEFAULT_EXCHANGE, this.queueName, MessageProperties.PERSISTENT_BASIC, message);
// if that fails, it simply throws an exception
}
return this;
}
代码示例来源:origin: testcontainers/testcontainers-java
@Test
public void simpleRabbitMqTest() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(rabbitMq.getContainerIpAddress());
factory.setPort(rabbitMq.getMappedPort(RABBITMQ_PORT));
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(RABBIQMQ_TEST_EXCHANGE, "direct", true);
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, RABBIQMQ_TEST_EXCHANGE, RABBITMQ_TEST_ROUTING_KEY);
// Set up a consumer on the queue
final boolean[] messageWasReceived = new boolean[1];
channel.basicConsume(queueName, false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
messageWasReceived[0] = Arrays.equals(body, RABBITMQ_TEST_MESSAGE.getBytes());
}
});
// post a message
channel.basicPublish(RABBIQMQ_TEST_EXCHANGE, RABBITMQ_TEST_ROUTING_KEY, null, RABBITMQ_TEST_MESSAGE.getBytes());
// check the message was received
assertTrue("The message was received", Unreliables.retryUntilSuccess(5, TimeUnit.SECONDS, () -> {
if (!messageWasReceived[0]) {
throw new IllegalStateException("Message not received yet");
}
return true;
}));
}
代码示例来源:origin: zstackio/zstack
void send() throws IOException {
try {
chan.basicPublish(exchange.toString(), serviceId,
true, msg.getAMQPProperties(), data);
} catch (ShutdownSignalException e) {
if (!(conn instanceof AutorecoveringConnection) || serverIps.size() <= 1 || !Platform.IS_RUNNING) {
// the connection is not recoverable
throw e;
}
logger.warn(String.format("failed to send a message because %s; as the connection is recoverable," +
"we are doing recoverable send right now", e.getMessage()));
if (!recoverSend()) {
throw e;
}
}
}
代码示例来源:origin: spring-projects/spring-integration
@Test
public void testInt2773WithOverrideToDefaultAmqpTemplateExchangeAndRoutingLey() throws IOException {
ConnectionFactory connectionFactory = context.getBean(ConnectionFactory.class);
Connection mockConnection = mock(Connection.class);
Channel mockChannel = mock(Channel.class);
when(connectionFactory.createConnection()).thenReturn(mockConnection);
PublisherCallbackChannelImpl publisherCallbackChannel = new PublisherCallbackChannelImpl(mockChannel,
mock(ExecutorService.class));
when(mockConnection.createChannel(false)).thenReturn(publisherCallbackChannel);
MessageChannel requestChannel = context.getBean("overrideTemplateAttributesToEmpty", MessageChannel.class);
requestChannel.send(MessageBuilder.withPayload("test").build());
Mockito.verify(mockChannel, Mockito.times(1)).basicPublish(Mockito.eq(""), Mockito.eq(""),
Mockito.anyBoolean(), Mockito.any(BasicProperties.class), Mockito.any(byte[].class));
}
代码示例来源:origin: spring-projects/spring-integration
@Test
public void testInt2773WithDefaultAmqpTemplateExchangeAndRoutingKey() throws IOException {
ConnectionFactory connectionFactory = context.getBean(ConnectionFactory.class);
Connection mockConnection = mock(Connection.class);
Channel mockChannel = mock(Channel.class);
when(connectionFactory.createConnection()).thenReturn(mockConnection);
PublisherCallbackChannelImpl publisherCallbackChannel = new PublisherCallbackChannelImpl(mockChannel,
mock(ExecutorService.class));
when(mockConnection.createChannel(false)).thenReturn(publisherCallbackChannel);
MessageChannel requestChannel = context.getBean("withDefaultAmqpTemplateExchangeAndRoutingKey",
MessageChannel.class);
requestChannel.send(MessageBuilder.withPayload("test").build());
Mockito.verify(mockChannel, Mockito.times(1)).basicPublish(Mockito.eq(""), Mockito.eq(""),
Mockito.anyBoolean(), Mockito.any(BasicProperties.class), Mockito.any(byte[].class));
}
代码示例来源:origin: spring-projects/spring-integration
@Test
public void testInt2773UseDefaultAmqpTemplateExchangeAndRoutingLey() throws IOException {
ConnectionFactory connectionFactory = context.getBean(ConnectionFactory.class);
Connection mockConnection = mock(Connection.class);
Channel mockChannel = mock(Channel.class);
when(connectionFactory.createConnection()).thenReturn(mockConnection);
PublisherCallbackChannelImpl publisherCallbackChannel = new PublisherCallbackChannelImpl(mockChannel,
mock(ExecutorService.class));
when(mockConnection.createChannel(false)).thenReturn(publisherCallbackChannel);
MessageChannel requestChannel = context.getBean("toRabbitOnlyWithTemplateChannel", MessageChannel.class);
requestChannel.send(MessageBuilder.withPayload("test").build());
Mockito.verify(mockChannel, Mockito.times(1)).basicPublish(Mockito.eq("default.test.exchange"),
Mockito.eq("default.routing.key"),
Mockito.anyBoolean(), Mockito.any(BasicProperties.class), Mockito.any(byte[].class));
}
代码示例来源:origin: zstackio/zstack
chan.basicPublish(exchange.toString(), serviceId,
true, msg.getAMQPProperties(), data);
return true;
代码示例来源:origin: vector4wang/spring-boot-quick
public static void main(String[] args) {
try {
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("guest");
factory.setPassword("guest");
factory.setHost("60.205.191.82");
factory.setPort(5672);
Connection conn = factory.newConnection();
Channel channel = conn.createChannel();
// channel.qu
channel.queueDeclare("hello", false, false, false, null);
String message = "Hello World!";
channel.basicPublish("", "hello", null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
channel.close();
conn.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
代码示例来源:origin: spring-projects/spring-amqp
@Override
public void basicPublish(String exchange, String routingKey,
boolean mandatory, boolean immediate, BasicProperties props,
byte[] body) throws IOException {
this.delegate.basicPublish(exchange, routingKey, mandatory, props, body);
}
代码示例来源:origin: org.springframework.amqp/spring-rabbit
@Override
public void basicPublish(String exchange, String routingKey,
boolean mandatory, boolean immediate, BasicProperties props,
byte[] body) throws IOException {
this.delegate.basicPublish(exchange, routingKey, mandatory, props, body);
}
代码示例来源:origin: spring-projects/spring-amqp
@Override
public void basicPublish(String exchange, String routingKey,
boolean mandatory, BasicProperties props, byte[] body)
throws IOException {
this.delegate.basicPublish(exchange, routingKey, mandatory, props, body);
}
代码示例来源:origin: spring-projects/spring-integration
@Test
public void testInbound() {
@SuppressWarnings("unchecked")
final Message<String> out = (Message<String>) new ObjectToJsonTransformer()
.transform(new GenericMessage<Foo>(new Foo()));
MessageProperties messageProperties = new MessageProperties();
DefaultAmqpHeaderMapper.outboundMapper().fromHeadersToRequest(out.getHeaders(), messageProperties);
final BasicProperties props = new DefaultMessagePropertiesConverter().fromMessageProperties(messageProperties,
"UTF-8");
this.rabbitTemplate.execute(channel -> {
channel.basicPublish("", JSON_TESTQ, props, out.getPayload().getBytes());
return null;
});
Object received = this.rabbitTemplate.receiveAndConvert(JSON_TESTQ);
assertThat(received, instanceOf(Foo.class));
}
代码示例来源:origin: espertechinc/esper
public void send(byte[] bytes) {
try {
channel.basicPublish("", settings.getQueueName(), null, bytes);
} catch (IOException e) {
String message = "Failed to publish to AMQP: " + e.getMessage();
log.error(message, e);
throw new RuntimeException(message, e);
}
}
代码示例来源:origin: org.apache.apex/malhar-contrib
@Override
public void processTuple(byte[] tuple)
{
try {
channel.basicPublish(exchange, "", null, tuple);
} catch (IOException e) {
DTThrowable.rethrow(e);
}
}
}
代码示例来源:origin: NationalSecurityAgency/lemongrenade
public void rePublish(GetResponse response) {
Envelope envelope = response.getEnvelope();
try {
channel.basicPublish(envelope.getExchange(), envelope.getRoutingKey(), response.getProps(), response.getBody());
} catch (IOException e) {
e.printStackTrace();
}
}
代码示例来源:origin: spring-projects/spring-amqp
public void processAndReply(MessagingMessageListenerAdapter listener,
org.springframework.amqp.core.Message message, String expectedExchange, String routingKey,
boolean mandatory, String expectedCorrelationId) throws Exception {
Channel channel = mock(Channel.class);
listener.onMessage(message, channel);
ArgumentCaptor<AMQP.BasicProperties> argument = ArgumentCaptor.forClass(AMQP.BasicProperties.class);
verify(channel).basicPublish(eq(expectedExchange), eq(routingKey), eq(mandatory),
argument.capture(), aryEq(message.getBody()));
assertEquals("Wrong correlationId in reply", expectedCorrelationId, argument.getValue().getCorrelationId());
}
代码示例来源:origin: spring-projects/spring-amqp
protected void doPublish(Channel channel, Address replyTo, Message message) throws IOException {
channel.basicPublish(replyTo.getExchangeName(), replyTo.getRoutingKey(), this.mandatoryPublish,
this.messagePropertiesConverter.fromMessageProperties(message.getMessageProperties(), this.encoding),
message.getBody());
}
内容来源于网络,如有侵权,请联系作者删除!