org.apache.activemq.broker.Broker类的使用及代码示例

x33g5p2x  于2022-01-17 转载在 其他  
字(12.8k)|赞(0)|评价(0)|浏览(173)

本文整理了Java中org.apache.activemq.broker.Broker类的一些代码示例,展示了Broker类的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Broker类的具体详情如下:
包路径:org.apache.activemq.broker.Broker
类名称:Broker

Broker介绍

[英]The Message Broker which routes messages, maintains subscriptions and connections, acknowledges messages and handles transactions.
[中]消息代理路由消息、维护订阅和连接、确认消息和处理事务。

代码示例

代码示例来源:origin: apache/activemq

int match = sub.getActiveMQDestination().compareTo(next.getActiveMQDestination());
if (match == 0 || (!next.getActiveMQDestination().isPattern() && match == 1)) {
  super.addSubscription(context, sub);
  final RegionBroker regionBroker = (RegionBroker) context.getBroker().getAdaptor(RegionBroker.class);
  final Set<Destination> virtualDests = regionBroker.getDestinations(virtualDestination);
    if (virtualDest.getActiveMQDestination().isTopic() &&
        (virtualDest.isAlwaysRetroactive() || sub.getConsumerInfo().isRetroactive())) {
          final Message copy = message.copy();
          copy.setOriginalDestination(message.getDestination());
          copy.setDestination(newDestination);
          if (regionDest == null) {

代码示例来源:origin: apache/activemq

public boolean checkQueueSize(String queueName) {
  long count = 0;
  long queueSize = 0;
  Map<ActiveMQDestination, Destination> destinationMap = regionBroker.getDestinationMap();
  for (Map.Entry<ActiveMQDestination, Destination> entry : destinationMap.entrySet()) {
    if (entry.getKey().isQueue()) {
      if (entry.getValue().getName().matches(queueName)) {
        queueSize = entry.getValue().getDestinationStatistics().getMessages().getCount();
        count += queueSize;
        if (queueSize > 0) {
          LOG.info("Queue has pending message: {} queueSize is: {}", entry.getValue().getName(), queueSize);
        }
      }
    }
  }
  return count == 0;
}

代码示例来源:origin: gocd/gocd

public void removeQueue(String queueName) {
  try {
    ActiveMQQueue destination = new ActiveMQQueue(queueName);
    ConnectionContext connectionContext = BrokerSupport.getConnectionContext(broker.getBroker());
    Destination brokerDestination = broker.getDestination(destination);
    List<Subscription> consumers = brokerDestination.getConsumers();
    for (Subscription consumer : consumers) {
      consumer.remove(connectionContext, brokerDestination);
      brokerDestination.removeSubscription(connectionContext, consumer, 0);
    }
    broker.getBroker().removeDestination(connectionContext, destination, 1000);
    broker.removeDestination(destination);
  } catch (Exception e) {
    throw bomb(e);
  }
}

代码示例来源:origin: apache/activemq

Broker broker = multiKahaDBPersistenceAdapter.getBrokerService().getBroker();
for (TransactionId txid : broker.getPreparedTransactions(null)) {
  if (multiKahaDBPersistenceAdapter.isLocalXid(txid)) {
    try {
      if (recoveredPendingCommit.contains(txid)) {
        LOG.info("delivering pending commit outcome for tid: " + txid);
        broker.commitTransaction(null, txid, false);
        broker.forgetTransaction(null, txid);

代码示例来源:origin: apache/activemq

@Override
protected void send(final ProducerBrokerExchange context, final Message message, ActiveMQDestination destination) throws Exception {
  final Broker broker = context.getConnectionContext().getBroker();
  final Set<Destination> destinations = broker.getDestinations(destination);
  final int numDestinations = destinations.size();
      final BrokerService brokerService = broker.getBrokerService();
          brokerService.getTaskRunnerFactory().execute(new Runnable() {
            @Override
            public void run() {
        if (shouldDispatch(broker, message, dest)) {
          try {
            dest.send(context, copy(message, dest.getActiveMQDestination()));
          } catch (ResourceAllocationException e) {
            if (!dropMessageOnResourceLimit) {

代码示例来源:origin: org.apache.activemq/activemq-all

@Override
protected void onRecovered(Tx tx) {
  for (RemoveMessageCommand removeMessageCommand: tx.acks) {
    if (removeMessageCommand instanceof LastAckCommand) {
      LastAckCommand lastAckCommand = (LastAckCommand) removeMessageCommand;
      JDBCTopicMessageStore jdbcTopicMessageStore = (JDBCTopicMessageStore) topicStores.get(lastAckCommand.getMessageAck().getDestination());
      jdbcTopicMessageStore.pendingCompletion(lastAckCommand.getClientId(), lastAckCommand.getSubName(), lastAckCommand.getSequence(), lastAckCommand.getPriority());
      lastAckCommand.setMessageStore(jdbcTopicMessageStore);
    } else {
      // when reading the store we ignore messages with non null XIDs but should include those with XIDS starting in - (pending acks in an xa transaction),
      // but the sql is non portable to match BLOB with LIKE etc
      // so we make up for it when we recover the ack
      ((JDBCPersistenceAdapter)persistenceAdapter).getBrokerService().getRegionBroker().getDestinationMap().get(removeMessageCommand.getMessageAck().getDestination()).getDestinationStatistics().getMessages().increment();
    }
  }
  for (AddMessageCommand addMessageCommand : tx.messages) {
    ActiveMQDestination destination = addMessageCommand.getMessage().getDestination();
    addMessageCommand.setMessageStore(destination.isQueue() ? queueStores.get(destination) : topicStores.get(destination));
  }
}

代码示例来源:origin: apache/activemq

private void fireFailedForwardAdvisory(MessageDispatch messageDispatch, Throwable error) {
  if (configuration.isAdvisoryForFailedForward()) {
    AdvisoryBroker advisoryBroker = null;
    try {
      advisoryBroker = (AdvisoryBroker) brokerService.getBroker().getAdaptor(AdvisoryBroker.class);
      if (advisoryBroker != null) {
        ConnectionContext context = new ConnectionContext();
        context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT);
        context.setBroker(brokerService.getBroker());
        ActiveMQMessage advisoryMessage = new ActiveMQMessage();
        advisoryMessage.setStringProperty("cause", error.getLocalizedMessage());
        advisoryBroker.fireAdvisory(context, AdvisorySupport.getNetworkBridgeForwardFailureAdvisoryTopic(), messageDispatch.getMessage(), null,
            advisoryMessage);
      }
    } catch (Exception e) {
      LOG.warn("failed to fire forward failure advisory, cause: {}", e);
      LOG.debug("detail", e);
    }
  }
}

代码示例来源:origin: apache/activemq

private void commit(LocalTransactionId tx, ConnectionContext connectionContext, Message message) throws Exception {
  if (tx != null) {
    connectionContext.getBroker().commitTransaction(connectionContext, tx, true);
    connectionContext.getTransactions().remove(tx);
    connectionContext.setTransaction(null);
    message.setTransactionId(null);
  }
}

代码示例来源:origin: apache/activemq

map.putAll(tempTopicRegion.getDestinationMap());
long maxPurgedDests = this.brokerService.getMaxPurgedDestinationsPerSweep();
long timeStamp = System.currentTimeMillis();
for (Destination d : map.values()) {
  d.markForGC(timeStamp);
  if (d.canGC()) {
    list.add(d);
    if (maxPurgedDests > 0 && list.size() == maxPurgedDests) {
  context.setBroker(this);
      log = ((BaseDestination) dest).getLog();
    log.info("{} Inactive for longer than {} ms - removing ...", dest.getName(), dest.getInactiveTimeoutBeforeGC());
    try {
      getRoot().removeDestination(context, dest.getActiveMQDestination(), isAllowTempAutoCreationOnSend() ? 1 : 0);
    } catch (Throwable e) {
      LOG.error("Failed to remove inactive destination {}", dest, e);

代码示例来源:origin: apache/activemq

/**
 * Sends a message to the given destination which may be a wildcard
 *
 * @param context broker context
 * @param message message to send
 * @param destination possibly wildcard destination to send the message to
 * @throws Exception on error
 */
protected void send(ProducerBrokerExchange context, Message message, ActiveMQDestination destination) throws Exception {
  Broker broker = context.getConnectionContext().getBroker();
  Set<Destination> destinations = broker.getDestinations(destination);
  for (Destination dest : destinations) {
    dest.send(context, message.copy());
  }
}

代码示例来源:origin: apache/activemq

final NetworkBridgeConfiguration config) {
RegionBroker regionBroker = (RegionBroker) brokerService.getRegionBroker();
TopicRegion topicRegion = (TopicRegion) regionBroker.getTopicRegion();
Set<ConsumerInfo> subscriptionInfos = new HashSet<>();
  AdvisoryBroker ab = (AdvisoryBroker) brokerService.getBroker().getAdaptor(AdvisoryBroker.class);
  if (ab != null && brokerService.isUseVirtualDestSubs() && config.isUseVirtualDestSubs()) {
    for (ConsumerInfo info : ab.getVirtualDestinationConsumers().keySet()) {
      if (NetworkBridgeUtils.isForcedDurable(info, config.getDynamicallyIncludedDestinations())) {

代码示例来源:origin: apache/activemq

/**
 * @returns the ObjectName of the Connection that created this subscription
 */
@Override
public ObjectName getConnection() {
  ObjectName result = null;
  if (clientId != null && subscription != null) {
    ConnectionContext ctx = subscription.getContext();
    if (ctx != null && ctx.getBroker() != null && ctx.getBroker().getBrokerService() != null) {
      BrokerService service = ctx.getBroker().getBrokerService();
      ManagementContext managementCtx = service.getManagementContext();
      if (managementCtx != null) {
        try {
          ObjectName query = createConnectionQuery(managementCtx, service.getBrokerName());
          Set<ObjectName> names = managementCtx.queryNames(query, null);
          if (names.size() == 1) {
            result = names.iterator().next();
          }
        } catch (Exception e) {
        }
      }
    }
  }
  return result;
}

代码示例来源:origin: pierre/meteo

private ProducerBrokerExchange getProducerBrokerExchange(ProducerId id) throws IOException {
  ProducerBrokerExchange result = producerExchanges.get(id);
  if (result == null) {
    synchronized (producerExchanges) {
      result = new ProducerBrokerExchange();
      TransportConnectionState state = lookupConnectionState(id);              
      context = state.getContext();
      if (context.isReconnect()) {
        result.setLastStoredSequenceId(broker.getBrokerService().getPersistenceAdapter().getLastProducerSequenceId(id));
      }
      result.setConnectionContext(context);
      SessionState ss = state.getSessionState(id.getParentId());
      if (ss != null) {
        result.setProducerState(ss.getProducerState(id));
        ProducerState producerState = ss.getProducerState(id);
        if (producerState != null && producerState.getInfo() != null) {
          ProducerInfo info = producerState.getInfo();
          result.setMutable(info.getDestination() == null || info.getDestination().isComposite());
        }
      }
      producerExchanges.put(id, result);
    }
  } else {
    context = result.getConnectionContext();
  }
  return result;
}

代码示例来源:origin: apache/activemq

public void removeDestination(ActiveMQDestination destination) throws Exception {
  getBroker().removeDestination(getAdminConnectionContext(), destination, 0);
}

代码示例来源:origin: pierre/meteo

public void create(Broker broker, ConnectionContext context, ActiveMQDestination destination) throws Exception {
  if (destination.isQueue() && destination.isPattern() && broker.getDestinations(destination).isEmpty()) {
    DestinationFilter filter = DestinationFilter.parseFilter(new ActiveMQQueue(prefix + DestinationFilter.ANY_DESCENDENT));
    if (filter.matches(destination)) {
      broker.addDestination(context, destination, false);
    }
  }
}

代码示例来源:origin: apache/activemq

@Override
public void start() throws Exception {
  super.start();
  if (location == null) {
    location = new File(getBrokerService().getBrokerDataDirectory(), "destinations");
  }
  importDestinations();
  destinations.addAll(Arrays.asList(getBrokerService().getBroker().getDestinations()));
}

代码示例来源:origin: apache/activemq

if (config.isSyncDurableSubs() && protocolVersion.get() >= CommandTypes.PROTOCOL_VERSION_DURABLE_SYNC) {
      LOG.debug("SyncDurableSubs is enabled, Sending BrokerSubscriptionInfo");
      dispatchSync(NetworkBridgeUtils.getBrokerSubscriptionInfo(this.broker.getBrokerService(), config));
    config.setBrokerName(broker.getBrokerName());
      dispatchSync(NetworkBridgeUtils.getBrokerSubscriptionInfo(this.broker.getBrokerService(), config));
    Transport localTransport = NetworkBridgeFactory.createLocalTransport(config, broker.getVmConnectorURI());
    Transport remoteBridgeTransport = transport;
    if (! (remoteBridgeTransport instanceof ResponseCorrelator)) {
      duplexName = duplexName.substring(duplexName.lastIndexOf("#"));
    MBeanNetworkListener listener = new MBeanNetworkListener(brokerService, config, brokerService.createDuplexNetworkConnectorObjectName(duplexName));
    listener.setCreatedByDuplex(true);
    duplexBridge = config.getBridgeFactory().createNetworkBridge(config, localTransport, remoteBridgeTransport, listener);
        broker.getDurableDestinations()));
List<TransportConnectionState> connectionStates = listConnectionStates();
for (TransportConnectionState cs : connectionStates) {
  cs.getContext().setNetworkConnection(true);

代码示例来源:origin: powertac/powertac-server

public Set<String> processQueues ()
{
 BrokerService brokerService = getProvider();
 if (brokerService == null) {
  log.debug("processQueues - JMS Server has not been started");
  return null;
 }
 Set<String> badQueues = new HashSet<String>();
 try {
  Broker broker = brokerService.getBroker();
  Map<ActiveMQDestination, Destination> dstMap = broker.getDestinationMap();
  for (Map.Entry<ActiveMQDestination, Destination> entry: dstMap.entrySet()) {
   ActiveMQDestination amqDestination = entry.getKey();
   Destination destination = entry.getValue();
   if (destinationLimitReached(destination)) {
    badQueues.add(destination.getName());
    deleteDestination(broker, amqDestination, destination);
   }
  }
 }
 catch (Exception e) {
  log.error("Encounter exception while getting jms broker", e);
 }
 return badQueues;
}

代码示例来源:origin: org.apache.beam/beam-sdks-java-io-mqtt

while (!pipelineConnected) {
 Thread.sleep(1000);
 for (Connection connection : brokerService.getBroker().getClients()) {
  if (!connection.getConnectionId().isEmpty()) {
   pipelineConnected = true;

代码示例来源:origin: apache/activemq

networkBridgeStatistics.setEnabled(brokerService.isEnableStatistics());
  duplexInboundLocalBroker = NetworkBridgeFactory.createLocalAsyncTransport(brokerService.getBroker().getVmConnectorURI());
  duplexInboundLocalBroker.setTransportListener(new DefaultTransportListener() {

相关文章

微信公众号

最新文章

更多

Broker类方法