Asynchronous DB data retrieval and asynchronously posting it to Queue using RxJava

I am testing an operation where I read data from DB and sent it asynchronously while I also post the received data to a Queue asynchronously using RxJava. I have somewhat ( dunno if this will work ) accomplished this using mock techniques but I just want to know if this will actually work in a real life scenario. Another thing about this code is that I need to sleep my main thread for sometime inorder to view the results. But I want to know how this can be achieved in realtime.

This is my code:

package rxJava;  import java.util.ArrayList; import java.util.List;  import rx.Observable; import rx.Observer; import rx.Subscriber; import rx.schedulers.Schedulers;  public class ParallelTest {      static int extractCount = 0;     static int pushCount = 0;     static int errorCount = 0;      public static void main(String[] args) throws InterruptedException {          Observable<String> resultsFromDB = getFromDB();          resultsFromDB.subscribe(new Observer<String>() {              @Override             public void onCompleted() {                 System.out.println("DB retrieval completed.");             }              @Override             public void onError(Throwable e) {                 e.printStackTrace();             }              @Override             public void onNext(String t) {                 publishToQueue(t).subscribe(new Observer<Boolean>() {                      @Override                     public void onCompleted() {                         System.out.println("Publish " + t + " complete.");                      }                      @Override                     public void onError(Throwable e) {                      }                      @Override                     public void onNext(Boolean b) {                         if (b) {                             System.out                                     .println("Successfully pushed:" + t + " using " + Thread.currentThread().getName());                             pushCount++;                         } else {                             System.out.println("Failed to push:" + t + " using " + Thread.currentThread().getName());                             errorCount++;                         }                     }                 });             }         });          Thread.sleep(20000);      }      protected static Observable<Boolean> publishToQueue(String t) {         return Observable.defer(() -> {             try {                 if (t.contains("7")) {                     Thread.sleep(1000);                 }                 System.out.println("Publishing:: " + t);             } catch (InterruptedException e) {                 e.printStackTrace();             }              return Observable.just(true).subscribeOn(Schedulers.io());         });     }      private static Observable<String> getFromDB() {          return Observable.create((Subscriber<? super String> s) -> {             // simulate latency             List<String> list = new ArrayList<String>();             try {                 for (int i = 0; i < 100; i++) {                     list.add(Integer.toString(i));                 }                 Thread.sleep(1000);                 System.out.println("DB op complete.");             } catch (Exception e) {                 s.onError(e);             }             for (String st : list) {                 extractCount++;                 s.onNext(st);             }             s.onCompleted();         }).subscribeOn(Schedulers.io());     }  } 

Please tell me if this will work in data flow on a large scale such as 40000 records in an hour.

Replay

Category: java Time: 2016-07-30 Views: 3

Related post

iOS development

Android development

Python development

JAVA development

Development language

PHP development

Ruby development

search

Front-end development

Database

development tools

Open Platform

Javascript development

.NET development

cloud computing

server

Copyright (C) avrocks.com, All Rights Reserved.

processed in 0.126 (s). 12 q(s)