Skip to content

Commit

Permalink
Revert "Revise wait_for_integration ES implementation (#12150)" (#12240)
Browse files Browse the repository at this point in the history
This reverts commit 3e54384.
  • Loading branch information
axw authored Dec 19, 2023
1 parent b5547a5 commit 54e367c
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 110 deletions.
46 changes: 15 additions & 31 deletions internal/beater/beater.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"net/http"
"os"
"runtime"
"sync"
"time"

"github.com/dustin/go-humanize"
Expand All @@ -43,9 +42,11 @@ import (
_ "google.golang.org/grpc/encoding/gzip"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/esleg/eslegclient"
"github.com/elastic/beats/v7/libbeat/instrumentation"
"github.com/elastic/beats/v7/libbeat/licenser"
"github.com/elastic/beats/v7/libbeat/outputs"
esoutput "github.com/elastic/beats/v7/libbeat/outputs/elasticsearch"
"github.com/elastic/beats/v7/libbeat/publisher/pipeline"
"github.com/elastic/beats/v7/libbeat/publisher/pipetool"
agentconfig "github.com/elastic/elastic-agent-libs/config"
Expand Down Expand Up @@ -346,13 +347,7 @@ func (s *Runner) Run(ctx context.Context) error {
// any events to Elasticsearch before the integration is ready.
publishReady := make(chan struct{})
drain := make(chan struct{})
startWaitReady := make(chan struct{})
var waitReadyOnce sync.Once
g.Go(func() error {
select {
case <-ctx.Done():
case <-startWaitReady:
}
if err := s.waitReady(ctx, kibanaClient, tracer); err != nil {
// One or more preconditions failed; drop events.
close(drain)
Expand All @@ -363,25 +358,24 @@ func (s *Runner) Run(ctx context.Context) error {
close(publishReady)
return nil
})
prePublish := func(ctx context.Context) error {
waitReadyOnce.Do(func() {
close(startWaitReady)
})
callbackUUID, err := esoutput.RegisterConnectCallback(func(*eslegclient.Connection) error {
select {
case <-ctx.Done():
return ctx.Err()
case <-drain:
return errServerShuttingDown
case <-publishReady:
return nil
default:
}
return nil
return errors.New("not ready for publishing events")
})
if err != nil {
return err
}
defer esoutput.DeregisterConnectCallback(callbackUUID)
newElasticsearchClient := func(cfg *elasticsearch.Config) (*elasticsearch.Client, error) {
httpTransport, err := elasticsearch.NewHTTPTransport(cfg)
if err != nil {
return nil, err
}
transport := &waitReadyRoundTripper{Transport: httpTransport, onBulk: prePublish}
transport := &waitReadyRoundTripper{Transport: httpTransport, ready: publishReady, drain: drain}
return elasticsearch.NewClientParams(elasticsearch.ClientParams{
Config: cfg,
Transport: transport,
Expand Down Expand Up @@ -438,7 +432,7 @@ func (s *Runner) Run(ctx context.Context) error {
// Create the BatchProcessor chain that is used to process all events,
// including the metrics aggregated by APM Server.
finalBatchProcessor, closeFinalBatchProcessor, err := s.newFinalBatchProcessor(
tracer, newElasticsearchClient, memLimitGB, prePublish,
tracer, newElasticsearchClient, memLimitGB,
)
if err != nil {
return err
Expand Down Expand Up @@ -653,9 +647,7 @@ func (s *Runner) waitReady(
return errors.New("cannot wait for integration without either Kibana or Elasticsearch config")
}
preconditions = append(preconditions, func(ctx context.Context) error {
return checkIndexTemplatesInstalled(
ctx, kibanaClient, esOutputClient, s.config.DataStreams.Namespace, s.logger,
)
return checkIntegrationInstalled(ctx, kibanaClient, esOutputClient, s.logger)
})
}

Expand All @@ -680,13 +672,12 @@ func (s *Runner) newFinalBatchProcessor(
tracer *apm.Tracer,
newElasticsearchClient func(cfg *elasticsearch.Config) (*elasticsearch.Client, error),
memLimit float64,
prePublish func(context.Context) error,
) (modelpb.BatchProcessor, func(context.Context) error, error) {

monitoring.Default.Remove("libbeat")
libbeatMonitoringRegistry := monitoring.Default.NewRegistry("libbeat")
if s.elasticsearchOutputConfig == nil {
return s.newLibbeatFinalBatchProcessor(tracer, prePublish, libbeatMonitoringRegistry)
return s.newLibbeatFinalBatchProcessor(tracer, libbeatMonitoringRegistry)
}

stateRegistry := monitoring.GetNamespace("state").GetRegistry()
Expand Down Expand Up @@ -838,7 +829,6 @@ func docappenderConfig(

func (s *Runner) newLibbeatFinalBatchProcessor(
tracer *apm.Tracer,
prePublish func(context.Context) error,
libbeatMonitoringRegistry *monitoring.Registry,
) (modelpb.BatchProcessor, func(context.Context) error, error) {
// When the publisher stops cleanly it will close its pipeline client,
Expand Down Expand Up @@ -899,13 +889,7 @@ func (s *Runner) newLibbeatFinalBatchProcessor(
}
return acker.Wait(ctx)
}
processor := modelprocessor.Chained{
modelpb.ProcessBatchFunc(func(ctx context.Context, batch *modelpb.Batch) error {
return prePublish(ctx)
}),
publisher,
}
return processor, stop, nil
return publisher, stop, nil
}

const sourcemapIndex = ".apm-source-map"
Expand Down
3 changes: 1 addition & 2 deletions internal/beater/beatertest/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,7 @@ func NewUnstartedServer(t testing.TB, opts ...option) *Server {
require.NoError(t, err)
if !outputConfig.Output.IsSet() {
err = cfg.Merge(map[string]any{
"output.null": map[string]any{},
"queue.mem.flush": map[string]any{"min_events": 1, "timeout": "1ns"},
"output.null": map[string]any{},
})
require.NoError(t, err)
}
Expand Down
91 changes: 54 additions & 37 deletions internal/beater/checkintegration.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,26 +24,23 @@ import (
"fmt"
"io"
"net/http"
"strings"

"github.com/pkg/errors"
"golang.org/x/sync/errgroup"

"github.com/elastic/elastic-agent-libs/logp"

"github.com/elastic/apm-server/internal/elasticsearch"
"github.com/elastic/apm-server/internal/kibana"
"github.com/elastic/go-elasticsearch/v8/typedapi/indices/createdatastream"
"github.com/elastic/go-elasticsearch/v8/typedapi/types"
"github.com/elastic/go-elasticsearch/v8/esapi"
)

// checkIndexTemplatesInstalled checks if the APM index templates are installed by querying the
// APM integration status via Kibana, or by attempting to create a data stream via Elasticsearch,
// returning nil if and only if it is installed.
func checkIndexTemplatesInstalled(
// checkIntegrationInstalled checks if the APM integration is installed by querying Kibana
// and/or Elasticsearch, returning nil if and only if it is installed.
func checkIntegrationInstalled(
ctx context.Context,
kibanaClient *kibana.Client,
esClient *elasticsearch.Client,
namespace string,
logger *logp.Logger,
) (err error) {
defer func() {
Expand All @@ -56,36 +53,37 @@ func checkIndexTemplatesInstalled(
}
}
}()
if esClient != nil {
installed, err := checkCreateDataStream(ctx, esClient, namespace)
if err != nil {
return fmt.Errorf("error checking Elasticsearch index template setup: %w", err)
}
if !installed {
return errors.New("index templates not installed")
}
return nil
}
if kibanaClient != nil {
installed, err := checkIntegrationInstalled(ctx, kibanaClient, logger)
installed, err := checkIntegrationInstalledKibana(ctx, kibanaClient, logger)
if err != nil {
// We only return the Kibana error if we have no Elasticsearch client,
// as we may not have sufficient privileges to query the Fleet API.
if esClient == nil {
return fmt.Errorf("error querying Kibana for integration package status: %w", err)
}
}
if !installed {
} else if !installed {
// We were able to query Kibana, but the package is not yet installed.
// We should continue querying the package status via Kibana, as it is
// more authoritative than checking for index template installation.
return errors.New("integration package not yet installed")
}
// Fall through and query Elasticsearch (if we have a client). Kibana may prematurely
// report packages as installed: https://github.com/elastic/kibana/issues/108649
}
if esClient != nil {
installed, err := checkIntegrationInstalledElasticsearch(ctx, esClient, logger)
if err != nil {
return fmt.Errorf("error querying Elasticsearch for integration index templates: %w", err)
} else if !installed {
return errors.New("integration index templates not installed")
}
}
return nil
}

// checkIntegrationInstalledKibana checks if the APM integration package
// is installed by querying Kibana.
func checkIntegrationInstalled(ctx context.Context, kibanaClient *kibana.Client, logger *logp.Logger) (bool, error) {
func checkIntegrationInstalledKibana(ctx context.Context, kibanaClient *kibana.Client, logger *logp.Logger) (bool, error) {
resp, err := kibanaClient.Send(ctx, "GET", "/api/fleet/epm/packages/apm", nil, nil, nil)
if err != nil {
return false, err
Expand All @@ -108,22 +106,41 @@ func checkIntegrationInstalled(ctx context.Context, kibanaClient *kibana.Client,
return result.Response.Status == "installed", nil
}

// checkCreateDataStream attempts to create a traces-apm-<namespace> data stream,
// returning an error if it could not be created. This will fail if there is no
// index template matching the pattern.
func checkCreateDataStream(ctx context.Context, esClient *elasticsearch.Client, namespace string) (bool, error) {
if _, err := createdatastream.NewCreateDataStreamFunc(esClient)("traces-apm-" + namespace).Do(ctx); err != nil {
var esError *types.ElasticsearchError
if errors.As(err, &esError) {
cause := esError.ErrorCause
if cause.Type == "resource_already_exists_exception" {
return true, nil
func checkIntegrationInstalledElasticsearch(ctx context.Context, esClient *elasticsearch.Client, _ *logp.Logger) (bool, error) {
// TODO(axw) generate the list of expected index templates.
templates := []string{
"traces-apm",
"traces-apm.sampled",
"metrics-apm.app",
"metrics-apm.internal",
"logs-apm.error",
}
for _, intervals := range []string{"1m", "10m", "60m"} {
for _, ds := range []string{"metrics-apm.transaction", "metrics-apm.service_transaction", "metrics-apm.service_destination", "metrics-apm.service_summary"} {
templates = append(templates, fmt.Sprintf("%s.%s", ds, intervals))
}
}
// IndicesGetIndexTemplateRequest accepts a slice of template names,
// but the REST API expects just one index template name. Query them
// in parallel.
g, ctx := errgroup.WithContext(ctx)
for _, template := range templates {
template := template // copy for closure
g.Go(func() error {
req := esapi.IndicesGetIndexTemplateRequest{Name: template}
resp, err := req.Do(ctx, esClient)
if err != nil {
return err
}
if cause.Reason != nil && strings.HasPrefix(*cause.Reason, "no matching index template") {
return false, nil
defer resp.Body.Close()

if resp.IsError() {
body, _ := io.ReadAll(resp.Body)
return fmt.Errorf("unexpected HTTP status: %s (%s)", resp.Status(), bytes.TrimSpace(body))
}
}
return false, err
return nil
})
}
return true, nil
err := g.Wait()
return err == nil, err
}
46 changes: 15 additions & 31 deletions internal/beater/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ func TestServerOTLPGRPC(t *testing.T) {

func TestServerWaitForIntegrationKibana(t *testing.T) {
var requests int64
requestCh := make(chan struct{}, 3)
requestCh := make(chan struct{})
mux := http.NewServeMux()
mux.HandleFunc("/api/status", func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte(`{"version":{"number":"1.2.3"}}`))
Expand Down Expand Up @@ -363,14 +363,6 @@ func TestServerWaitForIntegrationKibana(t *testing.T) {
},
})))

// Send some events to the server. They should be accepted and enqueued.
req := makeTransactionRequest(t, srv.URL)
req.Header.Add("Content-Type", "application/x-ndjson")
resp, err := srv.Client.Do(req)
assert.NoError(t, err)
assert.Equal(t, http.StatusAccepted, resp.StatusCode)
resp.Body.Close()

timeout := time.After(10 * time.Second)
for i := 0; i < 3; i++ {
select {
Expand All @@ -395,8 +387,8 @@ func TestServerWaitForIntegrationKibana(t *testing.T) {

func TestServerWaitForIntegrationElasticsearch(t *testing.T) {
var mu sync.Mutex
var createDataStreamRequests int
createDataStreamRequestsCh := make(chan int)
var tracesRequests int
tracesRequestsCh := make(chan int)
bulkCh := make(chan struct{}, 1)
mux := http.NewServeMux()
mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
Expand All @@ -405,24 +397,17 @@ func TestServerWaitForIntegrationElasticsearch(t *testing.T) {
// elasticsearch client to send bulk requests.
fmt.Fprintln(w, `{"version":{"number":"1.2.3"}}`)
})
mux.HandleFunc("/_data_stream/", func(w http.ResponseWriter, r *http.Request) {
mux.HandleFunc("/_index_template/", func(w http.ResponseWriter, r *http.Request) {
mu.Lock()
defer mu.Unlock()
name := path.Base(r.URL.Path)
if name != "traces-apm-testing" {
panic("unexpected data stream name: " + name)
}
createDataStreamRequests++
switch createDataStreamRequests {
case 1:
w.WriteHeader(500)
case 2:
w.WriteHeader(400)
w.Write([]byte(`{"error":{"root_cause":[{"type":"illegal_argument_exception","reason":"no matching index template found for data stream [traces-apm-testing]"}],"type":"illegal_argument_exception","reason":"no matching index template found for data stream [traces-apm-testing]"},"status":400}`))
case 3:
w.Write([]byte(`{"acknowledged":true}`))
template := path.Base(r.URL.Path)
if template == "traces-apm" {
tracesRequests++
if tracesRequests == 1 {
w.WriteHeader(404)
}
tracesRequestsCh <- tracesRequests
}
createDataStreamRequestsCh <- createDataStreamRequests
})
mux.HandleFunc("/_bulk", func(w http.ResponseWriter, r *http.Request) {
select {
Expand All @@ -437,7 +422,6 @@ func TestServerWaitForIntegrationElasticsearch(t *testing.T) {
"apm-server": map[string]interface{}{
"wait_ready_interval": "100ms",
"data_streams.wait_for_integration": true,
"data_streams.namespace": "testing",
},
"output.elasticsearch": map[string]interface{}{
"hosts": []string{elasticsearchServer.URL},
Expand Down Expand Up @@ -472,8 +456,8 @@ func TestServerWaitForIntegrationElasticsearch(t *testing.T) {
var done bool
for !done {
select {
case n := <-createDataStreamRequestsCh:
done = n == 3
case n := <-tracesRequestsCh:
done = n == 2
case <-timeout:
t.Fatal("timed out waiting for request")
}
Expand All @@ -487,7 +471,7 @@ func TestServerWaitForIntegrationElasticsearch(t *testing.T) {
}

logs := srv.Logs.FilterMessageSnippet("please install the apm integration")
assert.Len(t, logs.All(), 2, "couldn't find remediation message logs")
assert.Len(t, logs.All(), 1, "couldn't find remediation message logs")

// Healthcheck should now report that the server is publish-ready.
resp, err = srv.Client.Get(srv.URL + api.RootPath)
Expand All @@ -506,7 +490,7 @@ func TestServerFailedPreconditionDoesNotIndex(t *testing.T) {
// elasticsearch client to send bulk requests.
fmt.Fprintln(w, `{"version":{"number":"1.2.3"}}`)
})
mux.HandleFunc("/_data_stream/traces-apm-default", func(w http.ResponseWriter, r *http.Request) {
mux.HandleFunc("/_index_template/", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(404)
})
mux.HandleFunc("/_bulk", func(w http.ResponseWriter, r *http.Request) {
Expand Down
Loading

0 comments on commit 54e367c

Please sign in to comment.