org.apache.qpid.proton.message.Message类的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(10.3k)|赞(0)|评价(0)|浏览(149)

本文整理了Java中org.apache.qpid.proton.message.Message类的一些代码示例,展示了Message类的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Message类的具体详情如下:
包路径:org.apache.qpid.proton.message.Message
类名称:Message

Message介绍

[英]Represents a Message within Proton. Create instances of Message using a MessageFactory implementation.
[中]代表质子内部的信息。使用MessageFactory实现创建消息实例。

代码示例

代码示例来源:origin: vert-x3/vertx-examples

@Override
 public void start() throws Exception {
  ProtonClient client = ProtonClient.create(vertx);

  client.connect("localhost", 5672, res -> {
   if(!res.succeeded()) {
    System.out.println("Connect failed: " + res.cause());
    return;
   }

   ProtonConnection connection = res.result();
   connection.open();

   connection.createReceiver(address).handler((delivery, msg) -> {
    String content = (String) ((AmqpValue)  msg.getBody()).getValue();
    System.out.println("Received message with content: " + content);

    // By default, receivers automatically accept (and settle) the delivery
    // when the handler returns, if no other disposition has been applied.
    // To change this and always manage dispositions yourself, use the
    // setAutoAccept method on the receiver.
   }).open();
  });
 }
}

代码示例来源:origin: org.apache.qpid/proton-j-impl

private void restoreMessage(Message m)
{
  m.setAddress(_original);
}

代码示例来源:origin: eclipse/hono

/**
 * Gets the content type of the AMQP 1.0 message.
 *
 * @return The content type of the AMQP 1.0 message.
 */
String getMessageContentType() {
  return message.getContentType();
}

代码示例来源:origin: vert-x3/vertx-examples

String address = remoteTarget.getAddress() ;
if (address == null) {
 address = msg.getAddress();
Section body = msg.getBody();
if (body instanceof AmqpValue) {
 String content = (String) ((AmqpValue) body).getValue();
 System.out.println("Received message for address: " + address + ", body: " + content);

代码示例来源:origin: org.apache.beam/beam-sdks-java-io-amqp

@Test
 public void encodeDecodeLargeMessage() throws Exception {
  Message message = Message.Factory.create();
  message.setAddress("address");
  message.setSubject("subject");
  String body = Joiner.on("").join(Collections.nCopies(32 * 1024 * 1024, " "));
  message.setBody(new AmqpValue(body));

  AmqpMessageCoder coder = AmqpMessageCoder.of();

  Message clone = CoderUtils.clone(coder, message);

  clone.getBody().toString().equals(message.getBody().toString());
 }
}

代码示例来源:origin: EnMasseProject/enmasse

private static List<List<String>> collectRouter(SyncRequestClient client, String entityType, List<String> attributeNames) throws Exception {
    Map<String, Object> properties = new LinkedHashMap<>();
    properties.put("operation", "QUERY");
    properties.put("entityType", entityType);
    Map<String, Object> body = new LinkedHashMap<>();

    body.put("attributeNames", attributeNames);

    Message message = Proton.message();
    message.setApplicationProperties(new ApplicationProperties(properties));
    message.setBody(new AmqpValue(body));

    Message response = client.request(message, 10, TimeUnit.SECONDS);
    AmqpValue value = (AmqpValue) response.getBody();
    Map<?,?> values = (Map<?,?>) value.getValue();

    @SuppressWarnings("unchecked")
    List<List<String>> results = (List<List<String>>) values.get("results");
    return results;
  }
}

代码示例来源:origin: Azure/azure-event-hubs-java

Message toAmqpMessage(final String partitionKey) {
  final Message amqpMessage = this.toAmqpMessage();
  final MessageAnnotations messageAnnotations = (amqpMessage.getMessageAnnotations() == null)
      ? new MessageAnnotations(new HashMap<>())
      : amqpMessage.getMessageAnnotations();
  messageAnnotations.getValue().put(AmqpConstants.PARTITION_KEY, partitionKey);
  amqpMessage.setMessageAnnotations(messageAnnotations);
  return amqpMessage;
}

代码示例来源:origin: org.apache.beam/beam-sdks-java-io-amqp

