io.reactivex.Single.create()方法的使用及代码示例

x33g5p2x  于2022-01-29 转载在 其他  
字(10.0k)|赞(0)|评价(0)|浏览(201)

本文整理了Java中io.reactivex.Single.create()方法的一些代码示例,展示了Single.create()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Single.create()方法的具体详情如下:
包路径:io.reactivex.Single
类名称:Single
方法名:create

Single.create介绍

[英]Provides an API (via a cold Completable) that bridges the reactive world with the callback-style world.

Example:

Single.<Event>create(emitter -> { 
Callback listener = new Callback() { 
@Override 
public void onEvent(Event e) { 
emitter.onSuccess(e); 
} 
@Override 
public void onFailure(Exception e) { 
emitter.onError(e); 
} 
}; 
AutoCloseable c = api.someMethod(listener); 
emitter.setCancellable(c::close); 
});

Scheduler: create does not operate by default on a particular Scheduler.
[中]提供一个API(通过一个cold Completable),将反应式世界与回调式世界连接起来。
例子:

Single.<Event>create(emitter -> { 
Callback listener = new Callback() { 
@Override 
public void onEvent(Event e) { 
emitter.onSuccess(e); 
} 
@Override 
public void onFailure(Exception e) { 
emitter.onError(e); 
} 
}; 
AutoCloseable c = api.someMethod(listener); 
emitter.setCancellable(c::close); 
});

调度程序:默认情况下,创建不会在特定调度程序上运行。

代码示例

代码示例来源:origin: pwittchen/ReactiveNetwork

