Skip to content

Commit

Permalink
sampling: use a data stream for sampled trace docs (#4707) (#4819)
Browse files Browse the repository at this point in the history
* beater: add Managed and Namespace to ServerParams

This enables x-pack/apm-server code to alter behaviour
based on whether APM Server is managed or not, and to
create data streams with the configured namespace.

* sampling: use a data stream for sampled trace docs

Update tail-based sampling to index into and search a
data stream. The data stream will be associated with an
ILM policy that takes care of rollover and deletion.

When running in Fleet-managed mode, apm-server will expect
the data stream and ILM policy to exist for the data stream
called `traces-sampled-<namespace>`. Servers participating
in tail-based sampling are required to be configured with
the same namespace.

When running in standalone mode, apm-server will attempt
to create an index template and ILM policy for a data
stream called `apm-sampled-traces`. This is added for
minimal support while we transition things to Fleet, and
is intended to be removed in a future release. The data
stream is not intended to adhere to the standard indexing
strategy.

* apmpackage: add traces-sampled-* data stream

Add a data stream for sampled trace documents,
along with an ILM policy which rolls over after
1h, and then deletes after 1h.

* systemtest: fetch most recent beats monitoring doc

When searching for beats monitoring docs, make sure
we get the most recent one by sorting on 'timestamp'.

* systemtest: update tail-based sampling test

Update test to rely on apm-server to create its
own data stream index template.

* Cross-reference sampling/pubsub and apmpackage
# Conflicts:
#	changelogs/head.asciidoc
#	systemtest/elasticsearch.go
  • Loading branch information
axw authored Feb 19, 2021
1 parent 4edd816 commit 7ba2c2f
Show file tree
Hide file tree
Showing 25 changed files with 514 additions and 107 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{
"policy": {
"phases": {
"hot": {
"actions": {
"rollover": {
"max_age": "1h"
}
}
},
"delete": {
"min_age": "1h",
"actions": {
"delete": {}
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
{
"description": "Ingest pipeline for sampled trace documents",
"processors": [
{
"set": {
"field": "event.ingested",
"value": "{{_ingest.timestamp}}"
}
}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
- name: '@timestamp'
type: date
description: Event timestamp.
- name: data_stream.type
type: constant_keyword
description: Data stream type.
- name: data_stream.dataset
type: constant_keyword
description: Data stream dataset.
- name: data_stream.namespace
type: constant_keyword
description: Data stream namespace.
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
- name: event.ingested
type: date
description: |
Timestamp when an event arrived in the central data store.
- name: trace.id
type: keyword
description: |
The ID of the sampled trace.
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# When changing fields or ILM policy, make sure to update
# x-pack/apm-server/sampling/pubsub/datastream.go.
- name: observer.id
type: keyword
description: |
The ID of the APM Server that indexed the sampled trace ID.
4 changes: 4 additions & 0 deletions apmpackage/apm/0.1.0/data_stream/sampled_traces/manifest.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
title: APM tail-sampled traces
type: traces
dataset: sampled
ilm_policy: traces-apm.sampled-default_policy
2 changes: 2 additions & 0 deletions apmpackage/cmd/gen-package/genfields.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package main

import (
"io/ioutil"
"log"
"net/http"
"path/filepath"
"sort"
Expand All @@ -42,6 +43,7 @@ func generateFields(version string) map[string][]field {
inputFieldsFiles["app_metrics"] = filterInternalMetrics(inputFieldsFiles["internal_metrics"])

for streamType, inputFields := range inputFieldsFiles {
log.Printf("%s", streamType)
var ecsFields []field
var nonECSFields []field
for _, fields := range populateECSInfo(ecsFlatFields, inputFields) {
Expand Down
32 changes: 28 additions & 4 deletions apmpackage/cmd/gen-package/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,13 @@ var versionMapping = map[string]string{
"8.0": "0.1.0",
}

// Some data streams may not have a counterpart template
// in standalone apm-server, and so it does not make sense
// to maintain a separate fields.yml.
var handwritten = map[string]bool{
"sampled_traces": true,
}

func main() {
stackVersion := common.MustNewVersion(cmd.DefaultSettings().Version)
shortVersion := fmt.Sprintf("%d.%d", stackVersion.Major, stackVersion.Minor)
Expand All @@ -59,11 +66,28 @@ func clear(version string) {
panic(err)
}
for _, f := range fileInfo {
if f.IsDir() {
os.Remove(ecsFilePath(version, f.Name()))
os.Remove(fieldsFilePath(version, f.Name()))
os.RemoveAll(pipelinesPath(version, f.Name()))
if !f.IsDir() {
continue
}
name := f.Name()
if handwritten[name] {
continue
}
removeFile(ecsFilePath(version, name))
removeFile(fieldsFilePath(version, name))
removeDir(pipelinesPath(version, name))
}
ioutil.WriteFile(docsFilePath(version), nil, 0644)
}

func removeFile(path string) {
if err := os.Remove(path); err != nil && !os.IsNotExist(err) {
log.Fatal(err)
}
}

func removeDir(path string) {
if err := os.RemoveAll(path); err != nil && !os.IsNotExist(err) {
log.Fatal(err)
}
}
12 changes: 7 additions & 5 deletions beater/beater.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,11 +366,13 @@ func (s *serverRunner) run() error {
}

if err := runServer(s.runServerContext, ServerParams{
Info: s.beat.Info,
Config: s.config,
Logger: s.logger,
Tracer: s.tracer,
Reporter: reporter,
Info: s.beat.Info,
Config: s.config,
Managed: s.beat.Manager != nil && s.beat.Manager.Enabled(),
Namespace: s.namespace,
Logger: s.logger,
Tracer: s.tracer,
Reporter: reporter,
}); err != nil {
return err
}
Expand Down
6 changes: 6 additions & 0 deletions beater/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,12 @@ type ServerParams struct {
// Config is the configuration used for running the APM Server.
Config *config.Config

// Managed indicates that the server is managed by Fleet.
Managed bool

// Namespace holds the data stream namespace for the server.
Namespace string

// Logger is the logger for the beater component.
Logger *logp.Logger

Expand Down
46 changes: 32 additions & 14 deletions systemtest/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,19 +91,11 @@ func newElasticsearchConfig() elasticsearch.Config {
// and deletes the default ILM policy "apm-rollover-30-days".
func CleanupElasticsearch(t testing.TB) {
const (
legacyPrefix = "apm*"
legacyPrefix = "apm-*"
apmTracesPrefix = "traces-apm*"
apmMetricsPrefix = "metrics-apm*"
apmLogsPrefix = "logs-apm*"
)
requests := []estest.Request{
esapi.IndicesDeleteRequest{Index: []string{legacyPrefix}},
esapi.IndicesDeleteDataStreamRequest{Name: apmTracesPrefix},
esapi.IndicesDeleteDataStreamRequest{Name: apmMetricsPrefix},
esapi.IndicesDeleteDataStreamRequest{Name: apmLogsPrefix},
esapi.IngestDeletePipelineRequest{PipelineID: legacyPrefix},
esapi.IndicesDeleteTemplateRequest{Name: legacyPrefix},
}

doReq := func(req estest.Request) error {
_, err := Elasticsearch.Do(context.Background(), req, nil)
Expand All @@ -113,12 +105,38 @@ func CleanupElasticsearch(t testing.TB) {
return err
}

var g errgroup.Group
for _, req := range requests {
req := req // copy for closure
g.Go(func() error { return doReq(req) })
doParallel := func(requests ...estest.Request) {
t.Helper()
var g errgroup.Group
for _, req := range requests {
req := req // copy for closure
g.Go(func() error { return doReq(req) })
}
if err := g.Wait(); err != nil {
t.Fatal(err)
}
}
if err := g.Wait(); err != nil {

// Delete indices, data streams, and ingest pipelines.
doReq(esapi.IndicesDeleteRequest{Index: []string{legacyPrefix}})
doParallel(
esapi.IndicesDeleteDataStreamRequest{Name: legacyPrefix},
esapi.IndicesDeleteDataStreamRequest{Name: apmTracesPrefix},
esapi.IndicesDeleteDataStreamRequest{Name: apmMetricsPrefix},
esapi.IndicesDeleteDataStreamRequest{Name: apmLogsPrefix},
esapi.IngestDeletePipelineRequest{PipelineID: legacyPrefix},
)

// Delete index templates after deleting data streams.
doParallel(
esapi.IndicesDeleteTemplateRequest{Name: legacyPrefix},
esapi.IndicesDeleteIndexTemplateRequest{Name: apmTracesPrefix},
esapi.IndicesDeleteIndexTemplateRequest{Name: apmMetricsPrefix},
esapi.IndicesDeleteIndexTemplateRequest{Name: apmLogsPrefix},
)

// Refresh indices to ensure all recent changes are visible.
if err := doReq(esapi.IndicesRefreshRequest{}); err != nil {
t.Fatal(err)
}

Expand Down
5 changes: 5 additions & 0 deletions systemtest/estest/search.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,11 @@ func (r *SearchRequest) WithQuery(q interface{}) *SearchRequest {
return r
}

func (r *SearchRequest) WithSort(fieldDirection ...string) *SearchRequest {
r.Sort = fieldDirection
return r
}

func (r *SearchRequest) WithSize(size int) *SearchRequest {
r.Size = &size
return r
Expand Down
9 changes: 7 additions & 2 deletions systemtest/monitoring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package systemtest_test

import (
"context"
"encoding/json"
"testing"
"time"
Expand Down Expand Up @@ -83,9 +84,13 @@ func getBeatsMonitoringStats(t testing.TB, srv *apmservertest.Server, out interf
}

func getBeatsMonitoring(t testing.TB, srv *apmservertest.Server, type_ string, out interface{}) *beatsMonitoringDoc {
result := systemtest.Elasticsearch.ExpectDocs(t, ".monitoring-beats-*",
var result estest.SearchResult
req := systemtest.Elasticsearch.Search(".monitoring-beats-*").WithQuery(
estest.TermQuery{Field: type_ + ".beat.uuid", Value: srv.BeatUUID},
)
).WithSort("timestamp:desc")
if _, err := req.Do(context.Background(), &result, estest.WithCondition(result.Hits.MinHitsCondition(1))); err != nil {
t.Error(err)
}

var doc beatsMonitoringDoc
doc.RawSource = []byte(result.Hits.Hits[0].RawSource)
Expand Down
58 changes: 35 additions & 23 deletions systemtest/sampling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/tidwall/gjson"
"go.elastic.co/apm"
"golang.org/x/sync/errgroup"

"github.com/elastic/apm-server/systemtest"
"github.com/elastic/apm-server/systemtest/apmservertest"
Expand Down Expand Up @@ -88,29 +89,6 @@ func TestKeepUnsampledWarning(t *testing.T) {
func TestTailSampling(t *testing.T) {
systemtest.CleanupElasticsearch(t)

// Create the apm-sampled-traces index for the two servers to coordinate.
_, err := systemtest.Elasticsearch.Do(context.Background(), &esapi.IndicesCreateRequest{
Index: "apm-sampled-traces",
Body: strings.NewReader(`{
"mappings": {
"properties": {
"event.ingested": {"type": "date"},
"observer": {
"properties": {
"id": {"type": "keyword"}
}
},
"trace": {
"properties": {
"id": {"type": "keyword"}
}
}
}
}
}`),
}, nil)
require.NoError(t, err)

srv1 := apmservertest.NewUnstartedServer(t)
srv1.Config.Sampling = &apmservertest.SamplingConfig{
Tail: &apmservertest.TailSamplingConfig{
Expand Down Expand Up @@ -148,6 +126,10 @@ func TestTailSampling(t *testing.T) {
tracer1.Flush(nil)
tracer2.Flush(nil)

// Flush the data stream while the test is running, as we have no
// control over the settings for the sampled traces index template.
refreshPeriodically(t, 250*time.Millisecond, "apm-sampled-traces")

for _, transactionType := range []string{"parent", "child"} {
var result estest.SearchResult
t.Logf("waiting for %d %q transactions", expected, transactionType)
Expand Down Expand Up @@ -220,3 +202,33 @@ func TestTailSamplingUnlicensed(t *testing.T) {
assert.NoError(t, err)
assert.Empty(t, result.Hits.Hits)
}

func refreshPeriodically(t *testing.T, interval time.Duration, index ...string) {
g, ctx := errgroup.WithContext(context.Background())
ctx, cancel := context.WithCancel(ctx)
t.Cleanup(func() {
cancel()
assert.NoError(t, g.Wait())
})
g.Go(func() error {
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
allowNoIndices := true
ignoreUnavailable := true
request := esapi.IndicesRefreshRequest{
Index: index,
AllowNoIndices: &allowNoIndices,
IgnoreUnavailable: &ignoreUnavailable,
}
for {
select {
case <-ctx.Done():
return nil
case <-ticker.C:
}
if _, err := systemtest.Elasticsearch.Do(ctx, &request, nil); err != nil {
return err
}
}
})
}
Loading

0 comments on commit 7ba2c2f

Please sign in to comment.