Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

k$ -> observable.pipe #4

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions example_plugins/baz/src/BazService.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { k$, Observable, $combineLatest, map, first, toPromise } from 'kbn-observable';
import { Observable, $combineLatest, map, first, toPromise } from 'kbn-observable';

import { ElasticsearchService, KibanaConfig, KibanaRequest } from 'kbn-types';

Expand All @@ -13,7 +13,7 @@ export class BazService {
const { page = 1, perPage = 20, type } = options;

const [kibanaIndex, adminCluster] = await latestValues(
k$(this.kibanaConfig$)(map(config => config.index)),
this.kibanaConfig$.pipe(map(config => config.index)),
this.elasticsearchService.getClusterOfType$('admin')
);

Expand Down Expand Up @@ -64,7 +64,5 @@ function latestValues<A, B, C, D>(
d: Observable<D>
): Promise<[A, B, C, D]>;
function latestValues(...values: Observable<any>[]) {
return k$($combineLatest(values))(
first(),
toPromise());
return $combineLatest(values).pipe(first(), toPromise());
}
34 changes: 34 additions & 0 deletions packages/kbn-internal-native-observable/__tests__/index.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import { Observable } from '../';

const first = () => source =>
new Observable(observer =>
source.subscribe({
next(value) {
observer.next(value);
observer.complete();
}
})
);

const plus = x => source =>
new Observable(observer =>
source.subscribe({
next(value) {
observer.next(value + x);
},
complete() {
observer.complete();
}
})
);

test('can pipe values', () => {
const observable = Observable.of(1, 2, 3).pipe(plus(10), first());

let value;
observable.subscribe(x => {
value = x;
});

expect(value).toEqual(11);
});
66 changes: 66 additions & 0 deletions packages/kbn-internal-native-observable/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ declare global {
}
}

type UnaryFunction<T, R> = (source: T) => R;

// These types are based on the Observable proposal readme, see
// https://github.com/tc39/proposal-observable#api, with the addition of using
// generics to define the type of the `value`.
Expand Down Expand Up @@ -109,6 +111,70 @@ declare namespace Observable {
onComplete?: () => void
): Subscription;

// pipe
pipe(): Observable<T>;
pipe<A>(op1: UnaryFunction<Observable<T>, A>): A;
pipe<A, B>(
op1: UnaryFunction<Observable<T>, Observable<A>>,
op2: UnaryFunction<Observable<A>, B>
): B;
pipe<A, B, C>(
op1: UnaryFunction<Observable<T>, Observable<A>>,
op2: UnaryFunction<Observable<A>, Observable<B>>,
op3: UnaryFunction<Observable<B>, C>
): C;
pipe<A, B, C, D>(
op1: UnaryFunction<Observable<T>, Observable<A>>,
op2: UnaryFunction<Observable<A>, Observable<B>>,
op3: UnaryFunction<Observable<B>, Observable<C>>,
op4: UnaryFunction<Observable<C>, D>
): D;
pipe<A, B, C, D, E>(
op1: UnaryFunction<Observable<T>, Observable<A>>,
op2: UnaryFunction<Observable<A>, Observable<B>>,
op3: UnaryFunction<Observable<B>, Observable<C>>,
op4: UnaryFunction<Observable<C>, Observable<D>>,
op5: UnaryFunction<Observable<D>, E>
): E;
pipe<A, B, C, D, E, F>(
op1: UnaryFunction<Observable<T>, Observable<A>>,
op2: UnaryFunction<Observable<A>, Observable<B>>,
op3: UnaryFunction<Observable<B>, Observable<C>>,
op4: UnaryFunction<Observable<C>, Observable<D>>,
op5: UnaryFunction<Observable<D>, Observable<E>>,
op6: UnaryFunction<Observable<E>, F>
): F;
pipe<A, B, C, D, E, F, G>(
op1: UnaryFunction<Observable<T>, Observable<A>>,
op2: UnaryFunction<Observable<A>, Observable<B>>,
op3: UnaryFunction<Observable<B>, Observable<C>>,
op4: UnaryFunction<Observable<C>, Observable<D>>,
op5: UnaryFunction<Observable<D>, Observable<E>>,
op6: UnaryFunction<Observable<E>, Observable<F>>,
op7: UnaryFunction<Observable<F>, G>
): G;
pipe<A, B, C, D, E, F, G, H>(
op1: UnaryFunction<Observable<T>, Observable<A>>,
op2: UnaryFunction<Observable<A>, Observable<B>>,
op3: UnaryFunction<Observable<B>, Observable<C>>,
op4: UnaryFunction<Observable<C>, Observable<D>>,
op5: UnaryFunction<Observable<D>, Observable<E>>,
op6: UnaryFunction<Observable<E>, Observable<F>>,
op7: UnaryFunction<Observable<F>, Observable<G>>,
op8: UnaryFunction<Observable<G>, H>
): H;
pipe<A, B, C, D, E, F, G, H, I>(
op1: UnaryFunction<Observable<T>, Observable<A>>,
op2: UnaryFunction<Observable<A>, Observable<B>>,
op3: UnaryFunction<Observable<B>, Observable<C>>,
op4: UnaryFunction<Observable<C>, Observable<D>>,
op5: UnaryFunction<Observable<D>, Observable<E>>,
op6: UnaryFunction<Observable<E>, Observable<F>>,
op7: UnaryFunction<Observable<F>, Observable<G>>,
op8: UnaryFunction<Observable<G>, Observable<H>>,
op9: UnaryFunction<Observable<H>, I>
): I;

