diff --git a/apmpackage/apm/0.1.0/data_stream/sampled_traces/elasticsearch/ilm/default_policy.json b/apmpackage/apm/0.1.0/data_stream/sampled_traces/elasticsearch/ilm/default_policy.json new file mode 100644 index 00000000000..fc6ad7a9992 --- /dev/null +++ b/apmpackage/apm/0.1.0/data_stream/sampled_traces/elasticsearch/ilm/default_policy.json @@ -0,0 +1,19 @@ +{ + "policy": { + "phases": { + "hot": { + "actions": { + "rollover": { + "max_age": "1h" + } + } + }, + "delete": { + "min_age": "1h", + "actions": { + "delete": {} + } + } + } + } +} diff --git a/apmpackage/apm/0.1.0/data_stream/sampled_traces/elasticsearch/ingest_pipeline/default.json b/apmpackage/apm/0.1.0/data_stream/sampled_traces/elasticsearch/ingest_pipeline/default.json new file mode 100644 index 00000000000..ac07d8ee66e --- /dev/null +++ b/apmpackage/apm/0.1.0/data_stream/sampled_traces/elasticsearch/ingest_pipeline/default.json @@ -0,0 +1,11 @@ +{ + "description": "Ingest pipeline for sampled trace documents", + "processors": [ + { + "set": { + "field": "event.ingested", + "value": "{{_ingest.timestamp}}" + } + } + ] +} diff --git a/apmpackage/apm/0.1.0/data_stream/sampled_traces/fields/base-fields.yml b/apmpackage/apm/0.1.0/data_stream/sampled_traces/fields/base-fields.yml new file mode 100644 index 00000000000..bef973826be --- /dev/null +++ b/apmpackage/apm/0.1.0/data_stream/sampled_traces/fields/base-fields.yml @@ -0,0 +1,12 @@ +- name: '@timestamp' + type: date + description: Event timestamp. +- name: data_stream.type + type: constant_keyword + description: Data stream type. +- name: data_stream.dataset + type: constant_keyword + description: Data stream dataset. +- name: data_stream.namespace + type: constant_keyword + description: Data stream namespace. diff --git a/apmpackage/apm/0.1.0/data_stream/sampled_traces/fields/ecs.yml b/apmpackage/apm/0.1.0/data_stream/sampled_traces/fields/ecs.yml new file mode 100644 index 00000000000..690ea7daf4a --- /dev/null +++ b/apmpackage/apm/0.1.0/data_stream/sampled_traces/fields/ecs.yml @@ -0,0 +1,8 @@ +- name: event.ingested + type: date + description: | + Timestamp when an event arrived in the central data store. +- name: trace.id + type: keyword + description: | + The ID of the sampled trace. diff --git a/apmpackage/apm/0.1.0/data_stream/sampled_traces/fields/fields.yml b/apmpackage/apm/0.1.0/data_stream/sampled_traces/fields/fields.yml new file mode 100644 index 00000000000..370b8752662 --- /dev/null +++ b/apmpackage/apm/0.1.0/data_stream/sampled_traces/fields/fields.yml @@ -0,0 +1,6 @@ +# When changing fields or ILM policy, make sure to update +# x-pack/apm-server/sampling/pubsub/datastream.go. +- name: observer.id + type: keyword + description: | + The ID of the APM Server that indexed the sampled trace ID. diff --git a/apmpackage/apm/0.1.0/data_stream/sampled_traces/manifest.yml b/apmpackage/apm/0.1.0/data_stream/sampled_traces/manifest.yml new file mode 100644 index 00000000000..be3deb14708 --- /dev/null +++ b/apmpackage/apm/0.1.0/data_stream/sampled_traces/manifest.yml @@ -0,0 +1,4 @@ +title: APM tail-sampled traces +type: traces +dataset: sampled +ilm_policy: traces-apm.sampled-default_policy diff --git a/apmpackage/cmd/gen-package/genfields.go b/apmpackage/cmd/gen-package/genfields.go index c093441093a..d5828923c87 100644 --- a/apmpackage/cmd/gen-package/genfields.go +++ b/apmpackage/cmd/gen-package/genfields.go @@ -19,6 +19,7 @@ package main import ( "io/ioutil" + "log" "net/http" "path/filepath" "sort" @@ -42,6 +43,7 @@ func generateFields(version string) map[string][]field { inputFieldsFiles["app_metrics"] = filterInternalMetrics(inputFieldsFiles["internal_metrics"]) for streamType, inputFields := range inputFieldsFiles { + log.Printf("%s", streamType) var ecsFields []field var nonECSFields []field for _, fields := range populateECSInfo(ecsFlatFields, inputFields) { diff --git a/apmpackage/cmd/gen-package/main.go b/apmpackage/cmd/gen-package/main.go index bbf816269ea..7acb2905704 100644 --- a/apmpackage/cmd/gen-package/main.go +++ b/apmpackage/cmd/gen-package/main.go @@ -34,6 +34,13 @@ var versionMapping = map[string]string{ "8.0": "0.1.0", } +// Some data streams may not have a counterpart template +// in standalone apm-server, and so it does not make sense +// to maintain a separate fields.yml. +var handwritten = map[string]bool{ + "sampled_traces": true, +} + func main() { stackVersion := common.MustNewVersion(cmd.DefaultSettings().Version) shortVersion := fmt.Sprintf("%d.%d", stackVersion.Major, stackVersion.Minor) @@ -58,11 +65,28 @@ func clear(version string) { panic(err) } for _, f := range fileInfo { - if f.IsDir() { - os.Remove(ecsFilePath(version, f.Name())) - os.Remove(fieldsFilePath(version, f.Name())) - os.RemoveAll(pipelinesPath(version, f.Name())) + if !f.IsDir() { + continue + } + name := f.Name() + if handwritten[name] { + continue } + removeFile(ecsFilePath(version, name)) + removeFile(fieldsFilePath(version, name)) + removeDir(pipelinesPath(version, name)) } ioutil.WriteFile(docsFilePath(version), nil, 0644) } + +func removeFile(path string) { + if err := os.Remove(path); err != nil && !os.IsNotExist(err) { + log.Fatal(err) + } +} + +func removeDir(path string) { + if err := os.RemoveAll(path); err != nil && !os.IsNotExist(err) { + log.Fatal(err) + } +} diff --git a/beater/beater.go b/beater/beater.go index d4f7e19b8d2..a5e9e4bff43 100644 --- a/beater/beater.go +++ b/beater/beater.go @@ -366,11 +366,13 @@ func (s *serverRunner) run() error { } if err := runServer(s.runServerContext, ServerParams{ - Info: s.beat.Info, - Config: s.config, - Logger: s.logger, - Tracer: s.tracer, - Reporter: reporter, + Info: s.beat.Info, + Config: s.config, + Managed: s.beat.Manager != nil && s.beat.Manager.Enabled(), + Namespace: s.namespace, + Logger: s.logger, + Tracer: s.tracer, + Reporter: reporter, }); err != nil { return err } diff --git a/beater/server.go b/beater/server.go index 47164e7472c..e3cf1f6139d 100644 --- a/beater/server.go +++ b/beater/server.go @@ -52,6 +52,12 @@ type ServerParams struct { // Config is the configuration used for running the APM Server. Config *config.Config + // Managed indicates that the server is managed by Fleet. + Managed bool + + // Namespace holds the data stream namespace for the server. + Namespace string + // Logger is the logger for the beater component. Logger *logp.Logger diff --git a/systemtest/elasticsearch.go b/systemtest/elasticsearch.go index 39b420ddfc7..c3aae25b081 100644 --- a/systemtest/elasticsearch.go +++ b/systemtest/elasticsearch.go @@ -91,19 +91,11 @@ func newElasticsearchConfig() elasticsearch.Config { // and deletes the default ILM policy "apm-rollover-30-days". func CleanupElasticsearch(t testing.TB) { const ( - legacyPrefix = "apm*" + legacyPrefix = "apm-*" apmTracesPrefix = "traces-apm*" apmMetricsPrefix = "metrics-apm*" apmLogsPrefix = "logs-apm*" ) - requests := []estest.Request{ - esapi.IndicesDeleteRequest{Index: []string{legacyPrefix}}, - esapi.IndicesDeleteDataStreamRequest{Name: apmTracesPrefix}, - esapi.IndicesDeleteDataStreamRequest{Name: apmMetricsPrefix}, - esapi.IndicesDeleteDataStreamRequest{Name: apmLogsPrefix}, - esapi.IngestDeletePipelineRequest{PipelineID: legacyPrefix}, - esapi.IndicesDeleteTemplateRequest{Name: legacyPrefix}, - } doReq := func(req estest.Request) error { _, err := Elasticsearch.Do(context.Background(), req, nil) @@ -113,12 +105,38 @@ func CleanupElasticsearch(t testing.TB) { return err } - var g errgroup.Group - for _, req := range requests { - req := req // copy for closure - g.Go(func() error { return doReq(req) }) + doParallel := func(requests ...estest.Request) { + t.Helper() + var g errgroup.Group + for _, req := range requests { + req := req // copy for closure + g.Go(func() error { return doReq(req) }) + } + if err := g.Wait(); err != nil { + t.Fatal(err) + } } - if err := g.Wait(); err != nil { + + // Delete indices, data streams, and ingest pipelines. + doReq(esapi.IndicesDeleteRequest{Index: []string{legacyPrefix}}) + doParallel( + esapi.IndicesDeleteDataStreamRequest{Name: legacyPrefix}, + esapi.IndicesDeleteDataStreamRequest{Name: apmTracesPrefix}, + esapi.IndicesDeleteDataStreamRequest{Name: apmMetricsPrefix}, + esapi.IndicesDeleteDataStreamRequest{Name: apmLogsPrefix}, + esapi.IngestDeletePipelineRequest{PipelineID: legacyPrefix}, + ) + + // Delete index templates after deleting data streams. + doParallel( + esapi.IndicesDeleteTemplateRequest{Name: legacyPrefix}, + esapi.IndicesDeleteIndexTemplateRequest{Name: apmTracesPrefix}, + esapi.IndicesDeleteIndexTemplateRequest{Name: apmMetricsPrefix}, + esapi.IndicesDeleteIndexTemplateRequest{Name: apmLogsPrefix}, + ) + + // Refresh indices to ensure all recent changes are visible. + if err := doReq(esapi.IndicesRefreshRequest{}); err != nil { t.Fatal(err) } diff --git a/systemtest/estest/search.go b/systemtest/estest/search.go index 169aba00b96..5f40ec44ef3 100644 --- a/systemtest/estest/search.go +++ b/systemtest/estest/search.go @@ -71,6 +71,11 @@ func (r *SearchRequest) WithQuery(q interface{}) *SearchRequest { return r } +func (r *SearchRequest) WithSort(fieldDirection ...string) *SearchRequest { + r.Sort = fieldDirection + return r +} + func (r *SearchRequest) WithSize(size int) *SearchRequest { r.Size = &size return r diff --git a/systemtest/monitoring_test.go b/systemtest/monitoring_test.go index db1eecdb369..9b6dcc490d0 100644 --- a/systemtest/monitoring_test.go +++ b/systemtest/monitoring_test.go @@ -18,6 +18,7 @@ package systemtest_test import ( + "context" "encoding/json" "testing" "time" @@ -83,9 +84,13 @@ func getBeatsMonitoringStats(t testing.TB, srv *apmservertest.Server, out interf } func getBeatsMonitoring(t testing.TB, srv *apmservertest.Server, type_ string, out interface{}) *beatsMonitoringDoc { - result := systemtest.Elasticsearch.ExpectDocs(t, ".monitoring-beats-*", + var result estest.SearchResult + req := systemtest.Elasticsearch.Search(".monitoring-beats-*").WithQuery( estest.TermQuery{Field: type_ + ".beat.uuid", Value: srv.BeatUUID}, - ) + ).WithSort("timestamp:desc") + if _, err := req.Do(context.Background(), &result, estest.WithCondition(result.Hits.MinHitsCondition(1))); err != nil { + t.Error(err) + } var doc beatsMonitoringDoc doc.RawSource = []byte(result.Hits.Hits[0].RawSource) diff --git a/systemtest/sampling_test.go b/systemtest/sampling_test.go index 401c2f9047e..288423857c4 100644 --- a/systemtest/sampling_test.go +++ b/systemtest/sampling_test.go @@ -28,6 +28,7 @@ import ( "github.com/stretchr/testify/require" "github.com/tidwall/gjson" "go.elastic.co/apm" + "golang.org/x/sync/errgroup" "github.com/elastic/apm-server/systemtest" "github.com/elastic/apm-server/systemtest/apmservertest" @@ -88,29 +89,6 @@ func TestKeepUnsampledWarning(t *testing.T) { func TestTailSampling(t *testing.T) { systemtest.CleanupElasticsearch(t) - // Create the apm-sampled-traces index for the two servers to coordinate. - _, err := systemtest.Elasticsearch.Do(context.Background(), &esapi.IndicesCreateRequest{ - Index: "apm-sampled-traces", - Body: strings.NewReader(`{ - "mappings": { - "properties": { - "event.ingested": {"type": "date"}, - "observer": { - "properties": { - "id": {"type": "keyword"} - } - }, - "trace": { - "properties": { - "id": {"type": "keyword"} - } - } - } - } -}`), - }, nil) - require.NoError(t, err) - srv1 := apmservertest.NewUnstartedServer(t) srv1.Config.Sampling = &apmservertest.SamplingConfig{ Tail: &apmservertest.TailSamplingConfig{ @@ -148,6 +126,10 @@ func TestTailSampling(t *testing.T) { tracer1.Flush(nil) tracer2.Flush(nil) + // Flush the data stream while the test is running, as we have no + // control over the settings for the sampled traces index template. + refreshPeriodically(t, 250*time.Millisecond, "apm-sampled-traces") + for _, transactionType := range []string{"parent", "child"} { var result estest.SearchResult t.Logf("waiting for %d %q transactions", expected, transactionType) @@ -220,3 +202,33 @@ func TestTailSamplingUnlicensed(t *testing.T) { assert.NoError(t, err) assert.Empty(t, result.Hits.Hits) } + +func refreshPeriodically(t *testing.T, interval time.Duration, index ...string) { + g, ctx := errgroup.WithContext(context.Background()) + ctx, cancel := context.WithCancel(ctx) + t.Cleanup(func() { + cancel() + assert.NoError(t, g.Wait()) + }) + g.Go(func() error { + ticker := time.NewTicker(100 * time.Millisecond) + defer ticker.Stop() + allowNoIndices := true + ignoreUnavailable := true + request := esapi.IndicesRefreshRequest{ + Index: index, + AllowNoIndices: &allowNoIndices, + IgnoreUnavailable: &ignoreUnavailable, + } + for { + select { + case <-ctx.Done(): + return nil + case <-ticker.C: + } + if _, err := systemtest.Elasticsearch.Do(ctx, &request, nil); err != nil { + return err + } + } + }) +} diff --git a/x-pack/apm-server/main.go b/x-pack/apm-server/main.go index 9eabf9eed6a..91e02c81b6b 100644 --- a/x-pack/apm-server/main.go +++ b/x-pack/apm-server/main.go @@ -24,6 +24,7 @@ import ( "github.com/elastic/apm-server/x-pack/apm-server/aggregation/txmetrics" "github.com/elastic/apm-server/x-pack/apm-server/cmd" "github.com/elastic/apm-server/x-pack/apm-server/sampling" + "github.com/elastic/apm-server/x-pack/apm-server/sampling/pubsub" ) var ( @@ -108,6 +109,31 @@ func newTailSamplingProcessor(args beater.ServerParams) (*sampling.Processor, er if err != nil { return nil, errors.Wrap(err, "failed to create Elasticsearch client for tail-sampling") } + + var sampledTracesDataStream sampling.DataStreamConfig + if args.Managed { + // Data stream and ILM policy are managed by Fleet. + sampledTracesDataStream = sampling.DataStreamConfig{ + Type: "traces", + Dataset: "sampled", + Namespace: args.Namespace, + } + } else { + sampledTracesDataStream = sampling.DataStreamConfig{ + Type: "apm", + Dataset: "sampled", + Namespace: "traces", + } + if err := pubsub.SetupDataStream(context.Background(), es, + "apm-sampled-traces", // Index template + "apm-sampled-traces", // ILM policy + "apm-sampled-traces", // Index pattern + ); err != nil { + return nil, errors.Wrap(err, "failed to create data stream for tail-sampling") + } + args.Logger.Infof("Created tail-sampling data stream index template") + } + policies := make([]sampling.Policy, len(tailSamplingConfig.Policies)) for i, in := range tailSamplingConfig.Policies { policies[i] = sampling.Policy{ @@ -124,16 +150,14 @@ func newTailSamplingProcessor(args beater.ServerParams) (*sampling.Processor, er BeatID: args.Info.ID.String(), Reporter: args.Reporter, LocalSamplingConfig: sampling.LocalSamplingConfig{ - FlushInterval: tailSamplingConfig.Interval, - // TODO(axw) make MaxDynamicServices configurable? + FlushInterval: tailSamplingConfig.Interval, MaxDynamicServices: 1000, Policies: policies, IngestRateDecayFactor: tailSamplingConfig.IngestRateDecayFactor, }, RemoteSamplingConfig: sampling.RemoteSamplingConfig{ - Elasticsearch: es, - // TODO(axw) make index name configurable? - SampledTracesIndex: "apm-sampled-traces", + Elasticsearch: es, + SampledTracesDataStream: sampledTracesDataStream, }, StorageConfig: sampling.StorageConfig{ StorageDir: paths.Resolve(paths.Data, tailSamplingConfig.StorageDir), diff --git a/x-pack/apm-server/sampling/config.go b/x-pack/apm-server/sampling/config.go index f6d44b7568a..ce713d93ee0 100644 --- a/x-pack/apm-server/sampling/config.go +++ b/x-pack/apm-server/sampling/config.go @@ -11,6 +11,7 @@ import ( "github.com/elastic/apm-server/elasticsearch" "github.com/elastic/apm-server/publish" + "github.com/elastic/apm-server/x-pack/apm-server/sampling/pubsub" ) // Config holds configuration for Processor. @@ -60,9 +61,21 @@ type RemoteSamplingConfig struct { // and subscribing to remote sampling decisions. Elasticsearch elasticsearch.Client - // SampledTracesIndex holds the name of the Elasticsearch index for - // storing and searching sampled trace IDs. - SampledTracesIndex string + // SampledTracesDataStream holds the identifiers for the Elasticsearch + // data stream for storing and searching sampled trace IDs. + SampledTracesDataStream DataStreamConfig +} + +// DataStreamConfig holds configuration to identify a data stream. +type DataStreamConfig struct { + // Type holds the data stream's type. + Type string + + // Dataset holds the data stream's dataset. + Dataset string + + // Namespace holds the data stream's namespace. + Namespace string } // StorageConfig holds Processor configuration related to event storage. @@ -175,12 +188,16 @@ func (config RemoteSamplingConfig) validate() error { if config.Elasticsearch == nil { return errors.New("Elasticsearch unspecified") } - if config.SampledTracesIndex == "" { - return errors.New("SampledTracesIndex unspecified") + if err := config.SampledTracesDataStream.validate(); err != nil { + return errors.New("SampledTracesDataStream unspecified or invalid") } return nil } +func (config DataStreamConfig) validate() error { + return pubsub.DataStreamConfig(config).Validate() +} + func (config StorageConfig) validate() error { if config.StorageDir == "" { return errors.New("StorageDir unspecified") diff --git a/x-pack/apm-server/sampling/config_test.go b/x-pack/apm-server/sampling/config_test.go index 528267ddfc8..7519dbd06b2 100644 --- a/x-pack/apm-server/sampling/config_test.go +++ b/x-pack/apm-server/sampling/config_test.go @@ -57,8 +57,12 @@ func TestNewProcessorConfigInvalid(t *testing.T) { } config.Elasticsearch = elasticsearchClient - assertInvalidConfigError("invalid remote sampling config: SampledTracesIndex unspecified") - config.SampledTracesIndex = "sampled-traces" + assertInvalidConfigError("invalid remote sampling config: SampledTracesDataStream unspecified or invalid") + config.SampledTracesDataStream = sampling.DataStreamConfig{ + Type: "traces", + Dataset: "sampled", + Namespace: "testing", + } assertInvalidConfigError("invalid storage config: StorageDir unspecified") config.StorageDir = "tbs" diff --git a/x-pack/apm-server/sampling/processor.go b/x-pack/apm-server/sampling/processor.go index 4f9fa1cd3e2..712ec58fdeb 100644 --- a/x-pack/apm-server/sampling/processor.go +++ b/x-pack/apm-server/sampling/processor.go @@ -329,10 +329,10 @@ func (p *Processor) Run() error { } pubsub, err := pubsub.New(pubsub.Config{ - BeatID: p.config.BeatID, - Client: p.config.Elasticsearch, - Index: p.config.SampledTracesIndex, - Logger: p.logger, + BeatID: p.config.BeatID, + Client: p.config.Elasticsearch, + DataStream: pubsub.DataStreamConfig(p.config.SampledTracesDataStream), + Logger: p.logger, // Issue pubsub subscriber search requests at twice the frequency // of publishing, so each server observes each other's sampled diff --git a/x-pack/apm-server/sampling/processor_test.go b/x-pack/apm-server/sampling/processor_test.go index 64016eefbce..a7fb310df01 100644 --- a/x-pack/apm-server/sampling/processor_test.go +++ b/x-pack/apm-server/sampling/processor_test.go @@ -597,8 +597,12 @@ func newTempdirConfig(tb testing.TB) sampling.Config { }, }, RemoteSamplingConfig: sampling.RemoteSamplingConfig{ - Elasticsearch: pubsubtest.Client(nil, nil), - SampledTracesIndex: "apm-sampled-traces", + Elasticsearch: pubsubtest.Client(nil, nil), + SampledTracesDataStream: sampling.DataStreamConfig{ + Type: "traces", + Dataset: "sampled", + Namespace: "testing", + }, }, StorageConfig: sampling.StorageConfig{ StorageDir: tempdir, diff --git a/x-pack/apm-server/sampling/pubsub/config.go b/x-pack/apm-server/sampling/pubsub/config.go index b4cd34b61a4..b97e5abc8dd 100644 --- a/x-pack/apm-server/sampling/pubsub/config.go +++ b/x-pack/apm-server/sampling/pubsub/config.go @@ -5,6 +5,7 @@ package pubsub import ( + "fmt" "time" "github.com/pkg/errors" @@ -20,8 +21,8 @@ type Config struct { // trace ID observations. Client elasticsearch.Client - // Index holds the index name. - Index string + // DataStream holds the data stream. + DataStream DataStreamConfig // BeatID holds the APM Server's unique ID, used for filtering out // local observations in the subscriber. @@ -48,13 +49,25 @@ type Config struct { Logger *logp.Logger } +// DataStreamConfig holds data stream configuration for Pubsub. +type DataStreamConfig struct { + // Type holds the data stream's type. + Type string + + // Dataset holds the data stream's dataset. + Dataset string + + // Namespace holds the data stream's namespace. + Namespace string +} + // Validate validates the configuration. func (config Config) Validate() error { if config.Client == nil { return errors.New("Client unspecified") } - if config.Index == "" { - return errors.New("Index unspecified") + if err := config.DataStream.Validate(); err != nil { + return errors.Wrap(err, "DataStream unspecified or invalid") } if config.BeatID == "" { return errors.New("BeatID unspecified") @@ -67,3 +80,22 @@ func (config Config) Validate() error { } return nil } + +// Validate validates the configuration. +func (config DataStreamConfig) Validate() error { + if config.Type == "" { + return errors.New("Type unspecified") + } + if config.Dataset == "" { + return errors.New("Dataset unspecified") + } + if config.Namespace == "" { + return errors.New("Namespace unspecified") + } + return nil +} + +// String returns the data stream as a combined string. +func (config DataStreamConfig) String() string { + return fmt.Sprintf("%s-%s-%s", config.Type, config.Dataset, config.Namespace) +} diff --git a/x-pack/apm-server/sampling/pubsub/config_test.go b/x-pack/apm-server/sampling/pubsub/config_test.go index bbbb8cd8d77..36cca74911d 100644 --- a/x-pack/apm-server/sampling/pubsub/config_test.go +++ b/x-pack/apm-server/sampling/pubsub/config_test.go @@ -32,24 +32,53 @@ func TestConfigInvalid(t *testing.T) { config: pubsub.Config{ Client: elasticsearchClient, }, - err: "Index unspecified", + err: "DataStream unspecified or invalid: Type unspecified", }, { config: pubsub.Config{ Client: elasticsearchClient, - Index: "index", + DataStream: pubsub.DataStreamConfig{ + Type: "type", + }, + }, + err: "DataStream unspecified or invalid: Dataset unspecified", + }, { + config: pubsub.Config{ + Client: elasticsearchClient, + DataStream: pubsub.DataStreamConfig{ + Type: "type", + Dataset: "dataset", + }, + }, + err: "DataStream unspecified or invalid: Namespace unspecified", + }, { + config: pubsub.Config{ + Client: elasticsearchClient, + DataStream: pubsub.DataStreamConfig{ + Type: "type", + Dataset: "dataset", + Namespace: "namespace", + }, }, err: "BeatID unspecified", }, { config: pubsub.Config{ Client: elasticsearchClient, - Index: "index", + DataStream: pubsub.DataStreamConfig{ + Type: "type", + Dataset: "dataset", + Namespace: "namespace", + }, BeatID: "beat_id", }, err: "SearchInterval unspecified or negative", }, { config: pubsub.Config{ - Client: elasticsearchClient, - Index: "index", + Client: elasticsearchClient, + DataStream: pubsub.DataStreamConfig{ + Type: "type", + Dataset: "dataset", + Namespace: "namespace", + }, BeatID: "beat_id", SearchInterval: time.Second, }, diff --git a/x-pack/apm-server/sampling/pubsub/datastream.go b/x-pack/apm-server/sampling/pubsub/datastream.go new file mode 100644 index 00000000000..016848cb0a6 --- /dev/null +++ b/x-pack/apm-server/sampling/pubsub/datastream.go @@ -0,0 +1,113 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package pubsub + +import ( + "context" + "fmt" + "io/ioutil" + "strings" + + "github.com/elastic/apm-server/elasticsearch" + + "github.com/elastic/go-elasticsearch/v7/esapi" +) + +// SetupDataStream ensures that the sampled traces data stream index template +// exists with the given name, creating it and its associated ILM policy if it +// does not. +// +// This should only be called when not running under Fleet. +func SetupDataStream( + ctx context.Context, + client elasticsearch.Client, + indexTemplateName string, + ilmPolicyName string, + indexPattern string, +) error { + // Create/replace ILM policy. + resp, err := esapi.ILMPutLifecycleRequest{ + Policy: ilmPolicyName, + Body: strings.NewReader(ilmPolicy), + }.Do(ctx, client) + if err != nil { + return err + } + defer resp.Body.Close() + if resp.IsError() { + body, _ := ioutil.ReadAll(resp.Body) + return fmt.Errorf("failed to create ILM policy (%d): %s", resp.StatusCode, body) + } + + // Create/replace index template. + dataStreamIndexTemplate := fmt.Sprintf(dataStreamIndexTemplate, indexPattern, ilmPolicyName) + resp, err = esapi.IndicesPutIndexTemplateRequest{ + Name: indexTemplateName, + Body: strings.NewReader(dataStreamIndexTemplate), + }.Do(ctx, client) + if err != nil { + return err + } + defer resp.Body.Close() + if resp.IsError() { + body, _ := ioutil.ReadAll(resp.Body) + return fmt.Errorf("failed to create index template (%d): %s", resp.StatusCode, body) + } + return nil +} + +// NOTE(axw) these replicate the index template and ILM policy created by Fleet, +// and should be kept in sync with apmpackage/apm/.../data_stream/sampled_traces. + +const ilmPolicy = `{ + "policy": { + "phases": { + "hot": { + "actions": { + "rollover": { + "max_age": "1h" + } + } + }, + "delete": { + "min_age": "1h", + "actions": { + "delete": {} + } + } + } + } +}` + +const dataStreamIndexTemplate = `{ + "index_patterns": [%q], + "priority": 1, + "data_stream": {}, + "template": { + "settings": { + "index.lifecycle.name": %q + }, + "mappings": { + "properties": { + "@timestamp": {"type": "date"}, + "event": { + "properties": { + "ingested": {"type": "date"} + } + }, + "observer": { + "properties": { + "id": {"type": "keyword"} + } + }, + "trace": { + "properties": { + "id": {"type": "keyword"} + } + } + } + } + } +}` diff --git a/x-pack/apm-server/sampling/pubsub/pubsub.go b/x-pack/apm-server/sampling/pubsub/pubsub.go index dd5dedec306..5ba4352d1e6 100644 --- a/x-pack/apm-server/sampling/pubsub/pubsub.go +++ b/x-pack/apm-server/sampling/pubsub/pubsub.go @@ -47,7 +47,7 @@ func New(config Config) (*Pubsub, error) { config.Logger = logp.NewLogger(logs.Sampling) } indexer, err := config.Client.NewBulkIndexer(elasticsearch.BulkIndexerConfig{ - Index: config.Index, + Index: config.DataStream.String(), FlushInterval: config.FlushInterval, OnError: func(ctx context.Context, err error) { config.Logger.With(logp.Error(err)).Debug("publishing sampled trace IDs failed") @@ -61,17 +61,12 @@ func New(config Config) (*Pubsub, error) { // PublishSampledTraceIDs bulk indexes traceIDs into Elasticsearch. func (p *Pubsub) PublishSampledTraceIDs(ctx context.Context, traceID ...string) error { + now := time.Now() for _, id := range traceID { - var doc traceIDDocument - doc.Observer.ID = p.config.BeatID - doc.Trace.ID = id - var json fastjson.Writer - if err := doc.MarshalFastJSON(&json); err != nil { - return err - } + p.marshalTraceIDDocument(&json, id, now, p.config.DataStream) if err := p.indexer.Add(ctx, elasticsearch.BulkIndexerItem{ - Action: "index", + Action: "create", Body: bytes.NewReader(json.Bytes()), OnFailure: p.onBulkIndexerItemFailure, }); err != nil { @@ -82,7 +77,7 @@ func (p *Pubsub) PublishSampledTraceIDs(ctx context.Context, traceID ...string) } func (p *Pubsub) onBulkIndexerItemFailure(ctx context.Context, item elasticsearch.BulkIndexerItem, resp elasticsearch.BulkIndexerResponseItem, err error) { - p.config.Logger.With(logp.Error(err)).Debug("publishing sampled trace ID failed") + p.config.Logger.With(logp.Error(err)).Debug("publishing sampled trace ID failed", resp.Error) } // SubscribeSampledTraceIDs subscribes to new sampled trace IDs, sending them to the @@ -146,7 +141,7 @@ func (p *Pubsub) searchTraceIDs(ctx context.Context, out chan<- string, lastSeqN } req := esapi.SearchRequest{ - Index: []string{p.config.Index}, + Index: []string{p.config.DataStream.String()}, Body: esutil.NewJSONReader(searchBody), } resp, err := req.Do(ctx, p.config.Client) @@ -201,6 +196,23 @@ func (p *Pubsub) searchTraceIDs(ctx context.Context, out chan<- string, lastSeqN return n, nil } +func (p *Pubsub) marshalTraceIDDocument(w *fastjson.Writer, traceID string, timestamp time.Time, dataStream DataStreamConfig) { + w.RawString(`{"@timestamp":"`) + w.Time(timestamp.UTC(), time.RFC3339Nano) + w.RawString(`","data_stream.type":`) + w.String(dataStream.Type) + w.RawString(`,"data_stream.dataset":`) + w.String(dataStream.Dataset) + w.RawString(`,"data_stream.namespace":`) + w.String(dataStream.Namespace) + w.RawString(`,"observer":{"id":`) + w.String(p.config.BeatID) + w.RawString(`},`) + w.RawString(`"trace":{"id":`) + w.String(traceID) + w.RawString(`}}`) +} + type traceIDDocument struct { // Observer identifies the entity (typically an APM Server) that observed // and indexed the/ trace ID document. This can be used to filter out local @@ -217,8 +229,11 @@ type traceIDDocument struct { } `json:"trace"` } +/* func (d *traceIDDocument) MarshalFastJSON(w *fastjson.Writer) error { - w.RawString(`{"observer":{"id":`) + w.RawString(`{"@timestamp":"`) + w.Time(d.Timestamp, time.RFC3339Nano) + w.RawString(`","observer":{"id":`) w.String(d.Observer.ID) w.RawString(`},`) w.RawString(`"trace":{"id":`) @@ -226,3 +241,4 @@ func (d *traceIDDocument) MarshalFastJSON(w *fastjson.Writer) error { w.RawString(`}}`) return nil } +*/ diff --git a/x-pack/apm-server/sampling/pubsub/pubsub_integration_test.go b/x-pack/apm-server/sampling/pubsub/pubsub_integration_test.go index 6a7c0447af3..1bdcc44d533 100644 --- a/x-pack/apm-server/sampling/pubsub/pubsub_integration_test.go +++ b/x-pack/apm-server/sampling/pubsub/pubsub_integration_test.go @@ -35,11 +35,16 @@ const ( func TestElasticsearchIntegration_PublishSampledTraceIDs(t *testing.T) { const ( localBeatID = "local_beat_id" - indexName = "apm-testing-sampled-traces" ) + dataStream := pubsub.DataStreamConfig{ + Type: "apm", + Dataset: "sampled_traces", + Namespace: "testing", + } + client := newElasticsearchClient(t) - recreateIndex(t, client, indexName) + recreateDataStream(t, client, dataStream) var input []string for i := 0; i < 50; i++ { @@ -48,7 +53,7 @@ func TestElasticsearchIntegration_PublishSampledTraceIDs(t *testing.T) { es, err := pubsub.New(pubsub.Config{ Client: client, - Index: indexName, + DataStream: dataStream, BeatID: localBeatID, FlushInterval: 100 * time.Millisecond, SearchInterval: time.Minute, @@ -76,7 +81,7 @@ func TestElasticsearchIntegration_PublishSampledTraceIDs(t *testing.T) { for { size := len(input) + 1 resp, err := esapi.SearchRequest{ - Index: []string{indexName}, + Index: []string{dataStream.String()}, Size: &size, }.Do(context.Background(), client) require.NoError(t, err) @@ -105,15 +110,20 @@ func TestElasticsearchIntegration_SubscribeSampledTraceIDs(t *testing.T) { const ( localBeatID = "local_observer_id" remoteBeatID = "remote_observer_id" - indexName = "apm-testing-sampled-traces" ) + dataStream := pubsub.DataStreamConfig{ + Type: "apm", + Dataset: "sampled_traces", + Namespace: "testing", + } + client := newElasticsearchClient(t) - recreateIndex(t, client, indexName) + recreateDataStream(t, client, dataStream) es, err := pubsub.New(pubsub.Config{ Client: client, - Index: indexName, + DataStream: dataStream, BeatID: localBeatID, FlushInterval: time.Minute, SearchInterval: 100 * time.Millisecond, @@ -154,7 +164,7 @@ func TestElasticsearchIntegration_SubscribeSampledTraceIDs(t *testing.T) { assert.NoError(t, enc.Encode(&doc)) } resp, err := esapi.BulkRequest{ - Index: indexName, + Index: dataStream.String(), Body: &body, }.Do(context.Background(), client) require.NoError(t, err) @@ -188,7 +198,7 @@ func TestElasticsearchIntegration_SubscribeSampledTraceIDs(t *testing.T) { } } -func recreateIndex(tb testing.TB, client elasticsearch.Client, indexName string) { +func recreateDataStream(tb testing.TB, client elasticsearch.Client, dataStream pubsub.DataStreamConfig) { body := strings.NewReader(`{ "mappings": { "properties": { @@ -207,14 +217,19 @@ func recreateIndex(tb testing.TB, client elasticsearch.Client, indexName string) } }`) + // NOTE(aww) we cheat and create an index, rather than a + // data stream. System tests will test with data streams, + // and will pick up any resulting discrepancies. + + name := dataStream.String() resp, err := esapi.IndicesDeleteRequest{ - Index: []string{indexName}, + Index: []string{dataStream.String()}, }.Do(context.Background(), client) require.NoError(tb, err) resp.Body.Close() resp, err = esapi.IndicesCreateRequest{ - Index: indexName, + Index: name, Body: body, }.Do(context.Background(), client) require.NoError(tb, err) diff --git a/x-pack/apm-server/sampling/pubsub/pubsub_test.go b/x-pack/apm-server/sampling/pubsub/pubsub_test.go index 78225a4eb7a..5f89787aa69 100644 --- a/x-pack/apm-server/sampling/pubsub/pubsub_test.go +++ b/x-pack/apm-server/sampling/pubsub/pubsub_test.go @@ -28,10 +28,15 @@ import ( func TestPublishSampledTraceIDs(t *testing.T) { const ( - indexName = "trace-ids" - beatID = "beat_id" + beatID = "beat_id" ) + dataStream := pubsub.DataStreamConfig{ + Type: "traces", + Dataset: "sampled", + Namespace: "testing", + } + requests := make(chan *http.Request, 1) srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { var buf bytes.Buffer @@ -54,7 +59,7 @@ func TestPublishSampledTraceIDs(t *testing.T) { pub, err := pubsub.New(pubsub.Config{ Client: client, - Index: indexName, + DataStream: dataStream, BeatID: beatID, FlushInterval: time.Millisecond, SearchInterval: time.Minute, @@ -83,7 +88,7 @@ func TestPublishSampledTraceIDs(t *testing.T) { case <-deadlineTimer.C: t.Fatal("timed out waiting for events to be received by server") case req := <-requests: - require.Equal(t, fmt.Sprintf("/%s/_bulk", indexName), req.URL.Path) + require.Equal(t, fmt.Sprintf("/%s/_bulk", dataStream.String()), req.URL.Path) d := json.NewDecoder(req.Body) for { @@ -93,11 +98,15 @@ func TestPublishSampledTraceIDs(t *testing.T) { break } assert.NoError(t, err) - assert.Equal(t, map[string]interface{}{"index": map[string]interface{}{}}, action) + assert.Equal(t, map[string]interface{}{"create": map[string]interface{}{}}, action) doc := make(map[string]interface{}) assert.NoError(t, d.Decode(&doc)) + assert.Contains(t, doc, "@timestamp") assert.Equal(t, map[string]interface{}{"id": beatID}, doc["observer"]) + assert.Equal(t, dataStream.Type, doc["data_stream.type"]) + assert.Equal(t, dataStream.Dataset, doc["data_stream.dataset"]) + assert.Equal(t, dataStream.Namespace, doc["data_stream.namespace"]) trace := doc["trace"].(map[string]interface{}) traceID := trace["id"].(string) @@ -105,8 +114,13 @@ func TestPublishSampledTraceIDs(t *testing.T) { delete(trace, "id") assert.Empty(t, trace) // no other fields in "trace" + delete(doc, "@timestamp") + delete(doc, "data_stream.type") + delete(doc, "data_stream.dataset") + delete(doc, "data_stream.namespace") delete(doc, "observer") delete(doc, "trace") + assert.Empty(t, doc) // no other fields in doc } } @@ -119,10 +133,15 @@ func TestPublishSampledTraceIDs(t *testing.T) { func TestSubscribeSampledTraceIDs(t *testing.T) { const ( - indexName = "trace-ids" - beatID = "beat_id" + beatID = "beat_id" ) + dataStream := pubsub.DataStreamConfig{ + Type: "traces", + Dataset: "sampled", + Namespace: "default", + } + var requests []*http.Request responses := make(chan string) srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { @@ -148,7 +167,7 @@ func TestSubscribeSampledTraceIDs(t *testing.T) { sub, err := pubsub.New(pubsub.Config{ Client: client, - Index: indexName, + DataStream: dataStream, BeatID: beatID, FlushInterval: time.Minute, SearchInterval: time.Millisecond, @@ -221,7 +240,7 @@ func TestSubscribeSampledTraceIDs(t *testing.T) { var bodies []string for _, r := range requests { - assert.Equal(t, fmt.Sprintf("/%s/_search", indexName), r.URL.Path) + assert.Equal(t, fmt.Sprintf("/%s/_search", dataStream.String()), r.URL.Path) var buf bytes.Buffer io.Copy(&buf, r.Body)