我有两个Azure服务总线队列:队列1和队列2。我想用Java在这两个队列之间复制或移动消息。我在Azure的Maven文件中使用这些依赖项:
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-messaging-servicebus</artifactId>
<version>7.13.2</version>
</dependency>
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure</artifactId>
<version>1.41.4</version>
</dependency>
我使用以下代码创建我的Azure receiverClient和senderClient:
this.receiverClient = new ServiceBusClientBuilder()
.connectionString(props.getProperty("connection-string"))
.receiver()
.queueName(props.getProperty("queue1"))
.receiveMode(ServiceBusReceiveMode.PEEK_LOCK)
.buildClient();
this.senderClient = new ServiceBusClientBuilder()
.connectionString(props.getProperty("connection-string"))
.sender()
.queueName(props.getProperty("queue2"))
.buildClient();
我使用一个函数将消息从queue 1复制或移动到queue 2,如下所示:
public void readMessagesFromQueue() {
//"number of messages" is 1500 and the "duration-read-queue" is 100
IterableStream<ServiceBusReceivedMessage> messages = receiverClient.receiveMessages(Integer.parseInt(props.getProperty("number of messages")),
Duration.ofSeconds(Long.parseLong(props.getProperty("duration-read-queue"))));
for (ServiceBusReceivedMessage serviceBusReceivedMessage : messages) {
String messageBody = serviceBusReceivedMessage.getBody().toString();
Map<String, Object> mapQueueProperties = serviceBusReceivedMessage.getApplicationProperties();
ServiceBusMessage serviceBusMessage = new ServiceBusMessage(messageBody);
serviceBusMessage.getApplicationProperties().putAll(mapQueueProperties);
//write the message to the local file system as a file and also the propertys
writeMessageToFile(serviceBusMessage.getBody().toString(), fileTeller);
writePropertiesToFile(serviceBusMessage.getApplicationProperties(), fileTeller);
senderClient.sendMessage(serviceBusMessage);
//if it is a move from queue to queue the original message must be deleted
//if it is a copy from queue to queue the original message must NOT be deleted
if (props.getProperty("copy").trim().startsWith("move")) {
receiverClient.complete(serviceBusReceivedMessage);
} else if (props.getProperty("copy").trim().startsWith("copy")) {
receiverClient.abandon(serviceBusReceivedMessage);
}
}
receiverClient.close();
senderClient.close();
}
当我运行程序时,有时会出错。例如,当我将1300条消息从队列1移动到队列2时,它运行得很好。但是,当我在一分钟左右的时间内将消息从队列2复制回队列1时,它会给出如下错误:
交货不在接收链接上。2023-08-10 07:51:34错误MqWorkService:230 - com.azure.messaging. servebus.ServiceBusException:交货不在接收链接上。2023-08-10 07:51:34错误MqWorkService:230 -位于com.azure.messaging.servicebus.ServiceBusReceiverAsyncClient.serviceda$updateDisposition$52(ServiceBusReceiverAsyncClient.java:1478)
错误发生在以下代码行:
receiverClient.complete(serviceBusReceivedMessage);
当我等待5分钟左右,然后有时复制可以发生,有时我需要等待10分钟或15分钟。
所以这是一个非常不可靠的排队系统。
我们使用标准的Azure服务总线许可证。在Azure服务总线门户中创建的队列的消息锁定持续时间为10秒。
另外,当Java程序在成功处理消息后关闭时,应用程序将在一分钟后退出。我可以使用这样的东西:
System.exit(0)
更快的退出?
我做错了什么,为什么我会犯这些错误?
我想排队的时候一定有锁,但是为什么呢?
在错误中,还引用了ServiceBusReceiverAsync客户端。但是在我的代码中,我只使用了ServiceBusReceiverClient,而没有使用Async。
1条答案
按热度按时间kuarbcqp1#
问题是我试图一次处理大量的消息。因此,我重新构建了解决方案,使其每批最多处理100条消息。谢谢肖恩的支持。