// Returns itself
[Symbol.observable](): Observable<T>;

Expand Down
8 changes: 8 additions & 0 deletions packages/kbn-internal-native-observable/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,14 @@ export class Observable {

return new Subscription(observer, this._subscriber);
}

pipe(...operations) {
if (operations.length === 0) {
return this;
}

return operations.reduce((prev, fn) => fn(prev), this);
Copy link
Owner Author

@kimjoar kimjoar Nov 1, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is the implementation of pipe ^, which is the only needed change here. Otherwise we're just adding the types to the type declarations plus switching over everything from k$

}

[symbolObservable]() { return this }

Expand Down
110 changes: 73 additions & 37 deletions packages/kbn-observable/README.md
Original file line number Diff line number Diff line change
@@ -1,28 +1,30 @@
# `k$`
# `kbn-observable`

k$ is an observable library based on "native observables", aka the `Observable`
functionality proposed in https://github.com/tc39/proposal-observable.
kbn-observable is an observable library based on the [proposed `Observable`][proposal]
feature. In includes several factory functions and operators, that all return
"native" observable. There is only one addition to the `Observable` class in
`kbn-observable` compared to the spec: a `pipe` method that works like the newly
added `pipe` method in [RxJS][rxjs], and which enables a simple and clean way to
apply operators to the observable.

Where all other observable libraries put operators and other methods on their
own implementation of a base `Observable`, we want to use the proposed
`Observable` without adding anything to its `prototype`. By doing this, any
operator will always return an instance of the "native" observable.
Why build this? The main reason is that we don't want to tie our plugin apis
heavily to a large dependency, but rather expose something that's much closer
to "native" observables, and something we have control over ourselves. Also, all
other observable libraries have their own base `Observable` class, while we
wanted to rely on the proposed library.

The reason we want this is that we don't want to expose "heavy" observables
with lots of functionality in our plugin apis, but rather expose "native"
observables.
In addition, `System.observable` enables interop between observable libraries,
which means plugins can use whatever observable library they want, if they don't
want to use `kbn-observable`.

## Example

```js
import { Observable, k$, map, last } from 'kbn-observable';
import { Observable, map, last } from 'kbn-observable';

const source = Observable.from(1, 2, 3);

// When `k$` is called with the source observable it returns a function that
// can be called with "operators" that modify the input value and return an
// observable that reflects all of the modifications.
k$(source)(map(i => 2017 + i), last())
source.pipe(map(i => 2017 + i), last())
.subscribe(console.log) // logs 2020
```

Expand All @@ -32,14 +34,14 @@ TODO: Docs, videos, other intros. This needs to be good enough for people to
easily jump in and understand the basics of observables.

If you are just getting started with observables, a great place to start is with
Andre Staltz' [The introduction to Reactive Programming you've been missing](https://gist.github.com/staltz/868e7e9bc2a7b8c1f754),
Andre Staltz' [The introduction to Reactive Programming you've been missing][staltz-intro],
which is a great introduction to the ideas and concepts.

## Factories

Just like the `k$` function, factories take arguments and produce an observable.
Different factories are useful for different things, and many behave just like
the static functions attached to the `Rx.Observable` class in RxJS.
Factories take arguments and produce an observable. Different factories are
useful for different things, and many behave just like the static functions
attached to the `Rx.Observable` class in RxJS.

