Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Winlogbeat] Switch to ingest node processing #29435

Merged
merged 10 commits into from
Jan 4, 2022
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Improve ECS field mappings in Sysmon module. `file.name`, `file.directory`, and `file.extension` are now populated. {issue}18364[18364]
- Improve ECS field mappings in Sysmon module. `rule.name` is populated for all events when present. {issue}18364[18364]
- Remove top level `hash` property from sysmon events {pull}20653[20653]
- Move module processing from local Javascript processor to ingest node {issue}29184[29184] {pull}29435[29435]

*Functionbeat*

Expand Down
2 changes: 1 addition & 1 deletion filebeat/fileset/compatibility.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ func (p *Processor) String() string {
// adaptPipelineForCompatibility iterates over all processors in the pipeline
// and adapts them for version of Elasticsearch used. Adapt can mean modifying
// processor options or removing the processor.
func adaptPipelineForCompatibility(esVersion common.Version, pipelineID string, content map[string]interface{}, log *logp.Logger) (err error) {
func AdaptPipelineForCompatibility(esVersion common.Version, pipelineID string, content map[string]interface{}, log *logp.Logger) (err error) {
log = log.With("pipeline_id", pipelineID)
// Adapt the main processors in the pipeline.
if err = adaptProcessorsForCompatibility(esVersion, content, "processors", false, log); err != nil {
Expand Down
18 changes: 9 additions & 9 deletions filebeat/fileset/compatibility_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func TestAdaptPipelineForCompatibility(t *testing.T) {
test := test
t.Run(test.name, func(t *testing.T) {
t.Parallel()
err := adaptPipelineForCompatibility(*test.esVersion, "foo-pipeline", test.content, logp.NewLogger(logName))
err := AdaptPipelineForCompatibility(*test.esVersion, "foo-pipeline", test.content, logp.NewLogger(logName))
if test.isErrExpected {
assert.Error(t, err)
} else {
Expand Down Expand Up @@ -305,7 +305,7 @@ func TestReplaceSetIgnoreEmptyValue(t *testing.T) {
test := test
t.Run(test.name, func(t *testing.T) {
t.Parallel()
err := adaptPipelineForCompatibility(*test.esVersion, "foo-pipeline", test.content, logp.NewLogger(logName))
err := AdaptPipelineForCompatibility(*test.esVersion, "foo-pipeline", test.content, logp.NewLogger(logName))
if test.isErrExpected {
assert.Error(t, err)
} else {
Expand Down Expand Up @@ -507,7 +507,7 @@ func TestReplaceAppendAllowDuplicates(t *testing.T) {
test := test
t.Run(test.name, func(t *testing.T) {
t.Parallel()
err := adaptPipelineForCompatibility(*test.esVersion, "foo-pipeline", test.content, logp.NewLogger(logName))
err := AdaptPipelineForCompatibility(*test.esVersion, "foo-pipeline", test.content, logp.NewLogger(logName))
if test.isErrExpected {
assert.Error(t, err)
} else {
Expand Down Expand Up @@ -632,7 +632,7 @@ func TestRemoveURIPartsProcessor(t *testing.T) {
test := test
t.Run(test.name, func(t *testing.T) {
t.Parallel()
err := adaptPipelineForCompatibility(*test.esVersion, "foo-pipeline", test.content, logp.NewLogger(logName))
err := AdaptPipelineForCompatibility(*test.esVersion, "foo-pipeline", test.content, logp.NewLogger(logName))
if test.isErrExpected {
assert.Error(t, err)
} else {
Expand Down Expand Up @@ -767,7 +767,7 @@ func TestRemoveNetworkDirectionProcessor(t *testing.T) {
test := test
t.Run(test.name, func(t *testing.T) {
t.Parallel()
err := adaptPipelineForCompatibility(*test.esVersion, "foo-pipeline", test.content, logp.NewLogger(logName))
err := AdaptPipelineForCompatibility(*test.esVersion, "foo-pipeline", test.content, logp.NewLogger(logName))
if test.isErrExpected {
assert.Error(t, err)
} else {
Expand Down Expand Up @@ -950,7 +950,7 @@ func TestReplaceConvertIPWithGrok(t *testing.T) {
test := test
t.Run(test.name, func(t *testing.T) {
t.Parallel()
err := adaptPipelineForCompatibility(*test.esVersion, "foo-pipeline", test.content, logp.NewLogger(logName))
err := AdaptPipelineForCompatibility(*test.esVersion, "foo-pipeline", test.content, logp.NewLogger(logName))
if test.isErrExpected {
assert.Error(t, err)
} else {
Expand Down Expand Up @@ -1070,7 +1070,7 @@ func TestRemoveRegisteredDomainProcessor(t *testing.T) {
test := test
t.Run(test.name, func(t *testing.T) {
t.Parallel()
err := adaptPipelineForCompatibility(*test.esVersion, "foo-pipeline", test.content, logp.NewLogger(logName))
err := AdaptPipelineForCompatibility(*test.esVersion, "foo-pipeline", test.content, logp.NewLogger(logName))
if test.isErrExpected {
assert.Error(t, err)
} else {
Expand Down Expand Up @@ -1331,7 +1331,7 @@ func TestReplaceAlternativeFlowProcessors(t *testing.T) {
test := test
t.Run(test.name, func(t *testing.T) {
t.Parallel()
err := adaptPipelineForCompatibility(*test.esVersion, "foo-pipeline", test.content, logp.NewLogger(logName))
err := AdaptPipelineForCompatibility(*test.esVersion, "foo-pipeline", test.content, logp.NewLogger(logName))
if test.isErrExpected {
assert.Error(t, err)
} else {
Expand Down Expand Up @@ -1446,7 +1446,7 @@ func TestRemoveDescription(t *testing.T) {
test := test
t.Run(test.name, func(t *testing.T) {
t.Parallel()
err := adaptPipelineForCompatibility(*test.esVersion, "foo-pipeline", test.content, logp.NewLogger(logName))
err := AdaptPipelineForCompatibility(*test.esVersion, "foo-pipeline", test.content, logp.NewLogger(logName))
if test.isErrExpected {
assert.Error(t, err)
} else {
Expand Down
41 changes: 22 additions & 19 deletions filebeat/fileset/fileset.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,13 +237,13 @@ func (fs *Fileset) turnOffElasticsearchVars(vars map[string]interface{}, esVersi
func resolveVariable(vars map[string]interface{}, value interface{}) (interface{}, error) {
switch v := value.(type) {
case string:
return applyTemplate(vars, v, false)
return ApplyTemplate(vars, v, false)
case []interface{}:
transformed := []interface{}{}
for _, val := range v {
s, ok := val.(string)
if ok {
transf, err := applyTemplate(vars, s, false)
transf, err := ApplyTemplate(vars, s, false)
if err != nil {
return nil, fmt.Errorf("array: %v", err)
}
Expand All @@ -257,10 +257,10 @@ func resolveVariable(vars map[string]interface{}, value interface{}) (interface{
return value, nil
}

// applyTemplate applies a Golang text/template. If specialDelims is set to true,
// ApplyTemplate applies a Golang text/template. If specialDelims is set to true,
// the delimiters are set to `{<` and `>}` instead of `{{` and `}}`. These are easier to use
// in pipeline definitions.
func applyTemplate(vars map[string]interface{}, templateString string, specialDelims bool) (string, error) {
func ApplyTemplate(vars map[string]interface{}, templateString string, specialDelims bool) (string, error) {
tpl := template.New("text").Option("missingkey=error")
if specialDelims {
tpl = tpl.Delims("{<", ">}")
Expand Down Expand Up @@ -307,7 +307,7 @@ func getTemplateFunctions(vars map[string]interface{}) (template.FuncMap, error)
return buf.String(), err
},
"IngestPipeline": func(shortID string) string {
return formatPipelineID(
return FormatPipelineID(
builtinVars["prefix"].(string),
builtinVars["module"].(string),
builtinVars["fileset"].(string),
Expand Down Expand Up @@ -343,7 +343,7 @@ func (fs *Fileset) getBuiltinVars(info beat.Info) (map[string]interface{}, error
}

func (fs *Fileset) getInputConfig() (*common.Config, error) {
path, err := applyTemplate(fs.vars, fs.manifest.Input, false)
path, err := ApplyTemplate(fs.vars, fs.manifest.Input, false)
if err != nil {
return nil, fmt.Errorf("Error expanding vars on the input path: %v", err)
}
Expand All @@ -352,7 +352,7 @@ func (fs *Fileset) getInputConfig() (*common.Config, error) {
return nil, fmt.Errorf("Error reading input file %s: %v", path, err)
}

yaml, err := applyTemplate(fs.vars, string(contents), false)
yaml, err := ApplyTemplate(fs.vars, string(contents), false)
if err != nil {
return nil, fmt.Errorf("Error interpreting the template of the input: %v", err)
}
Expand Down Expand Up @@ -409,12 +409,12 @@ func (fs *Fileset) getInputConfig() (*common.Config, error) {
func (fs *Fileset) getPipelineIDs(info beat.Info) ([]string, error) {
var pipelineIDs []string
for _, ingestPipeline := range fs.manifest.IngestPipeline {
path, err := applyTemplate(fs.vars, ingestPipeline, false)
path, err := ApplyTemplate(fs.vars, ingestPipeline, false)
if err != nil {
return nil, fmt.Errorf("Error expanding vars on the ingest pipeline path: %v", err)
}

pipelineIDs = append(pipelineIDs, formatPipelineID(info.IndexPrefix, fs.mcfg.Module, fs.name, path, info.Version))
pipelineIDs = append(pipelineIDs, FormatPipelineID(info.IndexPrefix, fs.mcfg.Module, fs.name, path, info.Version))
}

return pipelineIDs, nil
Expand All @@ -428,7 +428,7 @@ func (fs *Fileset) GetPipelines(esVersion common.Version) (pipelines []pipeline,
}

for idx, ingestPipeline := range fs.manifest.IngestPipeline {
path, err := applyTemplate(fs.vars, ingestPipeline, false)
path, err := ApplyTemplate(fs.vars, ingestPipeline, false)
if err != nil {
return nil, fmt.Errorf("Error expanding vars on the ingest pipeline path: %v", err)
}
Expand All @@ -438,7 +438,7 @@ func (fs *Fileset) GetPipelines(esVersion common.Version) (pipelines []pipeline,
return nil, fmt.Errorf("Error reading pipeline file %s: %v", path, err)
}

encodedString, err := applyTemplate(vars, string(strContents), true)
encodedString, err := ApplyTemplate(vars, string(strContents), true)
if err != nil {
return nil, fmt.Errorf("Error interpreting the template of the ingest pipeline: %v", err)
}
Expand All @@ -453,7 +453,7 @@ func (fs *Fileset) GetPipelines(esVersion common.Version) (pipelines []pipeline,
if err = yaml.Unmarshal([]byte(encodedString), &content); err != nil {
return nil, fmt.Errorf("Error YAML decoding the pipeline file: %s: %v", path, err)
}
newContent, err := fixYAMLMaps(content)
newContent, err := FixYAMLMaps(content)
if err != nil {
return nil, fmt.Errorf("Failed to sanitize the YAML pipeline file: %s: %v", path, err)
}
Expand All @@ -474,11 +474,11 @@ func (fs *Fileset) GetPipelines(esVersion common.Version) (pipelines []pipeline,
return pipelines, nil
}

// This function recursively converts maps with interface{} keys, as returned by
// FixYAMLMaps recursively converts maps with interface{} keys, as returned by
// yaml.Unmarshal, to maps of string keys, as expected by the json encoder
// that will be used when delivering the pipeline to Elasticsearch.
// Will return an error when something other than a string is used as a key.
func fixYAMLMaps(elem interface{}) (_ interface{}, err error) {
func FixYAMLMaps(elem interface{}) (_ interface{}, err error) {
switch v := elem.(type) {
case map[interface{}]interface{}:
result := make(map[string]interface{}, len(v))
Expand All @@ -487,29 +487,32 @@ func fixYAMLMaps(elem interface{}) (_ interface{}, err error) {
if !ok {
return nil, fmt.Errorf("key '%v' is not string but %T", key, key)
}
if result[keyS], err = fixYAMLMaps(value); err != nil {
if result[keyS], err = FixYAMLMaps(value); err != nil {
return nil, err
}
}
return result, nil
case map[string]interface{}:
for key, value := range v {
if v[key], err = fixYAMLMaps(value); err != nil {
if v[key], err = FixYAMLMaps(value); err != nil {
return nil, err
}
}
case []interface{}:
for idx, value := range v {
if v[idx], err = fixYAMLMaps(value); err != nil {
if v[idx], err = FixYAMLMaps(value); err != nil {
return nil, err
}
}
}
return elem, nil
}

// formatPipelineID generates the ID to be used for the pipeline ID in Elasticsearch
func formatPipelineID(prefix, module, fileset, path, version string) string {
// FormatPipelineID generates the ID to be used for the pipeline ID in Elasticsearch
func FormatPipelineID(prefix, module, fileset, path, version string) string {
if module == "" && fileset == "" {
return fmt.Sprintf("%s-%s-%s", prefix, version, removeExt(filepath.Base(path)))
}
return fmt.Sprintf("%s-%s-%s-%s-%s", prefix, version, module, fileset, removeExt(filepath.Base(path)))
}

Expand Down
6 changes: 3 additions & 3 deletions filebeat/fileset/modules_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func TestLoadPipeline(t *testing.T) {
}

log := logp.NewLogger(logName)
err := loadPipeline(client, "my-pipeline-id", content, false, log)
err := LoadPipeline(client, "my-pipeline-id", content, false, log)
require.NoError(t, err)

status, _, err := client.Request("GET", "/_ingest/pipeline/my-pipeline-id", "", nil, nil)
Expand All @@ -71,12 +71,12 @@ func TestLoadPipeline(t *testing.T) {

// loading again shouldn't actually update the pipeline
content["description"] = "describe pipeline 2"
err = loadPipeline(client, "my-pipeline-id", content, false, log)
err = LoadPipeline(client, "my-pipeline-id", content, false, log)
require.NoError(t, err)
checkUploadedPipeline(t, client, "describe pipeline")

// loading again updates the pipeline
err = loadPipeline(client, "my-pipeline-id", content, true, log)
err = LoadPipeline(client, "my-pipeline-id", content, true, log)
require.NoError(t, err)
checkUploadedPipeline(t, client, "describe pipeline 2")
}
Expand Down
10 changes: 5 additions & 5 deletions filebeat/fileset/pipelines.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func (reg *ModuleRegistry) LoadPipelines(esClient PipelineLoader, overwrite bool

var pipelineIDsLoaded []string
for _, pipeline := range pipelines {
err = loadPipeline(esClient, pipeline.id, pipeline.contents, overwrite, reg.log.With("pipeline", pipeline.id))
err = LoadPipeline(esClient, pipeline.id, pipeline.contents, overwrite, reg.log.With("pipeline", pipeline.id))
if err != nil {
err = fmt.Errorf("error loading pipeline for fileset %s/%s: %v", module, name, err)
break
Expand All @@ -100,7 +100,7 @@ func (reg *ModuleRegistry) LoadPipelines(esClient PipelineLoader, overwrite bool
// error, validate all pipelines before loading any of them. This requires https://github.com/elastic/elasticsearch/issues/35495.
errs := multierror.Errors{err}
for _, pipelineID := range pipelineIDsLoaded {
err = deletePipeline(esClient, pipelineID)
err = DeletePipeline(esClient, pipelineID)
if err != nil {
errs = append(errs, err)
}
Expand All @@ -112,7 +112,7 @@ func (reg *ModuleRegistry) LoadPipelines(esClient PipelineLoader, overwrite bool
return nil
}

func loadPipeline(esClient PipelineLoader, pipelineID string, content map[string]interface{}, overwrite bool, log *logp.Logger) error {
func LoadPipeline(esClient PipelineLoader, pipelineID string, content map[string]interface{}, overwrite bool, log *logp.Logger) error {
path := makeIngestPipelinePath(pipelineID)
if !overwrite {
status, _, _ := esClient.Request("GET", path, "", nil, nil)
Expand All @@ -122,7 +122,7 @@ func loadPipeline(esClient PipelineLoader, pipelineID string, content map[string
}
}

if err := adaptPipelineForCompatibility(esClient.GetVersion(), pipelineID, content, log); err != nil {
if err := AdaptPipelineForCompatibility(esClient.GetVersion(), pipelineID, content, log); err != nil {
return fmt.Errorf("failed to adapt pipeline with backwards compatibility changes: %w", err)
}

Expand All @@ -134,7 +134,7 @@ func loadPipeline(esClient PipelineLoader, pipelineID string, content map[string
return nil
}

func deletePipeline(esClient PipelineLoader, pipelineID string) error {
func DeletePipeline(esClient PipelineLoader, pipelineID string) error {
path := makeIngestPipelinePath(pipelineID)
_, _, err := esClient.Request("DELETE", path, "", nil, nil)
return err
Expand Down
23 changes: 23 additions & 0 deletions libbeat/docs/command-reference.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

:apikey-command-short-desc: Manage API Keys for communication between APM agents and server.

ifndef::export_pipeline[]
ifndef::serverless[]
ifndef::no_dashboards[]
:export-command-short-desc: Exports the configuration, index template, ILM policy, or a dashboard to stdout
Expand All @@ -32,6 +33,11 @@ endif::serverless[]
ifdef::serverless[]
:export-command-short-desc: Exports the configuration, index template, or {cloudformation-ref} template to stdout
endif::serverless[]
endif::export_pipeline[]

ifdef::export_pipeline[]
:export-command-short-desc: Exports the configuration, index template, pipeline, or ILM policy to stdout
endif::export_pipeline[]

:help-command-short-desc: Shows help for any command
:keystore-command-short-desc: Manages the <<keystore,secrets keystore>>
Expand Down Expand Up @@ -262,6 +268,7 @@ endif::[]
[[export-command]]
==== `export` command

ifndef::export_pipeline[]
ifndef::serverless[]
ifndef::no_dashboards[]
{export-command-short-desc}. You can use this
Expand All @@ -281,6 +288,14 @@ ifdef::serverless[]
command to quickly view your configuration, see the contents of the index
template and the ILM policy, or export an CloudFormation template.
endif::serverless[]
endif::export_pipeline[]

ifdef::export_pipeline[]
{export-command-short-desc}. You can use this
command to quickly view your configuration, see the contents of the index
template and the ILM policy, export a dashboard from {kib}, or export ingest
pipelines.
endif::export_pipeline[]

*SYNOPSIS*

Expand Down Expand Up @@ -336,6 +351,14 @@ ifdef::serverless[]
Exports an {cloudformation-ref} template to stdout.
endif::serverless[]

ifdef::export_pipeline[]
[[pipeline-subcommand]]
*`pipeline`*::
Exports the ingest piplines. You must specify the `--es.version` to which
the pipelines should be exported. You can optionally specify `--dir` to control
where the pipelines are written.
endif::export_pipeline[]

*FLAGS*

*`--es.version VERSION`*::
Expand Down
5 changes: 5 additions & 0 deletions winlogbeat/_meta/config/header.yml.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@
# batch of events has been published successfully. The default value is 5s.
#winlogbeat.registry_flush: 5s

# By default Ingest pipelines are not updated if a pipeline with the same ID
# already exists. If this option is enabled Winlogbeat overwrites pipelines
# every time a new Elasticsearch connection is established.
#winlogbeat.overwrite_pipelines: false

{{end -}}
# event_logs specifies a list of event logs to monitor as well as any
# accompanying options. The YAML data type of event_logs is a list of
Expand Down
Loading