Skip to content

Commit

Permalink
Use debounceTime + PR comments
Browse files Browse the repository at this point in the history
  • Loading branch information
joshdover committed Aug 26, 2020
1 parent fa70af6 commit 2aaaf8c
Show file tree
Hide file tree
Showing 8 changed files with 50 additions and 159 deletions.
2 changes: 1 addition & 1 deletion src/core/server/legacy/legacy_service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ export class LegacyService implements CoreService {
core$: setupDeps.core.status.core$,
overall$: setupDeps.core.status.overall$,
set: setupDeps.core.status.plugins.set.bind(null, 'legacy'),
plugins$: setupDeps.core.status.plugins.getPlugins$('legacy'),
plugins$: setupDeps.core.status.plugins.getDepsStatus$('legacy'),
derivedStatus$: setupDeps.core.status.plugins.getDerivedStatus$('legacy'),
},
uiSettings: {
Expand Down
2 changes: 1 addition & 1 deletion src/core/server/plugins/plugin_context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ export function createPluginSetupContext<TPlugin, TPluginDependencies>(
core$: deps.status.core$,
overall$: deps.status.overall$,
set: deps.status.plugins.set.bind(null, plugin.name),
plugins$: deps.status.plugins.getPlugins$(plugin.name),
plugins$: deps.status.plugins.getDepsStatus$(plugin.name),
derivedStatus$: deps.status.plugins.getDerivedStatus$(plugin.name),
},
uiSettings: {
Expand Down
77 changes: 18 additions & 59 deletions src/core/server/status/plugins_status.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import { PluginName } from '../plugins';
import { PluginsStatusService } from './plugins_status';
import { of, Observable, BehaviorSubject } from 'rxjs';
import { of, Observable } from 'rxjs';
import { ServiceStatusLevels, CoreStatus, ServiceStatus } from './types';
import { first } from 'rxjs/operators';
import { ServiceStatusLevelSnapshotSerializer } from './test_utils';
Expand Down Expand Up @@ -79,6 +79,17 @@ describe('PluginStatusService', () => {
});
});

it(`provides a summary status when core and dependencies are at same severity level`, async () => {
const service = new PluginsStatusService({ core$: coreOneDegraded$, pluginDependencies });
service.set('a', of({ level: ServiceStatusLevels.degraded, summary: 'a is degraded' }));
expect(await service.getDerivedStatus$('b').pipe(first()).toPromise()).toEqual({
level: ServiceStatusLevels.degraded,
summary: '[2] services are degraded',
detail: 'See the status page for more information',
meta: expect.any(Object),
});
});

it(`allows dependencies status to take precedence over lower severity core statuses`, async () => {
const service = new PluginsStatusService({ core$: coreOneDegraded$, pluginDependencies });
service.set('a', of({ level: ServiceStatusLevels.unavailable, summary: 'a is not working' }));
Expand Down Expand Up @@ -209,15 +220,6 @@ describe('PluginStatusService', () => {
});
});

const available: ServiceStatus<any> = {
level: ServiceStatusLevels.available,
summary: 'Available',
};
const degraded: ServiceStatus<any> = {
level: ServiceStatusLevels.degraded,
summary: 'This is degraded!',
};

