org.apache.activemq.artemis.api.core.Message.getMessageID()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(8.1k)|赞(0)|评价(0)|浏览(186)

本文整理了Java中org.apache.activemq.artemis.api.core.Message.getMessageID()方法的一些代码示例,展示了Message.getMessageID()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Message.getMessageID()方法的具体详情如下:
包路径:org.apache.activemq.artemis.api.core.Message
类名称:Message
方法名:getMessageID

Message.getMessageID介绍

[英]Returns the messageID.
The messageID is set when the message is handled by the server.
[中]返回messageID。
messageID是在服务器处理消息时设置的。

代码示例

代码示例来源:origin: wildfly/wildfly

@Override
public void expireMessage(final ClientConsumer consumer, Message message) throws ActiveMQException {
 SessionExpireMessage messagePacket = new SessionExpireMessage(getConsumerID(consumer), message.getMessageID());
 sessionChannel.send(messagePacket);
}

代码示例来源:origin: wildfly/wildfly

/**
* @return Returns the message in Map form, useful when encoding to JSON
*/
default Map<String, Object> toMap() {
 Map map = toPropertyMap();
 map.put("messageID", getMessageID());
 Object userID = getUserID();
 if (getUserID() != null) {
   map.put("userID", "ID:" + userID.toString());
 }
 map.put("address", getAddress() == null ? "" : getAddress());
 map.put("durable", isDurable());
 map.put("expiration", getExpiration());
 map.put("timestamp", getTimestamp());
 map.put("priority", (int)getPriority());
 return map;
}

代码示例来源:origin: wildfly/wildfly

@Override
public void sendACK(boolean individual,
          boolean block,
          final ClientConsumer consumer,
          final Message message) throws ActiveMQException {
 PacketImpl messagePacket;
 if (individual) {
   messagePacket = new SessionIndividualAcknowledgeMessage(getConsumerID(consumer), message.getMessageID(), block);
 } else {
   messagePacket = new SessionAcknowledgeMessage(getConsumerID(consumer), message.getMessageID(), block);
 }
 if (block) {
   sessionChannel.sendBlocking(messagePacket, PacketImpl.NULL_RESPONSE);
 } else {
   sessionChannel.sendBatched(messagePacket);
 }
}

代码示例来源:origin: wildfly/wildfly

/**
* Acknowledges all messages received by the consumer so far.
*/
@Override
public void acknowledge(final ClientConsumer consumer, final Message message) throws ActiveMQException {
 // if we're pre-acknowledging then we don't need to do anything
 if (preAcknowledge) {
   return;
 }
 checkClosed();
 if (logger.isDebugEnabled()) {
   logger.debug("client ack messageID = " + message.getMessageID());
 }
 startCall();
 try {
   sessionContext.sendACK(false, blockOnAcknowledge, consumer, message);
 } finally {
   endCall();
 }
}

代码示例来源:origin: wildfly/wildfly

/** Sub classes must add the first short as the protocol-id */
@Override
public void encode(ActiveMQBuffer buffer, Message record) {
 buffer.writeByte((byte)1);
 buffer.writeLong(record.getMessageID());
 buffer.writeNullableSimpleString(record.getAddressSimpleString());
 record.persist(buffer);
}

代码示例来源:origin: wildfly/wildfly

@Override
public void copyHeadersAndProperties(final Message msg) {
 messageID = msg.getMessageID();
 address = msg.getAddressSimpleString();
 userID = (UUID) msg.getUserID();
 type = msg.toCore().getType();
 durable = msg.isDurable();
 expiration = msg.getExpiration();
 timestamp = msg.getTimestamp();
 priority = msg.getPriority();
 if (msg instanceof CoreMessage) {
   properties = ((CoreMessage) msg).getTypedProperties();
 }
}

代码示例来源:origin: wildfly/wildfly

default void referenceOriginalMessage(final Message original, String originalQueue) {
 String queueOnMessage = original.getAnnotationString(Message.HDR_ORIGINAL_QUEUE);
 if (queueOnMessage != null) {
   setAnnotation(Message.HDR_ORIGINAL_QUEUE, queueOnMessage);
 } else if (originalQueue != null) {
   setAnnotation(Message.HDR_ORIGINAL_QUEUE, originalQueue);
 }
 Object originalID = original.getAnnotation(Message.HDR_ORIG_MESSAGE_ID);
 if (originalID != null) {
   setAnnotation(Message.HDR_ORIGINAL_ADDRESS, original.getAnnotationString(Message.HDR_ORIGINAL_ADDRESS));
   setAnnotation(Message.HDR_ORIG_MESSAGE_ID, originalID);
 } else {
   setAnnotation(Message.HDR_ORIGINAL_ADDRESS, original.getAddress());
   setAnnotation(Message.HDR_ORIG_MESSAGE_ID, original.getMessageID());
 }
 // reset expiry
 setExpiration(0);
}

代码示例来源:origin: apache/activemq-artemis

@Override
public long getMessageID() {
 if (messageID < 0) {
   messageID = getPagedMessage().getMessage().getMessageID();
 }
 return messageID;
}

代码示例来源:origin: apache/activemq-artemis

