org.jgroups.Message.copy()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(9.8k)|赞(0)|评价(0)|浏览(145)

本文整理了Java中org.jgroups.Message.copy()方法的一些代码示例,展示了Message.copy()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Message.copy()方法的具体详情如下:
包路径:org.jgroups.Message
类名称:Message
方法名:copy

Message.copy介绍

[英]Create a copy of the message. If offset and length are used (to refer to another buffer), the copy will contain only the subset offset and length point to, copying the subset into the new copy.
[中]创建邮件的副本。如果使用了偏移量和长度(指另一个缓冲区),则副本将只包含指向的子集偏移量和长度,从而将子集复制到新副本中。

代码示例

代码示例来源:origin: wildfly/wildfly

/**
* Create a copy of the message. If offset and length are used (to refer to another buffer), the
* copy will contain only the subset offset and length point to, copying the subset into the new
* copy.
*
* @param copy_buffer
* @return Message with specified data
*/
public Message copy(boolean copy_buffer) {
  return copy(copy_buffer, true);
}

代码示例来源:origin: wildfly/wildfly

/**
* Doesn't copy any headers except for those with ID >= copy_headers_above
*
* @param copy_buffer
* @param starting_id
* @return A message with headers whose ID are >= starting_id
*/
public Message copy(boolean copy_buffer, short starting_id) {
  return copy(copy_buffer, starting_id, (short[])null);
}

代码示例来源:origin: wildfly/wildfly

/** Copies the message, but only the headers above the current protocol (RELAY) (or RpcDispatcher related headers) */
protected Message copy(Message msg) {
  return msg.copy(true, Global.BLOCKS_START_ID, this.prots_above);
}

代码示例来源:origin: wildfly/wildfly

public Message copy() {
  return copy(true);
}

代码示例来源:origin: wildfly/wildfly

MessageInfo(MessageID messageID, Message message, long sequenceNumber) {
  if (messageID == null) {
    throw new NullPointerException("Message ID can't be null");
  }
  this.messageID = messageID;
  this.message = message.copy(true, true);
  this.sequenceNumber = sequenceNumber;
  this.readyToDeliver = false;
  this.message.setSrc(messageID.getAddress());
}

代码示例来源:origin: wildfly/wildfly

/**
 * Copies a message. Copies only headers with IDs >= starting_id or IDs which are in the copy_only_ids list
 * @param copy_buffer
 * @param starting_id
 * @param copy_only_ids
 * @return
 */
public Message copy(boolean copy_buffer, short starting_id, short ... copy_only_ids) {
  Message retval=copy(copy_buffer, false);
  for(Map.Entry<Short,Header> entry: getHeaders().entrySet()) {
    short id=entry.getKey();
    if(id >= starting_id || Util.containsId(id, copy_only_ids))
      retval.putHeader(id, entry.getValue());
  }
  return retval;
}

代码示例来源:origin: wildfly/wildfly

public void up(MessageBatch batch) {
  boolean copy=(copy_multicast_msgs || copy_unicast_msgs) && incoming_copies > 0;
  if(copy) {
    List<Message> copies=new ArrayList<>();
    for(Message msg: batch) {
      Address dest=msg.getDest();
      boolean multicast=dest == null;
      if((multicast && copy_multicast_msgs) ||  (!multicast && copy_unicast_msgs)) {
        for(int i=0; i < incoming_copies; i++)
          copies.add(msg.copy(true));
      }
    }
    copies.forEach(batch::add);
  }
  if(!batch.isEmpty())
    up_prot.up(batch);
}

代码示例来源:origin: wildfly/wildfly

private void send(Collection<Address> destinations, Message msg, boolean sendToMyself) {
  if (log.isTraceEnabled()) {
    log.trace("sending anycast total order message %s to %s", msg, destinations);
  }
  for (Address address : destinations) {
    if (!sendToMyself && address.equals(localAddress)) {
      continue;
    }
    Message cpy = msg.copy();
    cpy.setDest(address);
    down_prot.down(cpy);
  }
}

代码示例来源:origin: wildfly/wildfly

private void copy(Message msg, int num_copies, Direction direction) {
    Address dest=msg.getDest();
    boolean multicast=dest == null;
    if((multicast && copy_multicast_msgs) ||  (!multicast && copy_unicast_msgs)) {
      for(int i=0; i < num_copies; i++) {
        Message copy=msg.copy(true);
        switch(direction) {
          case UP:
            up_prot.up(copy);
            break;
          case DOWN:
            down_prot.down(copy);
            break;
        }
      }
    }
  }
}

