org.apache.qpid.proton.message.Message.decode()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(7.9k)|赞(0)|评价(0)|浏览(108)

本文整理了Java中org.apache.qpid.proton.message.Message.decode()方法的一些代码示例,展示了Message.decode()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Message.decode()方法的具体详情如下:
包路径:org.apache.qpid.proton.message.Message
类名称:Message
方法名:decode

Message.decode介绍

[英]Decodes the Message from the given ReadableBuffer.

If the buffer given does not contain the fully encoded Message bytes for decode this method will throw an exception to indicate the buffer underflow condition and the message object will be left in an undefined state.
[中]解码来自给定ReadableBuffer的消息。
如果给定的缓冲区不包含用于解码的完全编码的消息字节,此方法将引发异常,以指示缓冲区下溢的情况,消息对象将处于未定义状态。

代码示例

代码示例来源:origin: org.apache.qpid/proton-hawtdispatch

static Message decode(Buffer buffer) {
  Message msg = Message.Factory.create();
  int offset = buffer.offset;
  int len = buffer.length;
  while( len > 0 ) {
    int decoded = msg.decode(buffer.data, offset, len);
    assert decoded > 0: "Make progress decoding the message";
    offset += decoded;
    len -= decoded;
  }
  return msg;
}

代码示例来源:origin: org.apache.qpid/qpid-jms-client

/**
 * Given a byte buffer that represents an encoded AMQP Message instance,
 * decode and return the Message.
 *
 * @param encodedBytes
 *      the bytes that represent an encoded AMQP Message.
 *
 * @return a new Message instance with the decoded data.
 */
public static Message decodeMessage(ByteBuf encodedBytes) {
  // For now we must fully decode the message to get at the annotations.
  Message protonMessage = Message.Factory.create();
  protonMessage.decode(encodedBytes.array(), 0, encodedBytes.readableBytes());
  return protonMessage;
}

代码示例来源:origin: apache/qpid-jms

/**
 * Given a byte buffer that represents an encoded AMQP Message instance,
 * decode and return the Message.
 *
 * @param encodedBytes
 *      the bytes that represent an encoded AMQP Message.
 *
 * @return a new Message instance with the decoded data.
 */
public static Message decodeMessage(ByteBuf encodedBytes) {
  // For now we must fully decode the message to get at the annotations.
  Message protonMessage = Message.Factory.create();
  protonMessage.decode(encodedBytes.array(), 0, encodedBytes.readableBytes());
  return protonMessage;
}

代码示例来源:origin: apache/activemq-artemis

protected Message decodeIncomingMessage(Delivery incoming) {
 int count;
 byte[] chunk = new byte[2048];
 ByteArrayOutputStream stream = new ByteArrayOutputStream();
 while ((count = getEndpoint().recv(chunk, 0, chunk.length)) > 0) {
   stream.write(chunk, 0, count);
 }
 byte[] messageBytes = stream.toByteArray();
 try {
   Message protonMessage = Message.Factory.create();
   protonMessage.decode(messageBytes, 0, messageBytes.length);
   return protonMessage;
 } finally {
   try {
    stream.close();
   } catch (IOException e) {
   }
 }
}

代码示例来源:origin: org.apache.beam/beam-sdks-java-io-amqp

@Override
 public Message decode(InputStream inStream) throws CoderException, IOException {
  Message message = Message.Factory.create();
  int bytesToRead = VarInt.decodeInt(inStream);
  byte[] encodedMessage = new byte[bytesToRead];
  ByteStreams.readFully(inStream, encodedMessage);
  message.decode(encodedMessage, 0, encodedMessage.length);
  return message;
 }
}

代码示例来源:origin: Azure/azure-service-bus-java

static Message readMessageFromDelivery(Receiver receiveLink, Delivery delivery)
{
  int msgSize = delivery.pending();
  byte[] buffer = new byte[msgSize];
  
  int read = receiveLink.recv(buffer, 0, msgSize);
  
  Message message = Proton.message();
  message.decode(buffer, 0, read);
  return message;	    
}

代码示例来源:origin: org.apache.activemq/activemq-all

public Message decode() throws Exception {
  Message amqp = Message.Factory.create();
  int offset = getArrayOffset();
  int len = getLength();
  while (len > 0) {
    final int decoded = amqp.decode(getArray(), offset, len);
    assert decoded > 0 : "Make progress decoding the message";
    offset += decoded;
    len -= decoded;
  }
  return amqp;
}

代码示例来源:origin: org.apache.activemq/activemq-osgi

public Message decode() throws Exception {
  Message amqp = Message.Factory.create();
  int offset = getArrayOffset();
  int len = getLength();
  while (len > 0) {
    final int decoded = amqp.decode(getArray(), offset, len);
    assert decoded > 0 : "Make progress decoding the message";
    offset += decoded;
    len -= decoded;
  }
  return amqp;
}

代码示例来源:origin: org.apache.qpid/proton-jms

public Message decode() throws Exception {
  Message amqp = Message.Factory.create();
  int offset = getArrayOffset();
  int len = getLength();
  while( len > 0 ) {
    final int decoded = amqp.decode(getArray(), offset, len);
    assert decoded > 0: "Make progress decoding the message";
    offset += decoded;
    len -= decoded;
  }
  return amqp;
}