@Override
public void expireMessage(final ClientConsumer consumer, Message message) throws ActiveMQException {
 SessionExpireMessage messagePacket = new SessionExpireMessage(getConsumerID(consumer), message.getMessageID());
 sessionChannel.send(messagePacket);
}

代码示例来源:origin: apache/activemq-artemis

@Override
public void expireMessage(final ClientConsumer consumer, Message message) throws ActiveMQException {
 SessionExpireMessage messagePacket = new SessionExpireMessage(getConsumerID(consumer), message.getMessageID());
 sessionChannel.send(messagePacket);
}

代码示例来源:origin: apache/activemq-artemis

@Override
public int sendMessage(MessageReference reference,
           Message message,
           ServerConsumer consumer,
           int deliveryCount) {
 try {
   session.getMqttPublishManager().sendMessage(message.toCore(), consumer, deliveryCount);
 } catch (Exception e) {
   log.warn("Unable to send message: " + message.getMessageID() + " Cause: " + e.getMessage(), e);
 }
 return 1;
}

代码示例来源:origin: org.apache.activemq/artemis-amqp-protocol

public void reject(Object brokerConsumer, Message message) throws Exception {
 OperationContext oldContext = recoverContext();
 try {
   ((ServerConsumer) brokerConsumer).reject(message.getMessageID());
 } finally {
   resetContext(oldContext);
 }
}

代码示例来源:origin: apache/activemq-artemis

public void ack(Transaction transaction, Object brokerConsumer, Message message) throws Exception {
 if (transaction == null) {
   transaction = serverSession.getCurrentTransaction();
 }
 OperationContext oldContext = recoverContext();
 try {
   ((ServerConsumer) brokerConsumer).individualAcknowledge(transaction, message.getMessageID());
 } finally {
   resetContext(oldContext);
 }
}

代码示例来源:origin: apache/activemq-artemis

/** Sub classes must add the first short as the protocol-id */
@Override
public void encode(ActiveMQBuffer buffer, Message record) {
 buffer.writeByte((byte)1);
 buffer.writeLong(record.getMessageID());
 buffer.writeNullableSimpleString(record.getAddressSimpleString());
 record.persist(buffer);
}

代码示例来源:origin: apache/activemq-artemis

public void cancel(Object brokerConsumer, Message message, boolean updateCounts) throws Exception {
 OperationContext oldContext = recoverContext();
 try {
   ((ServerConsumer) brokerConsumer).individualCancel(message.getMessageID(), updateCounts);
   ((ServerConsumer) brokerConsumer).getQueue().forceDelivery();
 } finally {
   resetContext(oldContext);
 }
}

代码示例来源:origin: apache/activemq-artemis

private void validateSequence(ScheduledDeliveryHandlerImpl handler) throws Exception {
 long lastSequence = -1;
 for (MessageReference ref : handler.getScheduledReferences()) {
   assertEquals(lastSequence + 1, ref.getMessage().getMessageID());
   lastSequence = ref.getMessage().getMessageID();
 }
}

代码示例来源:origin: apache/activemq-artemis

void sendPubRelMessage(Message message) {
 int messageId = message.getIntProperty(MQTTUtil.MQTT_MESSAGE_ID_KEY);
 session.getSessionState().getOutboundStore().publishReleasedSent(messageId, message.getMessageID());
 session.getProtocolHandler().sendPubRel(messageId);
}

代码示例来源:origin: apache/activemq-artemis

public static ICoreMessage embedAsCoreMessage(Message source) {
 if (source instanceof ICoreMessage) {
   return (ICoreMessage) source;
 } else {
   Persister persister = source.getPersister();
   CoreMessage message = new CoreMessage(source.getMessageID(), persister.getEncodeSize(source) + signature.length + CoreMessage.BODY_OFFSET).setType(Message.EMBEDDED_TYPE);
   ActiveMQBuffer buffer = message.getBodyBuffer();
   buffer.writeBytes(signature);
   persister.encode(buffer, source);
   return message;
 }
}

代码示例来源:origin: org.apache.activemq/artemis-amqp-protocol

/** Sub classes must add the first short as the protocol-id */
@Override
public void encode(ActiveMQBuffer buffer, Message record) {
 super.encode(buffer, record);
 AMQPMessage msgEncode = (AMQPMessage)record;
 buffer.writeLong(record.getMessageID());
 buffer.writeLong(msgEncode.getMessageFormat());
 buffer.writeNullableSimpleString(record.getAddressSimpleString());
 record.persist(buffer);
}

代码示例来源:origin: apache/activemq-artemis

@Override
public void updateScheduledDeliveryTimeTransactional(final long txID, final MessageReference ref) throws Exception {
 ScheduledDeliveryEncoding encoding = new ScheduledDeliveryEncoding(ref.getScheduledDeliveryTime(), ref.getQueue().getID());
 readLock();
 try {
   messageJournal.appendUpdateRecordTransactional(txID, ref.getMessage().getMessageID(), JournalRecordIds.SET_SCHEDULED_DELIVERY_TIME, encoding);
 } finally {
   readUnLock();
 }
}

相关文章

微信公众号

最新文章

更多