RXJava is an extremely useful streaming framework (here is an example application using it for parallel processing of restful calls to both uber and lyft (RT_UBER_NYC_TAXI)). However, In this post, I will cover how you can reactively stream and process a CSV file.
Firstly, you can create a Flowable of CSVRecord (commons-csv) by converting iterator to Flowable using the call Flowable.fromIterable(). Next, we want this to be safe resource usage i.e. we don't want to leave open file handles, so we use the resource safe Flowable.using(Callable<? extends D> resourceSupplier, Function<? super D,? extends Publisher<? extends T>> sourceSupplier, Consumer<? super D> resourceDisposer) method call, where the last argument is a resource disposer.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | public static Flowable<CSVRecord> readRecordsFromFile(Path inputFile,CSVFormat csvFormat) { return Flowable.using(() -> Files.newBufferedReader(inputFile), bufferedReader -> csvRecordFlowable(bufferedReader, csvFormat.withHeader()), BufferedReader::close ); } private static Flowable<CSVRecord> csvRecordFlowable(BufferedReader br, CSVFormat csvFormat) { try{ final CSVParser csvParser = new CSVParser(br,csvFormat); return Flowable.fromIterable(() -> csvParser.iterator()); } catch (IOException e){ throw new RuntimeException(e); } } |
This nicely sets up a Flowable<CSVRecord> which can then be processed in different ways and you get all the Flowable features like backpressure, etc. Example usage mentioned below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 | readRecordsFromFile(Paths.get("sample.csv")).parallel(). runOn(Schedulers.io()).filter(/*filter here*/).map(/*map here*/).sequential().subscribe(new Subscriber<CSVRecord>(){ Subscription sub = null; @Override public void onSubscribe(Subscription s){ sub = s; } @Override public void onNext(CSVRecord record){ //do something here //request next item sub.request(1) } @Override public void onError(Throwable t){ //handle error } @Override public void onComplete(){ } }) |