io.reactivex.Single.lift()方法的使用及代码示例

x33g5p2x  于2022-01-29 转载在 其他  
字(12.5k)|赞(0)|评价(0)|浏览(111)

本文整理了Java中io.reactivex.Single.lift()方法的一些代码示例,展示了Single.lift()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Single.lift()方法的具体详情如下:
包路径:io.reactivex.Single
类名称:Single
方法名:lift

Single.lift介绍

[英]This method requires advanced knowledge about building operators, please consider other standard composition methods first; Returns a Single which, when subscribed to, invokes the SingleOperator#apply(SingleObserver) method of the provided SingleOperator for each individual downstream Single and allows the insertion of a custom operator by accessing the downstream's SingleObserver during this subscription phase and providing a new SingleObserver, containing the custom operator's intended business logic, that will be used in the subscription process going further upstream.

Generally, such a new SingleObserver will wrap the downstream's SingleObserver and forwards the onSuccess and onError events from the upstream directly or according to the emission pattern the custom operator's business logic requires. In addition, such operator can intercept the flow control calls of dispose and isDisposed that would have traveled upstream and perform additional actions depending on the same business logic requirements.

Example:

// Step 1: Create the consumer type that will be returned by the SingleOperator.apply(): 
public final class CustomSingleObserver<T> implements SingleObserver<T>, Disposable { 
// The downstream's SingleObserver that will receive the onXXX events 
final SingleObserver<? super String> downstream; 
// The connection to the upstream source that will call this class' onXXX methods 
Disposable upstream; 
// The constructor takes the downstream subscriber and usually any other parameters 
public CustomSingleObserver(SingleObserver<? super String> downstream) { 
this.downstream = downstream; 
} 
// In the subscription phase, the upstream sends a Disposable to this class 
// and subsequently this class has to send a Disposable to the downstream. 
// Note that relaying the upstream's Disposable directly is not allowed in RxJava 
@Override 
public void onSubscribe(Disposable s) { 
if (upstream != null) { 
s.cancel(); 
} else { 
upstream = s; 
downstream.onSubscribe(this); 
} 
} 
// The upstream calls this with the next item and the implementation's 
// responsibility is to emit an item to the downstream based on the intended 
// business logic, or if it can't do so for the particular item, 
// request more from the upstream 
@Override 
public void onSuccess(T item) { 
String str = item.toString(); 
if (str.length() < 2) { 
downstream.onSuccess(str); 
} else { 
// Single is usually expected to produce one of the onXXX events 
downstream.onError(new NoSuchElementException()); 
} 
} 
// Some operators may handle the upstream's error while others 
// could just forward it to the downstream. 
@Override 
public void onError(Throwable throwable) { 
downstream.onError(throwable); 
} 
// Some operators may use their own resources which should be cleaned up if 
// the downstream disposes the flow before it completed. Operators without 
// resources can simply forward the dispose to the upstream. 
// In some cases, a disposed flag may be set by this method so that other parts 
// of this class may detect the dispose and stop sending events 
// to the downstream. 
@Override 
public void dispose() { 
upstream.dispose(); 
} 
// Some operators may simply forward the call to the upstream while others 
// can return the disposed flag set in dispose(). 
@Override 
public boolean isDisposed() { 
return upstream.isDisposed(); 
} 
} 
// Step 2: Create a class that implements the SingleOperator interface and 
//         returns the custom consumer type from above in its apply() method. 
//         Such class may define additional parameters to be submitted to 
//         the custom consumer type. 
final class CustomSingleOperator<T> implements SingleOperator<String> { 
@Override 
public SingleObserver<? super String> apply(SingleObserver<? super T> upstream) { 
return new CustomSingleObserver<T>(upstream); 
} 
} 
// Step 3: Apply the custom operator via lift() in a flow by creating an instance of it 
//         or reusing an existing one. 
Single.just(5) 
.lift(new CustomSingleOperator<Integer>()) 
.test() 
.assertResult("5"); 
Single.just(15) 
.lift(new CustomSingleOperator<Integer>()) 
.test() 
.assertFailure(NoSuchElementException.class);

Creating custom operators can be complicated and it is recommended one consults the RxJava wiki: Writing operators page about the tools, requirements, rules, considerations and pitfalls of implementing them.

Note that implementing custom operators via this lift() method adds slightly more overhead by requiring an additional allocation and indirection per assembled flows. Instead, extending the abstract Singleclass and creating a SingleTransformer with it is recommended.