@Test
public void encodeDecode() throws Exception {
 Message message = Message.Factory.create();
 message.setBody(new AmqpValue("body"));
 message.setAddress("address");
 message.setSubject("test");
 AmqpMessageCoder coder = AmqpMessageCoder.of();
 Message clone = CoderUtils.clone(coder, message);
 assertEquals("AmqpValue{body}", clone.getBody().toString());
 assertEquals("address", clone.getAddress());
 assertEquals("test", clone.getSubject());
}

代码示例来源:origin: Azure/azure-service-bus-java

public static Symbol getResponseErrorCondition(Message responseMessage)
{
  Symbol errorCondition = (Symbol)responseMessage.getApplicationProperties().getValue().get(ClientConstants.REQUEST_RESPONSE_ERROR_CONDITION);
  if(errorCondition == null)
  {
    errorCondition = (Symbol)responseMessage.getApplicationProperties().getValue().get(ClientConstants.REQUEST_RESPONSE_LEGACY_ERROR_CONDITION);
  }
  return errorCondition;
}

代码示例来源:origin: EnMasseProject/enmasse

public Message request(Message message, long timeout, TimeUnit timeUnit) {
  Map<String, Object> properties = new HashMap<>();
  if (message.getApplicationProperties() != null) {
    properties.putAll(message.getApplicationProperties().getValue());
  }
  message.setApplicationProperties(new ApplicationProperties(properties));
  if (message.getReplyTo() == null) {
    message.setReplyTo(replyTo);
  }
  context.runOnContext(h -> sender.send(message));
  try {
    return replies.poll(timeout, timeUnit);
  } catch (InterruptedException e) {
    throw new RuntimeException(e);
  }
}

代码示例来源:origin: org.apache.qpid/proton-hawtdispatch

public Message createTextMessage(String value) {
  Message msg = Message.Factory.create();
  Section body = new AmqpValue(value);
  msg.setBody(body);
  return msg;
}

代码示例来源:origin: strimzi/strimzi-kafka-bridge

@Override
public Message toMessage(String address, KafkaConsumerRecord<String, byte[]> record) {
  
  Message message = Proton.message();
  message.setAddress(address);
  
  // put message annotations about partition, offset and key (if not null)
  Map<Symbol, Object> map = new HashMap<>();
  map.put(Symbol.valueOf(AmqpBridge.AMQP_PARTITION_ANNOTATION), record.partition());
  map.put(Symbol.valueOf(AmqpBridge.AMQP_OFFSET_ANNOTATION), record.offset());
  map.put(Symbol.valueOf(AmqpBridge.AMQP_KEY_ANNOTATION), record.key());
  map.put(Symbol.valueOf(AmqpBridge.AMQP_TOPIC_ANNOTATION), record.topic());
  
  MessageAnnotations messageAnnotations = new MessageAnnotations(map);
  message.setMessageAnnotations(messageAnnotations);
  
  message.setBody(new Data(new Binary(record.value())));
  
  return message;
}

代码示例来源:origin: org.apache.qpid/proton-hawtdispatch

public Message createBinaryMessage(byte value[], int offset, int len) {
    Message msg = Message.Factory.create();
    Data body = new Data(new Binary(value, offset,len));
    msg.setBody(body);
    return msg;
  }
}

代码示例来源:origin: Azure/azure-service-bus-java

private static Message createRequestMessageFromValueBody(String operation, Object valueBody, Duration timeout, String associatedLinkName)
{
  Message requestMessage = Message.Factory.create();
  requestMessage.setBody(new AmqpValue(valueBody));
  HashMap applicationPropertiesMap = new HashMap();
  applicationPropertiesMap.put(ClientConstants.REQUEST_RESPONSE_OPERATION_NAME, operation);
  applicationPropertiesMap.put(ClientConstants.REQUEST_RESPONSE_TIMEOUT, timeout.toMillis());
  if(!StringUtil.isNullOrEmpty(associatedLinkName))
  {
    applicationPropertiesMap.put(ClientConstants.REQUEST_RESPONSE_ASSOCIATED_LINK_NAME, associatedLinkName);
  }
  requestMessage.setApplicationProperties(new ApplicationProperties(applicationPropertiesMap));
  return requestMessage;
}

