Skip to content

Commit

Permalink
[CM] Event Stream service (elastic#153211)
Browse files Browse the repository at this point in the history
## Summary

This PR implements the Event Stream service for Content Management.

For high-level overview see:

- [Event Stream technical
summary](https://docs.google.com/document/d/1nyMhb0p4gNV43OVF6cLJkxhMf2V4V1BIjPsnACxe_t0/edit#heading=h.typ7x7sxmeye)
(a bit old, but still good as general overview read)

Implementation details in this PR:

- This PR introduces the `EventStreamService` high-level class, which is
the public interface to the Event Stream, holds any necessary state, and
follows plugin life-cycle methods.
- On a lower level the actual event storage is defined in the
`EventStreamClient` interface.
- There are two `EventStreamClient` implementations:
- `EsEventStreamClient` is the production implementation, which stores
events to Elasticsearch.
- `MemoryEventStreamClient` is used for testing and could be used for
demo purposes.
- The same test suite `testEventStreamClient` is reused for
`EsEventStreamClient` and `MemoryEventStreamClient`, which should help
with verifying that both implements work correctly and the same. For
`EsEventStreamClient` it is executed as Kibana integration test, but for
`MemoryEventStreamClient` it is executed as a Jest test.
- In `EventStreamService` events are buffered for 250ms or up to 100
events before they are flushed to the storage.
- Events are stored in the `.kibana-event-stream` data stream.
- The data stream and index template are create during plugin
initialization "start" life-cycle, similar to how it is done in the
Event Log and in the Reporting index.
- The mappings define a `meta` field, which is currently unused, but
will allow to add more fields in the future without needing to change
the schema of the data stream.
- The mappings define a transaction ID `txId` field, which can be used
to correlate multiple related events together or to store the
transaction ID.
- Events are written to Elasticsearch using the `_bulk` request API.



### Checklist

Delete any items that are not applicable to this PR.

- [x] [Unit or functional
tests](https://www.elastic.co/guide/en/kibana/master/development-tests.html)
were updated or added to match the most common scenarios

### For maintainers

- [x] This was checked for breaking API changes and was [labeled
appropriately](https://www.elastic.co/guide/en/kibana/master/contributing.html#kibana-release-notes-process)

---------
Co-authored-by: Aleh Zasypkin <[email protected]>
Co-authored-by: Anton Dosov <[email protected]>
  • Loading branch information
vadimkibana authored and nreese committed Apr 6, 2023
1 parent f5ea8ad commit 0aeeca2
Show file tree
Hide file tree
Showing 37 changed files with 2,460 additions and 27 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -720,6 +720,7 @@
"JSONStream": "1.3.5",
"abort-controller": "^3.0.0",
"adm-zip": "^0.5.9",
"ajv": "^8.11.0",
"antlr4ts": "^0.5.0-alpha.3",
"archiver": "^5.3.1",
"async": "^3.2.3",
Expand Down Expand Up @@ -1297,7 +1298,6 @@
"@yarnpkg/lockfile": "^1.1.0",
"abab": "^2.0.4",
"aggregate-error": "^3.1.0",
"ajv": "^8.11.0",
"antlr4ts-cli": "^0.5.0-alpha.3",
"apidoc-markdown": "^7.2.4",
"argsplit": "^1.0.5",
Expand Down
29 changes: 29 additions & 0 deletions packages/kbn-es-query/src/kuery/node_types/node_builder.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -277,4 +277,33 @@ describe('nodeBuilder', () => {
`);
});
});

describe('range method', () => {
const date = new Date(1679741259769);
const dateString = date.toISOString();

test('formats all range operators', () => {
const operators: Array<'gt' | 'gte' | 'lt' | 'lte'> = ['gt', 'gte', 'lt', 'lte'];

for (const operator of operators) {
const nodes = nodeBuilder.range('foo', operator, dateString);
const query = toElasticsearchQuery(nodes);

expect(query).toMatchObject({
bool: {
minimum_should_match: 1,
should: [
{
range: {
foo: {
[operator]: dateString,
},
},
},
],
},
});
}
});
});
});
12 changes: 12 additions & 0 deletions packages/kbn-es-query/src/kuery/node_types/node_builder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
* Side Public License, v 1.
*/

import type { RangeFilterParams } from '../../filters';
import { KueryNode, nodeTypes } from '../types';

export const nodeBuilder = {
Expand All @@ -21,4 +22,15 @@ export const nodeBuilder = {
and: (nodes: KueryNode[]): KueryNode => {
return nodes.length > 1 ? nodeTypes.function.buildNode('and', nodes) : nodes[0];
},
range: (
fieldName: string,
operator: keyof Pick<RangeFilterParams, 'gt' | 'gte' | 'lt' | 'lte'>,
value: number | string
) => {
return nodeTypes.function.buildNodeWithArgumentNodes('range', [
nodeTypes.literal.buildNode(fieldName),
operator,
typeof value === 'string' ? nodeTypes.literal.buildNode(value) : value,
]);
},
};
14 changes: 11 additions & 3 deletions src/plugins/bfetch/common/buffer/item_buffer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ export interface ItemBufferParams<Item> {
* argument which is a list of all buffered items. If `.flush()` is called
* when buffer is empty, `.onflush` is called with empty array.
*/
onFlush: (items: Item[]) => void;
onFlush: (items: Item[]) => void | Promise<void>;
}

/**
Expand Down Expand Up @@ -60,11 +60,19 @@ export class ItemBuffer<Item> {
}

/**
* Call `.onflush` method and clear buffer.
* Call `.onFlush` method and clear buffer.
*/
public flush() {
this.flushAsync().catch(() => {});
}

/**
* Same as `.flush()` but asynchronous, and returns a promise, which
* rejects if `.onFlush` throws.
*/
public async flushAsync(): Promise<void> {
let list;
[list, this.list] = [this.list, []];
this.params.onFlush(list);
await this.params.onFlush(list);
}
}
5 changes: 5 additions & 0 deletions src/plugins/bfetch/common/buffer/timed_item_buffer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ export class TimedItemBuffer<Item> extends ItemBuffer<Item> {
super.flush();
}

public async flushAsync() {
clearTimeout(this.timer);
await super.flushAsync();
}

private onTimeout = () => {
this.flush();
};
Expand Down
19 changes: 19 additions & 0 deletions src/plugins/content_management/README.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,22 @@
# Content management

The content management plugin provides functionality to manage content in Kibana.


## Testing

Many parts of the Content Management service are implemented *in-memory*, hence it
is possible to test big chunks of the Content Management plugin using Jest
tests.


### Elasticsearch Integration tests

Some functionality of the Content Management plugin can be tested using *Kibana
Integration Tests*, which execute tests against a real Elasticsearch instance.

Run integrations tests with:

```
yarn test:jest_integration src/plugins/content_management
```
13 changes: 13 additions & 0 deletions src/plugins/content_management/jest.integration.config.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

module.exports = {
preset: '@kbn/test/jest_integration',
rootDir: '../../..',
roots: ['<rootDir>/src/plugins/content_management'],
};
46 changes: 45 additions & 1 deletion src/plugins/content_management/server/core/core.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

import { loggingSystemMock } from '@kbn/core/server/mocks';
import { Core } from './core';
import { createMemoryStorage } from './mocks';
Expand All @@ -31,6 +32,8 @@ import type {
SearchItemError,
} from './event_types';
import { ContentTypeDefinition, StorageContext } from './types';
import { until } from '../event_stream/tests/util';
import { setupEventStreamService } from '../event_stream/tests/setup_event_stream_service';

const logger = loggingSystemMock.createLogger();

Expand All @@ -48,8 +51,13 @@ const setup = ({ registerFooType = false }: { registerFooType?: boolean } = {})
},
};

const core = new Core({ logger });
const eventStream = setupEventStreamService().service;
const core = new Core({
logger,
eventStream,
});
const coreSetup = core.setup();

const contentDefinition: ContentTypeDefinition = {
id: FOO_CONTENT_ID,
storage: createMemoryStorage(),
Expand All @@ -76,6 +84,7 @@ const setup = ({ registerFooType = false }: { registerFooType?: boolean } = {})
fooContentCrud,
cleanUp,
eventBus: coreSetup.api.eventBus,
eventStream,
};
};

Expand Down Expand Up @@ -839,6 +848,41 @@ describe('Content Core', () => {
});
});
});

describe('eventStream', () => {
test('stores "delete" events', async () => {
const { fooContentCrud, ctx, eventStream } = setup({ registerFooType: true });

await fooContentCrud!.create(ctx, { title: 'Hello' }, { id: '1234' });
await fooContentCrud!.delete(ctx, '1234');

const findEvent = async () => {
const tail = await eventStream.tail();

for (const event of tail) {
if (
event.predicate[0] === 'delete' &&
event.object &&
event.object[0] === 'foo' &&
event.object[1] === '1234'
) {
return event;
}
}

return null;
};

await until(async () => !!(await findEvent()), 100);

const event = await findEvent();

expect(event).toMatchObject({
predicate: ['delete'],
object: ['foo', '1234'],
});
});
});
});
});
});
24 changes: 22 additions & 2 deletions src/plugins/content_management/server/core/core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import { Logger } from '@kbn/core/server';

import { Logger } from '@kbn/core/server';
import { EventStreamService } from '../event_stream';
import { ContentCrud } from './crud';
import { EventBus } from './event_bus';
import { ContentRegistry } from './registry';
Expand All @@ -25,6 +26,11 @@ export interface CoreApi {
eventBus: EventBus;
}

export interface CoreInitializerContext {
logger: Logger;
eventStream: EventStreamService;
}

export interface CoreSetup {
/** Content registry instance */
contentRegistry: ContentRegistry;
Expand All @@ -36,14 +42,16 @@ export class Core {
private contentRegistry: ContentRegistry;
private eventBus: EventBus;

constructor({ logger }: { logger: Logger }) {
constructor(private readonly ctx: CoreInitializerContext) {
const contentTypeValidator = (contentType: string) =>
this.contentRegistry?.isContentRegistered(contentType) ?? false;
this.eventBus = new EventBus(contentTypeValidator);
this.contentRegistry = new ContentRegistry(this.eventBus);
}

setup(): CoreSetup {
this.setupEventStream();

return {
contentRegistry: this.contentRegistry,
api: {
Expand All @@ -53,4 +61,16 @@ export class Core {
},
};
}

private setupEventStream() {
// TODO: This should be cleaned up and support added for all CRUD events.
this.eventBus.on('deleteItemSuccess', (event) => {
this.ctx.eventStream.addEvent({
// TODO: add "subject" field to event
predicate: ['delete'],
// TODO: the `.contentId` should be easily available on most events.
object: [event.contentTypeId, (event as any).contentId],
});
});
}
}
50 changes: 50 additions & 0 deletions src/plugins/content_management/server/event_stream/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# Event Stream


## The service

On a high-level the Event Stream is exposed through the `EventStreamService`
class, which is the public interface to the Event Stream, it holds any necessary
state, and follows plugin life-cycle methods.

The service also validates the events before they are stored. It also buffers
the events on write. Events are buffered for 250ms or up to 100 events before
they are flushed to the storage.


## The client

On a lower level the actual event storage is defined in the `EventStreamClient`
interface. There are two `EventStreamClient` implementations:

- `EsEventStreamClient` is the production implementation, which stores events
to the Elasticsearch.
- `MemoryEventStreamClient` is used for testing and could be used for demo
purposes.


### The `EsEventStreamClient` client

`EsEventStreamClient` is used in production. It stores events in the
`.kibana-event-stream` data stream. The data stream and index template are
created during plugin initialization "start" life-cycle.

The mappings define `meta` and `indexed` fields, which are reserved for future
schema extensions (so that new fields can be added without mapping changes).

The mappings also define a transaction ID (`txID`) field, which can be used to
correlate multiple related events together or to store the transaction ID.

Events are written to Elasticsearch using the `_bulk` request API.


## Testing

The `MemoryEventStreamClient` can be used to simulate the Event Stream in Jest
unit test environment. Use `setupEventStreamService()` to spin up the service
in the test environment.

The clients themselves can be tested using the `testEventStreamClient` test
suite, which should help with verifying that both implements work correctly.
The `EsEventStreamClient` it is tested using Kibana integration tests, but for
`MemoryEventStreamClient` it is tested as a Jest tests.
Loading

0 comments on commit 0aeeca2

Please sign in to comment.