Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: added generic serverless agent #893

Merged
merged 3 commits into from
Sep 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ MODULES = $(filter-out $(EXCLUDE_DIRS) ./example/% , $(shell find . -name go.mod
LINTER ?= $(shell go env GOPATH)/bin/golangci-lint

# The list of Go build tags as they are specified in respective integration test files
INTEGRATION_TESTS = fargate gcr lambda azure
INTEGRATION_TESTS = fargate gcr lambda azure generic_serverless

ifeq ($(RUN_LINTER),yes)
test: $(LINTER)
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ Another interesting feature is the usage of additional packages located under [i
1. [Tracing an application running on Azure Container Apps](docs/azure_container_apps.md)
1. [Tracing Other Go Packages](docs/other_packages.md)
1. [Instrumenting Code Manually](docs/manual_instrumentation.md)
1. [Generic Serverless Agent](/docs/generic_serverless_agent.md)

<!-- Links section -->

Expand Down
9 changes: 9 additions & 0 deletions docs/generic_serverless_agent.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
## Generic Serverless Agent

To monitor Go applications deployed in a serverless environment like AWS Lambda, or on a server without a host agent, the process is similar to monitoring any other application. Simply instrument your application with the Instana Go Tracer SDK, deploy it to the appropriate environment, and ensure that the following two environment variables are set.

> **INSTANA_ENDPOINT_URL** - The Instana backend endpoint that your serverless agents connect to. It depends on your region and is different from the host agent backend endpoint.
> **INSTANA_AGENT_KEY** - Your Instana Agent key. The same agent key can be used for host agents and serverless monitoring.

Please note that, in this generic serverless agent setup, only traces are available, metrics are not. However, for certain specific serverless services like AWS Lambda or Fargate, it is possible to correlate infrastructure and collect metrics as well. For more details, please refer to the documentation [here](https://www.ibm.com/docs/en/instana-observability/current?topic=technologies-monitoring-go#platforms).

172 changes: 172 additions & 0 deletions generic_serverless_agent.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
// (c) Copyright IBM Corp. 2024

package instana

import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"os"
"sync"
"time"

"github.com/google/uuid"
"github.com/instana/go-sensor/acceptor"
"github.com/instana/go-sensor/autoprofile"
)

const (
flushPeriodForGenericInSec = 2
)

type genericServerlessAgent struct {
Endpoint string
Key string
PluginName string
PID int

snapshot serverlessSnapshot

mu sync.Mutex
spanQueue []Span

client *http.Client
logger LeveledLogger
}

func newGenericServerlessAgent(acceptorEndpoint, agentKey string, client *http.Client, logger LeveledLogger) *genericServerlessAgent {
if logger == nil {
logger = defaultLogger
}

if client == nil {
client = http.DefaultClient
// You can change this timeout by setting the INSTANA_TIMEOUT environment variable.
client.Timeout = 2 * time.Second
}

logger.Debug("initializing generic serverless agent")

// Creating a unique serverless host ID.
uniqHostId := "Generic_Serverless_Agent" + uuid.New().String()

agent := &genericServerlessAgent{
Endpoint: acceptorEndpoint,
Key: agentKey,
PID: os.Getpid(),
client: client,
logger: logger,
snapshot: serverlessSnapshot{
Host: uniqHostId,
EntityID: uniqHostId,
},
}

go func() {
t := time.NewTicker(flushPeriodForGenericInSec * time.Second)
defer t.Stop()

for range t.C {
if err := agent.Flush(context.Background()); err != nil {
agent.logger.Error("failed to post collected data: ", err)
}
}
}()

return agent
}

func (a *genericServerlessAgent) Ready() bool { return true }

func (a *genericServerlessAgent) SendMetrics(acceptor.Metrics) error { return nil }

func (a *genericServerlessAgent) SendEvent(*EventData) error { return nil }

func (a *genericServerlessAgent) SendSpans(spans []Span) error {
a.enqueueSpans(spans)
return nil
}

func (a *genericServerlessAgent) SendProfiles([]autoprofile.Profile) error { return nil }

func (a *genericServerlessAgent) Flush(ctx context.Context) error {
from := newServerlessAgentFromS(a.snapshot.EntityID, "generic_serverless")

payload := struct {
Spans []Span `json:"spans,omitempty"`
}{}

a.mu.Lock()
payload.Spans = make([]Span, len(a.spanQueue))
copy(payload.Spans, a.spanQueue)
a.spanQueue = a.spanQueue[:0]
a.mu.Unlock()

for i := range payload.Spans {
payload.Spans[i].From = from
}

buf := bytes.NewBuffer(nil)
if err := json.NewEncoder(buf).Encode(payload); err != nil {
return fmt.Errorf("failed to marshal traces payload: %s", err)
}

payloadSize := buf.Len()
if payloadSize > maxContentLength {
a.logger.Warn(fmt.Sprintf("failed to send the spans. Payload size: %d exceeded max size: %d", payloadSize, maxContentLength))
return payloadTooLargeErr
}

req, err := http.NewRequest(http.MethodPost, a.Endpoint+"/bundle", buf)
if err != nil {
a.enqueueSpans(payload.Spans)
return fmt.Errorf("failed to prepare send traces request: %s", err)
}

req.Header.Set("Content-Type", "application/json")

if err := a.sendRequest(req.WithContext(ctx)); err != nil {
a.enqueueSpans(payload.Spans)
return fmt.Errorf("failed to send traces, will retry later: %dsec. Error details: %s",
flushPeriodForGenericInSec, err.Error())
}

return nil
}

func (a *genericServerlessAgent) enqueueSpans(spans []Span) {
a.mu.Lock()
defer a.mu.Unlock()

a.spanQueue = append(a.spanQueue, spans...)
}

func (a *genericServerlessAgent) sendRequest(req *http.Request) error {
req.Header.Set("X-Instana-Host", a.snapshot.Host)
req.Header.Set("X-Instana-Key", a.Key)

resp, err := a.client.Do(req)
if err != nil {
return fmt.Errorf("failed to send request to the serverless agent: %s", err)
}

defer resp.Body.Close()

if resp.StatusCode >= http.StatusBadRequest {
respBody, err := io.ReadAll(resp.Body)
if err != nil {
a.logger.Debug("failed to read serverless agent response: ", err.Error())
return err
}

a.logger.Info("serverless agent has responded with ", resp.Status, ": ", string(respBody))
return err
}

io.CopyN(io.Discard, resp.Body, 1<<20)

return nil
}
105 changes: 105 additions & 0 deletions generic_serverless_agent_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
// (c) Copyright IBM Corp. 2024

//go:build generic_serverless && integration
// +build generic_serverless,integration

package instana_test

import (
"context"
"encoding/json"
"log"
"os"
"testing"
"time"

instana "github.com/instana/go-sensor"
"github.com/stretchr/testify/require"
)

var agent *serverlessAgent

func TestMain(m *testing.M) {
teardownInstanaEnv := setupInstanaEnv()
defer teardownInstanaEnv()

var err error
agent, err = setupServerlessAgent()
if err != nil {
log.Fatalf("failed to initialize serverless agent: %s", err)
}

os.Exit(m.Run())
}

func TestLocalServerlessAgent_SendSpans(t *testing.T) {
defer agent.Reset()

tracer := instana.NewTracer()
sensor := instana.NewSensorWithTracer(tracer)
defer instana.ShutdownSensor()

sp := sensor.Tracer().StartSpan("generic_serverless")
sp.Finish()

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

require.NoError(t, tracer.Flush(ctx))
require.Len(t, agent.Bundles, 1)

var spans []map[string]json.RawMessage
for _, bundle := range agent.Bundles {
var payload struct {
Spans []map[string]json.RawMessage `json:"spans"`
}

require.NoError(t, json.Unmarshal(bundle.Body, &payload), "%s", string(bundle.Body))
spans = append(spans, payload.Spans...)
}

require.Len(t, spans, 1)
}

func TestLocalServerlessAgent_SendSpans_Error(t *testing.T) {
defer agent.Reset()

tracer := instana.NewTracer()
sensor := instana.NewSensorWithTracer(tracer)
defer instana.ShutdownSensor()

sp := sensor.Tracer().StartSpan("http")
sp.SetTag("returnError", "true")
sp.Finish()

ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()

require.NoError(t, tracer.Flush(ctx))
require.Len(t, agent.Bundles, 0)
}

func setupInstanaEnv() func() {
var teardownFuncs []func()

teardownFuncs = append(teardownFuncs, restoreEnvVarFunc("INSTANA_AGENT_KEY"))
os.Setenv("INSTANA_AGENT_KEY", "testkey1")

teardownFuncs = append(teardownFuncs, restoreEnvVarFunc("INSTANA_ZONE"))
os.Setenv("INSTANA_ZONE", "testzone")

teardownFuncs = append(teardownFuncs, restoreEnvVarFunc("INSTANA_TAGS"))
os.Setenv("INSTANA_TAGS", "key1=value1,key2")

teardownFuncs = append(teardownFuncs, restoreEnvVarFunc("INSTANA_SECRETS"))
os.Setenv("INSTANA_SECRETS", "contains-ignore-case:key,password,secret,classified")

teardownFuncs = append(teardownFuncs, restoreEnvVarFunc("CLASSIFIED_DATA"))
os.Setenv("CLASSIFIED_DATA", "classified")

return func() {
for _, f := range teardownFuncs {
f()
}
}
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/instana/go-sensor
go 1.22

require (
github.com/google/uuid v1.6.0
github.com/looplab/fsm v1.0.1
github.com/opentracing/opentracing-go v1.2.0
github.com/stretchr/testify v1.8.1
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/looplab/fsm v1.0.1 h1:OEW0ORrIx095N/6lgoGkFkotqH6s7vaFPsgjLAaF5QU=
github.com/looplab/fsm v1.0.1/go.mod h1:PmD3fFvQEIsjMEfvZdrCDZ6y8VwKTwWNjlpEr6IKPO4=
github.com/opentracing/opentracing-go v1.2.0 h1:uEJPy/1a5RIPAJ0Ov+OIO8OxWu77jEv+1B0VhjKrZUs=
Expand Down
2 changes: 1 addition & 1 deletion sensor.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,6 @@ func newServerlessAgent(serviceName, agentEndpoint, agentKey string,
os.Getenv(containerAppHostName) != "":
return newAzureAgent(agentEndpoint, agentKey, client, logger)
default:
return nil
return newGenericServerlessAgent(agentEndpoint, agentKey, client, logger)
}
}
Loading