it('updates when a new plugin status observable is set', async () => {
const service = new PluginsStatusService({
core$: coreAllAvailable$,
Expand All @@ -240,62 +242,19 @@ describe('PluginStatusService', () => {
{ a: { level: ServiceStatusLevels.available, summary: 'a available' } },
]);
});

it('debounces updates in a dependency tree between ticks', async () => {
const service = new PluginsStatusService({ core$: coreAllAvailable$, pluginDependencies });
const pluginA$ = new BehaviorSubject(available);
service.set('a', pluginA$);

const statusUpdates: Array<Record<PluginName, ServiceStatus>> = [];
const subscription = service
.getAll$()
.subscribe((pluginStatuses) => statusUpdates.push(pluginStatuses));
const nextTick = () => new Promise((resolve) => process.nextTick(resolve));

await nextTick();
pluginA$.next(degraded);
await nextTick();
subscription.unsubscribe();

// Results should only include the final computed state of the depenency tree, once per tick.
// As updates propagate between dependencies, they will not emit any updates until the microtasks queue has
// been exhausted before the next tick.
expect(statusUpdates).toEqual([
{
a: { level: ServiceStatusLevels.available, summary: 'Available' },
b: { level: ServiceStatusLevels.available, summary: 'All dependencies are available' },
c: { level: ServiceStatusLevels.available, summary: 'All dependencies are available' },
},
{
a: { level: ServiceStatusLevels.degraded, summary: 'This is degraded!' },
b: {
level: ServiceStatusLevels.degraded,
summary: '[a]: This is degraded!',
detail: 'See the status page for more information',
meta: expect.any(Object),
},
c: {
level: ServiceStatusLevels.degraded,
summary: '[2] services are degraded',
detail: 'See the status page for more information',
meta: expect.any(Object),
},
},
]);
});
});