Note also that it is not possible to stop the subscription phase in lift() as the apply() method requires a non-null SingleObserver instance to be returned, which is then unconditionally subscribed to the upstream Single. For example, if the operator decided there is no reason to subscribe to the upstream source because of some optimization possibility or a failure to prepare the operator, it still has to return a SingleObserver that should immediately dispose the upstream's Disposable in its onSubscribe method. Again, using a SingleTransformer and extending the Single is a better option as #subscribeActual can decide to not subscribe to its upstream after all. Scheduler: lift does not operate by default on a particular Scheduler, however, the SingleOperator may use a Scheduler to support its own asynchronous behavior.
[中]这种方法需要先进的建筑操作员的知识,请先考虑其他标准的作曲方法;返回一个Single,当订阅时,该Single将为每个单独的下游Single调用所提供SingleOperator的SingleOperator#apply(SingleObserver)方法,并允许在订阅阶段访问下游的SingleObserver并提供新的SingleObserver,从而插入自定义运算符,包含定制运营商的预期业务逻辑,将在更上游的订阅流程中使用。
通常,这种新的SingleObserver将封装下游的SingleObserver,并直接或根据定制运营商的业务逻辑所需的排放模式转发上游的onSuccess和onError事件。此外,此类操作员可以拦截dispose和isDisposed的流控制调用,这些调用本应向上游移动,并根据相同的业务逻辑要求执行其他操作。
例子:

// Step 1: Create the consumer type that will be returned by the SingleOperator.apply(): 
public final class CustomSingleObserver<T> implements SingleObserver<T>, Disposable { 
// The downstream's SingleObserver that will receive the onXXX events 
final SingleObserver<? super String> downstream; 
// The connection to the upstream source that will call this class' onXXX methods 
Disposable upstream; 
// The constructor takes the downstream subscriber and usually any other parameters 
public CustomSingleObserver(SingleObserver<? super String> downstream) { 
this.downstream = downstream; 
} 
// In the subscription phase, the upstream sends a Disposable to this class 
// and subsequently this class has to send a Disposable to the downstream. 
// Note that relaying the upstream's Disposable directly is not allowed in RxJava 
@Override 
public void onSubscribe(Disposable s) { 
if (upstream != null) { 
s.cancel(); 
} else { 
upstream = s; 
downstream.onSubscribe(this); 
} 
} 
// The upstream calls this with the next item and the implementation's 
// responsibility is to emit an item to the downstream based on the intended 
// business logic, or if it can't do so for the particular item, 
// request more from the upstream 
@Override 
public void onSuccess(T item) { 
String str = item.toString(); 
if (str.length() < 2) { 
downstream.onSuccess(str); 
} else { 
// Single is usually expected to produce one of the onXXX events 
downstream.onError(new NoSuchElementException()); 
} 
} 
// Some operators may handle the upstream's error while others 
// could just forward it to the downstream. 
@Override 
public void onError(Throwable throwable) { 
downstream.onError(throwable); 
} 
// Some operators may use their own resources which should be cleaned up if 
// the downstream disposes the flow before it completed. Operators without 
// resources can simply forward the dispose to the upstream. 
// In some cases, a disposed flag may be set by this method so that other parts 
// of this class may detect the dispose and stop sending events 
// to the downstream. 
@Override 
public void dispose() { 
upstream.dispose(); 
} 
// Some operators may simply forward the call to the upstream while others 
// can return the disposed flag set in dispose(). 
@Override 
public boolean isDisposed() { 
return upstream.isDisposed(); 
} 
} 
// Step 2: Create a class that implements the SingleOperator interface and 
//         returns the custom consumer type from above in its apply() method. 
//         Such class may define additional parameters to be submitted to 
//         the custom consumer type. 
final class CustomSingleOperator<T> implements SingleOperator<String> { 
@Override 
public SingleObserver<? super String> apply(SingleObserver<? super T> upstream) { 
return new CustomSingleObserver<T>(upstream); 
} 
} 
// Step 3: Apply the custom operator via lift() in a flow by creating an instance of it 
//         or reusing an existing one. 
Single.just(5) 
.lift(new CustomSingleOperator<Integer>()) 
.test() 
.assertResult("5"); 
Single.just(15) 
.lift(new CustomSingleOperator<Integer>()) 
.test() 
.assertFailure(NoSuchElementException.class);

创建自定义运算符可能会很复杂,建议您参考{$0$}页面,了解实现这些运算符的工具、要求、规则、注意事项和陷阱。
请注意,通过这个lift()方法实现自定义运算符会增加一些开销,因为每个组装的流需要额外的分配和间接寻址。相反,建议扩展抽象Singleclass并使用它创建SingleTransformer。
还要注意的是,在lift()中无法停止订阅阶段,因为apply()方法需要返回一个非空的SingleObserver实例,然后该实例将无条件订阅给上游的SingleObserver实例。例如,如果运营商由于某种优化可能性或运营商准备失败而决定没有理由订阅上游源,那么它仍然必须返回一个单独的观察者,该观察者应立即在其onSubscribe方法中处理上游的一次性资源。同样,使用SingleTransformer并扩展Single是一个更好的选择,因为#subscribeActual最终可能会决定不订阅其上游。调度程序:默认情况下,lift不会在特定的调度程序上运行,但是,SingleOperator可以使用调度程序来支持自己的异步行为。