See [./src/factories](./src/factories) for more info about each factory.

Expand All @@ -48,9 +50,9 @@ See [./src/factories](./src/factories) for more info about each factory.
Operators are functions that take some arguments and produce an operator
function. Operators aren't anything fancy, just a function that takes an
observable and returns a new observable with the requested modifications
applied. When using `k$` you don't even have to think much about it being an
observable in many cases, as it's just a pure function that receives a value as
an argument and returns a value, e.g.
applied.

Some examples:

```js
map(i => 2017 + i);
Expand All @@ -62,32 +64,66 @@ reduce((acc, val) => {
}, 0);
```

Multiple operator functions can be passed to `k$` and will be applied to the
Multiple operator functions can be passed to `pipe` and will be applied to the
input observable before returning the final observable with all modifications
applied, e.g. like the example above with `map` and `last`.

See [./src/operators](./src/operators) for more info about each operator.

## Why `k$`?
## More advanced topics

TODO
TODO: Hot/cold. Multicasting.

- We want to expose something minimal, and preferably something close to the
[proposed native observables](https://github.com/tc39/proposal-observable).
- RxJS is great, but a heavy dep to push on all plugins, especially with regards
to updates etc.
## Inspiration

## Caveats
This code is heavily inspired by and based on [RxJS][rxjs], which is licensed
under the Apache License, Version 2.0.

TODO
## Technical decisions

Why `k$(source)(...operators)` instead of `k$(source, [...operators])`?
### Why add the `pipe` method?

## More advanced topics
While exploring how to handle observables in Kibana we went through multiple
PoCs. We initially used RxJS directly, but we didn't find a simple way to
consistently transform RxJS observables into "native" observables in the plugin
apis. This was something we wanted because of our earlier experiences with
exposing large libraries in our apis, which causes problems e.g. when we need to
perform major upgrades of a lib that has breaking changes, but we can't ship a
new major version of Kibana yet, even though this will cause breaking changes
in our plugin apis.

TODO: Hot/cold. Multicasting.
Then we built the initial version of `kbn-observable` based on the Observable
spec, and we included a `k$` helper and several operators that worked like this:

## Inspiration
```js
import { k$, Observable, map, first } from 'kbn-observable';

// general structure:
const resultObservable = k$(sourceObservable, [...operators]);

// e.g.
const source = Observable.from(1,2,3);
const observable = k$(source, [map(x => x + 1), first()]);
```

This code is heavily inspired by and based on RxJS, which is licensed under the
Apache License, Version 2.0, see https://github.com/ReactiveX/rxjs.
Here `Observable` would be a copy of the Observable class from the spec. This
would enable us to always work with these spec-ed observables. This worked
nicely in pure JavaScript, but caused a problem with TypeScript, as TypeScript
wasn't able to correctly type the operators array when more than one operator
was specified.

Because of that problem we tried `k$(source)(...operators)`. With this change
TypeScript is able to corretly type the operator arguments. However, this made
for a not so nice looking api. Also, it gives a feeling that you can do
`const obs = k$(source)`, then later do `obs(...operators)`, which was not an
intended use of the api.

In the end we decided to include a `pipe` helper on the observable instead, so
it becomes `source.pipe(...operators)`. It's a fairly small addition to the
`Observable` class, so it's easy to keep up-to-date. However, it's not the
simplest thing to codemod later on because Node.js streams also uses the `pipe`
keyword, so it's not an _ideal_ solution.

[proposal]: https://github.com/tc39/proposal-observable
[rxjs]: http://reactivex.io/rxjs/
[staltz-intro]: https://gist.github.com/staltz/868e7e9bc2a7b8c1f754
2 changes: 1 addition & 1 deletion packages/kbn-observable/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,6 @@
"rxjs": "5.4.3"
},
"devDependencies": {
"typescript": "^2.5.3"
"typescript": "2.5.3"
}
}
3 changes: 1 addition & 2 deletions packages/kbn-observable/src/__tests__/Subject.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import { Observable } from '../Observable';
import { Subject } from '../Subject';
import { k$ } from '../k$';
import { first } from '../operators';

const noop = () => {};
Expand Down Expand Up @@ -340,7 +339,7 @@ test('can use subject in $k', async () => {
const complete = jest.fn();
const error = jest.fn();

k$(values$)(first()).subscribe({
values$.pipe(first()).subscribe({
next,
error,
complete
Expand Down
Loading