@Override public Single<Boolean> checkInternetConnectivity(final String host, final int port,
  final int timeoutInMs, final int httpResponse, final ErrorHandler errorHandler) {
 checkGeneralPreconditions(host, port, timeoutInMs, httpResponse, errorHandler);
 return Single.create(new SingleOnSubscribe<Boolean>() {
  @Override public void subscribe(@NonNull SingleEmitter<Boolean> emitter) {
   emitter.onSuccess(isConnected(host, port, timeoutInMs, httpResponse, errorHandler));
  }
 });
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void createConsumerThrowsOnError() {
  Single.create(new SingleOnSubscribe<Object>() {
    @Override
    public void subscribe(SingleEmitter<Object> s) throws Exception {
      try {
        s.onError(new IOException());
        fail("Should have thrown");
      } catch (TestException ex) {
        // expected
      }
    }
  })
  .subscribe(new SingleObserver<Object>() {
    @Override
    public void onSubscribe(Disposable d) {
    }
    @Override
    public void onSuccess(Object value) {
    }
    @Override
    public void onError(Throwable e) {
      throw new TestException();
    }
  });
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void createConsumerThrows() {
  Single.create(new SingleOnSubscribe<Object>() {
    @Override
    public void subscribe(SingleEmitter<Object> s) throws Exception {
      try {
        s.onSuccess(1);
        fail("Should have thrown");
      } catch (TestException ex) {
        // expected
      }
    }
  })
  .subscribe(new SingleObserver<Object>() {
    @Override
    public void onSubscribe(Disposable d) {
    }
    @Override
    public void onSuccess(Object value) {
      throw new TestException();
    }
    @Override
    public void onError(Throwable e) {
    }
  });
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void nullArgument() {
  Single.create(null);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void createConsumerThrowsResourceOnError() {
  Single.create(new SingleOnSubscribe<Object>() {
    @Override
    public void subscribe(SingleEmitter<Object> s) throws Exception {
      Disposable d = Disposables.empty();
      s.setDisposable(d);
      try {
        s.onError(new IOException());
        fail("Should have thrown");
      } catch (TestException ex) {
        // expected
      }
      assertTrue(d.isDisposed());
    }
  })
  .subscribe(new SingleObserver<Object>() {
    @Override
    public void onSubscribe(Disposable d) {
    }
    @Override
    public void onSuccess(Object value) {
    }
    @Override
    public void onError(Throwable e) {
      throw new TestException();
    }
  });
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void createConsumerThrowsResource() {
  Single.create(new SingleOnSubscribe<Object>() {
    @Override
    public void subscribe(SingleEmitter<Object> s) throws Exception {
      Disposable d = Disposables.empty();
      s.setDisposable(d);
      try {
        s.onSuccess(1);
        fail("Should have thrown");
      } catch (TestException ex) {
        // expected
      }
      assertTrue(d.isDisposed());
    }
  })
  .subscribe(new SingleObserver<Object>() {
    @Override
    public void onSubscribe(Disposable d) {
    }
    @Override
    public void onSuccess(Object value) {
      throw new TestException();
    }
    @Override
    public void onError(Throwable e) {
    }
  });
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void dispose() {
  TestHelper.checkDisposed(Single.create(new SingleOnSubscribe<Object>() {
    @Override
    public void subscribe(SingleEmitter<Object> s) throws Exception {
      s.onSuccess(1);
    }
  }));
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void createNullSuccess() {
  Single.create(new SingleOnSubscribe<Object>() {
    @Override
    public void subscribe(SingleEmitter<Object> s) throws Exception {
      s.onSuccess(null);
      s.onSuccess(null);
    }
  })
  .test()
  .assertFailure(NullPointerException.class);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
  public void emitterHasToString() {
    Single.create(new SingleOnSubscribe<Object>() {
      @Override
      public void subscribe(SingleEmitter<Object> emitter) throws Exception {
        assertTrue(emitter.toString().contains(SingleCreate.Emitter.class.getSimpleName()));
      }
    }).test().assertEmpty();
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void createCallbackThrows() {
  Single.create(new SingleOnSubscribe<Object>() {
    @Override
    public void subscribe(SingleEmitter<Object> s) throws Exception {
      throw new TestException();
    }
  })
  .test()
  .assertFailure(TestException.class);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void createNullError() {
  Single.create(new SingleOnSubscribe<Object>() {
    @Override
    public void subscribe(SingleEmitter<Object> s) throws Exception {
      s.onError(null);
      s.onError(null);
    }
  })
  .test()
  .assertFailure(NullPointerException.class);
}

代码示例来源:origin: ReactiveX/RxJava

@SuppressWarnings("unchecked")
  @Test
  public void noSubsequentSubscriptionIterable() {
    final int[] calls = { 0 };

    Single<Integer> source = Single.create(new SingleOnSubscribe<Integer>() {
      @Override
      public void subscribe(SingleEmitter<Integer> s) throws Exception {
        calls[0]++;
        s.onSuccess(1);
      }
    });

    Single.concat(Arrays.asList(source, source)).firstElement()
    .test()
    .assertResult(1);

    assertEquals(1, calls[0]);
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void tryOnError() {
  List<Throwable> errors = TestHelper.trackPluginErrors();
  try {
    final Boolean[] response = { null };
    Single.create(new SingleOnSubscribe<Object>() {
      @Override
      public void subscribe(SingleEmitter<Object> e) throws Exception {
        e.onSuccess(1);
        response[0] = e.tryOnError(new TestException());
      }
    })
    .test()
    .assertResult(1);
    assertFalse(response[0]);
    assertTrue(errors.toString(), errors.isEmpty());
  } finally {
    RxJavaPlugins.reset();
  }
}

代码示例来源:origin: spring-projects/spring-framework

@Test
public void deferredResultSubscriberWithError() throws Exception {
  IllegalStateException ex = new IllegalStateException();
  // Mono
  MonoProcessor<String> mono = MonoProcessor.create();
  testDeferredResultSubscriber(mono, Mono.class, forClass(String.class), () -> mono.onError(ex), ex);
  // RxJava 1 Single
  AtomicReference<SingleEmitter<String>> ref = new AtomicReference<>();
  Single<String> single = Single.fromEmitter(ref::set);
  testDeferredResultSubscriber(single, Single.class, forClass(String.class), () -> ref.get().onError(ex), ex);
  // RxJava 2 Single
  AtomicReference<io.reactivex.SingleEmitter<String>> ref2 = new AtomicReference<>();
  io.reactivex.Single<String> single2 = io.reactivex.Single.create(ref2::set);
  testDeferredResultSubscriber(single2, io.reactivex.Single.class, forClass(String.class),
      () -> ref2.get().onError(ex), ex);
}

代码示例来源:origin: line/armeria

private static Single<String> single(String input) {
  RequestContext.current();
  return Single.create(emitter -> {
    RequestContext.current();
    emitter.onSuccess(input);
  });
}

代码示例来源:origin: ReactiveX/RxJava

@SuppressWarnings("unchecked")
@Test
public void noSubsequentSubscription() {
  final int[] calls = { 0 };
  Single<Integer> source = Single.create(new SingleOnSubscribe<Integer>() {
    @Override
    public void subscribe(SingleEmitter<Integer> s) throws Exception {
      calls[0]++;
      s.onSuccess(1);
    }
  });
  Single.concatArray(source, source).firstElement()
  .test()
  .assertResult(1);
  assertEquals(1, calls[0]);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void basicWithError() {
  final Disposable d = Disposables.empty();
  Single.<Integer>create(new SingleOnSubscribe<Integer>() {
    @Override
    public void subscribe(SingleEmitter<Integer> e) throws Exception {
      e.setDisposable(d);
      e.onError(new TestException());
      e.onSuccess(2);
      e.onError(new TestException());
    }
  })
  .test()
  .assertFailure(TestException.class);
  assertTrue(d.isDisposed());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void basic() {
  final Disposable d = Disposables.empty();
  Single.<Integer>create(new SingleOnSubscribe<Integer>() {
    @Override
    public void subscribe(SingleEmitter<Integer> e) throws Exception {
      e.setDisposable(d);
      e.onSuccess(1);
      e.onError(new TestException());
      e.onSuccess(2);
      e.onError(new TestException());
    }
  })
  .test()
  .assertResult(1);
  assertTrue(d.isDisposed());
}

代码示例来源:origin: spring-projects/spring-framework

@Test
public void deferredResultSubscriberWithOneValue() throws Exception {
  // Mono
  MonoProcessor<String> mono = MonoProcessor.create();
  testDeferredResultSubscriber(mono, Mono.class, forClass(String.class), () -> mono.onNext("foo"), "foo");
  // Mono empty
  MonoProcessor<String> monoEmpty = MonoProcessor.create();
  testDeferredResultSubscriber(monoEmpty, Mono.class, forClass(String.class), monoEmpty::onComplete, null);
  // RxJava 1 Single
  AtomicReference<SingleEmitter<String>> ref = new AtomicReference<>();
  Single<String> single = Single.fromEmitter(ref::set);
  testDeferredResultSubscriber(single, Single.class, forClass(String.class),
      () -> ref.get().onSuccess("foo"), "foo");
  // RxJava 2 Single
  AtomicReference<io.reactivex.SingleEmitter<String>> ref2 = new AtomicReference<>();
  io.reactivex.Single<String> single2 = io.reactivex.Single.create(ref2::set);
  testDeferredResultSubscriber(single2, io.reactivex.Single.class, forClass(String.class),
      () -> ref2.get().onSuccess("foo"), "foo");
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void basicWithCancellable() {
  final Disposable d1 = Disposables.empty();
  final Disposable d2 = Disposables.empty();
  Single.<Integer>create(new SingleOnSubscribe<Integer>() {
    @Override
    public void subscribe(SingleEmitter<Integer> e) throws Exception {
      e.setDisposable(d1);
      e.setCancellable(new Cancellable() {
        @Override
        public void cancel() throws Exception {
          d2.dispose();
        }
      });
      e.onSuccess(1);
      e.onError(new TestException());
      e.onSuccess(2);
      e.onError(new TestException());
    }
  })
  .test()
  .assertResult(1);
  assertTrue(d1.isDisposed());
  assertTrue(d2.isDisposed());
}

相关文章

微信公众号

最新文章

更多