Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create prototype shipper-beat #35318

Merged
merged 21 commits into from
May 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions libbeat/beat/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,14 @@ type Pipeline interface {
Connect() (Client, error)
}

// PipelineConnector wraps the Pipeline interface
type PipelineConnector = Pipeline

// Client holds a connection to the beats publisher pipeline
type Client interface {
// Publish the event
Publish(Event)
// PublishAll events specified in the Event array
PublishAll([]Event)
Close() error
}
Expand Down
27 changes: 18 additions & 9 deletions x-pack/filebeat/cmd/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,25 @@ import (
)

func filebeatCfg(rawIn *proto.UnitExpectedConfig, agentInfo *client.AgentInfo) ([]*reload.ConfigWithMeta, error) {
modules, err := management.CreateInputsFromStreams(rawIn, "logs", agentInfo)
if err != nil {
return nil, fmt.Errorf("error creating input list from raw expected config: %w", err)
}
var modules []map[string]interface{}
var err error
if rawIn.Type == "shipper" { // place filebeat in "shipper mode", with one filebeat input per agent config input
modules, err = management.CreateShipperInput(rawIn, "logs", agentInfo)
if err != nil {
return nil, fmt.Errorf("error creating shipper config from raw expected config: %w", err)
}
} else {
modules, err = management.CreateInputsFromStreams(rawIn, "logs", agentInfo)
if err != nil {
return nil, fmt.Errorf("error creating input list from raw expected config: %w", err)
}

// Extract the module name from the stream-level type
// these types are defined in the elastic-agent's specfiles
for iter := range modules {
if _, ok := modules[iter]["type"]; !ok {
modules[iter]["type"] = rawIn.Type
// Extract the module name from the stream-level type
// these types are defined in the elastic-agent's specfiles
for iter := range modules {
if _, ok := modules[iter]["type"]; !ok {
modules[iter]["type"] = rawIn.Type
}
}
}

Expand Down
9 changes: 9 additions & 0 deletions x-pack/filebeat/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package cmd

import (
"fmt"
"os"

fbcmd "github.com/elastic/beats/v7/filebeat/cmd"
cmd "github.com/elastic/beats/v7/libbeat/cmd"
Expand Down Expand Up @@ -44,6 +45,13 @@ func defaultProcessors() []mapstr.M {
// - add_cloud_metadata: ~
// - add_docker_metadata: ~
// - add_kubernetes_metadata: ~

// This gets called early enough that the CLI handling isn't properly initialized yet,
// so use an environment variable.
shipperEnv := os.Getenv("SHIPPER_MODE")
if shipperEnv == "True" {
return []mapstr.M{}
}
return []mapstr.M{
{
"add_host_metadata": mapstr.M{
Expand All @@ -54,4 +62,5 @@ func defaultProcessors() []mapstr.M {
{"add_docker_metadata": nil},
{"add_kubernetes_metadata": nil},
}

}
2 changes: 2 additions & 0 deletions x-pack/filebeat/input/default-inputs/inputs_other.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/elastic/beats/v7/x-pack/filebeat/input/httpjson"
"github.com/elastic/beats/v7/x-pack/filebeat/input/lumberjack"
"github.com/elastic/beats/v7/x-pack/filebeat/input/o365audit"
"github.com/elastic/beats/v7/x-pack/filebeat/input/shipper"
"github.com/elastic/elastic-agent-libs/logp"
)

Expand All @@ -38,5 +39,6 @@ func xpackInputs(info beat.Info, log *logp.Logger, store beater.StateStore) []v2
awss3.Plugin(store),
awscloudwatch.Plugin(),
lumberjack.Plugin(),
shipper.Plugin(log, store),
}
}
30 changes: 30 additions & 0 deletions x-pack/filebeat/input/shipper/acker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package shipper

import (
"sync/atomic"
)

type shipperAcker struct {
persistedIndex uint64
}

func newShipperAcker() *shipperAcker {
return &shipperAcker{persistedIndex: 0}
}

// Update the input's persistedIndex by adding total to it.
// Despite the name, "total" here means an incremental total, i.e.
// the total number of events that are being acknowledged by this callback, not the total that have been sent overall.
// The acked parameter includes only those events that were successfully sent upstream rather than dropped by processors, etc.,
// but since we don't make that distinction in persistedIndex we can probably ignore it.
func (acker *shipperAcker) Track(_ int, total int) {
atomic.AddUint64(&acker.persistedIndex, uint64(total))
}

func (acker *shipperAcker) PersistedIndex() uint64 {
return acker.persistedIndex
}
63 changes: 63 additions & 0 deletions x-pack/filebeat/input/shipper/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package shipper

import (
"time"

"github.com/elastic/elastic-agent-libs/config"
)

// Instance represents all the config needed to start a single shipper input
// because a beat works fundamentally differently from the old shipper, we dont have to deal with async config that's being pieced together,
// this one config object recievewd on create has both the input and output config
type Instance struct {
// config for the shipper's gRPC input
Conn ConnectionConfig `config:",inline"`
Input InputConfig `config:",inline"`
}

// ConnectionConfig is the shipper-relevant portion of the config received from input units
type ConnectionConfig struct {
Server string `config:"server"`
InitialTimeout time.Duration `config:"grpc_setup_timeout"`
TLS TLS `config:"ssl"`
}

// TLS is TLS-specific shipper client settings
type TLS struct {
CAs []string `config:"certificate_authorities"`
Cert string `config:"certificate"`
Key string `config:"key"`
}

// InputConfig represents the config for a shipper input. This is the complete config for that input, mirrored and sent to us.
// This is more or less the same as the the proto.UnitExpectedConfig type, but that doesn't have `config` struct tags,
// so for the sake of quick prototyping we're just (roughly) duplicating the structure here, minus any fields the shipper doesn't need (for now)
type InputConfig struct {
ID string `config:"id"`
Type string `config:"type"`
Name string `config:"name"`
DataStream DataStream `config:"data_stream"`
// for now don't try to parse the streams,
// once we have a better idea of how per-stream processors work, we can find a better way to unpack this
Streams []Stream `config:"streams"`
}

// DataStream represents the datastream metadata from an input
type DataStream struct {
Dataset string `config:"dataset"`
Type string `config:"type"`
Namespace string `config:"namespace"`
}

// Stream represents a single stream present inside an input.
// this field is largely unpredictable and varies by input type,
// we're just grabbing the fields the shipper needs.
type Stream struct {
ID string `config:"id"`
Processors []*config.C `config:"processors"`
Index string `config:"index"`
}
Loading