Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async traces #225

Merged
merged 13 commits into from
Apr 19, 2022
1 change: 1 addition & 0 deletions server/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1254,6 +1254,7 @@ github.com/streadway/handy v0.0.0-20190108123426-d5acb3125c2a/go.mod h1:qNTQ5P5J
github.com/stretchr/objx v0.0.0-20180129172003-8a3f7159479f/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.2.0 h1:Hbg2NidpLE8veEBkEZTL3CvlkUIVzuU9jDplZO54c48=
github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE=
github.com/stretchr/testify v0.0.0-20180303142811-b89eecf5ca5d/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.2.0/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
Expand Down
141 changes: 12 additions & 129 deletions server/go/api_api_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,9 @@ package openapi
import (
"context"
"errors"
"fmt"
"math/rand"
"net/http"
"time"

"github.com/google/uuid"
"github.com/kubeshop/tracetest/server/go/tracedb"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"
)

Expand All @@ -36,6 +31,7 @@ type TestDB interface {
UpdateResult(ctx context.Context, res *TestRunResult) error
GetResult(ctx context.Context, id string) (*TestRunResult, error)
GetResultsByTestID(ctx context.Context, testid string) ([]TestRunResult, error)
GetResultsByTraceID(ctx context.Context, testid, traceid string) (TestRunResult, error)

CreateAssertion(ctx context.Context, testid string, assertion *Assertion) (string, error)
GetAssertion(ctx context.Context, id string) (*Assertion, error)
Expand All @@ -51,21 +47,17 @@ type TestExecutor interface {
// This service should implement the business logic for every endpoint for the ApiApi API.
// Include any external packages or services that will be required by this service.
type ApiApiService struct {
traceDB tracedb.TraceDB
testDB TestDB
executor TestExecutor
rand *rand.Rand
maxWaitTimeForTrace time.Duration
traceDB tracedb.TraceDB
testDB TestDB
runner Runner
}

// NewApiApiService creates a default api service
func NewApiApiService(traceDB tracedb.TraceDB, testDB TestDB, executor TestExecutor, maxWaitTimeForTrace time.Duration) ApiApiServicer {
func NewApiApiService(traceDB tracedb.TraceDB, testDB TestDB, runner Runner) ApiApiServicer {
return &ApiApiService{
traceDB: traceDB,
testDB: testDB,
executor: executor,
rand: rand.New(rand.NewSource(time.Now().UnixNano())),
maxWaitTimeForTrace: maxWaitTimeForTrace,
traceDB: traceDB,
testDB: testDB,
runner: runner,
}
}

Expand Down Expand Up @@ -115,58 +107,14 @@ func (s *ApiApiService) GetTest(ctx context.Context, testid string) (ImplRespons
}

if test.ReferenceTestRunResult.TraceId != "" {
res := test.ReferenceTestRunResult
tr, err := s.traceDB.GetTraceByID(ctx, res.TraceId)
if handledErr := s.handleGetTraceError(ctx, res, err); handledErr != nil {
return Response(http.StatusInternalServerError, handledErr.Error()), handledErr
}
res.State = TestRunStateAwaitingTestResults
if err := s.testDB.UpdateResult(ctx, &res); err != nil {
fmt.Printf("update result err: %s\n", err)
return Response(http.StatusInternalServerError, err.Error()), err
}

sid, err := trace.SpanIDFromHex(res.SpanId)
if err != nil {
return Response(http.StatusInternalServerError, err.Error()), err
}
tid, err := trace.TraceIDFromHex(res.TraceId)
res, err := s.testDB.GetResultsByTraceID(ctx, test.TestId, test.ReferenceTestRunResult.TraceId)
if err != nil {
return Response(http.StatusInternalServerError, err.Error()), err
}
ttr := FixParent(tr, string(tid[:]), string(sid[:]), res.Response)
res.Trace = mapTrace(ttr)

test.ReferenceTestRunResult = res
}
return Response(200, test), nil
}

func (s *ApiApiService) handleGetTraceError(ctx context.Context, res TestRunResult, err error) error {
if err == nil {
return nil
}

var finalErr error

if errors.Is(err, tracedb.ErrTraceNotFound) {
if time.Since(res.CompletedAt) < s.maxWaitTimeForTrace {
return nil
}
finalErr = fmt.Errorf("timed out waiting for traces after %s", s.maxWaitTimeForTrace.String())
} else {
finalErr = fmt.Errorf("cannot fetch trace: %w", err)
}

res.State = TestRunStateFailed
res.LastErrorState = finalErr.Error()
if err := s.testDB.UpdateResult(ctx, &res); err != nil {
fmt.Printf("update result err: %s\n", err)
finalErr = err
}

return finalErr

return Response(200, test), nil
schoren marked this conversation as resolved.
Show resolved Hide resolved
}

// GetTests - Gets all tests
Expand All @@ -180,7 +128,7 @@ func (s *ApiApiService) GetTests(ctx context.Context) (ImplResponse, error) {
}

func (s *ApiApiService) RunTest(ctx context.Context, testid string) (ImplResponse, error) {
t, err := s.testDB.GetTest(ctx, testid)
test, err := s.testDB.GetTest(ctx, testid)
if err != nil {
switch {
case errors.Is(ErrNotFound, err):
Expand All @@ -190,72 +138,7 @@ func (s *ApiApiService) RunTest(ctx context.Context, testid string) (ImplRespons
}
}

id := uuid.New().String()
tid := trace.TraceID{}
s.rand.Read(tid[:])

sid := trace.SpanID{}
s.rand.Read(sid[:])

res := &TestRunResult{
ResultId: id,
TestId: testid,
CreatedAt: time.Now(),
TraceId: tid.String(),
SpanId: sid.String(),
State: TestRunStateCreated,
}

err = s.testDB.CreateResult(ctx, testid, res)
if err != nil {
return Response(http.StatusInternalServerError, err.Error()), err
}

go func(t Test, tid trace.TraceID, sid trace.SpanID, res TestRunResult) {
tracer := otel.GetTracerProvider().Tracer("")
ctx, span := tracer.Start(ctx, "Execute Test")
defer span.End()

res.State = TestRunStateExecuting
err = s.testDB.UpdateResult(ctx, &res)
if err != nil {
fmt.Printf("update result err: %s\n", err)
return
}

fmt.Println("executing test")
resp, err := s.executor.Execute(&t, tid, sid)
if err != nil {
fmt.Printf("exec err: %s", err)
res.State = TestRunStateFailed
err = s.testDB.UpdateResult(ctx, &res)
if err != nil {
fmt.Printf("update result err: %s\n", err)
}
return
}
fmt.Println(resp)

res.State = TestRunStateAwaitingTrace
res.Response = resp.Response
res.CompletedAt = time.Now()
err = s.testDB.UpdateResult(ctx, &res)
if err != nil {
fmt.Printf("update result err: %s\n", err)
return
}

if t.ReferenceTestRunResult.ResultId == "" {
t.ReferenceTestRunResult = res
err = s.testDB.UpdateTest(ctx, &t)
if err != nil {
fmt.Printf("update test last result err: %s\n", err)
return
}
}

fmt.Println("executed successfully")
}(*t, tid, sid, *res)
id := s.runner.Run(*test)

return Response(200, TestRun{
TestRunId: id,
Expand Down
12 changes: 4 additions & 8 deletions server/go/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"net/http"
"strings"

"github.com/google/uuid"
openapi "github.com/kubeshop/tracetest/server/go"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
"go.opentelemetry.io/contrib/propagators/aws/xray"
Expand Down Expand Up @@ -37,7 +36,7 @@ func New() (*TestExecutor, error) {
}, nil
}

func (te *TestExecutor) Execute(test *openapi.Test, tid trace.TraceID, sid trace.SpanID) (*openapi.TestRunResult, error) {
func (te *TestExecutor) Execute(test *openapi.Test, tid trace.TraceID, sid trace.SpanID) (openapi.HttpResponse, error) {

client := http.Client{
Transport: otelhttp.NewTransport(http.DefaultTransport,
Expand Down Expand Up @@ -68,7 +67,7 @@ func (te *TestExecutor) Execute(test *openapi.Test, tid trace.TraceID, sid trace
}
req, err := http.NewRequest(strings.ToUpper(tReq.Method), tReq.Url, body)
if err != nil {
return nil, err
return openapi.HttpResponse{}, err
}
for _, h := range tReq.Headers {
req.Header.Set(h.Key, h.Value)
Expand All @@ -93,13 +92,10 @@ func (te *TestExecutor) Execute(test *openapi.Test, tid trace.TraceID, sid trace

resp, err := client.Do(req.WithContext(trace.ContextWithSpanContext(context.Background(), sc)))
if err != nil {
return nil, err
return openapi.HttpResponse{}, err
}

return &openapi.TestRunResult{
ResultId: uuid.New().String(),
Response: mapResp(resp),
}, nil
return mapResp(resp), nil
}

func mapResp(resp *http.Response) openapi.HttpResponse {
Expand Down
20 changes: 10 additions & 10 deletions server/go/executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ func TestExecuteGet(t *testing.T) {
resp, err := ex.Execute(test, tid, sid)
assert.NoError(t, err)

assert.Equal(t, int32(200), resp.Response.StatusCode)
assert.Equal(t, "OK", resp.Response.Body)
assert.Equal(t, int32(200), resp.StatusCode)
assert.Equal(t, "OK", resp.Body)
}

func TestExecutePost(t *testing.T) {
Expand Down Expand Up @@ -115,8 +115,8 @@ func TestExecutePost(t *testing.T) {
resp, err := ex.Execute(test, tid, sid)
assert.NoError(t, err)

assert.Equal(t, int32(200), resp.Response.StatusCode)
assert.Equal(t, "OK", resp.Response.Body)
assert.Equal(t, int32(200), resp.StatusCode)
assert.Equal(t, "OK", resp.Body)
}

func TestExecutePostWithApiKeyAuth(t *testing.T) {
Expand Down Expand Up @@ -182,8 +182,8 @@ func TestExecutePostWithApiKeyAuth(t *testing.T) {
resp, err := ex.Execute(test, tid, sid)
assert.NoError(t, err)

assert.Equal(t, int32(200), resp.Response.StatusCode)
assert.Equal(t, "OK", resp.Response.Body)
assert.Equal(t, int32(200), resp.StatusCode)
assert.Equal(t, "OK", resp.Body)
}

func TestExecutePostWithBasicAuth(t *testing.T) {
Expand Down Expand Up @@ -248,8 +248,8 @@ func TestExecutePostWithBasicAuth(t *testing.T) {
resp, err := ex.Execute(test, tid, sid)
assert.NoError(t, err)

assert.Equal(t, int32(200), resp.Response.StatusCode)
assert.Equal(t, "OK", resp.Response.Body)
assert.Equal(t, int32(200), resp.StatusCode)
assert.Equal(t, "OK", resp.Body)
}

func TestExecutePostWithBearerAuth(t *testing.T) {
Expand Down Expand Up @@ -313,6 +313,6 @@ func TestExecutePostWithBearerAuth(t *testing.T) {
resp, err := ex.Execute(test, tid, sid)
assert.NoError(t, err)

assert.Equal(t, int32(200), resp.Response.StatusCode)
assert.Equal(t, "OK", resp.Response.Body)
assert.Equal(t, int32(200), resp.StatusCode)
assert.Equal(t, "OK", resp.Body)
}
40 changes: 40 additions & 0 deletions server/go/id_generator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package openapi

import (
"math/rand"
"time"

"github.com/google/uuid"
"go.opentelemetry.io/otel/trace"
)

type IDGenerator interface {
UUID() string
TraceID() trace.TraceID
SpanID() trace.SpanID
}

func NewRandGenerator() IDGenerator {
return randGenerator{
rand: rand.New(rand.NewSource(time.Now().UnixNano())),
}
}

type randGenerator struct {
rand *rand.Rand
}

func (g randGenerator) UUID() string {
return uuid.New().String()
}

func (g randGenerator) TraceID() trace.TraceID {
tid := trace.TraceID{}
g.rand.Read(tid[:])
return tid
}
func (g randGenerator) SpanID() trace.SpanID {
sid := trace.SpanID{}
g.rand.Read(sid[:])
return sid
}
15 changes: 15 additions & 0 deletions server/go/mocks/testdb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading