The following code shows how to use RxJava Observable to create service , and how to call it asynchronously and synchronously in client side.
public static void main(final String[] args) throws InterruptedException {
Observable<Void> observable = createAsycService();
asyncObservableClient(observable);
observable = createAsycService();
syncObservableClient(observable);
LOGGER.info("main-wait for some time");
Thread.sleep(1000 * 10);
LOGGER.info("main-finished");
}
private static Observable<Void> createAsycService() {
final Observable<Void> observable = Observable.<Void>create(new OnSubscribe<Void>() {
@Override
public void call(final Subscriber<? super Void> subscriber) {
LOGGER.info("service-started");
try {
Thread.sleep(1000 * 2);
} catch (final InterruptedException e) {
e.printStackTrace();
}
LOGGER.info("service-finished");
// we should always call onNext(), so sync client can handle it easily.
subscriber.onNext(null);
// tell observer we are done
subscriber.onCompleted();
}
}).subscribeOn(Schedulers.newThread());
return observable;
}
private static void syncObservableClient(final Observable<Void> observable) {
LOGGER.info("sync-client-started");
final BlockingObservable<Void> blockingObservable = observable.timeout(5, TimeUnit.SECONDS).toBlocking();
blockingObservable.last();
LOGGER.info("sync-client-finished");
}
private static void asyncObservableClient(final Observable<Void> observable) {
observable.subscribe(new Subscriber<Void>() {
@Override
public void onCompleted() {
LOGGER.info("async-client-finished");
}
@Override
public void onError(final Throwable e) {
LOGGER.info("async-client:" + e.getMessage());
}
@Override
public void onNext(final Void t) {
LOGGER.info("async-client-received:" + t);
}
});
}
Resources: