javax.jms.Connection类的使用及代码示例

x33g5p2x  于2022-01-18 转载在 其他  
字(17.5k)|赞(0)|评价(0)|浏览(255)

本文整理了Java中javax.jms.Connection类的一些代码示例,展示了Connection类的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Connection类的具体详情如下:
包路径:javax.jms.Connection
类名称:Connection

Connection介绍

[英]A Connection object is a client's active connection to its JMS provider. It typically allocates provider resources outside the Java virtual machine (JVM).

Connections support concurrent use.

A connection serves several purposes:

  • It encapsulates an open connection with a JMS provider. It typically represents an open TCP/IP socket between a client and the service provider software.
  • Its creation is where client authentication takes place.
  • It can specify a unique client identifier.
  • It provides a ConnectionMetaData object.
  • It supports an optional ExceptionListener object.

Because the creation of a connection involves setting up authentication and communication, a connection is a relatively heavyweight object. Most clients will do all their messaging with a single connection. Other more advanced applications may use several connections. The JMS API does not architect a reason for using multiple connections; however, there may be operational reasons for doing so.

A JMS client typically creates a connection, one or more sessions, and a number of message producers and consumers. When a connection is created, it is in stopped mode. That means that no messages are being delivered.

It is typical to leave the connection in stopped mode until setup is complete (that is, until all message consumers have been created). At that point, the client calls the connection's start method, and messages begin arriving at the connection's consumers. This setup convention minimizes any client confusion that may result from asynchronous message delivery while the client is still in the process of setting itself up.

A connection can be started immediately, and the setup can be done afterwards. Clients that do this must be prepared to handle asynchronous message delivery while they are still in the process of setting up.

A message producer can send messages while a connection is stopped.
[中]连接对象是客户端与其JMS提供程序的活动连接。它通常在Java虚拟机(JVM)之外分配提供程序资源。
连接支持并发使用。
连接有多种用途:
*它封装了与JMS提供程序的开放连接。它通常表示客户端和服务提供商软件之间的开放TCP/IP套接字。
*它的创建是客户端身份验证发生的地方。
*它可以指定唯一的客户端标识符。
*它提供了一个ConnectionMetaData对象。
*它支持可选的ExceptionListener对象。
因为创建连接需要设置身份验证和通信,所以连接是一个相对较重的对象。大多数客户端将通过一个连接完成所有消息传递。其他更高级的应用程序可能会使用多个连接。JMS API没有设计使用多个连接的理由;然而,这样做可能有操作上的原因。
JMS客户机通常创建一个连接、一个或多个会话以及许多消息生产者和消费者。创建连接时,连接处于停止模式。这意味着没有消息被传递。
通常会将连接保持在停止模式,直到安装完成(即,直到创建了所有消息使用者)。此时,客户端调用连接的start方法,消息开始到达连接的使用者。此设置约定最大限度地减少了在客户端仍在进行自身设置时,异步消息传递可能导致的任何客户端混淆。
可以立即启动连接,然后再进行设置。执行此操作的客户端必须准备好在设置过程中处理异步消息传递。
消息生成器可以在连接停止时发送消息。

代码示例

代码示例来源:origin: kiegroup/jbpm