代码示例来源:origin: wildfly/wildfly

protected Object handleEncryptedMessage(Message msg, byte[] version) throws Exception {
  if(!Arrays.equals(sym_version, version)) { // only check if msg needs to be queued if versions differ
    versionMismatch(msg);
    return null;
  }
  // try and decrypt the message - we need to copy msg as we modify its
  // buffer (http://jira.jboss.com/jira/browse/JGRP-538)
  Message tmpMsg=decryptMessage(null, msg.copy()); // need to copy for possible xmits
  if(tmpMsg != null)
    return up_prot.up(tmpMsg);
  log.warn("%s: unrecognized cipher; discarding message from %s", local_addr, msg.src());
  return null;
}

代码示例来源:origin: wildfly/wildfly

public Object down(Message msg) {
  if(msg.getSrc() == null)
    msg.setSrc(local_addr);
  Buffer serialized_msg=Util.streamableToBuffer(msg);
  // exclude existing headers, they will be seen again when we unmarshal the message at the receiver
  Message tmp=msg.copy(false, false).setBuffer(serialized_msg);
  return down_prot.down(tmp);
}

代码示例来源:origin: wildfly/wildfly

protected void loopback(Message msg, final boolean multicast) {
  final Message copy=loopback_copy? msg.copy() : msg;
  if(is_trace)
    log.trace("%s: looping back message %s, headers are %s", local_addr, copy, copy.printHeaders());
  if(!loopback_separate_thread) {
    passMessageUp(copy, null, false, multicast, false);
    return;
  }
  // changed to fix http://jira.jboss.com/jira/browse/JGRP-506
  boolean internal=msg.isFlagSet(Message.Flag.INTERNAL);
  boolean oob=msg.isFlagSet(Message.Flag.OOB);
  // submitToThreadPool(() -> passMessageUp(copy, null, false, multicast, false), internal);
  msg_processing_policy.loopback(msg, oob, internal);
}

代码示例来源:origin: wildfly/wildfly

public void run() {
    Message msg=null, copy;
    while(true) {
      synchronized(this) {
        try {
          msg=send_queue.poll(1000, TimeUnit.MILLISECONDS);
          if(msg == null) {
            Util.sleep(1000);
            continue;
          }
        }
        catch(InterruptedException e) {
          return;
        }
        copy=msg.copy().putHeader(id, new ABPHeader(Type.data, bit));
      }
      log.trace("%s: --> %s.msg(%d). Msg: %s", local_addr, copy.dest(), bit, copy.printHeaders());
      down_prot.down(copy);
    }
  }
}

代码示例来源:origin: wildfly/wildfly

public Object down(Message msg) {
  if(msg.getDest() != null)
    return down_prot.down(msg); // only process multicast messages
  if(next == null) // view hasn'<></> been received yet, use the normal transport
    return down_prot.down(msg);
  // we need to copy the message, as we cannot do a msg.setSrc(next): the next retransmission
  // would use 'next' as destination  !
  Message copy=msg.copy(true);
  short hdr_ttl=(short)(loopback? view_size -1 : view_size);
  DaisyHeader hdr=new DaisyHeader(hdr_ttl);
  copy.setDest(next);
  copy.putHeader(getId(), hdr);
  msgs_sent++;
  if(loopback) {
    if(log.isTraceEnabled()) log.trace(new StringBuilder("looping back message ").append(msg));
    if(msg.getSrc() == null)
      msg.setSrc(local_addr);
    default_pool.execute(() -> up_prot.up(msg));
  }
  return down_prot.down(copy);
}

代码示例来源:origin: wildfly/wildfly

/**
 * Sends a message msg to the requester. We have to wrap the original message into a retransmit message, as we need
 * to preserve the original message's properties, such as src, headers etc.
 * @param dest
 * @param msg
 */
protected void sendXmitRsp(Address dest, Message msg) {
  if(msg == null)
    return;
  if(stats)
    xmit_rsps_sent.increment();
  if(msg.getSrc() == null)
    msg.setSrc(local_addr);
  if(use_mcast_xmit) { // we simply send the original multicast message
    down_prot.down(msg);
    return;
  }
  Message xmit_msg=msg.copy(true, true).dest(dest); // copy payload and headers
  NakAckHeader2 hdr=xmit_msg.getHeader(id);
  NakAckHeader2 newhdr=hdr.copy();
  newhdr.type=NakAckHeader2.XMIT_RSP; // change the type in the copy from MSG --> XMIT_RSP
  xmit_msg.putHeader(id, newhdr);
  down_prot.down(xmit_msg);
}

