Skip to content

Reactive Tasks : reactive streams based operators

johnmcclean-aol edited this page Nov 21, 2016 · 2 revisions

ForEachWithError and ForEachEvent

forEachWithError

The forEachWithErrors operator allows users to iterate over a Stream providing a consumer for the elements for the Stream a lá Stream.forEach, and a consumer for the errors produced while processing the Stream.

forEachWithError
List<Integer> list = new ArrayList<>();
Throwable error = null;
public String load(int i){
   if(i==2)
     throw new RuntimeException();

}
LazyFutureStream.of(1,2,3,4)
         .map(this::load)
         .forEachWithError(  i->list.add(i), e->error=e);
		

//list =List[1,3,4]
//error = RuntimeException
```java

### forEachEvent



The forEachEvent operator is similar to forEachWithErrors but also accepts a Runnable that is run when the Stream has been completely consumed.

#### forEachEvent with a LazyFutureStream

```java
Closeable resource;
List<Integer> list = new ArrayList<>();
Throwable error = null;
public String load(int i){
   if(i==2)
     throw new RuntimeException();

}
LazyFutureStream.of(1,2,3,4)
         .map(this::load)
         .forEachEvent(  i->list.add(i), 
                         logger::error,
                         ()->resource.close());
		

//list =List[1,3,4]
//runtime exception logged
//resource is closed

forEachX

forEachX allows users to consume only a specified amount of data from the Stream, returning a reactive-streams Subscription object that in turn allows more data to be consumed as needed.

forEachX with a JDK Stream

List<Integer> list = new ArrayList<>();
Subscription s = StreamUtils.forEachX(Stream.of(1,2,3), 2,  i->list.add(i));
assertThat(list,hasItems(1,2));
assertThat(list.size(),equalTo(2));

s.request(1); //request an additional iterm from the Stream be processed.

assertThat(list,hasItems(1,2,3));
assertThat(list.size(),equalTo(3));

forEachXWithError

forEachXWithErrors allows users to consume only a specified amount of data from the Stream, returning a reactive-streams Subscription object that in turn allows more data to be consumed as needed. The forEachXWithErrors operator allows users to iterate over a Stream providing a consumer for the elements for the Stream a lá Stream.forEach, and a consumer for the errors produced while processing the Stream.

forEachWithError with a LazyFutureStream

List<Integer> list = new ArrayList<>();
Throwable error = null;
public String load(int i){
   if(i==2)
     throw new RuntimeException();

}
Subscription s = LazyFutureStream.of(1,2,3,4)
                                 .map(this::load)
                                 .forEachXWithError( 2, i->list.add(i), e->error=e);
		

//list =List[1]
//error = RuntimeException

s.request(1);

//list =List[1,3]

s.request(1);

//list =List[1,3,4]

forEachXEvents

forEachXEvents allows users to consume only a specified amount of data from the Stream, returning a reactive-streams Subscription object that in turn allows more data to be consumed as needed. The forEachXEvents operator is similar to forEachXWithErrors but also accepts a Runnable that is run when the Stream has been completely consumed.

ForEachXEvents with a SequenceM

List<Integer> list = new ArrayList<>();
Throwable error = null;
Closeable resource;
public String load(int i){
   if(i==2)
     throw new RuntimeException();

}
Subscription s = LazyFutureStream.of(1,2,3,4)
                          .map(this::load)
                          .forEachXEvents( 2, i->list.add(i),   logger::error,
                           ()->resource.close());;
		

//list =List[1]
//error = RuntimeException
// resource open

s.request(1);

//list =List[1,3]

s.request(1);

//list =List[1,3,4]

s.request(1); //no new elements end of Stream

//list =List[1,3,4]
// resource closed

Reactive Future Operations & Reactive Tasks

The reactive-streams based terminal operations can also be launched asynchronously, first by using the futureOperations operator to provide an Executor that will process the Stream.

Using futureOperations

The futureOperations operator opens up a world of asynchronously executed terminal operations. A large range of terminal operations are provided and for each one a CompletbableFuture is returned.

using FutureOperatons

Executor exec = Executors.newFixedThreadPool(1);
FutureOperations<Integer> terminalOps  = LazyFutureStream.of(1,2,3).futureOperations(exec);


//execute the collection & Stream evaluation  on the provided executor
CompletableFuture<List<Integer>> futureList = terminalOps.collect(Collectors.toList());

List<Integer> result  = list.join();

ReactiveTask

Each of the async Future Operations for reactive-streams (forEachX, forEachEvent etc), return a ReactiveTask object. This allows users to check the status of Stream processing, to cancel it, to request more elements to be processed from the Stream either synchronously or asynchronously.

using FutureOperatons

List<Integer> list = new ArrayList<>();
ReactiveTask s = LazyFutureStream.of(1,2,3)
						  .futureOperations(exec)
						  .forEachX( 2,  i->list.add(i));
//wait until first 2 elements are processed
s.block();
		
//list = List[1,2]

//trigger the remainder of the Stream processing asynchronously
ReactiveTask nextElements = s.requestAllAsync();


//if we wait until it completes
//nextElements.block();
//list = List[1,2,3]
Clone this wiki locally