如何从com.ibm.mq.mqexception恢复:mqje001:完成代码“2”,原因“2009”

33qvvth1  于 2021-07-23  发布在  Java
关注(0)|答案(1)|浏览(583)

我正在编写一个消费者来读取来自mq的分段消息。我对其他队列使用了springjms/spring集成。我知道ibmq不支持jms中的消息分段:(这里是相关问题)。如何在spring集成中组装mq消息段)
下面是我为java和spring提供的使用IBMMQ类的方法。
mq对象的bean定义。

@Bean
    public MQGetMessageOptions mqGetMessageOptions() {
        MQGetMessageOptions getOptions = new MQGetMessageOptions();
        getOptions.waitInterval = CMQC.MQWI_UNLIMITED;
        getOptions.options = CMQC.MQGMO_WAIT + CMQC.MQGMO_ALL_SEGMENTS_AVAILABLE + CMQC.MQGMO_LOGICAL_ORDER
                + CMQC.MQGMO_COMPLETE_MSG;
        return getOptions;
    }

    @Bean
    public MQQueueManager mqQueueManager() throws Exception {
        Hashtable<String, Object> properties = new Hashtable<String, Object>();
        properties.put(CMQC.CHANNEL_PROPERTY, channel);
        properties.put(CMQC.HOST_NAME_PROPERTY, hostName);
        properties.put(CMQC.PORT_PROPERTY, new Integer(port));
        MQQueueManager qMgr = new MQQueueManager(queueManager, properties);
        return qMgr;
    }

    @Bean
    public MQQueue inboundQueue(@Autowired MQQueueManager mqQueueManager) throws Exception {
        int openOptions = CMQC.MQOO_INPUT_EXCLUSIVE;
        MQQueue inboundQueue = mqQueueManager.accessQueue(inboundQueue, openOptions);
        return inboundQueue;
    }

    @Bean
    public MessageChannel queueConsumerChannel() {
        // return new DirectChannel();
        return new ExecutorChannel(Executors.newFixedThreadPool(5));
    }

消费者代码:

@Component
@Slf4j
public class MyQueueConsumer {

    @Autowired
    MQQueueManager qMgr;

    @Autowired
    MQGetMessageOptions mqGetMessageOptions;

    @Autowired
    MQQueue inboundQueue;

    @Autowired
    MessageChannel queueConsumerChannel;

    @Autowired
    MessageSaveService messageSaveService;

    @EventListener(ApplicationReadyEvent.class)
    public void consume() {
        boolean getMore = true;
        MQMessage receiveMsg = null;
        while (getMore) {
            try {
                receiveMsg = new MQMessage();
                log.info("Waiting to consume mesages from ....");
                inboundQueue.get(receiveMsg, mqGetMessageOptions);
                byte[] b = new byte[receiveMsg.getMessageLength()];
                receiveMsg.readFully(b);
                String fileName = getFileName();
                Message<String> outMessage = MessageBuilder.withPayload(new String(b)).build();
                queueConsumerChannel.send(outMessage);
                log.info("Message consumed and sent to processng channel");
                // qMgr.commit();
            } catch (MQException e) {
                if ((e.completionCode == CMQC.MQCC_WARNING) && (e.reasonCode == CMQC.MQRC_NO_MSG_AVAILABLE)) {
                    log.error("Bottom of the queue reached.");

                    getMore = false;
                } else {
                    log.error("MQRead CC=" + e.completionCode + " : RC=" + e.reasonCode + " : EC=" + e.getErrorCode());
                    log.info("Is Connected :" + qMgr.isConnected());
                    log.info("Is open : " + qMgr.isOpen());
                    e.printStackTrace();
                    getMore = false;
                }
            } 
        }

    }

    @PreDestroy
    public void closeMQObjects() {
        System.out.println("Closing MQ objects ");
        try {
            if (inboundQueue != null)
                inboundQueue.close();
        } catch (MQException e) {
            System.err.println("MQRead CC=" + e.completionCode + " : RC=" + e.reasonCode);
            e.printStackTrace();
        }
        try {
            if (qMgr != null)
                qMgr.disconnect();
        } catch (MQException e) {
            System.err.println("MQRead CC=" + e.completionCode + " : RC=" + e.reasonCode);
            e.printStackTrace();
        }
    }
}

通过此配置,使用者可以按需工作,它将所有分段的消息组合起来,作为一条完整的消息读取,并在队列中等待下一条消息到达。但我面临的挑战是我明白了 com.ibm.mq.MQException: MQJE001: Completion Code '2', Reason '2009' 每隔一段时间。我把旗子弄错了,让它从while循环中出来。到目前为止,我们还不能找出这个例外的确切原因。如何从该异常中恢复并继续等待队列并在消息到达时使用它们?我使用spring和ibmq的方法有什么缺陷吗

puruo6ea

puruo6ea1#

而不是定义 inboundQueue 作为bean,在while循环中分配它并 close 当您成功地结束循环或遇到可重试的异常时,它会自动关闭。
很遗憾,我不能给你一个可重试例外的清单,但是 2009 绝对是其中之一。
延迟重试是一种很好的做法,例如,请参见“指数后退”。

相关问题