diff --git a/packages/kbn-test/src/functional_test_runner/lib/docker_servers/define_docker_servers_config.ts b/packages/kbn-test/src/functional_test_runner/lib/docker_servers/define_docker_servers_config.ts index 674fddf75af4c7..aeae36d340db9a 100644 --- a/packages/kbn-test/src/functional_test_runner/lib/docker_servers/define_docker_servers_config.ts +++ b/packages/kbn-test/src/functional_test_runner/lib/docker_servers/define_docker_servers_config.ts @@ -24,7 +24,8 @@ export interface DockerServerSpec { port: number; image: string; waitForLogLine?: RegExp | string; - waitFor?: (server: DockerServer, logLine$: Rx.Observable) => Promise; + /** a function that should return an obeservable that will allow the tests to execute as soon as it emits anything */ + waitFor?: (server: DockerServer, logLine$: Rx.Observable) => Rx.Observable; } export interface DockerServer extends DockerServerSpec { diff --git a/packages/kbn-test/src/functional_test_runner/lib/docker_servers/docker_servers_service.ts b/packages/kbn-test/src/functional_test_runner/lib/docker_servers/docker_servers_service.ts index 9997aac86574c6..4551327c654427 100644 --- a/packages/kbn-test/src/functional_test_runner/lib/docker_servers/docker_servers_service.ts +++ b/packages/kbn-test/src/functional_test_runner/lib/docker_servers/docker_servers_service.ts @@ -19,7 +19,8 @@ import Url from 'url'; import execa from 'execa'; -import { filter, take } from 'rxjs/operators'; +import * as Rx from 'rxjs'; +import { filter, take, map } from 'rxjs/operators'; import { ToolingLog } from '@kbn/dev-utils'; import { Lifecycle } from '../lifecycle'; @@ -27,6 +28,8 @@ import { observeContainerRunning } from './container_running'; import { observeContainerLogs } from './container_logs'; import { DockerServer, DockerServerSpec } from './define_docker_servers_config'; +const SECOND = 1000; + export class DockerServersService { private servers: DockerServer[]; @@ -162,20 +165,39 @@ export class DockerServersService { `); } - await Promise.all([ - waitFor !== undefined && waitFor(server, logLine$), - waitForLogLine !== undefined && - logLine$ - .pipe( - filter(line => - waitForLogLine instanceof RegExp - ? waitForLogLine.test(line) - : line.includes(waitForLogLine) + function takeFirstWithTimeout( + source$: Rx.Observable, + errorMsg: string, + ms = 30 * SECOND + ) { + return Rx.race( + source$.pipe(take(1)), + Rx.timer(ms).pipe( + map(() => { + throw new Error(`[docker:${name}] ${errorMsg} within ${ms / SECOND} seconds`); + }) + ) + ); + } + + await Rx.merge( + takeFirstWithTimeout( + waitFor === undefined ? Rx.EMPTY : waitFor(server, logLine$), + `didn't find a line containing "${waitForLogLine}"` + ), + takeFirstWithTimeout( + waitForLogLine === undefined + ? Rx.EMPTY + : logLine$.pipe( + filter(line => + waitForLogLine instanceof RegExp + ? waitForLogLine.test(line) + : line.includes(waitForLogLine) + ) ), - take(1) - ) - .toPromise(), - ]); + `waitForLogLine didn't emit anything` + ) + ).toPromise(); } private async startServers() {