使用来自mq的消息并在SpringJMS中合并

dvtswwa3  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(310)

我使用spring+mq+websphereapplicationserver。
我希望异步地使用来自mq的消息,并将这些消息组合起来,以便在每次提交时将n个实体持久化到数据库(而不会给我的目标oracle数据库带来太多的压力)
我使用defaultmessagelistenercontainer,使onmessage方法同步以添加消息(直到批大小),并创建线程以等待条件满足(时间/大小),并将消息推送到另一个执行业务逻辑和db持久化的线程。
线程开始的条件:
一旦onmessage方法中的第一条消息到达,线程就必须等待1000毫秒内收到25条消息,如果1000毫秒内没有收到25条消息,它会将可用数量的消息推送到另一个线程。
问题:
我看到线程只在服务器启动期间启动,而不是在第一次调用onmessage方法时启动。
有什么建议/其他方法可以实现从队列中收集消息吗?
应用程序上下文.xml

<bean id="myMessageListener" class="org.mypackage.MyMessageListener">

<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="connectionFactory"/>
    <property name="destinationName" ref="queue"/>
    <property name="messageListener" ref="myMessageListener"/>
    <property name ="concurrentConsumers" value ="10"/>
    <property name ="maxConcurrentConsumers" value ="50"/>        
</bean>

侦听器:

package org.mypackage.MyMessageListener;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;

import org.mypackage.service.MyService;

public class MyMessageListener implements MessageListener {

    private volatile long startTime = 0;
    private volatile int messageCount;
    private volatile List<String> messagesFromQueue = null;

    private int batchSize = 25;
    private long maximumBatchWaitTime = 1000;

    @Autowired
    private MyService myService;

    Thread thread = new Thread(new Runnable() {
        @Override
        public void run() {
            boolean threadRun = true;
                while (threadRun) {
                    try {
                        Thread.sleep(100);
                    } catch (InterruptedException e) {
                        System.out.println("InterruptedException is caught inside run method");
                    }
                    if ((messageCount >0 && messageCount == batchSize)) {
                        System.out.println("----Batch size Reached----");
                        threadRun = false;
                        processMsgsFromQueue(messagesFromQueue);
                    } else {

                        if (maximumBatchWaitTime > (System.currentTimeMillis() - startTime)) {
                              System.out.println("----Time limit is not reached----");
                              threadRun = true;
                        } else {
                              threadRun = false;
                              System.out.println("----Time limit is reached----");
                              processMsgsFromQueue(messagesFromQueue);
                        }
                    }
               }
          }
      });

    {
       thread.start();
    }

    @Override
    public synchronized void onMessage(Message message) {
        if (messageCount == 0) {
            startTime = System.currentTimeMillis();
            messagesFromQueue = new ArrayList<String>();
            System.out.println("----First Message Arrived at----"+startTime);
        }
        try {
            messageCount++;
            TextMessage tm = (TextMessage) message;
            String msg = tm.getText();
            messagesFromQueue.add(msg);

            if (messageCount == 0) {
                thread.start();
            }

        } catch (JMSException e1) {
             e1.printStackTrace();
        }
    }

    private void processMsgsFromQueue(List<String> messageFromQueue) {
       System.out.println("Inside processMsgsFromQueue");
       messageCount = 0;
       messagesFromQueue =  null;
       if(!messageFromQueue.isEmpty()) {
        this.myService.insertMsgsFromQueueToDB(messageFromQueue);
       }
   }
}
ss2ws0br

ss2ws0br1#

您还需要同步对来自队列的消息的访问。

List messagesFromQueue = Collections.synchronizedList(new ArrayList());
      ...
  synchronized (messagesFromQueue) {
      Iterator i = messagesFromQueue.iterator(); // Must be in synchronized block
      while (i.hasNext())
      ...
  }

https://docs.oracle.com/javase/7/docs/api/java/util/collections.html#synchronizedlist(java.util.list)
每次调用ProcessMsgFromQueue时,您都会有一个nullpointerexception!!

private void processMsgsFromQueue(List<String> messageFromQueue) {
       System.out.println("Inside processMsgsFromQueue");
       messageCount = 0;
       messagesFromQueue =  null;
       if(!messageFromQueue.isEmpty()/*messageFromQueue is null!!*/) {
        this.myService.insertMsgsFromQueueToDB(messageFromQueue);
       }
   }

最好持久化消息,当commit正常时,清除列表并重置计数器。

package org.mypackage.MyMessageListener;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;

import org.mypackage.service.MyService;

public class MyMessageListener implements MessageListener {

    private volatile long startTime = 0;
    private volatile int messageCount;
    private volatile List<String> messagesFromQueue = null;

    private int batchSize = 25;
    private long maximumBatchWaitTime = 1000;

    @Autowired
    private MyService myService;

    Thread thread = new Thread(new Runnable() {
        @Override
        public void run() {
            boolean threadRun = true;
                while (threadRun) {
                    try {
                        Thread.sleep(100);
                    } catch (InterruptedException e) {
                        System.out.println("InterruptedException is caught inside run method");
                    }
                    if ((messageCount >0 && messageCount == batchSize)) {
                        System.out.println("----Batch size Reached----");
                        threadRun = false;
                        processMsgsFromQueue(messagesFromQueue);
                    } else {

                        if (maximumBatchWaitTime > (System.currentTimeMillis() - startTime)) {
                              System.out.println("----Time limit is not reached----");
                              threadRun = true;
                        } else {
                              threadRun = false;
                              System.out.println("----Time limit is reached----");
                              processMsgsFromQueue(messagesFromQueue);
                        }
                    }
               }
          }
      });

    @Override
    public synchronized void onMessage(Message message) {
        if (messageCount == 0) {
            startTime = System.currentTimeMillis();
            messagesFromQueue = new ArrayList<String>();
            System.out.println("----First Message Arrived at----"+startTime);
        }
        try {
            messageCount++;
            TextMessage tm = (TextMessage) message;
            String msg = tm.getText();
            messagesFromQueue.add(msg);

            if (thread.getState() == Thread.State.NEW) {
                thread.start();
            }

        } catch (JMSException e1) {
             e1.printStackTrace();
        }
    }

    private void processMsgsFromQueue(List<String> messageFromQueue) {
       System.out.println("Inside processMsgsFromQueue");
       if(!messageFromQueue.isEmpty()) {
        this.myService.insertMsgsFromQueueToDB(messageFromQueue);
       }
       messageCount = 0;
       messagesFromQueue =  null;
   }
}

相关问题