From 511a79fdd696fdee437b0220a25ff658d5df9ffc Mon Sep 17 00:00:00 2001 From: Oscar Reyes Date: Fri, 13 Oct 2023 15:00:31 -0600 Subject: [PATCH] fix(server): Datastore test pipeline --- server/app/app.go | 1 + server/app/ds_test_connection_pipeline.go | 4 +++- server/testconnection/pipeline.go | 13 +++++++++++-- 3 files changed, 15 insertions(+), 3 deletions(-) diff --git a/server/app/app.go b/server/app/app.go index f002789af3..7e980b7323 100644 --- a/server/app/app.go +++ b/server/app/app.go @@ -281,6 +281,7 @@ func (app *App) Start(opts ...appOption) error { tracer, tracedbFactory, app.cfg, + meter, ) dsTestPipeline.Start() diff --git a/server/app/ds_test_connection_pipeline.go b/server/app/ds_test_connection_pipeline.go index 24e3385203..536ea6faeb 100644 --- a/server/app/ds_test_connection_pipeline.go +++ b/server/app/ds_test_connection_pipeline.go @@ -6,6 +6,7 @@ import ( "github.com/kubeshop/tracetest/server/pkg/pipeline" "github.com/kubeshop/tracetest/server/testconnection" "github.com/kubeshop/tracetest/server/tracedb" + "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/trace" ) @@ -15,13 +16,14 @@ func buildDataStoreTestPipeline( tracer trace.Tracer, newTraceDBFn tracedb.FactoryFunc, appConfig *config.AppConfig, + meter metric.Meter, ) *testconnection.DataStoreTestPipeline { requestWorker := testconnection.NewDsTestConnectionRequest(tracer, newTraceDBFn, appConfig.DataStorePipelineTestConnectionEnabled()) notifyWorker := testconnection.NewDsTestConnectionNotify(dsTestListener, tracer) pgQueue := pipeline.NewPostgresQueueDriver[testconnection.Job](pool, pgChannelName) - pipeline := pipeline.New(&testconnection.Configurer[testconnection.Job]{}, + pipeline := pipeline.New(testconnection.NewConfigurer(meter), pipeline.Step[testconnection.Job]{Processor: requestWorker, Driver: pgQueue.Channel("datastore_test_connection_request")}, pipeline.Step[testconnection.Job]{Processor: notifyWorker, Driver: pgQueue.Channel("datastore_test_connection_notify")}, ) diff --git a/server/testconnection/pipeline.go b/server/testconnection/pipeline.go index 958fa53268..b2e75e45ff 100644 --- a/server/testconnection/pipeline.go +++ b/server/testconnection/pipeline.go @@ -10,6 +10,7 @@ import ( "github.com/kubeshop/tracetest/server/model" "github.com/kubeshop/tracetest/server/pkg/pipeline" "github.com/kubeshop/tracetest/server/tracedb" + "go.opentelemetry.io/otel/metric" ) type Job struct { @@ -30,9 +31,17 @@ type DataStoreTestPipeline struct { dsTestListener DsTestListener } -type Configurer[T any] struct{} +type Configurer[T any] struct { + meter metric.Meter +} + +func NewConfigurer(meter metric.Meter) *Configurer[Job] { + return &Configurer[Job]{meter: meter} +} -func (c *Configurer[Job]) Configure(_ *pipeline.Queue[Job]) {} +func (c *Configurer[Job]) Configure(queue *pipeline.Queue[Job]) { + queue.InitializeMetrics(c.meter) +} func NewDataStoreTestPipeline( pipeline *pipeline.Pipeline[Job],