diff --git a/example_plugins/baz/src/BazService.ts b/example_plugins/baz/src/BazService.ts index c177f4a2f86352..342a302c5d001e 100644 --- a/example_plugins/baz/src/BazService.ts +++ b/example_plugins/baz/src/BazService.ts @@ -1,4 +1,4 @@ -import { k$, Observable, $combineLatest, map, first, toPromise } from 'kbn-observable'; +import { Observable, $combineLatest, map, first, toPromise } from 'kbn-observable'; import { ElasticsearchService, KibanaConfig, KibanaRequest } from 'kbn-types'; @@ -13,7 +13,7 @@ export class BazService { const { page = 1, perPage = 20, type } = options; const [kibanaIndex, adminCluster] = await latestValues( - k$(this.kibanaConfig$)(map(config => config.index)), + this.kibanaConfig$.pipe(map(config => config.index)), this.elasticsearchService.getClusterOfType$('admin') ); @@ -64,7 +64,5 @@ function latestValues( d: Observable ): Promise<[A, B, C, D]>; function latestValues(...values: Observable[]) { - return k$($combineLatest(values))( - first(), - toPromise()); + return $combineLatest(values).pipe(first(), toPromise()); } diff --git a/packages/kbn-internal-native-observable/__tests__/index.test.js b/packages/kbn-internal-native-observable/__tests__/index.test.js new file mode 100644 index 00000000000000..b6f8ee88143fc2 --- /dev/null +++ b/packages/kbn-internal-native-observable/__tests__/index.test.js @@ -0,0 +1,34 @@ +import { Observable } from '../'; + +const first = () => source => + new Observable(observer => + source.subscribe({ + next(value) { + observer.next(value); + observer.complete(); + } + }) + ); + +const plus = x => source => + new Observable(observer => + source.subscribe({ + next(value) { + observer.next(value + x); + }, + complete() { + observer.complete(); + } + }) + ); + +test('can pipe values', () => { + const observable = Observable.of(1, 2, 3).pipe(plus(10), first()); + + let value; + observable.subscribe(x => { + value = x; + }); + + expect(value).toEqual(11); +}); diff --git a/packages/kbn-internal-native-observable/index.d.ts b/packages/kbn-internal-native-observable/index.d.ts index 9d618e14b249e9..da7e08612e01ea 100644 --- a/packages/kbn-internal-native-observable/index.d.ts +++ b/packages/kbn-internal-native-observable/index.d.ts @@ -6,6 +6,8 @@ declare global { } } +type UnaryFunction = (source: T) => R; + // These types are based on the Observable proposal readme, see // https://github.com/tc39/proposal-observable#api, with the addition of using // generics to define the type of the `value`. @@ -109,6 +111,70 @@ declare namespace Observable { onComplete?: () => void ): Subscription; + // pipe + pipe(): Observable; + pipe(op1: UnaryFunction, A>): A; + pipe( + op1: UnaryFunction, Observable>, + op2: UnaryFunction, B> + ): B; + pipe( + op1: UnaryFunction, Observable>, + op2: UnaryFunction, Observable>, + op3: UnaryFunction, C> + ): C; + pipe( + op1: UnaryFunction, Observable>, + op2: UnaryFunction, Observable>, + op3: UnaryFunction, Observable>, + op4: UnaryFunction, D> + ): D; + pipe( + op1: UnaryFunction, Observable>, + op2: UnaryFunction, Observable>, + op3: UnaryFunction, Observable>, + op4: UnaryFunction, Observable>, + op5: UnaryFunction, E> + ): E; + pipe( + op1: UnaryFunction, Observable>, + op2: UnaryFunction, Observable>, + op3: UnaryFunction, Observable>, + op4: UnaryFunction, Observable>, + op5: UnaryFunction, Observable>, + op6: UnaryFunction, F> + ): F; + pipe( + op1: UnaryFunction, Observable>, + op2: UnaryFunction, Observable>, + op3: UnaryFunction, Observable>, + op4: UnaryFunction, Observable>, + op5: UnaryFunction, Observable>, + op6: UnaryFunction, Observable>, + op7: UnaryFunction, G> + ): G; + pipe( + op1: UnaryFunction, Observable>, + op2: UnaryFunction, Observable>, + op3: UnaryFunction, Observable>, + op4: UnaryFunction, Observable>, + op5: UnaryFunction, Observable>, + op6: UnaryFunction, Observable>, + op7: UnaryFunction, Observable>, + op8: UnaryFunction, H> + ): H; + pipe( + op1: UnaryFunction, Observable>, + op2: UnaryFunction, Observable>, + op3: UnaryFunction, Observable>, + op4: UnaryFunction, Observable>, + op5: UnaryFunction, Observable>, + op6: UnaryFunction, Observable>, + op7: UnaryFunction, Observable>, + op8: UnaryFunction, Observable>, + op9: UnaryFunction, I> + ): I; + // Returns itself [Symbol.observable](): Observable; diff --git a/packages/kbn-internal-native-observable/index.js b/packages/kbn-internal-native-observable/index.js index d03dce80ac01f1..3a999d1b23a174 100644 --- a/packages/kbn-internal-native-observable/index.js +++ b/packages/kbn-internal-native-observable/index.js @@ -255,6 +255,14 @@ export class Observable { return new Subscription(observer, this._subscriber); } + + pipe(...operations) { + if (operations.length === 0) { + return this; + } + + return operations.reduce((prev, fn) => fn(prev), this); + } [symbolObservable]() { return this } diff --git a/packages/kbn-observable/README.md b/packages/kbn-observable/README.md index bb9f94ef50d696..bbf8618a31875e 100644 --- a/packages/kbn-observable/README.md +++ b/packages/kbn-observable/README.md @@ -1,28 +1,30 @@ -# `k$` +# `kbn-observable` -k$ is an observable library based on "native observables", aka the `Observable` -functionality proposed in https://github.com/tc39/proposal-observable. +kbn-observable is an observable library based on the [proposed `Observable`][proposal] +feature. In includes several factory functions and operators, that all return +"native" observable. There is only one addition to the `Observable` class in +`kbn-observable` compared to the spec: a `pipe` method that works like the newly +added `pipe` method in [RxJS][rxjs], and which enables a simple and clean way to +apply operators to the observable. -Where all other observable libraries put operators and other methods on their -own implementation of a base `Observable`, we want to use the proposed -`Observable` without adding anything to its `prototype`. By doing this, any -operator will always return an instance of the "native" observable. +Why build this? The main reason is that we don't want to tie our plugin apis +heavily to a large dependency, but rather expose something that's much closer +to "native" observables, and something we have control over ourselves. Also, all +other observable libraries have their own base `Observable` class, while we +wanted to rely on the proposed library. -The reason we want this is that we don't want to expose "heavy" observables -with lots of functionality in our plugin apis, but rather expose "native" -observables. +In addition, `System.observable` enables interop between observable libraries, +which means plugins can use whatever observable library they want, if they don't +want to use `kbn-observable`. ## Example ```js -import { Observable, k$, map, last } from 'kbn-observable'; +import { Observable, map, last } from 'kbn-observable'; const source = Observable.from(1, 2, 3); -// When `k$` is called with the source observable it returns a function that -// can be called with "operators" that modify the input value and return an -// observable that reflects all of the modifications. -k$(source)(map(i => 2017 + i), last()) +source.pipe(map(i => 2017 + i), last()) .subscribe(console.log) // logs 2020 ``` @@ -32,14 +34,14 @@ TODO: Docs, videos, other intros. This needs to be good enough for people to easily jump in and understand the basics of observables. If you are just getting started with observables, a great place to start is with -Andre Staltz' [The introduction to Reactive Programming you've been missing](https://gist.github.com/staltz/868e7e9bc2a7b8c1f754), +Andre Staltz' [The introduction to Reactive Programming you've been missing][staltz-intro], which is a great introduction to the ideas and concepts. ## Factories -Just like the `k$` function, factories take arguments and produce an observable. -Different factories are useful for different things, and many behave just like -the static functions attached to the `Rx.Observable` class in RxJS. +Factories take arguments and produce an observable. Different factories are +useful for different things, and many behave just like the static functions +attached to the `Rx.Observable` class in RxJS. See [./src/factories](./src/factories) for more info about each factory. @@ -48,9 +50,9 @@ See [./src/factories](./src/factories) for more info about each factory. Operators are functions that take some arguments and produce an operator function. Operators aren't anything fancy, just a function that takes an observable and returns a new observable with the requested modifications -applied. When using `k$` you don't even have to think much about it being an -observable in many cases, as it's just a pure function that receives a value as -an argument and returns a value, e.g. +applied. + +Some examples: ```js map(i => 2017 + i); @@ -62,32 +64,66 @@ reduce((acc, val) => { }, 0); ``` -Multiple operator functions can be passed to `k$` and will be applied to the +Multiple operator functions can be passed to `pipe` and will be applied to the input observable before returning the final observable with all modifications applied, e.g. like the example above with `map` and `last`. See [./src/operators](./src/operators) for more info about each operator. -## Why `k$`? +## More advanced topics -TODO +TODO: Hot/cold. Multicasting. -- We want to expose something minimal, and preferably something close to the - [proposed native observables](https://github.com/tc39/proposal-observable). -- RxJS is great, but a heavy dep to push on all plugins, especially with regards - to updates etc. +## Inspiration -## Caveats +This code is heavily inspired by and based on [RxJS][rxjs], which is licensed +under the Apache License, Version 2.0. -TODO +## Technical decisions -Why `k$(source)(...operators)` instead of `k$(source, [...operators])`? +### Why add the `pipe` method? -## More advanced topics +While exploring how to handle observables in Kibana we went through multiple +PoCs. We initially used RxJS directly, but we didn't find a simple way to +consistently transform RxJS observables into "native" observables in the plugin +apis. This was something we wanted because of our earlier experiences with +exposing large libraries in our apis, which causes problems e.g. when we need to +perform major upgrades of a lib that has breaking changes, but we can't ship a +new major version of Kibana yet, even though this will cause breaking changes +in our plugin apis. -TODO: Hot/cold. Multicasting. +Then we built the initial version of `kbn-observable` based on the Observable +spec, and we included a `k$` helper and several operators that worked like this: -## Inspiration +```js +import { k$, Observable, map, first } from 'kbn-observable'; + +// general structure: +const resultObservable = k$(sourceObservable, [...operators]); + +// e.g. +const source = Observable.from(1,2,3); +const observable = k$(source, [map(x => x + 1), first()]); +``` -This code is heavily inspired by and based on RxJS, which is licensed under the -Apache License, Version 2.0, see https://github.com/ReactiveX/rxjs. +Here `Observable` would be a copy of the Observable class from the spec. This +would enable us to always work with these spec-ed observables. This worked +nicely in pure JavaScript, but caused a problem with TypeScript, as TypeScript +wasn't able to correctly type the operators array when more than one operator +was specified. + +Because of that problem we tried `k$(source)(...operators)`. With this change +TypeScript is able to corretly type the operator arguments. However, this made +for a not so nice looking api. Also, it gives a feeling that you can do +`const obs = k$(source)`, then later do `obs(...operators)`, which was not an +intended use of the api. + +In the end we decided to include a `pipe` helper on the observable instead, so +it becomes `source.pipe(...operators)`. It's a fairly small addition to the +`Observable` class, so it's easy to keep up-to-date. However, it's not the +simplest thing to codemod later on because Node.js streams also uses the `pipe` +keyword, so it's not an _ideal_ solution. + +[proposal]: https://github.com/tc39/proposal-observable +[rxjs]: http://reactivex.io/rxjs/ +[staltz-intro]: https://gist.github.com/staltz/868e7e9bc2a7b8c1f754 diff --git a/packages/kbn-observable/package.json b/packages/kbn-observable/package.json index 54dea3b0f73284..756edd34220031 100644 --- a/packages/kbn-observable/package.json +++ b/packages/kbn-observable/package.json @@ -11,6 +11,6 @@ "rxjs": "5.4.3" }, "devDependencies": { - "typescript": "^2.5.3" + "typescript": "2.5.3" } } diff --git a/packages/kbn-observable/src/__tests__/Subject.test.ts b/packages/kbn-observable/src/__tests__/Subject.test.ts index 4bd67425278f06..488f962a705353 100644 --- a/packages/kbn-observable/src/__tests__/Subject.test.ts +++ b/packages/kbn-observable/src/__tests__/Subject.test.ts @@ -1,6 +1,5 @@ import { Observable } from '../Observable'; import { Subject } from '../Subject'; -import { k$ } from '../k$'; import { first } from '../operators'; const noop = () => {}; @@ -340,7 +339,7 @@ test('can use subject in $k', async () => { const complete = jest.fn(); const error = jest.fn(); - k$(values$)(first()).subscribe({ + values$.pipe(first()).subscribe({ next, error, complete diff --git a/packages/kbn-observable/src/__tests__/k$.test.ts b/packages/kbn-observable/src/__tests__/k$.test.ts deleted file mode 100644 index d581aa499f2d9d..00000000000000 --- a/packages/kbn-observable/src/__tests__/k$.test.ts +++ /dev/null @@ -1,75 +0,0 @@ -import { Observable } from '../Observable'; -import { k$ } from '../k$'; -import { - OperatorFunction, - UnaryFunction, - MonoTypeOperatorFunction -} from '../interfaces'; - -const plus1: MonoTypeOperatorFunction = source => - new Observable(observer => { - source.subscribe({ - next(val) { - observer.next(val + 1); - }, - error(err) { - observer.error(err); - }, - complete() { - observer.complete(); - } - }); - }); - -const toString: OperatorFunction = source => - new Observable(observer => { - source.subscribe({ - next(val) { - observer.next(val.toString()); - }, - error(err) { - observer.error(err); - }, - complete() { - observer.complete(); - } - }); - }); - -const toPromise: UnaryFunction, Promise> = source => - new Promise((resolve, reject) => { - let lastValue: number; - - source.subscribe({ - next(value) { - lastValue = value; - }, - error(error) { - reject(error); - }, - complete() { - resolve(lastValue); - } - }); - }); - -test('observable to observable', () => { - const numbers$ = Observable.of(1, 2, 3); - const actual: any[] = []; - - k$(numbers$)(plus1, toString).subscribe({ - next(x) { - actual.push(x); - } - }); - - expect(actual).toEqual(['2', '3', '4']); -}); - -test('observable to promise', async () => { - const numbers$ = Observable.of(1, 2, 3); - - const value = await k$(numbers$)(plus1, toPromise); - - expect(value).toEqual(4); -}); diff --git a/packages/kbn-observable/src/index.ts b/packages/kbn-observable/src/index.ts index 9bb85a159fc97c..b2c4437b601d43 100644 --- a/packages/kbn-observable/src/index.ts +++ b/packages/kbn-observable/src/index.ts @@ -1,5 +1,3 @@ -export { k$ } from './k$'; - export * from './Observable'; export { Subject } from './Subject'; export { BehaviorSubject } from './BehaviorSubject'; diff --git a/packages/kbn-observable/src/k$.ts b/packages/kbn-observable/src/k$.ts deleted file mode 100644 index 24e63997a5774b..00000000000000 --- a/packages/kbn-observable/src/k$.ts +++ /dev/null @@ -1,74 +0,0 @@ -import { Observable, ObservableInput } from './Observable'; -import { pipeFromArray } from './lib'; -import { UnaryFunction } from './interfaces'; -import { $from } from './factories'; - -export function k$(source: ObservableInput) { - function kOperations(op1: UnaryFunction, A>): A; - function kOperations( - op1: UnaryFunction, A>, - op2: UnaryFunction - ): B; - function kOperations( - op1: UnaryFunction, A>, - op2: UnaryFunction, - op3: UnaryFunction - ): C; - function kOperations( - op1: UnaryFunction, A>, - op2: UnaryFunction, - op3: UnaryFunction, - op4: UnaryFunction - ): D; - function kOperations( - op1: UnaryFunction, A>, - op2: UnaryFunction, - op3: UnaryFunction, - op4: UnaryFunction, - op5: UnaryFunction - ): E; - function kOperations( - op1: UnaryFunction, A>, - op2: UnaryFunction, - op3: UnaryFunction, - op4: UnaryFunction, - op5: UnaryFunction, - op6: UnaryFunction - ): F; - function kOperations( - op1: UnaryFunction, A>, - op2: UnaryFunction, - op3: UnaryFunction, - op4: UnaryFunction, - op5: UnaryFunction, - op6: UnaryFunction, - op7: UnaryFunction - ): G; - function kOperations( - op1: UnaryFunction, A>, - op2: UnaryFunction, - op3: UnaryFunction, - op4: UnaryFunction, - op5: UnaryFunction, - op6: UnaryFunction, - op7: UnaryFunction, - op8: UnaryFunction - ): H; - function kOperations( - op1: UnaryFunction, A>, - op2: UnaryFunction, - op3: UnaryFunction, - op4: UnaryFunction, - op5: UnaryFunction, - op6: UnaryFunction, - op7: UnaryFunction, - op8: UnaryFunction, - op9: UnaryFunction - ): I; - - function kOperations(...operations: UnaryFunction, R>[]) { - return pipeFromArray(operations)($from(source)); - } - - return kOperations; -} diff --git a/packages/kbn-observable/src/operators/__tests__/filter.test.ts b/packages/kbn-observable/src/operators/__tests__/filter.test.ts index 707ddbd438f18c..79d4476c93afce 100644 --- a/packages/kbn-observable/src/operators/__tests__/filter.test.ts +++ b/packages/kbn-observable/src/operators/__tests__/filter.test.ts @@ -1,4 +1,3 @@ -import { k$ } from '../../k$'; import { $from } from '../../factories'; import { filter } from '../'; import { collect } from '../../lib/collect'; @@ -6,14 +5,14 @@ import { collect } from '../../lib/collect'; const number$ = $from([1, 2, 3]); test('returns the filtered values', async () => { - const filter$ = k$(number$)(filter(n => n > 1)); + const filter$ = number$.pipe(filter(n => n > 1)); const res = collect(filter$); expect(await res).toEqual([2, 3, 'C']); }); test('sends the index as arg 2', async () => { - const filter$ = k$(number$)(filter((n, i) => i > 1)); + const filter$ = number$.pipe(filter((n, i) => i > 1)); const res = collect(filter$); expect(await res).toEqual([3, 'C']); diff --git a/packages/kbn-observable/src/operators/__tests__/first.test.ts b/packages/kbn-observable/src/operators/__tests__/first.test.ts index 997bb9403edf8a..9e74e5e314daf7 100644 --- a/packages/kbn-observable/src/operators/__tests__/first.test.ts +++ b/packages/kbn-observable/src/operators/__tests__/first.test.ts @@ -1,4 +1,3 @@ -import { k$ } from '../../k$'; import { first } from '../'; import { Subject } from '../../Subject'; import { collect } from '../../lib/collect'; @@ -6,7 +5,7 @@ import { collect } from '../../lib/collect'; test('returns the first value, then completes', async () => { const values$ = new Subject(); - const observable = k$(values$)(first()); + const observable = values$.pipe(first()); const res = collect(observable); values$.next('foo'); @@ -18,7 +17,7 @@ test('returns the first value, then completes', async () => { test('handles source completing after receiving value', async () => { const values$ = new Subject(); - const observable = k$(values$)(first()); + const observable = values$.pipe(first()); const res = collect(observable); values$.next('foo'); @@ -31,7 +30,7 @@ test('handles source completing after receiving value', async () => { test('returns error if completing without receiving any value', async () => { const values$ = new Subject(); - const observable = k$(values$)(first()); + const observable = values$.pipe(first()); const res = collect(observable); values$.complete(); diff --git a/packages/kbn-observable/src/operators/__tests__/last.test.ts b/packages/kbn-observable/src/operators/__tests__/last.test.ts index 01ba3752727be6..cb8c8a4001d8c6 100644 --- a/packages/kbn-observable/src/operators/__tests__/last.test.ts +++ b/packages/kbn-observable/src/operators/__tests__/last.test.ts @@ -1,4 +1,3 @@ -import { k$ } from '../../k$'; import { Subject } from '../../Subject'; import { last } from '../'; @@ -9,7 +8,7 @@ test('returns the last value', async () => { const error = jest.fn(); const complete = jest.fn(); - k$(values$)(last()).subscribe(next, error, complete); + values$.pipe(last()).subscribe(next, error, complete); values$.next('foo'); expect(next).not.toHaveBeenCalled(); @@ -30,7 +29,7 @@ test('returns error if completing without receiving any value', async () => { const error = jest.fn(); - k$(values$)(last()).subscribe({ + values$.pipe(last()).subscribe({ error }); diff --git a/packages/kbn-observable/src/operators/__tests__/map.test.ts b/packages/kbn-observable/src/operators/__tests__/map.test.ts index 7f9cefe4f77c85..c66e6919ed2105 100644 --- a/packages/kbn-observable/src/operators/__tests__/map.test.ts +++ b/packages/kbn-observable/src/operators/__tests__/map.test.ts @@ -1,20 +1,19 @@ import { Observable } from '../../Observable'; -import { k$ } from '../../k$'; import { $from } from '../../factories'; import { map, toArray, toPromise } from '../'; const number$ = $from([1, 2, 3]); const collect = (source: Observable) => - k$(source)(toArray(), toPromise()); + source.pipe(toArray(), toPromise()); test('returns the modified value', async () => { - const numbers = await collect(k$(number$)(map(n => n * 1000))); + const numbers = await collect(number$.pipe(map(n => n * 1000))); expect(numbers).toEqual([1000, 2000, 3000]); }); test('sends the index as arg 2', async () => { - const numbers = await collect(k$(number$)(map((n, i) => i))); + const numbers = await collect(number$.pipe(map((n, i) => i))); expect(numbers).toEqual([0, 1, 2]); }); diff --git a/packages/kbn-observable/src/operators/__tests__/mergeMap.test.ts b/packages/kbn-observable/src/operators/__tests__/mergeMap.test.ts index c5ac1b32caa9a1..8006417397161b 100644 --- a/packages/kbn-observable/src/operators/__tests__/mergeMap.test.ts +++ b/packages/kbn-observable/src/operators/__tests__/mergeMap.test.ts @@ -1,5 +1,4 @@ import { Observable } from '../../Observable'; -import { k$ } from '../../k$'; import { Subject } from '../../Subject'; import { mergeMap, map } from '../'; import { $of, $error } from '../../factories'; @@ -11,9 +10,9 @@ test('should mergeMap many outer values to many inner values', async () => { const inner$ = new Subject(); const outer$ = Observable.from([1, 2, 3, 4]); - const project = (value: number) => k$(inner$)(map(x => `${value}-${x}`)); + const project = (value: number) => inner$.pipe(map(x => `${value}-${x}`)); - const observable = k$(outer$)(mergeMap(project)); + const observable = outer$.pipe(mergeMap(project)); const res = collect(observable); await tickMs(10); @@ -48,9 +47,9 @@ test('should mergeMap many outer values to many inner values, early complete', a const outer$ = new Subject(); const inner$ = new Subject(); - const project = (value: number) => k$(inner$)(map(x => `${value}-${x}`)); + const project = (value: number) => inner$.pipe(map(x => `${value}-${x}`)); - const observable = k$(outer$)(mergeMap(project)); + const observable = outer$.pipe(mergeMap(project)); const res = collect(observable); outer$.next(1); @@ -81,7 +80,7 @@ test('should mergeMap many outer to many inner, and inner throws', async () => { const project = (value: number, index: number) => index > 1 ? $error(error) : $of(value); - const observable = k$(source)(mergeMap(project)); + const observable = source.pipe(mergeMap(project)); const res = collect(observable); expect(await res).toEqual([1, 2, error]); @@ -91,9 +90,9 @@ test('should mergeMap many outer to many inner, and outer throws', async () => { const outer$ = new Subject(); const inner$ = new Subject(); - const project = (value: number) => k$(inner$)(map(x => `${value}-${x}`)); + const project = (value: number) => inner$.pipe(map(x => `${value}-${x}`)); - const observable = k$(outer$)(mergeMap(project)); + const observable = outer$.pipe(mergeMap(project)); const res = collect(observable); outer$.next(1); @@ -120,7 +119,7 @@ test('should mergeMap many outer to many inner, and outer throws', async () => { test('should mergeMap many outer to an array for each value', async () => { const source = Observable.from([1, 2, 3]); - const observable = k$(source)(mergeMap(() => $of('a', 'b', 'c'))); + const observable = source.pipe(mergeMap(() => $of('a', 'b', 'c'))); const res = collect(observable); expect(await res).toEqual(['a', 'b', 'c', 'a', 'b', 'c', 'a', 'b', 'c', 'C']); @@ -132,7 +131,7 @@ test('should mergeMap many outer to inner arrays, using resultSelector', async ( const source = Observable.from([1, 2, 3]); const project = (num: number, str: string) => `${num}/${str}`; - const observable = k$(source)(mergeMap(() => $of('a', 'b', 'c'), project)); + const observable = source.pipe(mergeMap(() => $of('a', 'b', 'c'), project)); const res = collect(observable); expect(await res).toEqual([ diff --git a/packages/kbn-observable/src/operators/__tests__/reduce.test.ts b/packages/kbn-observable/src/operators/__tests__/reduce.test.ts index 751e6382d6e10a..77db91093c4bb9 100644 --- a/packages/kbn-observable/src/operators/__tests__/reduce.test.ts +++ b/packages/kbn-observable/src/operators/__tests__/reduce.test.ts @@ -1,4 +1,3 @@ -import { k$ } from '../../k$'; import { reduce } from '../'; import { Subject } from '../../Subject'; import { collect } from '../../lib/collect'; @@ -6,7 +5,7 @@ import { collect } from '../../lib/collect'; test('completes when source completes', async () => { const subject = new Subject(); - const observable = k$(subject)( + const observable = subject.pipe( reduce((acc, val) => { return acc + val; }, 'foo') @@ -23,7 +22,7 @@ test('completes when source completes', async () => { test('injects index', async () => { const subject = new Subject(); - const observable = k$(subject)( + const observable = subject.pipe( reduce((acc, val, index) => { return acc + index; }, 'foo') @@ -40,7 +39,7 @@ test('injects index', async () => { test('completes with initial value if no values received', async () => { const subject = new Subject(); - const observable = k$(subject)( + const observable = subject.pipe( reduce((acc, val, index) => { return acc + val; }, 'foo') diff --git a/packages/kbn-observable/src/operators/__tests__/scan.test.ts b/packages/kbn-observable/src/operators/__tests__/scan.test.ts index 0044f751369815..89c5e0b7d626c6 100644 --- a/packages/kbn-observable/src/operators/__tests__/scan.test.ts +++ b/packages/kbn-observable/src/operators/__tests__/scan.test.ts @@ -1,4 +1,3 @@ -import { k$ } from '../../k$'; import { scan } from '../'; import { Subject } from '../../Subject'; import { collect } from '../../lib/collect'; @@ -6,7 +5,7 @@ import { collect } from '../../lib/collect'; test('completes when source completes', async () => { const subject = new Subject(); - const observable = k$(subject)( + const observable = subject.pipe( scan((acc, val) => { return acc + val; }, 'foo') @@ -23,7 +22,7 @@ test('completes when source completes', async () => { test('injects index', async () => { const subject = new Subject(); - const observable = k$(subject)( + const observable = subject.pipe( scan((acc, val, index) => { return acc + index; }, 'foo') @@ -40,7 +39,7 @@ test('injects index', async () => { test('completes if no values received', async () => { const subject = new Subject(); - const observable = k$(subject)( + const observable = subject.pipe( scan((acc, val, index) => { return acc + val; }, 'foo') diff --git a/packages/kbn-observable/src/operators/__tests__/shareLast.test.ts b/packages/kbn-observable/src/operators/__tests__/shareLast.test.ts index 358be87c952a47..8e3eb612459348 100644 --- a/packages/kbn-observable/src/operators/__tests__/shareLast.test.ts +++ b/packages/kbn-observable/src/operators/__tests__/shareLast.test.ts @@ -1,13 +1,12 @@ import { Observable, SubscriptionObserver } from '../../Observable'; import { BehaviorSubject } from '../../BehaviorSubject'; -import { k$ } from '../../k$'; import { shareLast } from '../'; import { collect } from '../../lib/collect'; test('should mirror a simple source Observable', async () => { const source = Observable.from([4, 3, 2, 1]); - const observable = k$(source)(shareLast()); + const observable = source.pipe(shareLast()); const res = collect(observable); expect(await res).toEqual([4, 3, 2, 1, 'C']); @@ -19,7 +18,7 @@ test('should do nothing if result is not subscribed', () => { subscribed = true; }); - k$(source)(shareLast()); + source.pipe(shareLast()); expect(subscribed).toBe(false); }); @@ -34,7 +33,7 @@ test('should multicast the same values to multiple observers', () => { }); const results: any[] = []; - const source = k$(subject)(shareLast()); + const source = subject.pipe(shareLast()); source.subscribe(x => { results.push(`1/${x}`); @@ -78,7 +77,7 @@ test('should multicast an error from the source to multiple observers', () => { const subject = new BehaviorSubject('a'); const results: any[] = []; - const source = k$(subject)(shareLast()); + const source = subject.pipe(shareLast()); source.subscribe({ error(err) { @@ -116,7 +115,7 @@ test('should replay results to subsequent subscriptions if source completes', () const results: any[] = []; - const source = k$(observable)(shareLast()); + const source = observable.pipe(shareLast()); source.subscribe(x => { results.push(`1/${x}`); @@ -150,7 +149,7 @@ test('should completely restart for subsequent subscriptions if source errors', const results: any[] = []; - const source = k$(observable)(shareLast()); + const source = observable.pipe(shareLast()); source.subscribe(x => { results.push(`1/${x}`); @@ -184,7 +183,7 @@ test('restarts if refCount hits 0 due to unsubscriptions', () => { const results: any[] = []; - const source = k$(observable)(shareLast()); + const source = observable.pipe(shareLast()); const sub1 = source.subscribe(x => { results.push(`1/${x}`); diff --git a/packages/kbn-observable/src/operators/__tests__/skipRepeats.test.ts b/packages/kbn-observable/src/operators/__tests__/skipRepeats.test.ts index 732f735c629606..10740f510908ba 100644 --- a/packages/kbn-observable/src/operators/__tests__/skipRepeats.test.ts +++ b/packages/kbn-observable/src/operators/__tests__/skipRepeats.test.ts @@ -1,6 +1,5 @@ import { Observable } from '../../Observable'; import { Subject } from '../../Subject'; -import { k$ } from '../../k$'; import { $of } from '../../factories'; import { skipRepeats } from '../'; import { collect } from '../../lib/collect'; @@ -8,7 +7,7 @@ import { collect } from '../../lib/collect'; test('should distinguish between values', async () => { const values$ = new Subject(); - const observable = k$(values$)(skipRepeats()); + const observable = values$.pipe(skipRepeats()); const res = collect(observable); values$.next('a'); @@ -27,7 +26,7 @@ test('should distinguish between values and does not complete', () => { const values$ = new Subject(); const actual: any[] = []; - k$(values$)(skipRepeats()).subscribe({ + values$.pipe(skipRepeats()).subscribe({ next(v) { actual.push(v); } @@ -47,7 +46,7 @@ test('should distinguish between values and does not complete', () => { test('should complete if source is empty', done => { const values$ = $of(); - k$(values$)(skipRepeats()).subscribe({ + values$.pipe(skipRepeats()).subscribe({ complete: done }); }); @@ -56,7 +55,7 @@ test('should emit if source emits single element only', () => { const values$ = new Subject(); const actual: any[] = []; - k$(values$)(skipRepeats()).subscribe({ + values$.pipe(skipRepeats()).subscribe({ next(x) { actual.push(x); } @@ -71,7 +70,7 @@ test('should emit if source is scalar', () => { const values$ = $of('a'); const actual: any[] = []; - k$(values$)(skipRepeats()).subscribe({ + values$.pipe(skipRepeats()).subscribe({ next(v) { actual.push(v); } @@ -83,7 +82,7 @@ test('should emit if source is scalar', () => { test('should raise error if source raises error', async () => { const values$ = new Subject(); - const observable = k$(values$)(skipRepeats()); + const observable = values$.pipe(skipRepeats()); const res = collect(observable); values$.next('a'); @@ -103,7 +102,7 @@ test('should raise error if source throws', () => { }); const error = jest.fn(); - k$(obs)(skipRepeats()).subscribe({ + obs.pipe(skipRepeats()).subscribe({ error }); @@ -114,7 +113,7 @@ test('should allow unsubscribing early and explicitly', () => { const values$ = new Subject(); const actual: any[] = []; - const sub = k$(values$)(skipRepeats()).subscribe({ + const sub = values$.pipe(skipRepeats()).subscribe({ next(v) { actual.push(v); } @@ -136,7 +135,7 @@ test('should emit once if comparator returns true always regardless of source em const values$ = new Subject(); const actual: any[] = []; - k$(values$)(skipRepeats(() => true)).subscribe({ + values$.pipe(skipRepeats(() => true)).subscribe({ next(v) { actual.push(v); } @@ -154,7 +153,7 @@ test('should emit all if comparator returns false always regardless of source em const values$ = new Subject(); const actual: any[] = []; - k$(values$)(skipRepeats(() => false)).subscribe({ + values$.pipe(skipRepeats(() => false)).subscribe({ next(v) { actual.push(v); } @@ -174,7 +173,7 @@ test('should distinguish values by comparator', () => { const comparator = (x: number, y: number) => y % 2 === 0; const actual: any[] = []; - k$(values$)(skipRepeats(comparator)).subscribe({ + values$.pipe(skipRepeats(comparator)).subscribe({ next(v) { actual.push(v); } diff --git a/packages/kbn-observable/src/operators/__tests__/switchMap.test.ts b/packages/kbn-observable/src/operators/__tests__/switchMap.test.ts index 9062069543f121..3c9c3c7d31d5a8 100644 --- a/packages/kbn-observable/src/operators/__tests__/switchMap.test.ts +++ b/packages/kbn-observable/src/operators/__tests__/switchMap.test.ts @@ -1,5 +1,4 @@ import { Observable } from '../../Observable'; -import { k$ } from '../../k$'; import { switchMap } from '../'; import { collect } from '../../lib/collect'; import { $of } from '../../factories'; @@ -10,7 +9,7 @@ const number$ = $of(1, 2, 3); test('returns the modified value', async () => { const expected = ['a1', 'b1', 'c1', 'a2', 'b2', 'c2', 'a3', 'b3', 'c3', 'C']; - const observable = k$(number$)( + const observable = number$.pipe( switchMap(x => $of('a' + x, 'b' + x, 'c' + x)) ); const res = collect(observable); @@ -19,7 +18,7 @@ test('returns the modified value', async () => { }); test('injects index to map', async () => { - const observable = k$(number$)(switchMap((x, i) => $of(i))); + const observable = number$.pipe(switchMap((x, i) => $of(i))); const res = collect(observable); expect(await res).toEqual([0, 1, 2, 'C']); @@ -29,16 +28,18 @@ test('should unsubscribe inner observable when source observable emits new value const unsubbed: string[] = []; const subject = new Subject(); - k$(subject)( - switchMap( - x => - new Observable(observer => { - return () => { - unsubbed.push(x); - }; - }) + subject + .pipe( + switchMap( + x => + new Observable(observer => { + return () => { + unsubbed.push(x); + }; + }) + ) ) - ).subscribe(); + .subscribe(); subject.next('a'); expect(unsubbed).toEqual([]); @@ -57,16 +58,18 @@ test('should unsubscribe inner observable when source observable errors', async const unsubbed: string[] = []; const subject = new Subject(); - k$(subject)( - switchMap( - x => - new Observable(observer => { - return () => { - unsubbed.push(x); - }; - }) + subject + .pipe( + switchMap( + x => + new Observable(observer => { + return () => { + unsubbed.push(x); + }; + }) + ) ) - ).subscribe(); + .subscribe(); subject.next('a'); subject.error(new Error('fail')); @@ -78,17 +81,19 @@ test('should unsubscribe inner observables if inner observer completes', async ( const unsubbed: string[] = []; const subject = new Subject(); - k$(subject)( - switchMap( - x => - new Observable(observer => { - observer.complete(); - return () => { - unsubbed.push(x); - }; - }) + subject + .pipe( + switchMap( + x => + new Observable(observer => { + observer.complete(); + return () => { + unsubbed.push(x); + }; + }) + ) ) - ).subscribe(); + .subscribe(); subject.next('a'); expect(unsubbed).toEqual(['a']); @@ -107,19 +112,21 @@ test('should unsubscribe inner observables if inner observer errors', async () = const error = jest.fn(); const thrownError = new Error('fail'); - k$(subject)( - switchMap( - x => - new Observable(observer => { - observer.error(thrownError); - return () => { - unsubbed.push(x); - }; - }) + subject + .pipe( + switchMap( + x => + new Observable(observer => { + observer.error(thrownError); + return () => { + unsubbed.push(x); + }; + }) + ) ) - ).subscribe({ - error - }); + .subscribe({ + error + }); subject.next('a'); expect(unsubbed).toEqual(['a']); @@ -137,7 +144,7 @@ test('should switch inner observables', () => { const actual: any[] = []; - k$(outer$)(switchMap(x => inner$[x])).subscribe({ + outer$.pipe(switchMap(x => inner$[x])).subscribe({ next(val) { actual.push(val); } @@ -165,7 +172,7 @@ test('should switch inner empty and empty', () => { const next = jest.fn(); - k$(outer$)(switchMap(x => inner$[x])).subscribe(next); + outer$.pipe(switchMap(x => inner$[x])).subscribe(next); outer$.next('x'); inner$.x.complete(); @@ -189,7 +196,7 @@ test('should switch inner never and throw', async () => { inner$.y.error(error); - const observable = k$(outer$)(switchMap(x => inner$[x])); + const observable = outer$.pipe(switchMap(x => inner$[x])); const res = collect(observable); outer$.next('x'); @@ -205,7 +212,7 @@ test('should handle outer throw', async () => { throw error; }); - const observable = k$(outer$)(switchMap(x => $of(x))); + const observable = outer$.pipe(switchMap(x => $of(x))); const res = collect(observable); expect(await res).toEqual([error]); @@ -217,7 +224,7 @@ test('should handle outer error', async () => { x: new Subject() }; - const observable = k$(outer$)(switchMap(x => inner$[x])); + const observable = outer$.pipe(switchMap(x => inner$[x])); const res = collect(observable); outer$.next('x'); @@ -239,7 +246,7 @@ test('should raise error when projection throws', async () => { const outer$ = new Subject(); const error = new Error('foo'); - const observable = k$(outer$)( + const observable = outer$.pipe( switchMap(x => { throw error; }) @@ -259,7 +266,7 @@ test('should switch inner cold observables, outer is unsubscribed early', () => }; const actual: any[] = []; - const sub = k$(outer$)(switchMap(x => inner$[x])).subscribe({ + const sub = outer$.pipe(switchMap(x => inner$[x])).subscribe({ next(val) { actual.push(val); } diff --git a/packages/kbn-observable/src/operators/__tests__/toPromise.test.ts b/packages/kbn-observable/src/operators/__tests__/toPromise.test.ts index d83b4d5736099e..4508732d479477 100644 --- a/packages/kbn-observable/src/operators/__tests__/toPromise.test.ts +++ b/packages/kbn-observable/src/operators/__tests__/toPromise.test.ts @@ -1,4 +1,3 @@ -import { k$ } from '../../k$'; import { Subject } from '../../Subject'; import { toPromise } from '../'; @@ -12,7 +11,7 @@ test('returns the last value', async () => { const resolved = jest.fn(); const rejected = jest.fn(); - k$(values$)(toPromise()).then(resolved, rejected); + values$.pipe(toPromise()).then(resolved, rejected); values$.next('foo'); await tick(); @@ -40,7 +39,7 @@ test('resolves even if no values received', async () => { const resolved = jest.fn(); const rejected = jest.fn(); - k$(values$)(toPromise()).then(resolved, rejected); + values$.pipe(toPromise()).then(resolved, rejected); values$.complete(); await tick(); @@ -55,7 +54,7 @@ test('rejects if error received', async () => { const resolved = jest.fn(); const rejected = jest.fn(); - k$(values$)(toPromise()).then(resolved, rejected); + values$.pipe(toPromise()).then(resolved, rejected); values$.error(new Error('fail')); await tick(); diff --git a/packages/kbn-observable/src/operators/mergeMap.ts b/packages/kbn-observable/src/operators/mergeMap.ts index 3c725fd4c4dc15..687ee7144672df 100644 --- a/packages/kbn-observable/src/operators/mergeMap.ts +++ b/packages/kbn-observable/src/operators/mergeMap.ts @@ -23,7 +23,7 @@ export function mergeMap( * * ```js * const source = Observable.from([1, 2, 3]); - * const observable = k$(source)( + * const observable = source.pipe( * mergeMap(x => Observable.of('a', x + 1)) * ); * ``` diff --git a/platform/cli/cli.ts b/platform/cli/cli.ts index 552853bfa7210b..27b0fa7a4d0d22 100644 --- a/platform/cli/cli.ts +++ b/platform/cli/cli.ts @@ -1,6 +1,6 @@ // TODO Fix build system so we can switch these to `import`s const yargs = require('yargs'); -import { k$, map } from 'kbn-observable'; +import { map } from 'kbn-observable'; import * as args from './args'; import { version } from './version'; @@ -34,9 +34,9 @@ const run = (argv: { [key: string]: any }) => { process.exit(reason === undefined ? 0 : 1); }; - const rawConfig$ = k$(rawConfigService.getConfig$())( - map(rawConfig => overrideConfigWithArgv(rawConfig, argv)) - ); + const rawConfig$ = rawConfigService + .getConfig$() + .pipe(map(rawConfig => overrideConfigWithArgv(rawConfig, argv))); const root = new Root(rawConfig$, env, onShutdown); root.start(); diff --git a/platform/config/ConfigService.ts b/platform/config/ConfigService.ts index 24de44bacb1b9f..5a3d36465f431a 100644 --- a/platform/config/ConfigService.ts +++ b/platform/config/ConfigService.ts @@ -1,11 +1,4 @@ -import { - Observable, - k$, - map, - first, - skipRepeats, - toPromise -} from 'kbn-observable'; +import { Observable, map, first, skipRepeats, toPromise } from 'kbn-observable'; import { isEqual } from 'lodash'; import { Env } from './Env'; @@ -56,7 +49,7 @@ export class ConfigService { path: ConfigPath, ConfigClass: ConfigWithSchema ) { - return k$(this.getDistinctRawConfig(path))( + return this.getDistinctRawConfig(path).pipe( map(rawConfig => this.createConfig(path, rawConfig, ConfigClass)) ); } @@ -71,7 +64,7 @@ export class ConfigService { path: ConfigPath, ConfigClass: ConfigWithSchema ) { - return k$(this.getDistinctRawConfig(path))( + return this.getDistinctRawConfig(path).pipe( map( rawConfig => rawConfig === undefined @@ -84,7 +77,7 @@ export class ConfigService { async isEnabledAtPath(path: ConfigPath) { const enabledPath = createPluginEnabledPath(path); - const config = await k$(this.config$)(first(), toPromise()); + const config = await this.config$.pipe(first(), toPromise()); if (!config.has(enabledPath)) { return true; @@ -123,7 +116,7 @@ export class ConfigService { private getDistinctRawConfig(path: ConfigPath) { this.markAsHandled(path); - return k$(this.config$)( + return this.config$.pipe( map(config => config.get(path)), skipRepeats(isEqual) ); @@ -134,7 +127,7 @@ export class ConfigService { } async getUnusedPaths(): Promise { - const config = await k$(this.config$)(first(), toPromise()); + const config = await this.config$.pipe(first(), toPromise()); const handledPaths = this.handledPaths.map(pathToString); return config diff --git a/platform/config/RawConfigService.ts b/platform/config/RawConfigService.ts index ba2ab79025f55a..bffe8fdbfe4f7a 100644 --- a/platform/config/RawConfigService.ts +++ b/platform/config/RawConfigService.ts @@ -1,5 +1,4 @@ import { - k$, BehaviorSubject, Observable, map, @@ -34,7 +33,7 @@ export class RawConfigService { private readonly config$: Observable; constructor(readonly configFile: string) { - this.config$ = k$(this.rawConfigFromFile$)( + this.config$ = this.rawConfigFromFile$.pipe( filter(rawConfig => rawConfig !== notRead), map(rawConfig => { // If the raw config is null, e.g. if empty config file, we default to diff --git a/platform/config/__tests__/ConfigService.test.ts b/platform/config/__tests__/ConfigService.test.ts index 08cba543b6cd18..a90020254d025a 100644 --- a/platform/config/__tests__/ConfigService.test.ts +++ b/platform/config/__tests__/ConfigService.test.ts @@ -1,4 +1,4 @@ -import { BehaviorSubject, k$, first, toPromise } from 'kbn-observable'; +import { BehaviorSubject, first, toPromise } from 'kbn-observable'; import { ConfigService, ObjectToRawConfigAdapter } from '..'; import { Env } from '../Env'; @@ -16,7 +16,7 @@ test('returns config at path as observable', async () => { const configService = new ConfigService(config$, defaultEnv, logger); const configs = configService.atPath('key', ExampleClassWithStringSchema); - const exampleConfig = await k$(configs)(first(), toPromise()); + const exampleConfig = await configs.pipe(first(), toPromise()); expect(exampleConfig.value).toBe('foo'); }); @@ -32,7 +32,7 @@ test('throws if config at path does not match schema', async () => { const configs = configService.atPath('key', ExampleClassWithStringSchema); try { - await k$(configs)(first(), toPromise()); + await configs.pipe(first(), toPromise()); } catch (e) { expect(e.message).toMatchSnapshot(); } @@ -48,7 +48,7 @@ test("returns undefined if fetching optional config at a path that doesn't exist 'unique-name', ExampleClassWithStringSchema ); - const exampleConfig = await k$(configs)(first(), toPromise()); + const exampleConfig = await configs.pipe(first(), toPromise()); expect(exampleConfig).toBeUndefined(); }); @@ -63,7 +63,7 @@ test('returns observable config at optional path if it exists', async () => { 'value', ExampleClassWithStringSchema ); - const exampleConfig: any = await k$(configs)(first(), toPromise()); + const exampleConfig: any = await configs.pipe(first(), toPromise()); expect(exampleConfig).toBeDefined(); expect(exampleConfig.value).toBe('bar'); @@ -118,7 +118,7 @@ test("throws error if config class does not implement 'createSchema'", async () const configs = configService.atPath('key', ExampleClass as any); try { - await k$(configs)(first(), toPromise()); + await configs.pipe(first(), toPromise()); } catch (e) { expect(e).toMatchSnapshot(); } diff --git a/platform/config/__tests__/RawConfigService.test.ts b/platform/config/__tests__/RawConfigService.test.ts index 65a1340bea3699..a611a4d7044e3e 100644 --- a/platform/config/__tests__/RawConfigService.test.ts +++ b/platform/config/__tests__/RawConfigService.test.ts @@ -4,7 +4,7 @@ jest.mock('../readConfig', () => ({ getConfigFromFile: mockGetConfigFromFile })); -import { k$, first, toPromise } from 'kbn-observable'; +import { first, toPromise } from 'kbn-observable'; import { RawConfigService } from '../RawConfigService'; const configFile = '/config/kibana.yml'; @@ -44,10 +44,9 @@ test('returns config at path as observable', async () => { configService.loadConfig(); - const exampleConfig = await k$(configService.getConfig$())( - first(), - toPromise() - ); + const exampleConfig = await configService + .getConfig$() + .pipe(first(), toPromise()); expect(exampleConfig.get('key')).toEqual('value'); expect(exampleConfig.getFlattenedPaths()).toEqual(['key']); diff --git a/platform/legacy/index.ts b/platform/legacy/index.ts index 0c6ac880017fd7..f77dbdadc7b003 100644 --- a/platform/legacy/index.ts +++ b/platform/legacy/index.ts @@ -8,7 +8,7 @@ export { /**@internal**/ export { LegacyKbnServer } from './LegacyKbnServer'; -import { k$, map, BehaviorSubject } from 'kbn-observable'; +import { map, BehaviorSubject } from 'kbn-observable'; import { Root } from '../root'; import { Env } from '../config'; import { @@ -23,7 +23,7 @@ import { */ export const injectIntoKbnServer = (rawKbnServer: any) => { const legacyConfig$ = new BehaviorSubject(rawKbnServer.config); - const config$ = k$(legacyConfig$)( + const config$ = legacyConfig$.pipe( map(legacyConfig => new LegacyConfigToRawConfigAdapter(legacyConfig)) ); diff --git a/platform/server/elasticsearch/ElasticsearchFacade.ts b/platform/server/elasticsearch/ElasticsearchFacade.ts index 5ef117ba09caee..1c03b97b691096 100644 --- a/platform/server/elasticsearch/ElasticsearchFacade.ts +++ b/platform/server/elasticsearch/ElasticsearchFacade.ts @@ -1,4 +1,4 @@ -import { k$, first, toPromise } from 'kbn-observable'; +import { first, toPromise } from 'kbn-observable'; import { ElasticsearchService } from './ElasticsearchService'; import { ElasticsearchClusterType } from './ElasticsearchConfig'; @@ -8,9 +8,8 @@ export class ElasticsearchRequestHelpers { constructor(private readonly elasticsearchService: ElasticsearchService) {} getClusterOfType(type: ElasticsearchClusterType): Promise { - return k$(this.elasticsearchService.getClusterOfType$(type))( - first(), - toPromise() - ); + return this.elasticsearchService + .getClusterOfType$(type) + .pipe(first(), toPromise()); } } diff --git a/platform/server/elasticsearch/ElasticsearchService.ts b/platform/server/elasticsearch/ElasticsearchService.ts index ebeb2ef2b499c7..a835306f4669c6 100644 --- a/platform/server/elasticsearch/ElasticsearchService.ts +++ b/platform/server/elasticsearch/ElasticsearchService.ts @@ -1,7 +1,6 @@ import { Observable, Subscription, - k$, map, filter, switchMap, @@ -26,7 +25,7 @@ export class ElasticsearchService implements CoreService { ) { const log = logger.get('elasticsearch'); - this.clusters$ = k$(config$)( + this.clusters$ = config$.pipe( filter(() => { if (this.subscription !== undefined) { log.error('clusters cannot be changed after they are created'); @@ -75,6 +74,6 @@ export class ElasticsearchService implements CoreService { } getClusterOfType$(type: ElasticsearchClusterType) { - return k$(this.clusters$)(map(clusters => clusters[type])); + return this.clusters$.pipe(map(clusters => clusters[type])); } } diff --git a/platform/server/http/HttpService.ts b/platform/server/http/HttpService.ts index 21bd93cb72b8bc..a9a9560691fbb3 100644 --- a/platform/server/http/HttpService.ts +++ b/platform/server/http/HttpService.ts @@ -1,4 +1,4 @@ -import { Observable, Subscription, k$, first, toPromise } from 'kbn-observable'; +import { Observable, Subscription, first, toPromise } from 'kbn-observable'; import { Env } from '../../config'; import { HttpServer } from './HttpServer'; @@ -34,7 +34,7 @@ export class HttpService implements CoreService { } }); - const config = await k$(this.config$)(first(), toPromise()); + const config = await this.config$.pipe(first(), toPromise()); await this.httpServer.start(config); } diff --git a/platform/server/plugins/PluginsService.ts b/platform/server/plugins/PluginsService.ts index f93cf1a75f3ebd..d9936f4c873ad6 100644 --- a/platform/server/plugins/PluginsService.ts +++ b/platform/server/plugins/PluginsService.ts @@ -2,7 +2,6 @@ import { readdir, stat } from 'fs'; import { resolve } from 'path'; import { Observable, - k$, first, map, mergeMap, @@ -37,7 +36,7 @@ export class PluginsService implements CoreService { } async start() { - const plugins = await k$(this.getAllPlugins())( + const plugins = await this.getAllPlugins().pipe( mergeMap( plugin => $fromPromise(this.isPluginEnabled(plugin)), (plugin, isEnabled) => ({ plugin, isEnabled }) @@ -69,7 +68,7 @@ export class PluginsService implements CoreService { } private getAllPlugins() { - return k$(this.pluginsConfig$)( + return this.pluginsConfig$.pipe( first(), mergeMap(config => config.scanDirs), mergeMap(