本文整理了Java中org.springframework.amqp.core.Message
类的一些代码示例,展示了Message
类的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Message
类的具体详情如下:
包路径:org.springframework.amqp.core.Message
类名称:Message
[英]The 0-8 and 0-9-1 AMQP specifications do not define an Message class or interface. Instead, when performing an operation such as basicPublish the content is passed as a byte-array argument and additional properties are passed in as separate arguments. Spring AMQP defines a Message class as part of a more general AMQP domain model representation. The purpose of the Message class is to simply encapsulate the body and properties within a single instance so that the rest of the AMQP API can in turn be simpler.
[中]0-8和0-9-1 AMQP规范没有定义消息类或接口。相反,在执行basicPublish等操作时,内容作为字节数组参数传递,其他属性作为单独的参数传递。Spring AMQP将消息类定义为更通用的AMQP域模型表示的一部分。Message类的目的是简单地将主体和属性封装在一个实例中,从而使AMQP API的其余部分变得更简单。
代码示例来源:origin: qiurunze123/miaosha
@RabbitListener(queues=MQConfig.MIAOSHATEST)
public void receiveMiaoShaMessage(Message message, Channel channel) throws IOException {
log.info("接受到的消息为:{}",message);
String messRegister = new String(message.getBody(), "UTF-8");
channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
MiaoShaMessageVo msm = RedisService.stringToBean(messRegister, MiaoShaMessageVo.class);
messageService.insertMs(msm);
}
}
代码示例来源:origin: FlowCI/flow-platform
public PriorityMessage(Message message) {
super(message.getBody(), message.getMessageProperties());
this.timestamp = System.nanoTime();
}
代码示例来源:origin: spring-projects/spring-amqp
@Test
public void stringToMessage() throws Exception {
SimpleMessageConverter converter = new SimpleMessageConverter();
Message message = converter.toMessage("test", new MessageProperties());
String contentType = message.getMessageProperties().getContentType();
String content = new String(message.getBody(),
message.getMessageProperties().getContentEncoding());
assertEquals("text/plain", contentType);
assertEquals("test", content);
}
代码示例来源:origin: spring-projects/spring-integration
Object[] args = invocation.getArguments();
org.springframework.amqp.core.Message amqpRequestMessage = (org.springframework.amqp.core.Message) args[2];
MessageProperties properties = amqpRequestMessage.getMessageProperties();
assertNull(properties.getHeaders().get("foo"));
MessageProperties amqpProperties = new MessageProperties();
amqpProperties.setAppId("test.appId");
amqpProperties.setHeader("foobar", "foobar");
amqpProperties.setHeader("bar", "bar");
return new org.springframework.amqp.core.Message("hello".getBytes(), amqpProperties);
})
.when(amqpTemplate).sendAndReceive(Mockito.any(String.class), Mockito.any(String.class),
代码示例来源:origin: spring-projects/spring-integration
@Test
public void withHeaderMapperDefaultMapping() throws Exception {
AmqpInboundChannelAdapter adapter = context.getBean("withHeaderMapperDefaultMapping",
AmqpInboundChannelAdapter.class);
AbstractMessageListenerContainer mlc =
TestUtils.getPropertyValue(adapter, "messageListenerContainer", AbstractMessageListenerContainer.class);
ChannelAwareMessageListener listener = TestUtils.getPropertyValue(mlc, "messageListener",
ChannelAwareMessageListener.class);
MessageProperties amqpProperties = new MessageProperties();
amqpProperties.setAppId("test.appId");
amqpProperties.setClusterId("test.clusterId");
amqpProperties.setContentEncoding("test.contentEncoding");
amqpProperties.setContentLength(99L);
amqpProperties.setContentType("test.contentType");
amqpProperties.setHeader("foo", "foo");
amqpProperties.setHeader("bar", "bar");
Message amqpMessage = new Message("hello".getBytes(), amqpProperties);
listener.onMessage(amqpMessage, null);
QueueChannel requestChannel = context.getBean("requestChannel", QueueChannel.class);
org.springframework.messaging.Message<?> siMessage = requestChannel.receive(0);
assertNotNull(siMessage.getHeaders().get("bar"));
assertNotNull(siMessage.getHeaders().get("foo"));
assertNotNull(siMessage.getHeaders().get(AmqpHeaders.CONTENT_ENCODING));
assertNotNull(siMessage.getHeaders().get(AmqpHeaders.CLUSTER_ID));
assertNotNull(siMessage.getHeaders().get(AmqpHeaders.APP_ID));
assertNotNull(siMessage.getHeaders().get(AmqpHeaders.CONTENT_TYPE));
}
代码示例来源:origin: spring-projects/spring-integration
Object[] args = invocation.getArguments();
Message amqpReplyMessage = (Message) args[2];
MessageProperties properties = amqpReplyMessage.getMessageProperties();
assertEquals("bar", properties.getHeaders().get("bar"));
return null;
}).when(amqpTemplate).send(Mockito.any(String.class), Mockito.any(String.class),
ChannelAwareMessageListener listener = TestUtils.getPropertyValue(mlc, "messageListener",
ChannelAwareMessageListener.class);
MessageProperties amqpProperties = new MessageProperties();
amqpProperties.setAppId("test.appId");
amqpProperties.setClusterId("test.clusterId");
amqpProperties.setContentEncoding("test.contentEncoding");
amqpProperties.setHeader("foo", "foo");
amqpProperties.setHeader("bar", "bar");
Message amqpMessage = new Message("hello".getBytes(), amqpProperties);
listener.onMessage(amqpMessage, null);
代码示例来源:origin: spring-projects/spring-integration
@Test
public void adapterWithContentType() throws Exception {
RabbitTemplate template = new RabbitTemplate(this.connectionFactory);
template.setDefaultReceiveQueue(this.queue.getName());
while (template.receive() != null) {
// drain
}
Message<?> message = MessageBuilder.withPayload("hello")
.setHeader(AmqpHeaders.CONTENT_TYPE, "application/json")
.build();
this.ctRequestChannel.send(message);
org.springframework.amqp.core.Message m = receive(template);
assertNotNull(m);
assertEquals("\"hello\"", new String(m.getBody(), "UTF-8"));
assertEquals("application/json", m.getMessageProperties().getContentType());
assertEquals("java.lang.String",
m.getMessageProperties().getHeaders().get(JsonHeaders.TYPE_ID.replaceFirst(JsonHeaders.PREFIX, "")));
message = MessageBuilder.withPayload("hello")
.build();
this.ctRequestChannel.send(message);
m = receive(template);
assertNotNull(m);
assertEquals("hello", new String(m.getBody(), "UTF-8"));
assertEquals("text/plain", m.getMessageProperties().getContentType());
while (template.receive() != null) {
// drain
}
}
代码示例来源:origin: spring-projects/spring-amqp
@Test
public void messagingMessageReturned() {
Message message = org.springframework.amqp.core.MessageBuilder.withBody("\"messaging\"".getBytes())
.andProperties(MessagePropertiesBuilder.newInstance().setContentType("application/json").build()).build();
message = this.rabbitTemplate.sendAndReceive("test.messaging.message", message);
assertThat(message, is(notNullValue()));
assertThat(new String(message.getBody()), equalTo("{\"field\":\"MESSAGING\"}"));
assertThat(message.getMessageProperties().getHeaders().get("foo"), equalTo("bar"));
}
代码示例来源:origin: openzipkin/brave
/**
* MethodInterceptor for {@link SimpleMessageListenerContainer.ContainerDelegate#invokeListener(Channel,
* Message)}
*/
@Override public Object invoke(MethodInvocation methodInvocation) throws Throwable {
Message message = (Message) methodInvocation.getArguments()[1];
TraceContextOrSamplingFlags extracted = springRabbitTracing.extractAndClearHeaders(message);
// named for BlockingQueueConsumer.nextMessage, which we can't currently see
Span consumerSpan = tracer.nextSpan(extracted);
Span listenerSpan = tracer.newChild(consumerSpan.context());
if (!consumerSpan.isNoop()) {
setConsumerSpan(consumerSpan, message.getMessageProperties());
// incur timestamp overhead only once
long timestamp = tracing.clock(consumerSpan.context()).currentTimeMicroseconds();
consumerSpan.start(timestamp);
long consumerFinish = timestamp + 1L; // save a clock reading
consumerSpan.finish(consumerFinish);
// not using scoped span as we want to start with a pre-configured time
listenerSpan.name("on-message").start(consumerFinish);
}
try (SpanInScope ws = tracer.withSpanInScope(listenerSpan)) {
return methodInvocation.proceed();
} catch (Throwable t) {
listenerSpan.error(t);
throw t;
} finally {
listenerSpan.finish();
}
}
代码示例来源:origin: spring-projects/spring-integration
if (reply != null) {
Address replyTo;
String replyToProperty = message.getMessageProperties().getReplyTo();
if (replyToProperty != null) {
replyTo = new Address(replyToProperty);
MessageProperties messageProperties = message1.getMessageProperties();
String contentEncoding = messageProperties.getContentEncoding();
long contentLength = messageProperties.getContentLength();
String contentType = messageProperties.getContentType();
AmqpInboundGateway.this.headerMapper.fromHeadersToReply(reply.getHeaders(),
代码示例来源:origin: macrozheng/mall
@Override
public Message postProcessMessage(Message message) throws AmqpException {
//给消息设置延迟毫秒值
message.getMessageProperties().setExpiration(String.valueOf(delayTimes));
return message;
}
});
代码示例来源:origin: spring-projects/spring-amqp
@SuppressWarnings("unused")
public Message handleMessage(Message message) {
replyToWas.set(message.getMessageProperties().getReplyTo());
return new Message(new String(message.getBody()).toUpperCase().getBytes(),
message.getMessageProperties());
}
});
代码示例来源:origin: spring-projects/spring-integration
private org.springframework.messaging.Message<Object> createMessage(Message message, Channel channel) {
Object payload = AmqpInboundChannelAdapter.this.messageConverter.fromMessage(message);
Map<String, Object> headers = AmqpInboundChannelAdapter.this.headerMapper
.toHeadersFromRequest(message.getMessageProperties());
if (AmqpInboundChannelAdapter.this.messageListenerContainer.getAcknowledgeMode()
== AcknowledgeMode.MANUAL) {
headers.put(AmqpHeaders.DELIVERY_TAG, message.getMessageProperties().getDeliveryTag());
headers.put(AmqpHeaders.CHANNEL, channel);
}
if (AmqpInboundChannelAdapter.this.retryTemplate != null) {
headers.put(IntegrationMessageHeaderAccessor.DELIVERY_ATTEMPT, new AtomicInteger());
}
final org.springframework.messaging.Message<Object> messagingMessage = getMessageBuilderFactory()
.withPayload(payload)
.copyHeaders(headers)
.build();
return messagingMessage;
}
代码示例来源:origin: spring-projects/spring-amqp
@Test
public void testInferredTypeInfo() {
byte[] bytes = "{\"name\" : \"foo\" }".getBytes();
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType("application/json");
messageProperties.setInferredArgumentType(Foo.class);
Message message = new Message(bytes, messageProperties);
Object foo = this.converter.fromMessage(message);
assertThat(foo, instanceOf(Foo.class));
}
代码示例来源:origin: spring-projects/spring-amqp
@Test
public void testConvert4Args() throws Exception {
ListenableFuture<String> future = this.asyncTemplate.convertSendAndReceive("", this.requests.getName(), "foo",
message -> {
String body = new String(message.getBody());
return new Message((body + "bar").getBytes(), message.getMessageProperties());
});
checkConverterResult(future, "FOOBAR");
}
代码示例来源:origin: prontera/spring-cloud-rest-tcc
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
final String failedMessage = new String(message.getBody(), Charsets.UTF_8);
try {
final String guid = Jacksons.getMapper().readTree(failedMessage).get("guid").asText();
final EventPublisher publisher = new EventPublisher();
publisher.setGuid(guid);
if (EventStatus.NO_ROUTE.name().equalsIgnoreCase(replyText)) {
publisher.setEventStatus(EventStatus.NO_ROUTE);
} else {
logReturnedFault(replyCode, replyText, exchange, routingKey, failedMessage);
publisher.setEventStatus(EventStatus.ERROR);
}
// 因为在basic.return之后会调用basic.ack,鄙人认为NO_ROUTE的状态有可能被错误地转换成为NOT_FOUND,所以不需要考虑竞争情况
publisherMapper.updateByGuidSelective(publisher);
} catch (IOException e) {
logReturnedFault(replyCode, replyText, exchange, routingKey, failedMessage);
}
}
代码示例来源:origin: spring-projects/spring-amqp
@Test
public void messageToString() {
SimpleMessageConverter converter = new SimpleMessageConverter();
Message message = new Message("test".getBytes(), new MessageProperties());
message.getMessageProperties().setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);
Object result = converter.fromMessage(message);
assertEquals(String.class, result.getClass());
assertEquals("test", result);
}
代码示例来源:origin: openzipkin/brave
TraceContextOrSamplingFlags extractAndClearHeaders(Message message) {
MessageProperties messageProperties = message.getMessageProperties();
TraceContextOrSamplingFlags extracted = extractor.extract(messageProperties);
Map<String, Object> headers = messageProperties.getHeaders();
clearHeaders(headers);
return extracted;
}
代码示例来源:origin: spring-projects/spring-amqp
@Test
public void testDefaultTypeConfig() {
byte[] bytes = "<root><name>foo</name></root>".getBytes();
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType("application/xml");
Message message = new Message(bytes, messageProperties);
Object foo = xmlConverterWithDefaultType.fromMessage(message);
assertThat(foo).isInstanceOf(Foo.class);
}
代码示例来源:origin: spring-projects/spring-amqp
@Test
public void testSimpleBatchTimeout() throws Exception {
BatchingStrategy batchingStrategy = new SimpleBatchingStrategy(2, Integer.MAX_VALUE, 50);
BatchingRabbitTemplate template = new BatchingRabbitTemplate(batchingStrategy, this.scheduler);
template.setConnectionFactory(this.connectionFactory);
MessageProperties props = new MessageProperties();
Message message = new Message("foo".getBytes(), props);
template.send("", ROUTE, message);
message = receive(template);
assertEquals("foo", new String(message.getBody()));
}
内容来源于网络,如有侵权,请联系作者删除!