org.springframework.amqp.core.Message类的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(13.6k)|赞(0)|评价(0)|浏览(1028)

本文整理了Java中org.springframework.amqp.core.Message类的一些代码示例,展示了Message类的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Message类的具体详情如下:
包路径:org.springframework.amqp.core.Message
类名称:Message

Message介绍

[英]The 0-8 and 0-9-1 AMQP specifications do not define an Message class or interface. Instead, when performing an operation such as basicPublish the content is passed as a byte-array argument and additional properties are passed in as separate arguments. Spring AMQP defines a Message class as part of a more general AMQP domain model representation. The purpose of the Message class is to simply encapsulate the body and properties within a single instance so that the rest of the AMQP API can in turn be simpler.
[中]0-8和0-9-1 AMQP规范没有定义消息类或接口。相反,在执行basicPublish等操作时,内容作为字节数组参数传递,其他属性作为单独的参数传递。Spring AMQP将消息类定义为更通用的AMQP域模型表示的一部分。Message类的目的是简单地将主体和属性封装在一个实例中,从而使AMQP API的其余部分变得更简单。

代码示例

代码示例来源:origin: qiurunze123/miaosha

@RabbitListener(queues=MQConfig.MIAOSHATEST)
  public void receiveMiaoShaMessage(Message message, Channel channel) throws IOException {
    log.info("接受到的消息为:{}",message);
    String messRegister = new String(message.getBody(), "UTF-8");
    channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
    MiaoShaMessageVo msm  = RedisService.stringToBean(messRegister, MiaoShaMessageVo.class);
    messageService.insertMs(msm);
    }
}

代码示例来源:origin: FlowCI/flow-platform

public PriorityMessage(Message message) {
  super(message.getBody(), message.getMessageProperties());
  this.timestamp = System.nanoTime();
}

代码示例来源:origin: spring-projects/spring-amqp

@Test
public void stringToMessage() throws Exception {
  SimpleMessageConverter converter = new SimpleMessageConverter();
  Message message = converter.toMessage("test", new MessageProperties());
  String contentType = message.getMessageProperties().getContentType();
  String content = new String(message.getBody(),
      message.getMessageProperties().getContentEncoding());
  assertEquals("text/plain", contentType);
  assertEquals("test", content);
}

代码示例来源:origin: spring-projects/spring-integration

