Skip to content

Commit

Permalink
[Automatic Import] Add support for handling unstructured syslog sampl…
Browse files Browse the repository at this point in the history
…es (elastic#192817)

## Summary

This PR handles the `unstructured` syslog samples in Automatic Import.

Examples of unstructured samples would be:

```
<34>Oct 11 00:14:05 mymachine su: 'su root' failed for user on /dev/pts/8
<34>Dec 11 00:14:43 yourmachine su: 'su root' failed for someone on /dev/pts/5
<34>Apr 11 00:14:05 mymachine su: 'su root' failed for otheruser on /dev/pts/3
```


https://github.com/user-attachments/assets/d1381ac9-4889-42cf-b3c1-d1b7a88def02


### Checklist

- [x] [Unit or functional
tests](https://www.elastic.co/guide/en/kibana/master/development-tests.html)
were updated or added to match the most common scenarios

### For maintainers

- [ ] This was checked for breaking API changes and was [labeled
appropriately](https://www.elastic.co/guide/en/kibana/master/contributing.html#kibana-release-notes-process)
  • Loading branch information
bhapas authored Sep 17, 2024
1 parent 193935c commit 77fe423
Show file tree
Hide file tree
Showing 22 changed files with 633 additions and 15 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

export const unstructuredLogState = {
lastExecutedChain: 'testchain',
packageName: 'testPackage',
dataStreamName: 'testDatastream',
grokPatterns: ['%{GREEDYDATA:message}'],
logSamples: ['dummy data'],
jsonSamples: ['{"message":"dummy data"}'],
finalized: false,
ecsVersion: 'testVersion',
errors: { test: 'testerror' },
additionalProcessors: [],
};

export const unstructuredLogResponse = {
grok_patterns: [
'####<%{MONTH} %{MONTHDAY}, %{YEAR} %{TIME} (?:AM|PM) %{WORD:timezone}> <%{WORD:log_level}> <%{WORD:component}> <%{DATA:hostname}> <%{DATA:server_name}> <%{DATA:thread_info}> <%{DATA:user}> <%{DATA:empty_field}> <%{DATA:empty_field2}> <%{NUMBER:timestamp}> <%{DATA:message_id}> <%{GREEDYDATA:message}>',
],
};
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ export async function handleKVError({

return {
kvProcessor,
lastExecutedChain: 'kv_error',
lastExecutedChain: 'kvError',
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,6 @@ describe('Testing kv header', () => {
expect(response.grokPattern).toStrictEqual(
'<%{NUMBER:priority}>%{NUMBER:version} %{GREEDYDATA:message}'
);
expect(response.lastExecutedChain).toBe('kv_header');
expect(response.lastExecutedChain).toBe('kvHeader');
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,6 @@ export async function handleHeader({

return {
grokPattern: pattern.grok_pattern,
lastExecutedChain: 'kv_header',
lastExecutedChain: 'kvHeader',
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ export async function handleKVValidate({
)) as { pipelineResults: KVResult[]; errors: object[] };

if (errors.length > 0) {
return { errors, lastExecutedChain: 'kv_validate' };
return { errors, lastExecutedChain: 'kvValidate' };
}

// Converts JSON Object into a string and parses it as a array of JSON strings
Expand All @@ -56,7 +56,7 @@ export async function handleKVValidate({
jsonSamples,
additionalProcessors,
errors: [],
lastExecutedChain: 'kv_validate',
lastExecutedChain: 'kvValidate',
};
}

Expand All @@ -65,7 +65,7 @@ export async function handleHeaderValidate({
client,
}: HandleKVNodeParams): Promise<Partial<KVState>> {
const grokPattern = state.grokPattern;
const grokProcessor = createGrokProcessor(grokPattern);
const grokProcessor = createGrokProcessor([grokPattern]);
const pipeline = { processors: grokProcessor, on_failure: [onFailure] };

const { pipelineResults, errors } = (await testPipeline(state.logSamples, pipeline, client)) as {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import { ESProcessorItem, SamplesFormat } from '../../../common';
import { getKVGraph } from '../kv/graph';
import { LogDetectionGraphParams, LogDetectionBaseNodeParams } from './types';
import { LogFormat } from '../../constants';
import { getUnstructuredGraph } from '../unstructured/graph';

const graphState: StateGraphArgs<LogFormatDetectionState>['channels'] = {
lastExecutedChain: {
Expand Down Expand Up @@ -90,9 +91,9 @@ function logFormatRouter({ state }: LogDetectionBaseNodeParams): string {
if (state.samplesFormat.name === LogFormat.STRUCTURED) {
return 'structured';
}
// if (state.samplesFormat === LogFormat.UNSTRUCTURED) {
// return 'unstructured';
// }
if (state.samplesFormat.name === LogFormat.UNSTRUCTURED) {
return 'unstructured';
}
// if (state.samplesFormat === LogFormat.CSV) {
// return 'csv';
// }
Expand All @@ -109,18 +110,19 @@ export async function getLogFormatDetectionGraph({ model, client }: LogDetection
handleLogFormatDetection({ state, model })
)
.addNode('handleKVGraph', await getKVGraph({ model, client }))
// .addNode('handleUnstructuredGraph', (state: LogFormatDetectionState) => getCompiledUnstructuredGraph({state, model}))
.addNode('handleUnstructuredGraph', await getUnstructuredGraph({ model, client }))
// .addNode('handleCsvGraph', (state: LogFormatDetectionState) => getCompiledCsvGraph({state, model}))
.addEdge(START, 'modelInput')
.addEdge('modelInput', 'handleLogFormatDetection')
.addEdge('handleKVGraph', 'modelOutput')
.addEdge('handleUnstructuredGraph', 'modelOutput')
.addEdge('modelOutput', END)
.addConditionalEdges(
'handleLogFormatDetection',
(state: LogFormatDetectionState) => logFormatRouter({ state }),
{
structured: 'handleKVGraph',
// unstructured: 'handleUnstructuredGraph',
unstructured: 'handleUnstructuredGraph',
// csv: 'handleCsvGraph',
unsupported: 'modelOutput',
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

export const GROK_EXAMPLE_ANSWER = {
rfc: 'RFC2454',
regex:
'/(?:(d{4}[-]d{2}[-]d{2}[T]d{2}[:]d{2}[:]d{2}(?:.d{1,6})?(?:[+-]d{2}[:]d{2}|Z)?)|-)s(?:([w][wd.@-]*)|-)s(.*)$/',
grok_patterns: ['%{WORD:key1}:%{WORD:value1};%{WORD:key2}:%{WORD:value2}:%{GREEDYDATA:message}'],
};

export const GROK_ERROR_EXAMPLE_ANSWER = {
grok_patterns: [
'%{TIMESTAMP:timestamp}:%{WORD:value1};%{WORD:key2}:%{WORD:value2}:%{GREEDYDATA:message}',
],
};

export const onFailure = {
append: {
field: 'error.message',
value:
'{% raw %}Processor {{{_ingest.on_failure_processor_type}}} with tag {{{_ingest.on_failure_processor_tag}}} in pipeline {{{_ingest.on_failure_pipeline}}} failed with message: {{{_ingest.on_failure_message}}}{% endraw %}',
},
};
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import { JsonOutputParser } from '@langchain/core/output_parsers';
import type { UnstructuredLogState } from '../../types';
import type { HandleUnstructuredNodeParams } from './types';
import { GROK_ERROR_PROMPT } from './prompts';
import { GROK_ERROR_EXAMPLE_ANSWER } from './constants';

export async function handleUnstructuredError({
state,
model,
}: HandleUnstructuredNodeParams): Promise<Partial<UnstructuredLogState>> {
const outputParser = new JsonOutputParser();
const grokErrorGraph = GROK_ERROR_PROMPT.pipe(model).pipe(outputParser);
const currentPatterns = state.grokPatterns;

const pattern = await grokErrorGraph.invoke({
current_pattern: JSON.stringify(currentPatterns, null, 2),
errors: JSON.stringify(state.errors, null, 2),
ex_answer: JSON.stringify(GROK_ERROR_EXAMPLE_ANSWER, null, 2),
});

return {
grokPatterns: pattern.grok_patterns,
lastExecutedChain: 'unstructuredError',
};
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import { FakeLLM } from '@langchain/core/utils/testing';
import { handleUnstructuredError } from './error';
import type { UnstructuredLogState } from '../../types';
import {
unstructuredLogState,
unstructuredLogResponse,
} from '../../../__jest__/fixtures/unstructured';
import {
ActionsClientChatOpenAI,
ActionsClientSimpleChatModel,
} from '@kbn/langchain/server/language_models';
import { IScopedClusterClient } from '@kbn/core-elasticsearch-server';

const model = new FakeLLM({
response: JSON.stringify(unstructuredLogResponse, null, 2),
}) as unknown as ActionsClientChatOpenAI | ActionsClientSimpleChatModel;

const state: UnstructuredLogState = unstructuredLogState;

describe('Testing unstructured error handling node', () => {
const client = {
asCurrentUser: {
ingest: {
simulate: jest.fn(),
},
},
} as unknown as IScopedClusterClient;
it('handleUnstructuredError()', async () => {
const response = await handleUnstructuredError({ state, model, client });
expect(response.grokPatterns).toStrictEqual(unstructuredLogResponse.grok_patterns);
expect(response.lastExecutedChain).toBe('unstructuredError');
});
});
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import {
ActionsClientChatOpenAI,
ActionsClientSimpleChatModel,
} from '@kbn/langchain/server/language_models';
import { FakeLLM } from '@langchain/core/utils/testing';
import { getUnstructuredGraph } from './graph';
import { IScopedClusterClient } from '@kbn/core-elasticsearch-server';

const model = new FakeLLM({
response: '{"log_type": "structured"}',
}) as unknown as ActionsClientChatOpenAI | ActionsClientSimpleChatModel;

describe('UnstructuredGraph', () => {
const client = {
asCurrentUser: {
ingest: {
simulate: jest.fn(),
},
},
} as unknown as IScopedClusterClient;
describe('Compiling and Running', () => {
it('Ensures that the graph compiles', async () => {
// When getUnstructuredGraph runs, langgraph compiles the graph it will error if the graph has any issues.
// Common issues for example detecting a node has no next step, or there is a infinite loop between them.
try {
await getUnstructuredGraph({ model, client });
} catch (error) {
fail(`getUnstructuredGraph threw an error: ${error}`);
}
});
});
});
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import type { StateGraphArgs } from '@langchain/langgraph';
import { StateGraph, END, START } from '@langchain/langgraph';
import type { UnstructuredLogState } from '../../types';
import { handleUnstructured } from './unstructured';
import type { UnstructuredGraphParams, UnstructuredBaseNodeParams } from './types';
import { handleUnstructuredError } from './error';
import { handleUnstructuredValidate } from './validate';

const graphState: StateGraphArgs<UnstructuredLogState>['channels'] = {
lastExecutedChain: {
value: (x: string, y?: string) => y ?? x,
default: () => '',
},
packageName: {
value: (x: string, y?: string) => y ?? x,
default: () => '',
},
dataStreamName: {
value: (x: string, y?: string) => y ?? x,
default: () => '',
},
logSamples: {
value: (x: string[], y?: string[]) => y ?? x,
default: () => [],
},
grokPatterns: {
value: (x: string[], y?: string[]) => y ?? x,
default: () => [],
},
jsonSamples: {
value: (x: string[], y?: string[]) => y ?? x,
default: () => [],
},
finalized: {
value: (x: boolean, y?: boolean) => y ?? x,
default: () => false,
},
errors: {
value: (x: object, y?: object) => y ?? x,
default: () => [],
},
additionalProcessors: {
value: (x: object[], y?: object[]) => y ?? x,
default: () => [],
},
ecsVersion: {
value: (x: string, y?: string) => y ?? x,
default: () => '',
},
};

function modelInput({ state }: UnstructuredBaseNodeParams): Partial<UnstructuredLogState> {
return {
finalized: false,
lastExecutedChain: 'modelInput',
};
}

function modelOutput({ state }: UnstructuredBaseNodeParams): Partial<UnstructuredLogState> {
return {
finalized: true,
additionalProcessors: state.additionalProcessors,
lastExecutedChain: 'modelOutput',
};
}

function validationRouter({ state }: UnstructuredBaseNodeParams): string {
if (Object.keys(state.errors).length === 0) {
return 'modelOutput';
}
return 'handleUnstructuredError';
}

export async function getUnstructuredGraph({ model, client }: UnstructuredGraphParams) {
const workflow = new StateGraph({
channels: graphState,
})
.addNode('modelInput', (state: UnstructuredLogState) => modelInput({ state }))
.addNode('modelOutput', (state: UnstructuredLogState) => modelOutput({ state }))
.addNode('handleUnstructuredError', (state: UnstructuredLogState) =>
handleUnstructuredError({ state, model, client })
)
.addNode('handleUnstructured', (state: UnstructuredLogState) =>
handleUnstructured({ state, model, client })
)
.addNode('handleUnstructuredValidate', (state: UnstructuredLogState) =>
handleUnstructuredValidate({ state, model, client })
)
.addEdge(START, 'modelInput')
.addEdge('modelInput', 'handleUnstructured')
.addEdge('handleUnstructured', 'handleUnstructuredValidate')
.addConditionalEdges(
'handleUnstructuredValidate',
(state: UnstructuredLogState) => validationRouter({ state }),
{
handleUnstructuredError: 'handleUnstructuredError',
modelOutput: 'modelOutput',
}
)
.addEdge('handleUnstructuredError', 'handleUnstructuredValidate')
.addEdge('modelOutput', END);

const compiledUnstructuredGraph = workflow.compile();
return compiledUnstructuredGraph;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
export { getUnstructuredGraph } from './graph';
Loading

0 comments on commit 77fe423

Please sign in to comment.