本文整理了Java中org.apache.activemq.broker.Broker
类的一些代码示例,展示了Broker
类的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Broker
类的具体详情如下:
包路径:org.apache.activemq.broker.Broker
类名称:Broker
[英]The Message Broker which routes messages, maintains subscriptions and connections, acknowledges messages and handles transactions.
[中]消息代理路由消息、维护订阅和连接、确认消息和处理事务。
代码示例来源:origin: apache/activemq
int match = sub.getActiveMQDestination().compareTo(next.getActiveMQDestination());
if (match == 0 || (!next.getActiveMQDestination().isPattern() && match == 1)) {
super.addSubscription(context, sub);
final RegionBroker regionBroker = (RegionBroker) context.getBroker().getAdaptor(RegionBroker.class);
final Set<Destination> virtualDests = regionBroker.getDestinations(virtualDestination);
if (virtualDest.getActiveMQDestination().isTopic() &&
(virtualDest.isAlwaysRetroactive() || sub.getConsumerInfo().isRetroactive())) {
final Message copy = message.copy();
copy.setOriginalDestination(message.getDestination());
copy.setDestination(newDestination);
if (regionDest == null) {
代码示例来源:origin: apache/activemq
public boolean checkQueueSize(String queueName) {
long count = 0;
long queueSize = 0;
Map<ActiveMQDestination, Destination> destinationMap = regionBroker.getDestinationMap();
for (Map.Entry<ActiveMQDestination, Destination> entry : destinationMap.entrySet()) {
if (entry.getKey().isQueue()) {
if (entry.getValue().getName().matches(queueName)) {
queueSize = entry.getValue().getDestinationStatistics().getMessages().getCount();
count += queueSize;
if (queueSize > 0) {
LOG.info("Queue has pending message: {} queueSize is: {}", entry.getValue().getName(), queueSize);
}
}
}
}
return count == 0;
}
代码示例来源:origin: gocd/gocd
public void removeQueue(String queueName) {
try {
ActiveMQQueue destination = new ActiveMQQueue(queueName);
ConnectionContext connectionContext = BrokerSupport.getConnectionContext(broker.getBroker());
Destination brokerDestination = broker.getDestination(destination);
List<Subscription> consumers = brokerDestination.getConsumers();
for (Subscription consumer : consumers) {
consumer.remove(connectionContext, brokerDestination);
brokerDestination.removeSubscription(connectionContext, consumer, 0);
}
broker.getBroker().removeDestination(connectionContext, destination, 1000);
broker.removeDestination(destination);
} catch (Exception e) {
throw bomb(e);
}
}
代码示例来源:origin: apache/activemq
Broker broker = multiKahaDBPersistenceAdapter.getBrokerService().getBroker();
for (TransactionId txid : broker.getPreparedTransactions(null)) {
if (multiKahaDBPersistenceAdapter.isLocalXid(txid)) {
try {
if (recoveredPendingCommit.contains(txid)) {
LOG.info("delivering pending commit outcome for tid: " + txid);
broker.commitTransaction(null, txid, false);
broker.forgetTransaction(null, txid);
代码示例来源:origin: apache/activemq
@Override
protected void send(final ProducerBrokerExchange context, final Message message, ActiveMQDestination destination) throws Exception {
final Broker broker = context.getConnectionContext().getBroker();
final Set<Destination> destinations = broker.getDestinations(destination);
final int numDestinations = destinations.size();
final BrokerService brokerService = broker.getBrokerService();
brokerService.getTaskRunnerFactory().execute(new Runnable() {
@Override
public void run() {
if (shouldDispatch(broker, message, dest)) {
try {
dest.send(context, copy(message, dest.getActiveMQDestination()));
} catch (ResourceAllocationException e) {
if (!dropMessageOnResourceLimit) {
代码示例来源:origin: org.apache.activemq/activemq-all
@Override
protected void onRecovered(Tx tx) {
for (RemoveMessageCommand removeMessageCommand: tx.acks) {
if (removeMessageCommand instanceof LastAckCommand) {
LastAckCommand lastAckCommand = (LastAckCommand) removeMessageCommand;
JDBCTopicMessageStore jdbcTopicMessageStore = (JDBCTopicMessageStore) topicStores.get(lastAckCommand.getMessageAck().getDestination());
jdbcTopicMessageStore.pendingCompletion(lastAckCommand.getClientId(), lastAckCommand.getSubName(), lastAckCommand.getSequence(), lastAckCommand.getPriority());
lastAckCommand.setMessageStore(jdbcTopicMessageStore);
} else {
// when reading the store we ignore messages with non null XIDs but should include those with XIDS starting in - (pending acks in an xa transaction),
// but the sql is non portable to match BLOB with LIKE etc
// so we make up for it when we recover the ack
((JDBCPersistenceAdapter)persistenceAdapter).getBrokerService().getRegionBroker().getDestinationMap().get(removeMessageCommand.getMessageAck().getDestination()).getDestinationStatistics().getMessages().increment();
}
}
for (AddMessageCommand addMessageCommand : tx.messages) {
ActiveMQDestination destination = addMessageCommand.getMessage().getDestination();
addMessageCommand.setMessageStore(destination.isQueue() ? queueStores.get(destination) : topicStores.get(destination));
}
}
代码示例来源:origin: apache/activemq
private void fireFailedForwardAdvisory(MessageDispatch messageDispatch, Throwable error) {
if (configuration.isAdvisoryForFailedForward()) {
AdvisoryBroker advisoryBroker = null;
try {
advisoryBroker = (AdvisoryBroker) brokerService.getBroker().getAdaptor(AdvisoryBroker.class);
if (advisoryBroker != null) {
ConnectionContext context = new ConnectionContext();
context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT);
context.setBroker(brokerService.getBroker());
ActiveMQMessage advisoryMessage = new ActiveMQMessage();
advisoryMessage.setStringProperty("cause", error.getLocalizedMessage());
advisoryBroker.fireAdvisory(context, AdvisorySupport.getNetworkBridgeForwardFailureAdvisoryTopic(), messageDispatch.getMessage(), null,
advisoryMessage);
}
} catch (Exception e) {
LOG.warn("failed to fire forward failure advisory, cause: {}", e);
LOG.debug("detail", e);
}
}
}
代码示例来源:origin: apache/activemq
private void commit(LocalTransactionId tx, ConnectionContext connectionContext, Message message) throws Exception {
if (tx != null) {
connectionContext.getBroker().commitTransaction(connectionContext, tx, true);
connectionContext.getTransactions().remove(tx);
connectionContext.setTransaction(null);
message.setTransactionId(null);
}
}
代码示例来源:origin: apache/activemq
map.putAll(tempTopicRegion.getDestinationMap());
long maxPurgedDests = this.brokerService.getMaxPurgedDestinationsPerSweep();
long timeStamp = System.currentTimeMillis();
for (Destination d : map.values()) {
d.markForGC(timeStamp);
if (d.canGC()) {
list.add(d);
if (maxPurgedDests > 0 && list.size() == maxPurgedDests) {
context.setBroker(this);
log = ((BaseDestination) dest).getLog();
log.info("{} Inactive for longer than {} ms - removing ...", dest.getName(), dest.getInactiveTimeoutBeforeGC());
try {
getRoot().removeDestination(context, dest.getActiveMQDestination(), isAllowTempAutoCreationOnSend() ? 1 : 0);
} catch (Throwable e) {
LOG.error("Failed to remove inactive destination {}", dest, e);
代码示例来源:origin: apache/activemq
/**
* Sends a message to the given destination which may be a wildcard
*
* @param context broker context
* @param message message to send
* @param destination possibly wildcard destination to send the message to
* @throws Exception on error
*/
protected void send(ProducerBrokerExchange context, Message message, ActiveMQDestination destination) throws Exception {
Broker broker = context.getConnectionContext().getBroker();
Set<Destination> destinations = broker.getDestinations(destination);
for (Destination dest : destinations) {
dest.send(context, message.copy());
}
}
代码示例来源:origin: apache/activemq
final NetworkBridgeConfiguration config) {
RegionBroker regionBroker = (RegionBroker) brokerService.getRegionBroker();
TopicRegion topicRegion = (TopicRegion) regionBroker.getTopicRegion();
Set<ConsumerInfo> subscriptionInfos = new HashSet<>();
AdvisoryBroker ab = (AdvisoryBroker) brokerService.getBroker().getAdaptor(AdvisoryBroker.class);
if (ab != null && brokerService.isUseVirtualDestSubs() && config.isUseVirtualDestSubs()) {
for (ConsumerInfo info : ab.getVirtualDestinationConsumers().keySet()) {
if (NetworkBridgeUtils.isForcedDurable(info, config.getDynamicallyIncludedDestinations())) {
代码示例来源:origin: apache/activemq
/**
* @returns the ObjectName of the Connection that created this subscription
*/
@Override
public ObjectName getConnection() {
ObjectName result = null;
if (clientId != null && subscription != null) {
ConnectionContext ctx = subscription.getContext();
if (ctx != null && ctx.getBroker() != null && ctx.getBroker().getBrokerService() != null) {
BrokerService service = ctx.getBroker().getBrokerService();
ManagementContext managementCtx = service.getManagementContext();
if (managementCtx != null) {
try {
ObjectName query = createConnectionQuery(managementCtx, service.getBrokerName());
Set<ObjectName> names = managementCtx.queryNames(query, null);
if (names.size() == 1) {
result = names.iterator().next();
}
} catch (Exception e) {
}
}
}
}
return result;
}
代码示例来源:origin: pierre/meteo
private ProducerBrokerExchange getProducerBrokerExchange(ProducerId id) throws IOException {
ProducerBrokerExchange result = producerExchanges.get(id);
if (result == null) {
synchronized (producerExchanges) {
result = new ProducerBrokerExchange();
TransportConnectionState state = lookupConnectionState(id);
context = state.getContext();
if (context.isReconnect()) {
result.setLastStoredSequenceId(broker.getBrokerService().getPersistenceAdapter().getLastProducerSequenceId(id));
}
result.setConnectionContext(context);
SessionState ss = state.getSessionState(id.getParentId());
if (ss != null) {
result.setProducerState(ss.getProducerState(id));
ProducerState producerState = ss.getProducerState(id);
if (producerState != null && producerState.getInfo() != null) {
ProducerInfo info = producerState.getInfo();
result.setMutable(info.getDestination() == null || info.getDestination().isComposite());
}
}
producerExchanges.put(id, result);
}
} else {
context = result.getConnectionContext();
}
return result;
}
代码示例来源:origin: apache/activemq
public void removeDestination(ActiveMQDestination destination) throws Exception {
getBroker().removeDestination(getAdminConnectionContext(), destination, 0);
}
代码示例来源:origin: pierre/meteo
public void create(Broker broker, ConnectionContext context, ActiveMQDestination destination) throws Exception {
if (destination.isQueue() && destination.isPattern() && broker.getDestinations(destination).isEmpty()) {
DestinationFilter filter = DestinationFilter.parseFilter(new ActiveMQQueue(prefix + DestinationFilter.ANY_DESCENDENT));
if (filter.matches(destination)) {
broker.addDestination(context, destination, false);
}
}
}
代码示例来源:origin: apache/activemq
@Override
public void start() throws Exception {
super.start();
if (location == null) {
location = new File(getBrokerService().getBrokerDataDirectory(), "destinations");
}
importDestinations();
destinations.addAll(Arrays.asList(getBrokerService().getBroker().getDestinations()));
}
代码示例来源:origin: apache/activemq
if (config.isSyncDurableSubs() && protocolVersion.get() >= CommandTypes.PROTOCOL_VERSION_DURABLE_SYNC) {
LOG.debug("SyncDurableSubs is enabled, Sending BrokerSubscriptionInfo");
dispatchSync(NetworkBridgeUtils.getBrokerSubscriptionInfo(this.broker.getBrokerService(), config));
config.setBrokerName(broker.getBrokerName());
dispatchSync(NetworkBridgeUtils.getBrokerSubscriptionInfo(this.broker.getBrokerService(), config));
Transport localTransport = NetworkBridgeFactory.createLocalTransport(config, broker.getVmConnectorURI());
Transport remoteBridgeTransport = transport;
if (! (remoteBridgeTransport instanceof ResponseCorrelator)) {
duplexName = duplexName.substring(duplexName.lastIndexOf("#"));
MBeanNetworkListener listener = new MBeanNetworkListener(brokerService, config, brokerService.createDuplexNetworkConnectorObjectName(duplexName));
listener.setCreatedByDuplex(true);
duplexBridge = config.getBridgeFactory().createNetworkBridge(config, localTransport, remoteBridgeTransport, listener);
broker.getDurableDestinations()));
List<TransportConnectionState> connectionStates = listConnectionStates();
for (TransportConnectionState cs : connectionStates) {
cs.getContext().setNetworkConnection(true);
代码示例来源:origin: powertac/powertac-server
public Set<String> processQueues ()
{
BrokerService brokerService = getProvider();
if (brokerService == null) {
log.debug("processQueues - JMS Server has not been started");
return null;
}
Set<String> badQueues = new HashSet<String>();
try {
Broker broker = brokerService.getBroker();
Map<ActiveMQDestination, Destination> dstMap = broker.getDestinationMap();
for (Map.Entry<ActiveMQDestination, Destination> entry: dstMap.entrySet()) {
ActiveMQDestination amqDestination = entry.getKey();
Destination destination = entry.getValue();
if (destinationLimitReached(destination)) {
badQueues.add(destination.getName());
deleteDestination(broker, amqDestination, destination);
}
}
}
catch (Exception e) {
log.error("Encounter exception while getting jms broker", e);
}
return badQueues;
}
代码示例来源:origin: org.apache.beam/beam-sdks-java-io-mqtt
while (!pipelineConnected) {
Thread.sleep(1000);
for (Connection connection : brokerService.getBroker().getClients()) {
if (!connection.getConnectionId().isEmpty()) {
pipelineConnected = true;
代码示例来源:origin: apache/activemq
networkBridgeStatistics.setEnabled(brokerService.isEnableStatistics());
duplexInboundLocalBroker = NetworkBridgeFactory.createLocalAsyncTransport(brokerService.getBroker().getVmConnectorURI());
duplexInboundLocalBroker.setTransportListener(new DefaultTransportListener() {
内容来源于网络,如有侵权,请联系作者删除!