MessageProducer producer = null;
try {
  queueConnection = connectionFactory.createConnection();
  queueSession = queueConnection.createSession(transacted, Session.AUTO_ACKNOWLEDGE);
  TextMessage message = queueSession.createTextMessage(eventXml);
  message.setStringProperty("LogType", "Task");
  producer = queueSession.createProducer(queue);  
  producer.setPriority(priority);
  producer.send(message);
} catch (Exception e) {
  throw new RuntimeException("Error when sending JMS message with working memory event", e);
  if (producer != null) {
    try {
      producer.close();
    } catch (JMSException e) {
      logger.warn("Error when closing producer", e);
      queueSession.close();
    } catch (JMSException e) {
      logger.warn("Error when closing queue session", e);
      queueConnection.close();
    } catch (JMSException e) {
      logger.warn("Error when closing queue connection", e);

代码示例来源:origin: kiegroup/jbpm

public List<Message> receive(Queue queue) throws Exception {
    List<Message> messages = new ArrayList<Message>();
    
    Connection qconnetion = factory.createConnection();
    Session qsession = qconnetion.createSession(true, QueueSession.AUTO_ACKNOWLEDGE);
    MessageConsumer consumer = qsession.createConsumer(queue);
    qconnetion.start();
    
    Message m = null;
    
    while ((m = consumer.receiveNoWait()) != null) {
      messages.add(m);
    }
    consumer.close();            
    qsession.close();            
    qconnetion.close();
    
    return messages;
  }
}

代码示例来源:origin: spring-projects/spring-framework

/**
 * Prepare the given Connection before it is exposed.
 * <p>The default implementation applies ExceptionListener and client id.
 * Can be overridden in subclasses.
 * @param con the Connection to prepare
 * @throws JMSException if thrown by JMS API methods
 * @see #setExceptionListener
 * @see #setReconnectOnException
 */
protected void prepareConnection(Connection con) throws JMSException {
  if (getClientId() != null) {
    con.setClientID(getClientId());
  }
  if (this.aggregatedExceptionListener != null) {
    con.setExceptionListener(this.aggregatedExceptionListener);
  }
  else if (getExceptionListener() != null || isReconnectOnException()) {
    ExceptionListener listenerToUse = getExceptionListener();
    if (isReconnectOnException()) {
      this.aggregatedExceptionListener = new AggregatedExceptionListener();
      this.aggregatedExceptionListener.delegates.add(this);
      if (listenerToUse != null) {
        this.aggregatedExceptionListener.delegates.add(listenerToUse);
      }
      listenerToUse = this.aggregatedExceptionListener;
    }
    con.setExceptionListener(listenerToUse);
  }
}

代码示例来源:origin: spring-projects/spring-framework

/**
 * Close the given Connection.
 * @param con the Connection to close
 */
protected void closeConnection(Connection con) {
  if (logger.isDebugEnabled()) {
    logger.debug("Closing shared JMS Connection: " + con);
  }
  try {
    try {
      if (this.startedCount > 0) {
        con.stop();
      }
    }
    finally {
      con.close();
    }
  }
  catch (javax.jms.IllegalStateException ex) {
    logger.debug("Ignoring Connection state exception - assuming already closed: " + ex);
  }
  catch (Throwable ex) {
    logger.debug("Could not close shared JMS Connection", ex);
  }
}

代码示例来源:origin: apache/storm

ConnectionFactory cf = jmsProvider.connectionFactory();
Destination dest = jmsProvider.destination();
this.connection = cf.createConnection();
this.session = connection.createSession(false, jmsAcknowledgeMode);
MessageConsumer consumer = session.createConsumer(dest);
consumer.setMessageListener(this);
this.connection.start();

代码示例来源:origin: apache/activemq-artemis

@Test
public void testCreateProducerOnNullQueue() throws Exception {
 Connection conn = getConnectionFactory().createConnection();
 Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
 Message m = sess.createTextMessage("something");
 MessageProducer p = sess.createProducer(null);
 p.send(queue1, m);
 MessageConsumer c = sess.createConsumer(queue1);
 conn.start();
 // receiveNoWait is not guaranteed to return message immediately
 TextMessage rm = (TextMessage) c.receive(1000);
 ProxyAssertSupport.assertEquals("something", rm.getText());
 conn.close();
}

代码示例来源:origin: mercyblitz/segmentfault-lessons

private static void receiveMessage() throws Exception {
  // 创建 ActiveMQ 链接,设置 Broker URL
  ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
  // 创造 JMS 链接
  Connection connection = connectionFactory.createConnection();
  // 启动连接
  connection.start();
  // 创建会话 Session
  Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
  // 创建消息目的 - Queue 名称为 "TEST"
  Destination destination = session.createQueue("TEST");
  // 创建消息消费者
  MessageConsumer messageConsumer = session.createConsumer(destination);
  // 获取消息
  Message message = messageConsumer.receive(100);
  if (message instanceof TextMessage) {
    TextMessage textMessage = (TextMessage) message;
    System.out.println("消息消费内容:" + textMessage.getText());
  }
  // 关闭消息消费者
  messageConsumer.close();
  // 关闭会话
  session.close();
  // 关闭连接
  connection.stop();
  connection.close();
}

代码示例来源:origin: mercyblitz/segmentfault-lessons

private static void sendMessage() throws Exception {
    // 创建 ActiveMQ 链接,设置 Broker URL
    ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
    // 创造 JMS 链接
    Connection connection = connectionFactory.createConnection();
    // 创建会话 Session
    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    // 创建消息目的 - Queue 名称为 "TEST"
    Destination destination = session.createQueue("TEST");
    // 创建消息生产者
    MessageProducer producer = session.createProducer(destination);
    // 创建消息 - 文本消息
    ActiveMQTextMessage message = new ActiveMQTextMessage();
    message.setText("Hello,World");
    // 发送文本消息
    producer.send(message);

    // 关闭消息生产者
    producer.close();
    // 关闭会话
    session.close();
    // 关闭连接
    connection.close();
  }
}

代码示例来源:origin: spring-projects/spring-framework

@Test
public void testContextRefreshedEventStartsTheConnectionByDefault() throws Exception {
  MessageConsumer messageConsumer = mock(MessageConsumer.class);
  Session session = mock(Session.class);
  // Queue gets created in order to create MessageConsumer for that Destination...
  given(session.createQueue(DESTINATION_NAME)).willReturn(QUEUE_DESTINATION);
  // and then the MessageConsumer gets created...
  given(session.createConsumer(QUEUE_DESTINATION, null)).willReturn(messageConsumer);  // no MessageSelector...
  Connection connection = mock(Connection.class);
  // session gets created in order to register MessageListener...
  given(connection.createSession(this.container.isSessionTransacted(),
      this.container.getSessionAcknowledgeMode())).willReturn(session);
  // and the connection is start()ed after the listener is registered...
  ConnectionFactory connectionFactory = mock(ConnectionFactory.class);
  given(connectionFactory.createConnection()).willReturn(connection);
  this.container.setConnectionFactory(connectionFactory);
  this.container.setDestinationName(DESTINATION_NAME);
  this.container.setMessageListener(new TestMessageListener());
  this.container.afterPropertiesSet();
  GenericApplicationContext context = new GenericApplicationContext();
  context.getBeanFactory().registerSingleton("messageListenerContainer", this.container);
  context.refresh();
  verify(connection).setExceptionListener(this.container);
  verify(connection).start();
}

代码示例来源:origin: apache/rocketmq-externals

Connection connection = createConnection(producerId, consumerId);
Connection anotherConnection = createConnection(producerId, consumerId +"other");
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Session anotherSession = anotherConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
  MessageProducer messageProducer = session.createProducer(destinationA);
  connection.start();
  anotherConnection.start();
    TextMessage message = session.createTextMessage(text + i);
    Assert.assertNull(message.getJMSMessageID());
    messageProducer.send(message);
    Assert.assertNotNull(message.getJMSMessageID());
    TextMessage message = session.createTextMessage(text + i);
    Assert.assertNull(message.getJMSMessageID());
    messageProducer.send(destinationB, message);
    Assert.assertNotNull(message.getJMSMessageID());
  connection.close();
  anotherConnection.close();

代码示例来源:origin: spring-projects/spring-framework

@Test
public void testConverter() throws Exception {
  JmsTemplate template = createTemplate();
  template.setConnectionFactory(this.connectionFactory);
  template.setMessageConverter(new SimpleMessageConverter());
  String s = "Hello world";
  MessageProducer messageProducer = mock(MessageProducer.class);
  TextMessage textMessage = mock(TextMessage.class);
  given(this.session.createProducer(this.queue)).willReturn(messageProducer);
  given(this.session.createTextMessage("Hello world")).willReturn(textMessage);
  template.convertAndSend(this.queue, s);
  verify(messageProducer).send(textMessage);
  verify(messageProducer).close();
  if (useTransactedTemplate()) {
    verify(this.session).commit();
  }
  verify(this.session).close();
  verify(this.connection).close();
}

代码示例来源:origin: kiegroup/jbpm

void receiveAndProcess(Queue queue, EntityManagerFactory entityManagerFactory, long waitTime, int countDown) throws Exception {
  Connection qconnetion = factory.createConnection();
  Session qsession = qconnetion.createSession(true, QueueSession.AUTO_ACKNOWLEDGE);
  MessageConsumer consumer = qsession.createConsumer(queue);
  qconnetion.start();
  consumer.setMessageListener(rec);
  Assertions.assertThat(latch.await(waitTime, TimeUnit.MILLISECONDS)).isTrue();
  consumer.close();            
  qsession.close();            
  qconnetion.close();

代码示例来源:origin: spring-projects/spring-framework

@Test
public void testDestroyClosesConsumersSessionsAndConnectionInThatOrder() throws Exception {
  MessageConsumer messageConsumer = mock(MessageConsumer.class);
  Session session = mock(Session.class);
  // Queue gets created in order to create MessageConsumer for that Destination...
  given(session.createQueue(DESTINATION_NAME)).willReturn(QUEUE_DESTINATION);
  // and then the MessageConsumer gets created...
  given(session.createConsumer(QUEUE_DESTINATION, null)).willReturn(messageConsumer);  // no MessageSelector...
  Connection connection = mock(Connection.class);
  // session gets created in order to register MessageListener...
  given(connection.createSession(this.container.isSessionTransacted(),
      this.container.getSessionAcknowledgeMode())).willReturn(session);
  // and the connection is start()ed after the listener is registered...
  ConnectionFactory connectionFactory = mock(ConnectionFactory.class);
  given(connectionFactory.createConnection()).willReturn(connection);
  this.container.setConnectionFactory(connectionFactory);
  this.container.setDestinationName(DESTINATION_NAME);
  this.container.setMessageListener(new TestMessageListener());
  this.container.afterPropertiesSet();
  this.container.start();
  this.container.destroy();
  verify(messageConsumer).close();
  verify(session).close();
  verify(connection).setExceptionListener(this.container);
  verify(connection).start();
  verify(connection).close();
}

代码示例来源:origin: spring-projects/spring-framework

@Test
public void testTransactionCommitWithMessageProducer() throws JMSException {
  Destination dest = new StubQueue();
  ConnectionFactory cf = mock(ConnectionFactory.class);
  Connection con = mock(Connection.class);
  Session session = mock(Session.class);
  MessageProducer producer = mock(MessageProducer.class);
  final Message message = mock(Message.class);
  given(cf.createConnection()).willReturn(con);
  given(con.createSession(true, Session.AUTO_ACKNOWLEDGE)).willReturn(session);
  given(session.createProducer(dest)).willReturn(producer);
  given(session.getTransacted()).willReturn(true);
  JmsTransactionManager tm = new JmsTransactionManager(cf);
  TransactionStatus ts = tm.getTransaction(new DefaultTransactionDefinition());
  JmsTemplate jt = new JmsTemplate(cf);
  jt.send(dest, new MessageCreator() {
    @Override
    public Message createMessage(Session session) throws JMSException {
      return message;
    }
  });
  tm.commit(ts);
  verify(producer).send(message);
  verify(session).commit();
  verify(producer).close();
  verify(session).close();
  verify(con).close();
}

代码示例来源:origin: apache/activemq

ActiveMQDestination dest = destination.getActiveMQDestination();
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(brokerUrl);
Connection connection = null;
try {
  connection = cf.createConnection(userName, password);
  Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
  MessageProducer producer = session.createProducer(dest);
  ActiveMQTextMessage msg = (ActiveMQTextMessage) session.createTextMessage(body);
  producer.setDeliveryMode(msg.getJMSDeliveryMode());
  producer.setPriority(msg.getPriority());
  long ttl = 0;
  if (msg.getExpiration() != 0) {
  producer.setTimeToLive(ttl > 0 ? ttl : 0);
  producer.send(msg);
    connection.close();

代码示例来源:origin: spring-projects/spring-framework

TemporaryQueue replyDestination = mock(TemporaryQueue.class);
MessageProducer messageProducer = mock(MessageProducer.class);
given(localSession.createProducer(this.queue)).willReturn(messageProducer);
given(localSession.createTemporaryQueue()).willReturn(replyDestination);
given(localSession.createConsumer(replyDestination)).willReturn(messageConsumer);
  given(messageConsumer.receiveNoWait()).willReturn(reply);
  given(messageConsumer.receive()).willReturn(reply);
  given(messageConsumer.receive(timeout)).willReturn(reply);
verify(request).setJMSReplyTo(replyDestination);
assertSame("Reply message not received", reply, message);
verify(this.connection).start();
verify(this.connection).close();
verify(localSession).close();
verify(messageConsumer).close();
verify(messageProducer).close();

代码示例来源:origin: apache/activemq-artemis

private void checkDestination(String name) throws Exception {
 ConnectionFactory cf = (ConnectionFactory) namingContext.lookup("/someCF");
 Connection conn = cf.createConnection();
 Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
 Destination dest = (Destination) namingContext.lookup(name);
 conn.start();
 MessageConsumer cons = sess.createConsumer(dest);
 MessageProducer prod = sess.createProducer(dest);
 prod.send(sess.createMessage());
 assertNotNull(cons.receiveNoWait());
 conn.close();
}

代码示例来源:origin: apache/activemq

@Before
public void setUp() throws Exception {
  brokerService = new BrokerService();
  brokerService.setPersistent(false);
  brokerService.setUseJmx(true);
  String connectionUri = brokerService.addConnector("tcp://localhost:0").getPublishableConnectString();
  brokerService.start();
  brokerService.waitUntilStarted();
  ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(connectionUri);
  final Connection conn = connectionFactory.createConnection();
  try {
    conn.start();
    final Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
    final Destination queue = session.createQueue(testQueueName);
    final Message toSend = session.createMessage();
    toSend.setStringProperty("foo", "bar");
    final MessageProducer producer = session.createProducer(queue);
    producer.send(queue, toSend);
  } finally {
    conn.close();
  }
}

代码示例来源:origin: apache/activemq-artemis

@Test
  public void testCreateTopic() throws Exception {
   conn = cf.createConnection();

   Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);

   Topic topic = createTopic("TestTopic");

   String topicName = topic.getTopicName();

//      assertFalse(topicName.startsWith(ActiveMQDestination.JMS_TOPIC_ADDRESS_PREFIX));

   Topic replyTopic = session.createTopic(topicName);

   MessageConsumer consumer = session.createConsumer(replyTopic);

   conn.start();

   MessageProducer producer = session.createProducer(replyTopic);

   producer.send(session.createMessage());

   assertNotNull(consumer.receive(10000));
  }

代码示例来源:origin: apache/activemq-artemis

private void sendMessage(ConnectionSupplier producerConnectionSupplier, String queueName, int i) throws JMSException {
 try (Connection connection = producerConnectionSupplier.createConnection();
    Session session = connection.createSession();
    MessageProducer producer = session.createProducer(session.createQueue(queueName))) {
   TextMessage message1 = session.createTextMessage();
   message1.setText(Integer.toString(i));
   producer.send(message1);
 }
}

相关文章