使用消息队列处理rest请求

jhkqcmku  于 2021-06-24  发布在  Storm
关注(0)|答案(1)|浏览(529)

我有两份申请如下:
spring启动应用程序—充当rest端点,将请求发布到消息队列。(Apache脉冲星)
heron(storm)拓扑—处理从消息队列(pulsar)接收的消息,并具有所有处理逻辑。
根据我的需求,我需要通过springboot应用程序为不同的用户查询提供服务,该应用程序将查询发送到消息队列,并在spout处使用。一旦spout和bolt处理了请求,bolt就会再次发布一条消息。来自bolt的响应在springboot(consumer)处理,并回复用户请求。具体如下:

为了服务于同一个请求,我现在正在内存中缓存deferredresult对象(我为发送到topology的每条消息设置了一个reqid,还维护了一个key,value对),当消息到达时,我解析请求id并将结果设置为deferedresult(我知道这是一个糟糕的设计,应该如何解决这个问题?)。
在这种情况下,从拓扑接收到的消息顺序不是连续的(因为is处理的每个请求都需要自己的时间,并且producer bolt在接收到一个请求时将触发on响应),我如何继续为相同的请求提供响应。
我有点被这个设计卡住了,不能再继续下去了。

//Controller
public DeferredResult<ResponseEntity<?>> process(//someinput) {
    DeferredResult<ResponseEntity<?>> result = new DeferredResult<>(config.getTimeout());
    CompletableFuture<String> serviceResponse = service.processAsync(inputSource);
    serviceResponse.whenComplete((response, exception) -> {
        if (!ObjectUtils.isEmpty(exception))
            result.setErrorResult(//error);
        else
            result.setResult(//complete);
    });
    return result;
}

//In Service
public CompletableFuture processAsync(//input){
    producer.send(input);
    CompletableFuture result = new CompletableFuture();
    //consumer has a listener as shown below
    //**I want to avoid below line, how can I redesign this**
    map.put(id, result);
  return result;
}

//in same service, a listener is present for consumer for reading the messages
consumerListener(Message msg){
     int reqID = msg.getRequestID();
     map.get(reqID).complete(msg.getData);
}

如上图所示,当我得到一条消息时,我就得到completablefuture对象并设置结果,它内部调用defferred result对象并将响应返回给用户。

2skhul33

2skhul331#

在这种情况下,从拓扑接收到的消息顺序不是连续的(因为is处理的每个请求都需要自己的时间,并且producer bolt在接收到一个请求时将触发on响应),我如何继续为相同的请求提供响应。
听起来像是在寻找相关标识符消息传递模式。大致来说,您可以计算/创建一个附加到发送到pulsar的消息的标识符,并安排heron将该标识符从它接收的请求复制到它发送的响应。
因此,当您的spring引导组件在步骤5使用pulsar的消息时,您将相关id与正确的http请求匹配,并返回结果。
据我所知,使用原始requestid()作为相关标识符应该可以。
为了服务于同一个请求,我现在正在内存中缓存deferredresult对象(我为发送到topology的每条消息设置了一个reqid,还维护了一个key,value对),当消息到达时,我解析请求id并将结果设置为deferedresult(我知道这是一个糟糕的设计,应该如何解决这个问题?)。
最终,你可能会在某种程度上做到这一点;也就是说,第5步中的消费者将使用相关id来查找生产者存储的内容。试图通过四个不同的流程边界传递原始请求可能会以眼泪告终。
更一般的形式是存储回调,而不是 CompletableFuture ,在Map上;但在这种情况下,回调可能只是完成了未来。
我想在设计中仔细检查的一件事是:您希望确保第5步的消费者在消息到达之前看到它应该使用的未来。换句话说,在某个地方应该有一个before内存屏障,以确保步骤5中的Map查找不会失败。

相关问题