Go Reactive - RxJava Example


The following code shows how to use RxJava Observable to create service , and how to call it asynchronously and synchronously in client side.
    public static void main(final String[] args) throws InterruptedException {
        Observable<Void> observable = createAsycService();
        asyncObservableClient(observable);

        observable = createAsycService();
        syncObservableClient(observable);

        LOGGER.info("main-wait for some time");
        Thread.sleep(1000 * 10);
        LOGGER.info("main-finished");
    }

    private static Observable<Void> createAsycService() {
        final Observable<Void> observable = Observable.<Void>create(new OnSubscribe<Void>() {
            @Override
            public void call(final Subscriber<? super Void> subscriber) {
                LOGGER.info("service-started");
                try {
                    Thread.sleep(1000 * 2);
                } catch (final InterruptedException e) {
                    e.printStackTrace();
                }
                LOGGER.info("service-finished");
                // we should always call onNext(), so sync client can handle it easily.
                subscriber.onNext(null);
                // tell observer we are done
                subscriber.onCompleted();
            }
        }).subscribeOn(Schedulers.newThread());
        return observable;
    }

    private static void syncObservableClient(final Observable<Void> observable) {
        LOGGER.info("sync-client-started");

        final BlockingObservable<Void> blockingObservable = observable.timeout(5, TimeUnit.SECONDS).toBlocking();
        blockingObservable.last();
        LOGGER.info("sync-client-finished");
    }

    private static void asyncObservableClient(final Observable<Void> observable) {
        observable.subscribe(new Subscriber<Void>() {
            @Override
            public void onCompleted() {
                LOGGER.info("async-client-finished");
            }

            @Override
            public void onError(final Throwable e) {
                LOGGER.info("async-client:" + e.getMessage());
            }

            @Override
            public void onNext(final Void t) {
                LOGGER.info("async-client-received:" + t);
            }
        });
    }
Resources:

Labels

adsense (5) Algorithm (69) Algorithm Series (35) Android (7) ANT (6) bat (8) Big Data (7) Blogger (14) Bugs (6) Cache (5) Chrome (19) Code Example (29) Code Quality (7) Coding Skills (5) Database (7) Debug (16) Design (5) Dev Tips (63) Eclipse (32) Git (5) Google (33) Guava (7) How to (9) Http Client (8) IDE (7) Interview (88) J2EE (13) J2SE (49) Java (186) JavaScript (27) JSON (7) Learning code (9) Lesson Learned (6) Linux (26) Lucene-Solr (112) Mac (10) Maven (8) Network (9) Nutch2 (18) Performance (9) PowerShell (11) Problem Solving (11) Programmer Skills (6) regex (5) Scala (6) Security (9) Soft Skills (38) Spring (22) System Design (11) Testing (7) Text Mining (14) Tips (17) Tools (24) Troubleshooting (29) UIMA (9) Web Development (19) Windows (21) xml (5)