代码示例

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void liftNull() {
  just1.lift(null);
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void liftFunctionReturnsNull() {
  just1.lift(new SingleOperator<Object, Integer>() {
    @Override
    public SingleObserver<? super Integer> apply(SingleObserver<? super Object> observer) {
      return null;
    }
  }).blockingGet();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
  public void normal() {

    Single.just(1).lift(new SingleOperator<Integer, Integer>() {
      @Override
      public SingleObserver<Integer> apply(final SingleObserver<? super Integer> observer) throws Exception {
        return new SingleObserver<Integer>() {

          @Override
          public void onSubscribe(Disposable d) {
            observer.onSubscribe(d);
          }

          @Override
          public void onSuccess(Integer value) {
            observer.onSuccess(value + 1);
          }

          @Override
          public void onError(Throwable e) {
            observer.onError(e);
          }
        };
      }
    })
    .test()
    .assertResult(2);
  }
}

代码示例来源:origin: resilience4j/resilience4j

@Test
public void shouldEmitAllEvents() {
  Single.just(1)
    .lift(BulkheadOperator.of(bulkhead))
    .test()
    .assertResult(1);
  assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(1);
}

代码示例来源:origin: resilience4j/resilience4j

@Test
public void shouldPropagateError() {
  Single.error(new IOException("BAM!"))
    .lift(BulkheadOperator.of(bulkhead))
    .test()
    .assertSubscribed()
    .assertError(IOException.class)
    .assertNotComplete();
  assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(1);
}

代码示例来源:origin: resilience4j/resilience4j

@Test
  public void shouldReleaseBulkheadOnlyOnce() {
    Single.just(Arrays.asList(1, 2, 3))
      .lift(BulkheadOperator.of(bulkhead))
      .flatMapObservable(Observable::fromIterable)
      .take(2) //this with the previous line triggers an extra dispose
      .test()
      .assertResult(1, 2);

    assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(1);
  }
}

代码示例来源:origin: resilience4j/resilience4j

@Test
public void shouldEmitErrorWithBulkheadFullException() {
  bulkhead.isCallPermitted();
  Single.just(1)
    .lift(BulkheadOperator.of(bulkhead))
    .test()
    .assertSubscribed()
    .assertError(BulkheadFullException.class)
    .assertNotComplete();
  assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(0);
}

代码示例来源:origin: resilience4j/resilience4j

@Test
public void shouldPropagateError() {
  Single.error(new IOException("BAM!"))
    .lift(CircuitBreakerOperator.of(circuitBreaker))
    .test()
    .assertSubscribed()
    .assertError(IOException.class)
    .assertNotComplete();
  assertSingleFailedCall();
}

代码示例来源:origin: resilience4j/resilience4j

@Test
public void shouldPropagateError() {
  Single.error(new IOException("BAM!"))
    .lift(RateLimiterOperator.of(rateLimiter))
    .test()
    .assertSubscribed()
    .assertError(IOException.class)
    .assertNotComplete();
  assertSinglePermitUsed();
}

代码示例来源:origin: resilience4j/resilience4j

@Test
public void shouldEmitAllEvents() {
  Single.just(1)
    .lift(CircuitBreakerOperator.of(circuitBreaker))
    .test()
    .assertResult(1);
  assertSingleSuccessfulCall();
}

代码示例来源:origin: resilience4j/resilience4j

@Test
public void shouldEmitEvent() {
  Single.just(1)
    .lift(RateLimiterOperator.of(rateLimiter))
    .test()
    .assertResult(1);
  assertSinglePermitUsed();
}

代码示例来源:origin: resilience4j/resilience4j

@Test
public void shouldEmitErrorWithRequestNotPermittedException() {
  saturateRateLimiter();
  Single.just(1)
    .lift(RateLimiterOperator.of(rateLimiter))
    .test()
    .assertSubscribed()
    .assertError(RequestNotPermitted.class)
    .assertNotComplete();
  assertNoPermitLeft();
}

代码示例来源:origin: resilience4j/resilience4j

@Test
public void shouldEmitErrorWithCircuitBreakerOpenException() {
  circuitBreaker.transitionToOpenState();
  Single.just(1)
    .lift(CircuitBreakerOperator.of(circuitBreaker))
    .test()
    .assertSubscribed()
    .assertError(CircuitBreakerOpenException.class)
    .assertNotComplete();
  assertNoRegisteredCall();
}

代码示例来源:origin: com.salesforce.servicelibs/rxgrpc-stub

.lift(new SubscribeOnlyOnceSingleOperator<TResponse>());
} catch (Throwable throwable) {
  return Single.error(throwable);

代码示例来源:origin: com.salesforce.servicelibs/rxgrpc-stub

rxProducerStreamObserver.rxSubscribe();
    }).lift(new SubscribeOnlyOnceSingleOperator<TResponse>());
} catch (Throwable throwable) {
  return Single.error(throwable);

相关文章

微信公众号

最新文章

更多