Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Filebeat] allow ingest pipelines in YAML format #11209

Merged
merged 4 commits into from
Mar 13, 2019
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 52 additions & 4 deletions filebeat/fileset/fileset.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"text/template"

errw "github.com/pkg/errors"
"gopkg.in/yaml.v2"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/cfgwarn"
Expand Down Expand Up @@ -421,15 +422,28 @@ func (fs *Fileset) GetPipelines(esVersion common.Version) (pipelines []pipeline,
return nil, fmt.Errorf("Error reading pipeline file %s: %v", path, err)
}

jsonString, err := applyTemplate(vars, string(strContents), true)
encodedString, err := applyTemplate(vars, string(strContents), true)
if err != nil {
return nil, fmt.Errorf("Error interpreting the template of the ingest pipeline: %v", err)
}

var content map[string]interface{}
err = json.Unmarshal([]byte(jsonString), &content)
if err != nil {
return nil, fmt.Errorf("Error JSON decoding the pipeline file: %s: %v", path, err)
switch extension := strings.ToLower(filepath.Ext(path)); extension {
case ".json":
if err = json.Unmarshal([]byte(encodedString), &content); err != nil {
return nil, fmt.Errorf("Error JSON decoding the pipeline file: %s: %v", path, err)
}
case ".yaml", ".yml":
if err = yaml.Unmarshal([]byte(encodedString), &content); err != nil {
return nil, fmt.Errorf("Error YAML decoding the pipeline file: %s: %v", path, err)
}
newContent, err := fixYAMLMaps(content)
if err != nil {
return nil, fmt.Errorf("Failed to sanitize the YAML pipeline file: %s: %v", path, err)
}
content = newContent.(map[string]interface{})
default:
return nil, fmt.Errorf("Unsupported extension '%s' for pipeline file: %s", extension, path)
}

pipelineID := fs.pipelineIDs[idx]
Expand All @@ -444,6 +458,40 @@ func (fs *Fileset) GetPipelines(esVersion common.Version) (pipelines []pipeline,
return pipelines, nil
}

// This function recursively converts maps with interface{} keys, as returned by
// yaml.Unmarshal, to maps of string keys, as expected by the json encoder
// that will be used when delivering the pipeline to Elasticsearch.
// Will return an error when something other than a string is used as a key.
func fixYAMLMaps(elem interface{}) (_ interface{}, err error) {
switch v := elem.(type) {
case map[interface{}]interface{}:
result := make(map[string]interface{}, len(v))
for key, value := range v {
keyS, ok := key.(string)
if !ok {
return nil, fmt.Errorf("key '%v' is not string but %T", key, key)
}
if result[keyS], err = fixYAMLMaps(value); err != nil {
return nil, err
}
}
return result, nil
case map[string]interface{}:
for key, value := range v {
if v[key], err = fixYAMLMaps(value); err != nil {
return nil, err
}
}
case []interface{}:
for idx, value := range v {
if v[idx], err = fixYAMLMaps(value); err != nil {
return nil, err
}
}
}
return elem, nil
}

// formatPipelineID generates the ID to be used for the pipeline ID in Elasticsearch
func formatPipelineID(module, fileset, path, beatVersion string) string {
return fmt.Sprintf("filebeat-%s-%s-%s-%s", beatVersion, module, fileset, removeExt(filepath.Base(path)))
Expand Down