代码示例来源:origin: eclipse/hono

/**
 * Set the payload of the message using a {@link Data} section.
 * <p>
 * If the payload is {@code null}, then neither the payload, nor content type will be set.
 * </p>
 * 
 * @param message The message to update.
 * @param contentType An optional content type.
 * @param payload The optional message payload.
 * 
 * @throws NullPointerException If the parameter {@code message} was {@code null}.
 */
public static void setPayload(final Message message, final String contentType, final byte[] payload) {
  Objects.requireNonNull(message);
  if (contentType != null) {
    message.setContentType(contentType);
  }
  if (payload != null) {
    message.setBody(new Data(new Binary(payload)));
  }
}

代码示例来源:origin: io.vertx/vertx-amqp-bridge

@Test
public void testJSON_to_AMQP_VerifyDataBody() {
 String testContent = "myTestContent";
 JsonObject jsonObject = new JsonObject();
 jsonObject.put(AmqpConstants.BODY, testContent.getBytes(StandardCharsets.UTF_8));
 jsonObject.put(AmqpConstants.BODY_TYPE, AmqpConstants.BODY_TYPE_DATA);
 Message protonMsg = translator.convertToAmqpMessage(jsonObject);
 assertNotNull("Expected converted msg", protonMsg);
 Section body = protonMsg.getBody();
 assertTrue("Unexpected body type", body instanceof Data);
 assertNotNull("Unexpected body content", body);
 assertEquals("Unexpected message body value", new Binary(testContent.getBytes(StandardCharsets.UTF_8)),
   ((Data) body).getValue());
}

代码示例来源:origin: EnMasseProject/enmasse

private static boolean isMessageReplicated(Message message) {
    MessageAnnotations annotations = message.getMessageAnnotations();
    return annotations != null && annotations.getValue().containsKey(replicated);
  }
}

代码示例来源:origin: EnMasseProject/enmasse

public Future<Integer> sendMessages(String address, List<String> messages, Predicate<Message> predicate) {
  List<Message> messageList = messages.stream()
      .map(body -> {
        Message message = Message.Factory.create();
        message.setBody(new AmqpValue(body));
        message.setAddress(address);
        return message;
      })
      .collect(Collectors.toList());
  return sendMessages(address, messageList, predicate);
}

代码示例来源:origin: strimzi/strimzi-kafka-bridge

@Override
public Message toMessage(String address, KafkaConsumerRecord<String, byte[]> record) {
  
  Message message = Proton.message();
  message.setAddress(address);
  
  message.decode(record.value(), 0, record.value().length);
  
  // put message annotations about partition, offset and key (if not null)
  Map<Symbol, Object> map = new HashMap<>();
  map.put(Symbol.valueOf(AmqpBridge.AMQP_PARTITION_ANNOTATION), record.partition());
  map.put(Symbol.valueOf(AmqpBridge.AMQP_OFFSET_ANNOTATION), record.offset());
  map.put(Symbol.valueOf(AmqpBridge.AMQP_KEY_ANNOTATION), record.key());
  map.put(Symbol.valueOf(AmqpBridge.AMQP_TOPIC_ANNOTATION), record.topic());
  
  MessageAnnotations messageAnnotations = new MessageAnnotations(map);
  message.setMessageAnnotations(messageAnnotations);
  
  return message;
}

代码示例来源:origin: strimzi/strimzi-kafka-bridge

String topic = (message.getAddress() == null) ?
    kafkaTopic :
    message.getAddress().replace('/', '.');
int encoded = message.encode(buffer, 0, AmqpRawMessageConverter.BUFFER_SIZE);
value = Arrays.copyOfRange(buffer, 0, encoded);
MessageAnnotations messageAnnotations = message.getMessageAnnotations();
  partition = messageAnnotations.getValue().get(Symbol.getSymbol(AmqpBridge.AMQP_PARTITION_ANNOTATION));
  key = messageAnnotations.getValue().get(Symbol.getSymbol(AmqpBridge.AMQP_KEY_ANNOTATION));

相关文章

微信公众号

最新文章

更多