代码示例来源:origin: Azure/azure-event-hubs-java

@Override
public void onReceiveComplete(Delivery delivery) {
  final Message response = Proton.message();
  final int msgSize = delivery.pending();
  final byte[] buffer = new byte[msgSize];
  final int read = receiveLink.recv(buffer, 0, msgSize);
  response.decode(buffer, 0, read);
  delivery.settle();
  final OperationResult<Message, Exception> responseCallback = inflightRequests.remove(response.getCorrelationId());
  if (responseCallback != null)
    responseCallback.onComplete(response);
}

代码示例来源:origin: Azure/azure-service-bus-java

peekedMessage.decode(messagePayLoad.getArray(), messagePayLoad.getArrayOffset(), messagePayLoad.getLength());
peekedMessages.add(peekedMessage);

代码示例来源:origin: strimzi/strimzi-kafka-bridge

@Override
public Message toMessage(String address, KafkaConsumerRecord<String, byte[]> record) {
  
  Message message = Proton.message();
  message.setAddress(address);
  
  message.decode(record.value(), 0, record.value().length);
  
  // put message annotations about partition, offset and key (if not null)
  Map<Symbol, Object> map = new HashMap<>();
  map.put(Symbol.valueOf(AmqpBridge.AMQP_PARTITION_ANNOTATION), record.partition());
  map.put(Symbol.valueOf(AmqpBridge.AMQP_OFFSET_ANNOTATION), record.offset());
  map.put(Symbol.valueOf(AmqpBridge.AMQP_KEY_ANNOTATION), record.key());
  map.put(Symbol.valueOf(AmqpBridge.AMQP_TOPIC_ANNOTATION), record.topic());
  
  MessageAnnotations messageAnnotations = new MessageAnnotations(map);
  message.setMessageAnnotations(messageAnnotations);
  
  return message;
}

代码示例来源:origin: Azure/azure-event-hubs-java

@Override
public void onReceiveComplete(Delivery delivery) {
  int msgSize = delivery.pending();
  byte[] buffer = new byte[msgSize];
  int read = receiveLink.recv(buffer, 0, msgSize);
  Message message = Proton.message();
  message.decode(buffer, 0, read);
  delivery.settle();
  this.prefetchedMessages.add(message);
  this.underlyingFactory.getRetryPolicy().resetRetryCount(this.getClientId());
  this.receiveWork.onEvent();
}

代码示例来源:origin: Azure/azure-service-bus-java

receivedMessage.decode(messagePayLoad.getArray(), messagePayLoad.getArrayOffset(), messagePayLoad.getLength());
UUID lockToken = ClientConstants.ZEROLOCKTOKEN;
if(((Map)message).containsKey(ClientConstants.REQUEST_RESPONSE_LOCKTOKEN))

代码示例来源:origin: org.apache.qpid/proton

public Message get()
{
  for (Connector c : _driver.connectors())
  {
    Connection connection = c.getConnection();
    _logger.log(Level.FINE, "Attempting to get message from " + connection);
    Delivery delivery = connection.getWorkHead();
    while (delivery != null)
    {
      if (delivery.isReadable())
      {
        _logger.log(Level.FINE, "Readable delivery found: " + delivery);
        int size = read((Receiver) delivery.getLink());
        Message message = new MessageImpl();
        message.decode(_buffer, 0, size);
        _incoming.add(delivery);
        _distributed--;
        delivery.getLink().advance();
        return message;
      }
      else
      {
        _logger.log(Level.FINE, "Delivery not readable: " + delivery);
        delivery = delivery.getWorkNext();
      }
    }
  }
  return null;
}

代码示例来源:origin: org.apache.qpid/proton-j-impl

public Message get()
{
  StoreEntry entry = _incomingStore.get( null );
  if (entry != null)
  {
    Message message = Proton.message();
    message.decode( entry.getEncodedMsg(), 0, entry.getEncodedLength() );
    _incomingTracker = new TrackerImpl(TrackerImpl.Type.INCOMING,
                      _incomingStore.trackEntry(entry));
    _incomingStore.freeEntry( entry );
    return message;
  }
  return null;
}

代码示例来源:origin: com.microsoft.azure.iot/proton-j-azure-iot

public Message get()
{
  StoreEntry entry = _incomingStore.get( null );
  if (entry != null)
  {
    Message message = Proton.message();
    message.decode( entry.getEncodedMsg(), 0, entry.getEncodedLength() );
    _incomingTracker = new TrackerImpl(TrackerImpl.Type.INCOMING,
                      _incomingStore.trackEntry(entry));
    _incomingStore.freeEntry( entry );
    return message;
  }
  return null;
}

代码示例来源:origin: org.apache.activemq/activemq-all

final int decoded = message.decode(deliveryBytes.data, offset, len);
assert decoded > 0 : "Make progress decoding the message";
offset += decoded;

代码示例来源:origin: Azure/azure-iot-sdk-java

msg.decode(buffer, 0, read);

代码示例来源:origin: Azure/azure-iot-sdk-java

msg.decode(buffer, 0, read);

相关文章

微信公众号

最新文章

更多