Object[] args = invocation.getArguments();
  org.springframework.amqp.core.Message amqpRequestMessage = (org.springframework.amqp.core.Message) args[2];
  MessageProperties properties = amqpRequestMessage.getMessageProperties();
  assertNull(properties.getHeaders().get("foo"));
  MessageProperties amqpProperties = new MessageProperties();
  amqpProperties.setAppId("test.appId");
  amqpProperties.setHeader("foobar", "foobar");
  amqpProperties.setHeader("bar", "bar");
  return new org.springframework.amqp.core.Message("hello".getBytes(), amqpProperties);
})
    .when(amqpTemplate).sendAndReceive(Mockito.any(String.class), Mockito.any(String.class),

代码示例来源:origin: spring-projects/spring-integration

@Test
public void withHeaderMapperDefaultMapping() throws Exception {
  AmqpInboundChannelAdapter adapter = context.getBean("withHeaderMapperDefaultMapping",
      AmqpInboundChannelAdapter.class);
  AbstractMessageListenerContainer mlc =
      TestUtils.getPropertyValue(adapter, "messageListenerContainer", AbstractMessageListenerContainer.class);
  ChannelAwareMessageListener listener = TestUtils.getPropertyValue(mlc, "messageListener",
      ChannelAwareMessageListener.class);
  MessageProperties amqpProperties = new MessageProperties();
  amqpProperties.setAppId("test.appId");
  amqpProperties.setClusterId("test.clusterId");
  amqpProperties.setContentEncoding("test.contentEncoding");
  amqpProperties.setContentLength(99L);
  amqpProperties.setContentType("test.contentType");
  amqpProperties.setHeader("foo", "foo");
  amqpProperties.setHeader("bar", "bar");
  Message amqpMessage = new Message("hello".getBytes(), amqpProperties);
  listener.onMessage(amqpMessage, null);
  QueueChannel requestChannel = context.getBean("requestChannel", QueueChannel.class);
  org.springframework.messaging.Message<?> siMessage = requestChannel.receive(0);
  assertNotNull(siMessage.getHeaders().get("bar"));
  assertNotNull(siMessage.getHeaders().get("foo"));
  assertNotNull(siMessage.getHeaders().get(AmqpHeaders.CONTENT_ENCODING));
  assertNotNull(siMessage.getHeaders().get(AmqpHeaders.CLUSTER_ID));
  assertNotNull(siMessage.getHeaders().get(AmqpHeaders.APP_ID));
  assertNotNull(siMessage.getHeaders().get(AmqpHeaders.CONTENT_TYPE));
}

代码示例来源:origin: spring-projects/spring-integration

Object[] args = invocation.getArguments();
  Message amqpReplyMessage = (Message) args[2];
  MessageProperties properties = amqpReplyMessage.getMessageProperties();
  assertEquals("bar", properties.getHeaders().get("bar"));
  return null;
}).when(amqpTemplate).send(Mockito.any(String.class), Mockito.any(String.class),
ChannelAwareMessageListener listener = TestUtils.getPropertyValue(mlc, "messageListener",
    ChannelAwareMessageListener.class);
MessageProperties amqpProperties = new MessageProperties();
amqpProperties.setAppId("test.appId");
amqpProperties.setClusterId("test.clusterId");
amqpProperties.setContentEncoding("test.contentEncoding");
amqpProperties.setHeader("foo", "foo");
amqpProperties.setHeader("bar", "bar");
Message amqpMessage = new Message("hello".getBytes(), amqpProperties);
listener.onMessage(amqpMessage, null);

代码示例来源:origin: spring-projects/spring-integration

@Test
public void adapterWithContentType() throws Exception {
  RabbitTemplate template = new RabbitTemplate(this.connectionFactory);
  template.setDefaultReceiveQueue(this.queue.getName());
  while (template.receive() != null) {
    // drain
  }
  Message<?> message = MessageBuilder.withPayload("hello")
      .setHeader(AmqpHeaders.CONTENT_TYPE, "application/json")
      .build();
  this.ctRequestChannel.send(message);
  org.springframework.amqp.core.Message m = receive(template);
  assertNotNull(m);
  assertEquals("\"hello\"", new String(m.getBody(), "UTF-8"));
  assertEquals("application/json", m.getMessageProperties().getContentType());
  assertEquals("java.lang.String",
      m.getMessageProperties().getHeaders().get(JsonHeaders.TYPE_ID.replaceFirst(JsonHeaders.PREFIX, "")));
  message = MessageBuilder.withPayload("hello")
      .build();
  this.ctRequestChannel.send(message);
  m = receive(template);
  assertNotNull(m);
  assertEquals("hello", new String(m.getBody(), "UTF-8"));
  assertEquals("text/plain", m.getMessageProperties().getContentType());
  while (template.receive() != null) {
    // drain
  }
}

代码示例来源:origin: spring-projects/spring-amqp

@Test
public void messagingMessageReturned() {
  Message message = org.springframework.amqp.core.MessageBuilder.withBody("\"messaging\"".getBytes())
    .andProperties(MessagePropertiesBuilder.newInstance().setContentType("application/json").build()).build();
  message = this.rabbitTemplate.sendAndReceive("test.messaging.message", message);
  assertThat(message, is(notNullValue()));
  assertThat(new String(message.getBody()), equalTo("{\"field\":\"MESSAGING\"}"));
  assertThat(message.getMessageProperties().getHeaders().get("foo"), equalTo("bar"));
}

代码示例来源:origin: openzipkin/brave

/**
 * MethodInterceptor for {@link SimpleMessageListenerContainer.ContainerDelegate#invokeListener(Channel,
 * Message)}
 */
@Override public Object invoke(MethodInvocation methodInvocation) throws Throwable {
 Message message = (Message) methodInvocation.getArguments()[1];
 TraceContextOrSamplingFlags extracted = springRabbitTracing.extractAndClearHeaders(message);
 // named for BlockingQueueConsumer.nextMessage, which we can't currently see
 Span consumerSpan = tracer.nextSpan(extracted);
 Span listenerSpan = tracer.newChild(consumerSpan.context());
 if (!consumerSpan.isNoop()) {
  setConsumerSpan(consumerSpan, message.getMessageProperties());
  // incur timestamp overhead only once
  long timestamp = tracing.clock(consumerSpan.context()).currentTimeMicroseconds();
  consumerSpan.start(timestamp);
  long consumerFinish = timestamp + 1L; // save a clock reading
  consumerSpan.finish(consumerFinish);
  // not using scoped span as we want to start with a pre-configured time
  listenerSpan.name("on-message").start(consumerFinish);
 }
 try (SpanInScope ws = tracer.withSpanInScope(listenerSpan)) {
  return methodInvocation.proceed();
 } catch (Throwable t) {
  listenerSpan.error(t);
  throw t;
 } finally {
  listenerSpan.finish();
 }
}

代码示例来源:origin: spring-projects/spring-integration

if (reply != null) {
  Address replyTo;
  String replyToProperty = message.getMessageProperties().getReplyTo();
  if (replyToProperty != null) {
    replyTo = new Address(replyToProperty);
        MessageProperties messageProperties = message1.getMessageProperties();
        String contentEncoding = messageProperties.getContentEncoding();
        long contentLength = messageProperties.getContentLength();
        String contentType = messageProperties.getContentType();
        AmqpInboundGateway.this.headerMapper.fromHeadersToReply(reply.getHeaders(),

代码示例来源:origin: macrozheng/mall

@Override
  public Message postProcessMessage(Message message) throws AmqpException {
    //给消息设置延迟毫秒值
    message.getMessageProperties().setExpiration(String.valueOf(delayTimes));
    return message;
  }
});

代码示例来源:origin: spring-projects/spring-amqp

@SuppressWarnings("unused")
  public Message handleMessage(Message message) {
    replyToWas.set(message.getMessageProperties().getReplyTo());
    return new Message(new String(message.getBody()).toUpperCase().getBytes(),
        message.getMessageProperties());
  }
});

