org.apache.activemq.artemis.api.core.Message.copy()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(7.8k)|赞(0)|评价(0)|浏览(129)

本文整理了Java中org.apache.activemq.artemis.api.core.Message.copy()方法的一些代码示例,展示了Message.copy()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Message.copy()方法的具体详情如下:
包路径:org.apache.activemq.artemis.api.core.Message
类名称:Message
方法名:copy

Message.copy介绍

[英]It will generate a new instance of the message encode, being a deep copy, new properties, new everything
[中]它将生成消息编码的一个新实例,即深度副本、新属性和新的一切

代码示例

代码示例来源:origin: apache/activemq-artemis

Message msg = ref.getMessage().copy();

代码示例来源:origin: apache/activemq-artemis

@Override
protected Message beforeForward(final Message message, final SimpleString forwardingAddress) {
 // We make a copy of the message, then we strip out the unwanted routing id headers and leave
 // only
 // the one pertinent for the address node - this is important since different queues on different
 // nodes could have same queue ids
 // Note we must copy since same message may get routed to other nodes which require different headers
 Message messageCopy = message.copy();
 if (logger.isTraceEnabled()) {
   logger.trace("Clustered bridge  copied message " + message + " as " + messageCopy + " before delivery");
 }
 // TODO - we can optimise this
 Set<SimpleString> propNames = new HashSet<>(messageCopy.getPropertyNames());
 byte[] queueIds = message.getExtraBytesProperty(idsHeaderName);
 if (queueIds == null) {
   // Sanity check only
   ActiveMQServerLogger.LOGGER.noQueueIdDefined(message, messageCopy, idsHeaderName);
   throw new IllegalStateException("no queueIDs defined");
 }
 for (SimpleString propName : propNames) {
   if (propName.startsWith(Message.HDR_ROUTE_TO_IDS)) {
    messageCopy.removeProperty(propName);
   }
 }
 messageCopy.putExtraBytesProperty(Message.HDR_ROUTE_TO_IDS, queueIds);
 messageCopy = super.beforeForward(messageCopy, forwardingAddress);
 return messageCopy;
}

代码示例来源:origin: apache/activemq-artemis

originalMessage.putStringProperty(Message.HDR_ROUTING_TYPE.toString(), routingTypeString);
Message message = originalMessage.copy();

代码示例来源:origin: apache/activemq-artemis

private Message makeCopy(final MessageReference ref,
            final boolean expiry,
            final boolean copyOriginalHeaders) throws Exception {
 if (ref == null) {
   ActiveMQServerLogger.LOGGER.nullRefMessage();
   throw new ActiveMQNullRefException("Reference to message is null");
 }
 Message message = ref.getMessage();
 /*
  We copy the message and send that to the dla/expiry queue - this is
  because otherwise we may end up with a ref with the same message id in the
  queue more than once which would barf - this might happen if the same message had been
  expire from multiple subscriptions of a topic for example
  We set headers that hold the original message address, expiry time
  and original message id
 */
 long newID = storageManager.generateID();
 Message copy = message.copy(newID);
 if (copyOriginalHeaders) {
   copy.referenceOriginalMessage(message, ref.getQueue().getName().toString());
 }
 copy.setExpiration(0);
 if (expiry) {
   copy.setAnnotation(Message.HDR_ACTUAL_EXPIRY_TIME, System.currentTimeMillis());
 }
 copy.reencode();
 return copy;
}

代码示例来源:origin: apache/activemq-artemis

Message message = messageRef.getMessage().copy();

代码示例来源:origin: apache/activemq-artemis

Message copyRedistribute = message.copy(storageManager.generateID());
copyRedistribute.setAddress(originatingQueue.getAddress());

代码示例来源:origin: apache/activemq-artemis

copy = message.copy(id);

代码示例来源:origin: apache/activemq-artemis

/**
* FIXME
* Retained messages should be handled in the core API.  There is currently no support for retained messages
* at the time of writing.  Instead we handle retained messages here.  This method will create a new queue for
* every address that is used to store retained messages.  THere should only ever be one message in the retained
* message queue.  When a new subscription is created the queue should be browsed and the message copied onto
* the subscription queue for the consumer.  When a new retained message is received the message will be sent to
* the retained queue and the previous retain message consumed to remove it from the queue.
*/
void handleRetainedMessage(Message message, String address, boolean reset, Transaction tx) throws Exception {
 SimpleString retainAddress = new SimpleString(MQTTUtil.convertMQTTAddressFilterToCoreRetain(address, session.getWildcardConfiguration()));
 Queue queue = session.getServer().locateQueue(retainAddress);
 if (queue == null) {
   queue = session.getServer().createQueue(retainAddress, retainAddress, null, true, false);
 }
 try (LinkedListIterator<MessageReference> iterator = queue.iterator()) {
   synchronized (queue) {
    if (iterator.hasNext()) {
      MessageReference ref = iterator.next();
      iterator.remove();
      queue.acknowledge(tx, ref);
    }
    if (!reset) {
      sendToQueue(message.copy(session.getServer().getStorageManager().generateID()), queue, tx);
    }
   }
 }
}

