diff --git a/CHANGELOG.md b/CHANGELOG.md index 515f0ed..e26b1f1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,7 @@ +## 0.0.15 + +- Add `whereType`. + ## 0.0.14+1 - Allow using non-dev Dart 2 SDK. diff --git a/README.md b/README.md index 5e54457..89ce15d 100644 --- a/README.md +++ b/README.md @@ -63,3 +63,7 @@ being a real subscriber. # throttle Blocks events for a duration after an event is successfully emitted. + +# whereType + +Like `Iterable.whereType` for a stream. diff --git a/lib/src/where_type.dart b/lib/src/where_type.dart new file mode 100644 index 0000000..96eb394 --- /dev/null +++ b/lib/src/where_type.dart @@ -0,0 +1,54 @@ +// Copyright (c) 2018, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:async'; + +/// Emits only the events which have type [R]. +/// +/// If the source stream is a broadcast stream the result will be as well. +/// +/// Errors from the source stream are forwarded directly to the result stream. +/// +/// The static type of the returned transformer takes `Null` so that it can +/// satisfy the subtype requirements for the `stream.transform()` argument on +/// any source stream. The argument to `bind` has been broadened to take +/// `Stream` since it will never be passed a `Stream` at runtime. +/// This is safe to use on any source stream. +/// +/// [R] should be a subtype of the stream's generic type, otherwise nothing of +/// type [R] could possibly be emitted, however there is no static or runtime +/// checking that this is the case. +StreamTransformer whereType() => _WhereType(); + +class _WhereType extends StreamTransformerBase { + @override + Stream bind(Stream source) { + var controller = source.isBroadcast + ? StreamController.broadcast(sync: true) + : StreamController(sync: true); + + StreamSubscription subscription; + controller.onListen = () { + if (subscription != null) return; + subscription = source.listen( + (value) { + if (value is R) controller.add(value); + }, + onError: controller.addError, + onDone: () { + subscription = null; + controller.close(); + }); + if (!source.isBroadcast) { + controller.onPause = subscription.pause; + controller.onResume = subscription.resume; + } + controller.onCancel = () { + subscription?.cancel(); + subscription = null; + }; + }; + return controller.stream; + } +} diff --git a/lib/stream_transform.dart b/lib/stream_transform.dart index 13b80a4..bddb744 100644 --- a/lib/stream_transform.dart +++ b/lib/stream_transform.dart @@ -19,3 +19,4 @@ export 'src/switch.dart'; export 'src/take_until.dart'; export 'src/tap.dart'; export 'src/throttle.dart'; +export 'src/where_type.dart'; diff --git a/pubspec.yaml b/pubspec.yaml index b32a050..a29a467 100644 --- a/pubspec.yaml +++ b/pubspec.yaml @@ -2,7 +2,7 @@ name: stream_transform description: A collection of utilities to transform and manipulate streams. author: Dart Team homepage: https://www.github.com/dart-lang/stream_transform -version: 0.0.15-dev +version: 0.0.15 environment: sdk: ">=2.1.0 <3.0.0" diff --git a/test/where_type_test.dart b/test/where_type_test.dart new file mode 100644 index 0000000..2c6d7ed --- /dev/null +++ b/test/where_type_test.dart @@ -0,0 +1,50 @@ +// Copyright (c) 2018, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:async'; + +import 'package:test/test.dart'; + +import 'package:stream_transform/stream_transform.dart'; + +void main() { + test('forwards only events that match the type', () async { + var values = Stream.fromIterable([1, 'a', 2, 'b']); + var filtered = values.transform(whereType()); + expect(await filtered.toList(), ['a', 'b']); + }); + + test('can result in empty stream', () async { + var values = Stream.fromIterable([1, 2, 3, 4]); + var filtered = values.transform(whereType()); + expect(await filtered.isEmpty, true); + }); + + test('forwards values to multiple listeners', () async { + var values = StreamController.broadcast(); + var filtered = values.stream.transform(whereType()); + var firstValues = []; + var secondValues = []; + filtered..listen(firstValues.add)..listen(secondValues.add); + values..add(1)..add('a')..add(2)..add('b'); + await Future(() {}); + expect(firstValues, ['a', 'b']); + expect(secondValues, ['a', 'b']); + }); + + test('closes streams with multiple listeners', () async { + var values = StreamController.broadcast(); + var filtered = values.stream.transform(whereType()); + var firstDone = false; + var secondDone = false; + filtered + ..listen(null, onDone: () => firstDone = true) + ..listen(null, onDone: () => secondDone = true); + values.add(1); + values.add('a'); + await values.close(); + expect(firstDone, true); + expect(secondDone, true); + }); +}