io.reactivex.subjects.Subject.share()方法的使用及代码示例

x33g5p2x  于2022-01-30 转载在 其他  
字(1.6k)|赞(0)|评价(0)|浏览(80)

本文整理了Java中io.reactivex.subjects.Subject.share()方法的一些代码示例,展示了Subject.share()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Subject.share()方法的具体详情如下:
包路径:io.reactivex.subjects.Subject
类名称:Subject
方法名:share

Subject.share介绍

暂无

代码示例

代码示例来源:origin: akarnokd/akarnokd-misc

public static void main(String[] args) {
    
    Subject<Integer> rp = PublishSubject.<Integer>create().toSerialized();

    Observable<Integer> share = rp.share();

    int numPrevious = 2;
    
    share.buffer(numPrevious, 1)
    .filter(b -> b.size() < numPrevious || b.get(numPrevious - 1) >= 3)
    .map(v -> v.get(0))
    .subscribe(System.out::println);
    
    Ix.range(1, 10).foreach(rp::onNext);
    rp.onComplete();
  }
}

代码示例来源:origin: akarnokd/akarnokd-misc

@Test
public void test4() throws Exception {
  TestScheduler sch = new TestScheduler();
  Subject<Long> subject = PublishSubject.create();
  Observable<Long> initialObservable = subject.share()
  .map(value -> {
    System.out.println("Received value " + value);
    //new Exception().printStackTrace(System.out);
    return value;
  });
  Observable<Long> timeoutObservable = initialObservable.map(value -> {
    System.out.println("Timeout received value " + value);
    return value;
  });
  TestObserver<Long> subscriber = new TestObserver<>();
  initialObservable
  .doOnDispose(() -> { 
    System.out.println("Unsubscribed"); 
    new Exception().printStackTrace(System.out);
  })
  .timeout(1, TimeUnit.SECONDS, sch, timeoutObservable).subscribe(subscriber);
  subject.onNext(5L);
  sch.advanceTimeBy(2, TimeUnit.SECONDS);
  subject.onNext(10L);
  subject.onComplete();
  subscriber.awaitTerminalEvent();
  subscriber.assertNoErrors();
  subscriber.assertValues(5L, 10L);
}

相关文章