代码示例来源:origin: org.apache.activemq/artemis-mqtt-protocol

/**
* FIXME
* Retained messages should be handled in the core API.  There is currently no support for retained messages
* at the time of writing.  Instead we handle retained messages here.  This method will create a new queue for
* every address that is used to store retained messages.  THere should only ever be one message in the retained
* message queue.  When a new subscription is created the queue should be browsed and the message copied onto
* the subscription queue for the consumer.  When a new retained message is received the message will be sent to
* the retained queue and the previous retain message consumed to remove it from the queue.
*/
void handleRetainedMessage(Message message, String address, boolean reset, Transaction tx) throws Exception {
 SimpleString retainAddress = new SimpleString(MQTTUtil.convertMQTTAddressFilterToCoreRetain(address, session.getWildcardConfiguration()));
 Queue queue = session.getServer().locateQueue(retainAddress);
 if (queue == null) {
   queue = session.getServer().createQueue(retainAddress, retainAddress, null, true, false);
 }
 try (LinkedListIterator<MessageReference> iterator = queue.iterator()) {
   synchronized (queue) {
    if (iterator.hasNext()) {
      MessageReference ref = iterator.next();
      iterator.remove();
      queue.acknowledge(tx, ref);
    }
    if (!reset) {
      sendToQueue(message.copy(session.getServer().getStorageManager().generateID()), queue, tx);
    }
   }
 }
}

代码示例来源:origin: apache/activemq-artemis

final SimpleString address = SimpleString.toSimpleString(physicalName, coreMessageObjectPools.getAddressStringSimpleStringPool());
final org.apache.activemq.artemis.api.core.Message coreMsg = (i == actualDestinationsCount - 1) ? originalCoreMsg : originalCoreMsg.copy();
coreMsg.setAddress(address);

代码示例来源:origin: apache/activemq-artemis

void addRetainedMessagesToQueue(Queue queue, String address) throws Exception {
 // The address filter that matches all retained message queues.
 String retainAddress = MQTTUtil.convertMQTTAddressFilterToCoreRetain(address, session.getWildcardConfiguration());
 BindingQueryResult bindingQueryResult = session.getServerSession().executeBindingQuery(new SimpleString(retainAddress));
 // Iterate over all matching retain queues and add the queue
 Transaction tx = session.getServerSession().newTransaction();
 try {
   synchronized (queue) {
    for (SimpleString retainedQueueName : bindingQueryResult.getQueueNames()) {
      Queue retainedQueue = session.getServer().locateQueue(retainedQueueName);
      try (LinkedListIterator<MessageReference> i = retainedQueue.iterator()) {
       if (i.hasNext()) {
         Message message = i.next().getMessage().copy(session.getServer().getStorageManager().generateID());
         sendToQueue(message, queue, tx);
       }
      }
    }
   }
 } catch (Throwable t) {
   tx.rollback();
   throw t;
 }
 tx.commit();
}

代码示例来源:origin: org.apache.activemq/artemis-mqtt-protocol

void addRetainedMessagesToQueue(Queue queue, String address) throws Exception {
 // The address filter that matches all retained message queues.
 String retainAddress = MQTTUtil.convertMQTTAddressFilterToCoreRetain(address, session.getWildcardConfiguration());
 BindingQueryResult bindingQueryResult = session.getServerSession().executeBindingQuery(new SimpleString(retainAddress));
 // Iterate over all matching retain queues and add the queue
 Transaction tx = session.getServerSession().newTransaction();
 try {
   synchronized (queue) {
    for (SimpleString retainedQueueName : bindingQueryResult.getQueueNames()) {
      Queue retainedQueue = session.getServer().locateQueue(retainedQueueName);
      try (LinkedListIterator<MessageReference> i = retainedQueue.iterator()) {
       if (i.hasNext()) {
         Message message = i.next().getMessage().copy(session.getServer().getStorageManager().generateID());
         sendToQueue(message, queue, tx);
       }
      }
    }
   }
 } catch (Throwable t) {
   tx.rollback();
   throw t;
 }
 tx.commit();
}

相关文章

微信公众号

最新文章

更多