代码示例来源:origin: wildfly/wildfly

protected Object handleUpEvent(Message msg, RelayHeader hdr) {
  switch(hdr.type) {
    case DISSEMINATE:
      Message copy=msg.copy();
      if(hdr.original_sender != null)
        copy.setSrc(hdr.original_sender);
      return up_prot.up(copy);
    case FORWARD:
      if(is_coord)
        forward(msg.getRawBuffer(), msg.getOffset(), msg.getLength());
      break;
    case VIEW:
      return installView(msg.getRawBuffer(), msg.getOffset(), msg.getLength());
    case BROADCAST_VIEW:
      break;
    default:
      throw new IllegalArgumentException(hdr.type + " is not a valid type");
  }
  return null;
}

代码示例来源:origin: wildfly/wildfly

/** Wraps the message annd sends it to the current coordinator */
protected void forwardToCoord(Message msg) {
  Message tmp=msg.copy(true, Global.BLOCKS_START_ID); // // we only copy headers from building blocks
  if(tmp.getSrc() == null)
    tmp.setSrc(local_addr);
  
  try {
    byte[] buf=Util.streamableToByteBuffer(tmp);
    if(coord != null) {
      // optimization: if I'm the coord, simply relay to the remote cluster via the bridge
      if(coord.equals(local_addr)) {
        forward(buf, 0, buf.length);
        return;
      }
      tmp=new Message(coord, buf, 0, buf.length) // reusing tmp is OK here ...
       .putHeader(id, new RelayHeader(RelayHeader.Type.FORWARD));
      down_prot.down(tmp);
    }
  }
  catch(Exception e) {
    log.error(Util.getMessage("FailedForwardingUnicastMessageToCoord"), e);
  }
}

代码示例来源:origin: wildfly/wildfly

public Object up(Message msg) {
  Address dest=msg.getDest();
  RelayHeader hdr=msg.getHeader(getId());
  if(hdr != null)
    return handleUpEvent(msg, hdr);
  if(is_coord && relay && dest == null && !msg.isFlagSet(Message.Flag.NO_RELAY)) {
    Message tmp=msg.copy(true, Global.BLOCKS_START_ID); // we only copy headers from building blocks
    try {
      byte[] buf=Util.streamableToByteBuffer(tmp);
      forward(buf, 0, buf.length);
    }
    catch(Exception e) {
      log.warn("failed relaying message", e);
    }
  }
  return up_prot.up(msg);
}

代码示例来源:origin: wildfly/wildfly

protected void encryptAndSend(Message msg) throws Exception {
  EncryptHeader hdr=new EncryptHeader(EncryptHeader.ENCRYPT, symVersion());
  // copy neeeded because same message (object) may be retransmitted -> prevent double encryption
  Message msgEncrypted=msg.copy(false).putHeader(this.id, hdr);
  if(msg.getLength() > 0)
    msgEncrypted.setBuffer(code(msg.getRawBuffer(),msg.getOffset(),msg.getLength(),false));
  else { // length is 0
    byte[] payload=msg.getRawBuffer();
    if(payload != null) // we don't encrypt empty buffers (https://issues.jboss.org/browse/JGRP-2153)
      msgEncrypted.setBuffer(payload, msg.getOffset(), msg.getLength());
  }
  down_prot.down(msgEncrypted);
}

代码示例来源:origin: wildfly/wildfly

public Object up(Message msg) {
  DaisyHeader hdr=msg.getHeader(getId());
  if(hdr == null)
    return up_prot.up(msg);
  // 1. forward the message to the next in line if ttl > 0
  short ttl=hdr.getTTL();
  if(log.isTraceEnabled())
    log.trace(local_addr + ": received message from " + msg.getSrc() + " with ttl=" + ttl);
  if(--ttl > 0) {
    Message copy=msg.copy(true);
    copy.setDest(next);
    copy.putHeader(getId(), new DaisyHeader(ttl));
    msgs_forwarded++;
    if(log.isTraceEnabled())
      log.trace(local_addr + ": forwarding message to " + next + " with ttl=" + ttl);
    down_prot.down(copy);
  }
  // 2. Pass up
  msg.setDest(null);
  return up_prot.up(msg);
}

相关文章