Skip to content

Commit

Permalink
Merge branch 'Bertrand-expose_reactive_streams_consume_each'
Browse files Browse the repository at this point in the history
  • Loading branch information
zigzago committed Feb 18, 2019
2 parents d176beb + 0b54eed commit 6ee4ee3
Show file tree
Hide file tree
Showing 8 changed files with 15 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ val <T> AggregatePublisher<T>.coroutine: CoroutineAggregatePublisher<T>
/**
* Coroutine wrapper around [AggregatePublisher].
*/
class CoroutineAggregatePublisher<T>(val publisher: AggregatePublisher<T>) :
class CoroutineAggregatePublisher<T>(override val publisher: AggregatePublisher<T>) :
CoroutinePublisher<T>(publisher) {
/**
* Enables writing to temporary files. A null value indicates that it's unspecified.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ val <T> ChangeStreamPublisher<T>.coroutine: CoroutineChangeStreamPublisher<T>
/**
* Coroutine wrapper around [ChangeStreamPublisher].
*/
class CoroutineChangeStreamPublisher<TResult>(val publisher: ChangeStreamPublisher<TResult>) :
class CoroutineChangeStreamPublisher<TResult>(override val publisher: ChangeStreamPublisher<TResult>) :
CoroutinePublisher<ChangeStreamDocument<TResult>>(publisher) {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ val <T> DistinctPublisher<T>.coroutine: CoroutineDistinctPublisher<T>
/**
* Coroutine wrapper around [CoroutineDistinctPublisher].
*/
class CoroutineDistinctPublisher<T>(val publisher: DistinctPublisher<T>) :
class CoroutineDistinctPublisher<T>(override val publisher: DistinctPublisher<T>) :
CoroutinePublisher<T>(publisher) {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ val <T> FindPublisher<T>.coroutine: CoroutineFindPublisher<T>
/**
* Coroutine wrapper around [CoroutineFindPublisher].
*/
class CoroutineFindPublisher<T>(val publisher: FindPublisher<T>) :
class CoroutineFindPublisher<T>(override val publisher: FindPublisher<T>) :
CoroutinePublisher<T>(publisher) {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ val <T> ListCollectionsPublisher<T>.coroutine: CoroutineListCollectionsPublisher
/**
* Coroutine wrapper around [ListCollectionsPublisher].
*/
class CoroutineListCollectionsPublisher<TResult>(val publisher: ListCollectionsPublisher<TResult>) :
class CoroutineListCollectionsPublisher<TResult>(override val publisher: ListCollectionsPublisher<TResult>) :
CoroutinePublisher<TResult>(publisher) {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ val <T> ListIndexesPublisher<T>.coroutine: CoroutineListIndexesPublisher<T>
/**
* Coroutine wrapper around [CoroutineListIndexesPublisher].
*/
class CoroutineListIndexesPublisher<T>(val publisher: ListIndexesPublisher<T>) :
class CoroutineListIndexesPublisher<T>(override val publisher: ListIndexesPublisher<T>) :
CoroutinePublisher<T>(publisher) {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ val <T> MapReducePublisher<T>.coroutine: CoroutineMapReducePublisher<T>
/**
* Coroutine wrapper around [MapReducePublisher].
*/
class CoroutineMapReducePublisher<T>(val publisher: MapReducePublisher<T>) :
class CoroutineMapReducePublisher<T>(override val publisher: MapReducePublisher<T>) :
CoroutinePublisher<T>(publisher) {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,17 @@ suspend fun <T> Publisher<T>.toList(): List<T> {
/**
* Coroutine wrapper around [Publisher].
*/
open class CoroutinePublisher<T>(private val publisher: Publisher<T>) {
open class CoroutinePublisher<T>(open val publisher: Publisher<T>) {

/**
* Provides a list of not null elements from the publisher.
*/
suspend fun toList(): List<T> = publisher.toList()

/**
* iterates over all elements from the publisher
*/
suspend fun consumeEach(action: suspend (T) -> kotlin.Unit) {
publisher.consumeEach { action(it) }
}
}

0 comments on commit 6ee4ee3

Please sign in to comment.