Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add whereType transformer #60

Merged
merged 7 commits into from
Feb 12, 2019
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
## 0.0.15

- Add `whereType`.

## 0.0.14+1

- Allow using non-dev Dart 2 SDK.
Expand Down
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,3 +63,7 @@ being a real subscriber.
# throttle

Blocks events for a duration after an event is successfully emitted.

# whereType

Like `Iterable.whereType` for a stream.
52 changes: 52 additions & 0 deletions lib/src/where_type.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// Copyright (c) 2018, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.

import 'dart:async';

/// Emits only the events which have type [R].
///
/// If the source stream is a broadcast stream the result will be as well.
///
/// Errors from the source stream are forwarded directly to the result stream.
///
/// The static type of the returned transformer takes `Null` so that it can
/// satisfy the subtype requirements for `stream.transform()` argument on any
/// source Stream. The argument to `bind` has been broaded to take
/// `Stream<Object>` since it never be passed a `Stream<Null>` at runtime. This
/// is safe to use on any source stream and there is no static or runtime
/// checking that [R] is sensible - that is that is a subtype of the stream's
/// type such that some values of that type may be possible.
StreamTransformer<Null, R> whereType<R>() => _WhereType<R>();

class _WhereType<R> extends StreamTransformerBase<Null, R> {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The null is a code smell here, what does it represent? can it also be generic, or should it maybe be Object?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I see you commented about this, I don't fully understand the issue here but I can check out the repo and try it out which would probably make it more obvious.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See some discussion here:
dart-lang/language#180 (comment)

concrete scenario:

  var numbers = Stream<num>.fromIterable([1, 1.5, 2]);
  var ints = numbers.transform(whereType<int>());

In this case the transform excepts an argument assignable to StreamTransformer<num, S>. Since the whereType call doesn't have a way to statically know the num there, it either needs us to be explicit about it (numbers.transform(whereType<num, int>()), or to use some default that can fill in for any T that may have been on the stream. Null is the only possibility there for now, I'm not sure if we've figured out what the user referenceable bottom type will be after NNBD...

Copy link

@jakemac53 jakemac53 Feb 12, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would at least leave a lengthy doc comment here, its going to be pretty bizarre for anybody reading this in the future so some context would certainly help :).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a note to the doc comment, how does that look to you? I think for further details the git blame should point here which will have extra detail.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think what you added is fine

@override
Stream<R> bind(Stream<Object> values) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe inputs instead of values? values is fairly ambiguous

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This matches the naming pattern used elsewhere in the package.

Stream<T> bind(Stream<S> values) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think its actually a good name though?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's about as good as anything else. A value or event is what is emitted by a stream when we don't know anything more specific about it, and I generally like to name a stream as the plural of whatever flows through it. I don't like the name inputs because I wouldn't call the thing emitted by the stream an input - I wouldn't name the argument to the listen callback that.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't like the name inputs because I wouldn't call the thing emitted by the stream an input

In the case of a stream transformer I would disagree - it is transforming one stream into another stream, so it makes sense to me to consider one the "inputs" stream and one the "outputs" stream.

If I see the word "values" in the middle of the method its not clear what that represents. Is that the outputs or the inputs? But if I see "inputs" I can immediately assume its the input stream.

In any case I don't care enough to withhold an lgtm, but I do think "inputs" is better :).

I wouldn't name the argument to the listen callback that.

Right, because listen doesn't transform anything, it only listens for values and there is only one stream in scope. With a transformer there are two streams in the scope of the bind method.

var controller = values.isBroadcast
? StreamController<R>.broadcast(sync: true)
: StreamController<R>(sync: true);

StreamSubscription<Object> subscription;
controller.onListen = () {
if (subscription != null) return;
subscription = values.listen(
(value) {
if (value is R) controller.add(value);
},
onError: controller.addError,
onDone: () {
subscription = null;
controller.close();
});
if (!values.isBroadcast) {
controller.onPause = subscription.pause;
controller.onResume = subscription.resume;
}
controller.onCancel = () {
subscription?.cancel();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this used to return this value, not sure if it matters or not

Copy link
Member Author

@natebosch natebosch Feb 12, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I'm not sure either. There was some weirdness with an analyzer hint around this at one point that could have been why I had it in the fromHandlers implementation, and I copied most of that here.

subscription = null;
};
};
return controller.stream;
}
}
1 change: 1 addition & 0 deletions lib/stream_transform.dart
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,4 @@ export 'src/switch.dart';
export 'src/take_until.dart';
export 'src/tap.dart';
export 'src/throttle.dart';
export 'src/where_type.dart';
2 changes: 1 addition & 1 deletion pubspec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ name: stream_transform
description: A collection of utilities to transform and manipulate streams.
author: Dart Team <[email protected]>
homepage: https://www.github.com/dart-lang/stream_transform
version: 0.0.15-dev
version: 0.0.15

environment:
sdk: ">=2.1.0 <3.0.0"
Expand Down
50 changes: 50 additions & 0 deletions test/where_type_test.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Copyright (c) 2018, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.

import 'dart:async';

import 'package:test/test.dart';

import 'package:stream_transform/stream_transform.dart';

void main() {
test('forwards only events that match the type', () async {
var values = Stream.fromIterable([1, 'a', 2, 'b']);
var filtered = values.transform(whereType<String>());
expect(await filtered.toList(), ['a', 'b']);
});

test('can result in empty stream', () async {
var values = Stream.fromIterable([1, 2, 3, 4]);
var filtered = values.transform(whereType<String>());
expect(await filtered.isEmpty, true);
});

test('forwards values to multiple listeners', () async {
var values = StreamController.broadcast();
var filtered = values.stream.transform(whereType<String>());
var firstValues = [];
var secondValues = [];
filtered..listen(firstValues.add)..listen(secondValues.add);
values..add(1)..add('a')..add(2)..add('b');
await Future(() {});
expect(firstValues, ['a', 'b']);
expect(secondValues, ['a', 'b']);
});

test('closes streams with multiple listeners', () async {
var values = StreamController.broadcast();
var filtered = values.stream.transform(whereType<String>());
var firstDone = false;
var secondDone = false;
filtered
..listen(null, onDone: () => firstDone = true)
..listen(null, onDone: () => secondDone = true);
values.add(1);
values.add('a');
await values.close();
expect(firstDone, true);
expect(secondDone, true);
});
}