java—如何获取在flux中导致异常的元素?

1szpjjfi  于 2021-07-03  发布在  Java
关注(0)|答案(2)|浏览(543)

假设我有一个ID数组: [9, 8, 7, 6] .
我做了一些处理,一个元素导致抛出一个异常。我想用我自己的方式处理这种情况(比如说记录),让其他元素随波逐流。
我怎么知道是哪一个?我需要在我的生活中有这个元素 onError 处理。

Flux.fromArray(myArray)
  .flatMap(element -> {
    var foo = processMyEl(element);  
    return anotherProcess(foo); // this returns Mono
  })
  .onErrorOperator(element -> handleMyError(element)) // this line is what I need

所以,我看到的,这几乎是美好的 .onErrorContinue((error, obj) -> 它发出一个错误和一个对象。
但是这个 obj 不是 element 导致异常的对象。它发生在我的处理方法中,不必每次都是同一类型的对象。 .onErrorReturn(...) -不是我想要的 .doOnError(error -> -没有我的元素的信息 .onErrorResume(error -> -同上
有人建议我可以创建自己的异常并将元素传递到那里,然后从异常中检索它。但是我应该如何抛出异常呢?
我是不是应该用一种古老的方法来试抓:

Flux.fromArray(myArray)
  .flatMap(el -> {
    try {
      var foo = processMyEl(el);  
      return anotherProcess(foo); // this returns Mono
    } catch (Exception e) {
      return Mono.error(new MyException(el));
     }
    })
  .onErrorOperator(error -> handleMyError(error.getElement()))

看起来不太好
编辑:
不仅看起来不好,而且也不管用。异常根本没有被捕获,而是直接触发 doOnTerminate() 阻止了整条河流
更新:
多亏了我用的@jey .onErrorResume() 内部 flatMap .
我还通过 Mono.defer(() -> Mono.just(processMyEl(el))) .
注意:使用 Mono.defer() 允许我使用 onErrorResumeMono.just() 无法发送错误信号。
最终代码如下所示:

Flux.fromArray(myArray)
    .flatMap(element -> Mono.defer(() -> Mono.just(processMyEl(element)))
        .onErrorResume(th -> handleMyError(element, th))
    )
    .flatMap(foo -> anotherProcess(foo)
        .onErrorResume(th -> handleMyError(foo, th)
    )

哪里:

private Mono<> handleMyError(el, th) {
  // handling code
  return Mono.empty()
}
ccgok5k5

ccgok5k51#

根据@kamil的要求,我将添加我的评论作为答案:
您应该只处理flatmap中的错误,并返回mono.empty()以丢弃它,方法如下:

Flux.fromArray(myArray)
  .flatMap(el -> anotherProcess(processMyEl(el)).onErrorResume(th -> handleError(th, el))

处理错误如下:

Mono<Void> handleError(Throwable th, Object element) {
    LOG.error("An error occurred on {}", element, th);
    return Mono.empty()
}

或者如果您想做一些需要异步的更复杂的事情:

Mono<Void> handleError(Throwable th, Object element) {
    return doSomethingThaReturnFluxOrMono(element).then();
}
xlpyo6sf

xlpyo6sf2#

} catch (Exception e) {
    throw new MyException(el, e);
}

相关问题