maven Azure服务总线队列正在锁定Java 8程序中的消息或队列

r6hnlfcb  于 8个月前  发布在  Maven
关注(0)|答案(1)|浏览(110)

我有两个Azure服务总线队列:队列1和队列2。我想用Java在这两个队列之间复制或移动消息。我在Azure的Maven文件中使用这些依赖项:

<dependency>
            <groupId>com.azure</groupId>
            <artifactId>azure-messaging-servicebus</artifactId>
            <version>7.13.2</version>
        </dependency>
        <dependency>
            <groupId>com.microsoft.azure</groupId>
            <artifactId>azure</artifactId>
            <version>1.41.4</version>
        </dependency>

我使用以下代码创建我的Azure receiverClient和senderClient:

this.receiverClient = new ServiceBusClientBuilder()
                        .connectionString(props.getProperty("connection-string"))
                        .receiver()
                        .queueName(props.getProperty("queue1"))
                        .receiveMode(ServiceBusReceiveMode.PEEK_LOCK)
                        .buildClient();
            
            this.senderClient = new ServiceBusClientBuilder()
                    .connectionString(props.getProperty("connection-string"))
                    .sender()
                    .queueName(props.getProperty("queue2"))
                    .buildClient();

我使用一个函数将消息从queue 1复制或移动到queue 2,如下所示:

public void readMessagesFromQueue() {
        //"number of messages" is 1500 and the "duration-read-queue"  is 100
        IterableStream<ServiceBusReceivedMessage> messages = receiverClient.receiveMessages(Integer.parseInt(props.getProperty("number of messages")), 
                Duration.ofSeconds(Long.parseLong(props.getProperty("duration-read-queue"))));

        for (ServiceBusReceivedMessage serviceBusReceivedMessage : messages) {

            String messageBody = serviceBusReceivedMessage.getBody().toString();
            Map<String, Object> mapQueueProperties = serviceBusReceivedMessage.getApplicationProperties();
            
            ServiceBusMessage serviceBusMessage = new ServiceBusMessage(messageBody);
            serviceBusMessage.getApplicationProperties().putAll(mapQueueProperties);

            //write the message to the local file system as a file and also the propertys
            writeMessageToFile(serviceBusMessage.getBody().toString(), fileTeller);
            writePropertiesToFile(serviceBusMessage.getApplicationProperties(), fileTeller);

            senderClient.sendMessage(serviceBusMessage);
            
            //if it is a move from queue to queue the original message must be deleted
            //if it is a copy from queue to queue the original message must NOT be deleted
            if (props.getProperty("copy").trim().startsWith("move")) {
                receiverClient.complete(serviceBusReceivedMessage);
            } else if (props.getProperty("copy").trim().startsWith("copy")) {
                receiverClient.abandon(serviceBusReceivedMessage);
            }

        }
        
        receiverClient.close();
        senderClient.close();
    }

当我运行程序时,有时会出错。例如,当我将1300条消息从队列1移动到队列2时,它运行得很好。但是,当我在一分钟左右的时间内将消息从队列2复制回队列1时,它会给出如下错误:
交货不在接收链接上。2023-08-10 07:51:34错误MqWorkService:230 - com.azure.messaging. servebus.ServiceBusException:交货不在接收链接上。2023-08-10 07:51:34错误MqWorkService:230 -位于com.azure.messaging.servicebus.ServiceBusReceiverAsyncClient.serviceda$updateDisposition$52(ServiceBusReceiverAsyncClient.java:1478)
错误发生在以下代码行:

receiverClient.complete(serviceBusReceivedMessage);

当我等待5分钟左右,然后有时复制可以发生,有时我需要等待10分钟或15分钟。
所以这是一个非常不可靠的排队系统。
我们使用标准的Azure服务总线许可证。在Azure服务总线门户中创建的队列的消息锁定持续时间为10秒。
另外,当Java程序在成功处理消息后关闭时,应用程序将在一分钟后退出。我可以使用这样的东西:

System.exit(0)

更快的退出?
我做错了什么,为什么我会犯这些错误?
我想排队的时候一定有锁,但是为什么呢?
在错误中,还引用了ServiceBusReceiverAsync客户端。但是在我的代码中,我只使用了ServiceBusReceiverClient,而没有使用Async。

kuarbcqp

kuarbcqp1#

问题是我试图一次处理大量的消息。因此,我重新构建了解决方案,使其每批最多处理100条消息。谢谢肖恩的支持。

相关问题