Skip to content

Commit

Permalink
Add convert processor
Browse files Browse the repository at this point in the history
The `convert` processor converts a field in the event to a different type, such
as converting a string to an integer. For a full description of the processor's
capabilities see the add documentation.

Closes elastic#8124
  • Loading branch information
andrewkroh committed May 7, 2019
1 parent 9653105 commit 532aa04
Show file tree
Hide file tree
Showing 7 changed files with 752 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Updated go-seccomp-bpf library to v1.1.0 which updates syscall lists for Linux v5.0. {pull}NNNN[NNNN]
- Add `add_observer_metadata` processor. {pull}11394[11394]
- Add `decode_csv_fields` processor. {pull}11753[11753]
- Add `convert` processor for converting data types of fields. {issue}8124[8124] {pull}11686[11686]

*Auditbeat*

Expand Down
1 change: 1 addition & 0 deletions libbeat/cmd/instance/imports.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
_ "github.com/elastic/beats/libbeat/processors/add_observer_metadata"
_ "github.com/elastic/beats/libbeat/processors/add_process_metadata"
_ "github.com/elastic/beats/libbeat/processors/communityid"
_ "github.com/elastic/beats/libbeat/processors/convert"
_ "github.com/elastic/beats/libbeat/processors/dissect"
_ "github.com/elastic/beats/libbeat/processors/dns"
_ "github.com/elastic/beats/libbeat/publisher/includes" // Register publisher pipeline modules
Expand Down
47 changes: 47 additions & 0 deletions libbeat/docs/processors-using.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ The supported processors are:
* <<add-process-metadata,`add_process_metadata`>>
* <<add-tags, `add_tags`>>
* <<community-id,`community_id`>>
* <<convert,`convert`>>
ifdef::has_decode_csv_fields_processor[]
* <<decode-csv-fields,`decode_csv_fields`>>
endif::[]
Expand Down Expand Up @@ -907,6 +908,52 @@ silently continue without adding the target field.
The processor also accepts an optional `seed` parameter that must be a 16-bit
unsigned integer. This value gets incorporated into all generated hashes.

[[convert]]
=== Convert

The `convert` processor converts a field in the event to a different type, such
as converting a string to an integer.

The supported types include: `integer`, `long`, `float`, `double`, `string`,
`boolean`, and `ip`.

The `ip` type is effectively an alias for `string`, but with an added validation
that the value is an IPv4 or IPv6 address.

[source,yaml]
----
processors:
- convert:
fields:
- {from: "src_ip", to: "source.ip", type: "ip"}
- {from: "src_port", to: "source.port", type: "integer"}
ignore_missing: true
ignore_failure: true
----

The `convert` processor has the following configuration settings:

`fields`:: (Required) This is the list of fields to convert. At least one item
must be contained in the list. Each item in the list must have a `from` key that
specifies the source field. The `to` key is optional and specifies where to
assign the converted value. If `to` is omitted then the `from` field is updated
in-place. The `type` key specifies the data type to convert the value to. If
`type` is omitted then the processor copies or renames the field without any
type conversion.

`ignore_missing`:: (Optional) If `true` the processor continues to the next
field when the `from` key is not found in the event. If false then the processor
returns an error and does not process the remaining fields. Default is `false`.

`ignore_failure`:: (Optional) If true type conversion failures are ignored and
the processor continues to the next field.

`tag`:: (Optional) An identifier for this processor. Useful for debugging.

`mode`:: (Optional) When both `from` and `to` are defined for a field then
`mode` controls whether to `copy` or `rename` the field when the type conversion
is successful. Default is `copy`.

[[drop-event]]
=== Drop events

Expand Down
133 changes: 133 additions & 0 deletions libbeat/processors/convert/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package convert

import (
"fmt"
"strings"

"github.com/pkg/errors"
)

func defaultConfig() config {
return config{
IgnoreMissing: false,
Mode: copyMode,
}
}

type config struct {
Fields []field `config:"fields" validate:"required"` // List of fields to convert.
Tag string `config:"tag"` // Processor ID for debug and metrics.
IgnoreMissing bool `config:"ignore_missing"` // Skip field when From field is missing.
IgnoreFailure bool `config:"ignore_failure"` // Ignore conversion errors.
Mode mode `config:"mode"` // Mode (copy vs rename).
}

type field struct {
From string `config:"from" validate:"required"`
To string `config:"to"`
Type dataType `config:"type"`
}

func (f field) Validate() error {
if f.To == "" && f.Type == unset {
return errors.New("each field must have a 'to' or a 'type'")
}
return nil
}

func (f field) String() string {
return fmt.Sprintf("{from=%v, to=%v, type=%v}", f.From, f.To, f.Type)
}

type dataType uint8

// List of dataTypes.
const (
unset dataType = iota
Integer
Long
Float
Double
String
Boolean
IP
)

var dataTypeNames = map[dataType]string{
unset: "[unset]",
Integer: "integer",
Long: "long",
Float: "float",
Double: "double",
String: "string",
Boolean: "boolean",
IP: "ip",
}

func (dt dataType) String() string {
return dataTypeNames[dt]
}

func (dt dataType) MarshalText() ([]byte, error) {
return []byte(dt.String()), nil
}

func (dt *dataType) Unpack(s string) error {
s = strings.ToLower(s)
for typ, name := range dataTypeNames {
if s == name {
*dt = typ
return nil
}
}
return errors.Errorf("invalid data type: %v", s)
}

type mode uint8

// List of modes.
const (
copyMode mode = iota
renameMode
)

var modeNames = map[mode]string{
copyMode: "copy",
renameMode: "rename",
}

func (m mode) String() string {
return modeNames[m]
}

func (m mode) MarshalText() ([]byte, error) {
return []byte(m.String()), nil
}

func (m *mode) Unpack(s string) error {
s = strings.ToLower(s)
for md, name := range modeNames {
if s == name {
*m = md
return nil
}
}
return errors.Errorf("invalid mode: %v", s)
}
Loading

0 comments on commit 532aa04

Please sign in to comment.