describe('getPlugins$', () => {
describe('getDepsStatus$', () => {
it('only includes dependencies of specified plugin', async () => {
const service = new PluginsStatusService({
core$: coreAllAvailable$,
pluginDependencies,
});
expect(await service.getPlugins$('a').pipe(first()).toPromise()).toEqual({});
expect(await service.getPlugins$('b').pipe(first()).toPromise()).toEqual({
expect(await service.getDepsStatus$('a').pipe(first()).toPromise()).toEqual({});
expect(await service.getDepsStatus$('b').pipe(first()).toPromise()).toEqual({
a: { level: ServiceStatusLevels.available, summary: 'All dependencies are available' },
});
expect(await service.getPlugins$('c').pipe(first()).toPromise()).toEqual({
expect(await service.getDepsStatus$('c').pipe(first()).toPromise()).toEqual({
a: { level: ServiceStatusLevels.available, summary: 'All dependencies are available' },
b: { level: ServiceStatusLevels.available, summary: 'All dependencies are available' },
});
Expand All @@ -305,7 +264,7 @@ describe('PluginStatusService', () => {
const service = new PluginsStatusService({ core$: coreOneDegraded$, pluginDependencies });
service.set('a', of({ level: ServiceStatusLevels.available, summary: 'a status' }));

expect(await service.getPlugins$('c').pipe(first()).toPromise()).toEqual({
expect(await service.getDepsStatus$('c').pipe(first()).toPromise()).toEqual({
a: { level: ServiceStatusLevels.available, summary: 'a status' }, // a is available depsite savedObjects being degraded
b: {
level: ServiceStatusLevels.degraded,
Expand All @@ -319,7 +278,7 @@ describe('PluginStatusService', () => {
it('throws error if unknown plugin passed', () => {
const service = new PluginsStatusService({ core$: coreAllAvailable$, pluginDependencies });
expect(() => {
service.getPlugins$('dont-exist');
service.getDepsStatus$('dont-exist');
}).toThrowError();
});
});
Expand Down
22 changes: 12 additions & 10 deletions src/core/server/status/plugins_status.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
*/

import { BehaviorSubject, Observable, combineLatest, of } from 'rxjs';
import { map, distinctUntilChanged, switchMap, debounce } from 'rxjs/operators';
import { map, distinctUntilChanged, switchMap, debounceTime } from 'rxjs/operators';
import { isDeepStrictEqual } from 'util';

import { PluginName } from '../plugins';
Expand Down Expand Up @@ -52,17 +52,20 @@ export class PluginsStatusService {
return this.getPluginStatuses$([...this.deps.pluginDependencies.keys()]);
}

public getPlugins$(plugin: PluginName): Observable<Record<PluginName, ServiceStatus>> {
public getDepsStatus$(plugin: PluginName): Observable<Record<PluginName, ServiceStatus>> {
const dependencies = this.deps.pluginDependencies.get(plugin);
if (!dependencies) {
throw new Error(`Unknown plugin: ${plugin}`);
}

return this.getPluginStatuses$(dependencies);
return this.getPluginStatuses$(dependencies).pipe(
// Prevent many emissions at once from dependency status resolution from making this too noisy
debounceTime(100)
);
}

public getDerivedStatus$(plugin: PluginName): Observable<ServiceStatus> {
return combineLatest(this.deps.core$, this.getPlugins$(plugin)).pipe(
return combineLatest([this.deps.core$, this.getDepsStatus$(plugin)]).pipe(
map(([coreStatus, pluginStatuses]) => {
return getSummaryStatus(
[...Object.entries(coreStatus), ...Object.entries(pluginStatuses)],
Expand All @@ -89,13 +92,12 @@ export class PluginsStatusService {
Observable<ServiceStatus>
]
)
.map(([pName, status$]) => status$.pipe(map((status) => ({ [pName]: status }))));
.map(([pName, status$]) =>
status$.pipe(map((status) => [pName, status] as [PluginName, ServiceStatus]))
);

return combineLatest(...pluginStatuses).pipe(
// We schedule the number of microtasks expected to resolve all of the dependencies of this update.
// Waiting for this ensures that we do not emit 'partial updates' to reduce noise.
debounce((statuses) => waitForMicrotasks(Object.keys(statuses).length)),
map((statuses) => Object.assign({}, ...statuses)),
return combineLatest(pluginStatuses).pipe(
map((statuses) => Object.fromEntries(statuses)),
distinctUntilChanged(isDeepStrictEqual)
);
})
Expand Down
2 changes: 1 addition & 1 deletion src/core/server/status/status_service.mock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ const createInternalSetupContractMock = () => {
isStatusPageAnonymous: jest.fn().mockReturnValue(false),
plugins: {
set: jest.fn(),
getPlugins$: jest.fn(),
getDepsStatus$: jest.fn(),
getDerivedStatus$: jest.fn(),
},
};
Expand Down
88 changes: 9 additions & 79 deletions src/core/server/status/status_service.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,6 @@ describe('StatusService', () => {
level: ServiceStatusLevels.degraded,
summary: 'This is degraded!',
};
const critical: ServiceStatus<any> = {
level: ServiceStatusLevels.critical,
summary: 'This is critical!',
};

const nextTick = () => new Promise((resolve) => process.nextTick(resolve));

describe('setup', () => {
describe('core$', () => {
Expand Down Expand Up @@ -203,6 +197,7 @@ describe('StatusService', () => {
});

it('does not emit duplicate events', async () => {
jest.useFakeTimers();
const elasticsearch$ = new BehaviorSubject(available);
const savedObjects$ = new BehaviorSubject(degraded);
const setup = await service.setup({
Expand All @@ -218,24 +213,22 @@ describe('StatusService', () => {
const statusUpdates: ServiceStatus[] = [];
const subscription = setup.overall$.subscribe((status) => statusUpdates.push(status));

// Wait for ticks to ensure that duplicate events are still filtered out
// regardless of debouncing.
await nextTick();
// Wait for timers to ensure that duplicate events are still filtered out regardless of debouncing.
elasticsearch$.next(available);
await nextTick();
jest.runAllTimers();
elasticsearch$.next(available);
await nextTick();
jest.runAllTimers();
elasticsearch$.next({
level: ServiceStatusLevels.available,
summary: `Wow another summary`,
});
await nextTick();
jest.runAllTimers();
savedObjects$.next(degraded);
await nextTick();
jest.runAllTimers();
savedObjects$.next(available);
await nextTick();
jest.runAllTimers();
savedObjects$.next(available);
await nextTick();
jest.runAllTimers();
subscription.unsubscribe();

expect(statusUpdates).toMatchInlineSnapshot(`
Expand All @@ -259,71 +252,8 @@ describe('StatusService', () => {
},
]
`);
});

it('debounces updates in dependency tree between ticks', async () => {
const pluginDependencies = new Map([
['a', []],
['b', ['a']],
]);
const elasticsearch$ = new BehaviorSubject(available);
const savedObjects$ = new BehaviorSubject(available);
const setup = await service.setup({
elasticsearch: {
status$: elasticsearch$,
},
savedObjects: {
status$: savedObjects$,
},
pluginDependencies,
});

const pluginA$ = new BehaviorSubject(available);
setup.plugins.set('a', pluginA$);

const statusUpdates: ServiceStatus[] = [];
const subscription = setup.overall$.subscribe((status) => statusUpdates.push(status));

// Wait for ticks to exhaust the microtask queue for the next debounced value
await nextTick();
pluginA$.next(degraded);
await nextTick();
elasticsearch$.next(critical);
await nextTick();
elasticsearch$.next(available);
await nextTick();
pluginA$.next(available);
await nextTick();
subscription.unsubscribe();

// Results should only include the final computed state of the depenency tree, once per tick.
// As updates propagate between dependencies, they will not emit any updates until the microtasks queue has
// been exhausted before the next tick.
expect(statusUpdates.map(({ level, summary }) => ({ level, summary })))
.toMatchInlineSnapshot(`
Array [
Object {
"level": available,
"summary": "All services are available",
},
Object {
"level": degraded,
"summary": "[2] services are degraded",
},
Object {
"level": critical,
"summary": "[2] services are critical",
},
Object {
"level": degraded,
"summary": "[2] services are degraded",
},
Object {
"level": available,
"summary": "All services are available",
},
]
`);
jest.useRealTimers();
});
});
});
Expand Down
11 changes: 4 additions & 7 deletions src/core/server/status/status_service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
*/

import { Observable, combineLatest } from 'rxjs';
import { map, distinctUntilChanged, shareReplay, take, debounce } from 'rxjs/operators';
import { map, distinctUntilChanged, shareReplay, take, debounceTime } from 'rxjs/operators';
import { isDeepStrictEqual } from 'util';

import { CoreService } from '../../types';
Expand Down Expand Up @@ -67,11 +67,8 @@ export class StatusService implements CoreService<InternalStatusServiceSetup> {
core$,
this.pluginsStatus.getAll$()
).pipe(
// We schedule the number of microtasks expected to resolve all of the dependencies of this update.
// Waiting for this ensures that we do not emit 'partial updates' to reduce noise.
debounce(([coreStatus, pluginsStatus]) =>
waitForMicrotasks(1 + Object.keys(pluginsStatus).length)
),
// Prevent many emissions at once from dependency status resolution from making this too noisy
debounceTime(100),
map(([coreStatus, pluginsStatus]) => {
const summary = getSummaryStatus([
...Object.entries(coreStatus),
Expand All @@ -88,7 +85,7 @@ export class StatusService implements CoreService<InternalStatusServiceSetup> {
overall$,
plugins: {
set: this.pluginsStatus.set.bind(this.pluginsStatus),
getPlugins$: this.pluginsStatus.getPlugins$.bind(this.pluginsStatus),
getDepsStatus$: this.pluginsStatus.getDepsStatus$.bind(this.pluginsStatus),
getDerivedStatus$: this.pluginsStatus.getDerivedStatus$.bind(this.pluginsStatus),
},
isStatusPageAnonymous: () => statusConfig.allowAnonymous,
Expand Down
5 changes: 4 additions & 1 deletion src/core/server/status/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,9 @@ export interface StatusServiceSetup {
* @remarks
* By default, plugins inherit this derived status from their dependencies.
* Calling {@link StatusSetup.set} overrides this default status.
*
* This may emit multliple times for a single status change event as propagates
* through the dependency tree
*/
derivedStatus$: Observable<ServiceStatus>;
}
Expand All @@ -168,7 +171,7 @@ export interface InternalStatusServiceSetup extends Pick<StatusServiceSetup, 'co
// Namespaced under `plugins` key to improve clarity that these are APIs for plugins specifically.
plugins: {
set(plugin: PluginName, status$: Observable<ServiceStatus>): void;
getPlugins$(plugin: PluginName): Observable<Record<string, ServiceStatus>>;
getDepsStatus$(plugin: PluginName): Observable<Record<string, ServiceStatus>>;
getDerivedStatus$(plugin: PluginName): Observable<ServiceStatus>;
};
}

0 comments on commit 2aaaf8c

Please sign in to comment.