代码示例来源:origin: spring-projects/spring-integration

private org.springframework.messaging.Message<Object> createMessage(Message message, Channel channel) {
  Object payload = AmqpInboundChannelAdapter.this.messageConverter.fromMessage(message);
  Map<String, Object> headers = AmqpInboundChannelAdapter.this.headerMapper
      .toHeadersFromRequest(message.getMessageProperties());
  if (AmqpInboundChannelAdapter.this.messageListenerContainer.getAcknowledgeMode()
      == AcknowledgeMode.MANUAL) {
    headers.put(AmqpHeaders.DELIVERY_TAG, message.getMessageProperties().getDeliveryTag());
    headers.put(AmqpHeaders.CHANNEL, channel);
  }
  if (AmqpInboundChannelAdapter.this.retryTemplate != null) {
    headers.put(IntegrationMessageHeaderAccessor.DELIVERY_ATTEMPT, new AtomicInteger());
  }
  final org.springframework.messaging.Message<Object> messagingMessage = getMessageBuilderFactory()
      .withPayload(payload)
      .copyHeaders(headers)
      .build();
  return messagingMessage;
}

代码示例来源:origin: spring-projects/spring-amqp

@Test
public void testInferredTypeInfo() {
  byte[] bytes = "{\"name\" : \"foo\" }".getBytes();
  MessageProperties messageProperties = new MessageProperties();
  messageProperties.setContentType("application/json");
  messageProperties.setInferredArgumentType(Foo.class);
  Message message = new Message(bytes, messageProperties);
  Object foo = this.converter.fromMessage(message);
  assertThat(foo, instanceOf(Foo.class));
}

代码示例来源:origin: spring-projects/spring-amqp

@Test
public void testConvert4Args() throws Exception {
  ListenableFuture<String> future = this.asyncTemplate.convertSendAndReceive("", this.requests.getName(), "foo",
      message -> {
        String body = new String(message.getBody());
        return new Message((body + "bar").getBytes(), message.getMessageProperties());
      });
  checkConverterResult(future, "FOOBAR");
}

代码示例来源:origin: prontera/spring-cloud-rest-tcc

@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
  final String failedMessage = new String(message.getBody(), Charsets.UTF_8);
  try {
    final String guid = Jacksons.getMapper().readTree(failedMessage).get("guid").asText();
    final EventPublisher publisher = new EventPublisher();
    publisher.setGuid(guid);
    if (EventStatus.NO_ROUTE.name().equalsIgnoreCase(replyText)) {
      publisher.setEventStatus(EventStatus.NO_ROUTE);
    } else {
      logReturnedFault(replyCode, replyText, exchange, routingKey, failedMessage);
      publisher.setEventStatus(EventStatus.ERROR);
    }
    // 因为在basic.return之后会调用basic.ack,鄙人认为NO_ROUTE的状态有可能被错误地转换成为NOT_FOUND,所以不需要考虑竞争情况
    publisherMapper.updateByGuidSelective(publisher);
  } catch (IOException e) {
    logReturnedFault(replyCode, replyText, exchange, routingKey, failedMessage);
  }
}

代码示例来源:origin: spring-projects/spring-amqp

@Test
public void messageToString() {
  SimpleMessageConverter converter = new SimpleMessageConverter();
  Message message = new Message("test".getBytes(), new MessageProperties());
  message.getMessageProperties().setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);
  Object result = converter.fromMessage(message);
  assertEquals(String.class, result.getClass());
  assertEquals("test", result);
}

代码示例来源:origin: openzipkin/brave

TraceContextOrSamplingFlags extractAndClearHeaders(Message message) {
 MessageProperties messageProperties = message.getMessageProperties();
 TraceContextOrSamplingFlags extracted = extractor.extract(messageProperties);
 Map<String, Object> headers = messageProperties.getHeaders();
 clearHeaders(headers);
 return extracted;
}

代码示例来源:origin: spring-projects/spring-amqp

@Test
public void testDefaultTypeConfig() {
  byte[] bytes = "<root><name>foo</name></root>".getBytes();
  MessageProperties messageProperties = new MessageProperties();
  messageProperties.setContentType("application/xml");
  Message message = new Message(bytes, messageProperties);
  Object foo = xmlConverterWithDefaultType.fromMessage(message);
  assertThat(foo).isInstanceOf(Foo.class);
}

代码示例来源:origin: spring-projects/spring-amqp

@Test
public void testSimpleBatchTimeout() throws Exception {
  BatchingStrategy batchingStrategy = new SimpleBatchingStrategy(2, Integer.MAX_VALUE, 50);
  BatchingRabbitTemplate template = new BatchingRabbitTemplate(batchingStrategy, this.scheduler);
  template.setConnectionFactory(this.connectionFactory);
  MessageProperties props = new MessageProperties();
  Message message = new Message("foo".getBytes(), props);
  template.send("", ROUTE, message);
  message = receive(template);
  assertEquals("foo", new String(message.getBody()));
}

相关文章

微信公众号

最新文章

更多