From d57708bf12a1dd456dfe59f216d4e8924675502e Mon Sep 17 00:00:00 2001 From: Kajetan Date: Sat, 4 Feb 2023 01:19:20 +0100 Subject: [PATCH 1/5] Custom interceptors for Temporal --- aggregatedpool/workers.go | 6 +++++- common/interfaces.go | 6 ++++++ go.mod | 1 + go.sum | 2 ++ plugin.go | 22 ++++++++++++++++++++-- 5 files changed, 34 insertions(+), 3 deletions(-) diff --git a/aggregatedpool/workers.go b/aggregatedpool/workers.go index e0971da7..3e366d0c 100644 --- a/aggregatedpool/workers.go +++ b/aggregatedpool/workers.go @@ -5,6 +5,7 @@ import ( "time" "github.com/google/uuid" + "github.com/temporalio/roadrunner-temporal/v4/common" "github.com/temporalio/roadrunner-temporal/v4/internal" tActivity "go.temporal.io/sdk/activity" temporalClient "go.temporal.io/sdk/client" @@ -15,7 +16,7 @@ import ( const tq = "taskqueue" -func TemporalWorkers(wDef *Workflow, actDef *Activity, wi []*internal.WorkerInfo, log *zap.Logger, tc temporalClient.Client) ([]worker.Worker, error) { +func TemporalWorkers(wDef *Workflow, actDef *Activity, wi []*internal.WorkerInfo, log *zap.Logger, tc temporalClient.Client, interceptors map[string]common.TemporalInterceptor) ([]worker.Worker, error) { workers := make([]worker.Worker, 0, 1) for i := 0; i < len(wi); i++ { @@ -37,6 +38,9 @@ func TemporalWorkers(wDef *Workflow, actDef *Activity, wi []*internal.WorkerInfo // interceptor used here to the headers wi[i].Options.Interceptors = append(wi[i].Options.Interceptors, NewWorkerInterceptor()) + for _, interceptor := range interceptors { + wi[i].Options.Interceptors = append(wi[i].Options.Interceptors, interceptor.TemporalInterceptor()) + } wrk := worker.New(tc, wi[i].TaskQueue, wi[i].Options) diff --git a/common/interfaces.go b/common/interfaces.go index 17954851..67017bc6 100644 --- a/common/interfaces.go +++ b/common/interfaces.go @@ -10,9 +10,15 @@ import ( "github.com/roadrunner-server/sdk/v4/state/process" "github.com/roadrunner-server/sdk/v4/worker" "github.com/temporalio/roadrunner-temporal/v4/internal" + "go.temporal.io/sdk/interceptor" "go.uber.org/zap" ) +type TemporalInterceptor interface { + TemporalInterceptor() interceptor.Interceptor + Name() string +} + type Pool interface { // Workers returns worker list associated with the pool. Workers() (workers []*worker.Process) diff --git a/go.mod b/go.mod index 5fdc8378..8e9f0e76 100644 --- a/go.mod +++ b/go.mod @@ -7,6 +7,7 @@ require ( github.com/goccy/go-json v0.10.0 github.com/golang/protobuf v1.5.2 github.com/google/uuid v1.3.0 + github.com/roadrunner-server/endure/v2 v2.0.1 github.com/roadrunner-server/errors v1.2.0 github.com/roadrunner-server/sdk/v4 v4.0.0 github.com/stretchr/testify v1.8.1 diff --git a/go.sum b/go.sum index 9cb94186..8510b342 100644 --- a/go.sum +++ b/go.sum @@ -610,6 +610,8 @@ github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4O github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA= github.com/prometheus/procfs v0.9.0 h1:wzCHvIvM5SxWqYvwgVL7yJY8Lz3PKn49KQtpgMYJfhI= github.com/prometheus/procfs v0.9.0/go.mod h1:+pB4zwohETzFnmlpe6yd2lSc+0/46IYZRB/chUwxUZY= +github.com/roadrunner-server/endure/v2 v2.0.1 h1:2greoQ669iCjsYNt14dwXXQGHAKDmLtFzL0OIE0ZATc= +github.com/roadrunner-server/endure/v2 v2.0.1/go.mod h1:RDrC9SFlyCGqGA2v9SqFIA+EqWTFmPxafIb4SMeHCHM= github.com/roadrunner-server/errors v1.2.0 h1:qBmNXt8Iex9QnYTjCkbJKsBZu2EtYkQCM06GUDcQBbI= github.com/roadrunner-server/errors v1.2.0/go.mod h1:z0ECxZp/dDa5RahtMcy4mBIavVxiZ9vwE5kByl7kFtY= github.com/roadrunner-server/goridge/v3 v3.6.2 h1:LH5HXfCygDp05KnOaXpa4fqVPWTsH7V3lfvPtMwFU3k= diff --git a/plugin.go b/plugin.go index aabe0068..d68420d1 100644 --- a/plugin.go +++ b/plugin.go @@ -13,6 +13,7 @@ import ( "time" prom "github.com/prometheus/client_golang/prometheus" + "github.com/roadrunner-server/endure/v2/dep" "github.com/roadrunner-server/errors" "github.com/roadrunner-server/sdk/v4/events" "github.com/roadrunner-server/sdk/v4/metrics" @@ -89,6 +90,8 @@ type Plugin struct { stopCh chan struct{} workers []worker.Worker + + intcp map[string]common.TemporalInterceptor } func (p *Plugin) Init(cfg common.Configurer, log Logger, server common.Server) error { @@ -224,6 +227,8 @@ func (p *Plugin) Init(cfg common.Configurer, log Logger, server common.Server) e } } + p.intcp = make(map[string]common.TemporalInterceptor) + return nil } @@ -428,7 +433,7 @@ func (p *Plugin) Reset() error { } // based on the worker info -> initialize workers - p.workers, err = aggregatedpool.TemporalWorkers(p.rrWorkflowDef, p.rrActivityDef, wi, p.log, p.client) + p.workers, err = aggregatedpool.TemporalWorkers(p.rrWorkflowDef, p.rrActivityDef, wi, p.log, p.client, p.intcp) if err != nil { return err } @@ -510,7 +515,7 @@ func (p *Plugin) initPool() error { return err } - p.workers, err = aggregatedpool.TemporalWorkers(p.rrWorkflowDef, p.rrActivityDef, wi, p.log, p.client) + p.workers, err = aggregatedpool.TemporalWorkers(p.rrWorkflowDef, p.rrActivityDef, wi, p.log, p.client, p.intcp) if err != nil { return err } @@ -536,3 +541,16 @@ func (p *Plugin) initPool() error { return nil } + +// Collects collecting grpc interceptors +func (p *Plugin) Collects() []*dep.In { + return []*dep.In{ + dep.Fits(func(pp any) { + mdw := pp.(common.TemporalInterceptor) + // just to be safe + p.mu.Lock() + p.intcp[mdw.Name()] = mdw + p.mu.Unlock() + }, (*common.TemporalInterceptor)(nil)), + } +} From 5cc14d4121f248fae5baf5d436d85dbe1c8d04a3 Mon Sep 17 00:00:00 2001 From: Kajetan Date: Sat, 4 Feb 2023 09:21:38 +0100 Subject: [PATCH 2/5] Name improvements --- plugin.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/plugin.go b/plugin.go index d68420d1..36d88192 100644 --- a/plugin.go +++ b/plugin.go @@ -91,7 +91,7 @@ type Plugin struct { workers []worker.Worker - intcp map[string]common.TemporalInterceptor + interceptors map[string]common.TemporalInterceptor } func (p *Plugin) Init(cfg common.Configurer, log Logger, server common.Server) error { @@ -227,7 +227,7 @@ func (p *Plugin) Init(cfg common.Configurer, log Logger, server common.Server) e } } - p.intcp = make(map[string]common.TemporalInterceptor) + p.interceptors = make(map[string]common.TemporalInterceptor) return nil } @@ -433,7 +433,7 @@ func (p *Plugin) Reset() error { } // based on the worker info -> initialize workers - p.workers, err = aggregatedpool.TemporalWorkers(p.rrWorkflowDef, p.rrActivityDef, wi, p.log, p.client, p.intcp) + p.workers, err = aggregatedpool.TemporalWorkers(p.rrWorkflowDef, p.rrActivityDef, wi, p.log, p.client, p.interceptors) if err != nil { return err } @@ -515,7 +515,7 @@ func (p *Plugin) initPool() error { return err } - p.workers, err = aggregatedpool.TemporalWorkers(p.rrWorkflowDef, p.rrActivityDef, wi, p.log, p.client, p.intcp) + p.workers, err = aggregatedpool.TemporalWorkers(p.rrWorkflowDef, p.rrActivityDef, wi, p.log, p.client, p.interceptors) if err != nil { return err } @@ -549,7 +549,7 @@ func (p *Plugin) Collects() []*dep.In { mdw := pp.(common.TemporalInterceptor) // just to be safe p.mu.Lock() - p.intcp[mdw.Name()] = mdw + p.interceptors[mdw.Name()] = mdw p.mu.Unlock() }, (*common.TemporalInterceptor)(nil)), } From 7d6f210cdfcd68884c9a547f10c53608170626e6 Mon Sep 17 00:00:00 2001 From: Kajetan Date: Sat, 4 Feb 2023 09:48:12 +0100 Subject: [PATCH 3/5] More specific interface --- common/interfaces.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common/interfaces.go b/common/interfaces.go index 67017bc6..e218e0c8 100644 --- a/common/interfaces.go +++ b/common/interfaces.go @@ -15,7 +15,7 @@ import ( ) type TemporalInterceptor interface { - TemporalInterceptor() interceptor.Interceptor + TemporalInterceptor() interceptor.WorkerInterceptor Name() string } From d48400f6361f54748b1bea8008960a7992fe1bde Mon Sep 17 00:00:00 2001 From: Kajetan Date: Mon, 6 Feb 2023 13:28:28 +0100 Subject: [PATCH 4/5] Rename method --- aggregatedpool/workers.go | 2 +- common/interfaces.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/aggregatedpool/workers.go b/aggregatedpool/workers.go index 3e366d0c..8e8e9da2 100644 --- a/aggregatedpool/workers.go +++ b/aggregatedpool/workers.go @@ -39,7 +39,7 @@ func TemporalWorkers(wDef *Workflow, actDef *Activity, wi []*internal.WorkerInfo // interceptor used here to the headers wi[i].Options.Interceptors = append(wi[i].Options.Interceptors, NewWorkerInterceptor()) for _, interceptor := range interceptors { - wi[i].Options.Interceptors = append(wi[i].Options.Interceptors, interceptor.TemporalInterceptor()) + wi[i].Options.Interceptors = append(wi[i].Options.Interceptors, interceptor.WorkerInterceptor()) } wrk := worker.New(tc, wi[i].TaskQueue, wi[i].Options) diff --git a/common/interfaces.go b/common/interfaces.go index e218e0c8..2307d0a8 100644 --- a/common/interfaces.go +++ b/common/interfaces.go @@ -15,7 +15,7 @@ import ( ) type TemporalInterceptor interface { - TemporalInterceptor() interceptor.WorkerInterceptor + WorkerInterceptor() interceptor.WorkerInterceptor Name() string } From 41c53a33729e2b3ccbcb03ac088b037f2d0c21dd Mon Sep 17 00:00:00 2001 From: Kajetan Date: Mon, 6 Feb 2023 13:30:19 +0100 Subject: [PATCH 5/5] Rename interface --- aggregatedpool/workers.go | 2 +- common/interfaces.go | 2 +- plugin.go | 8 ++++---- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/aggregatedpool/workers.go b/aggregatedpool/workers.go index 8e8e9da2..4b8d20da 100644 --- a/aggregatedpool/workers.go +++ b/aggregatedpool/workers.go @@ -16,7 +16,7 @@ import ( const tq = "taskqueue" -func TemporalWorkers(wDef *Workflow, actDef *Activity, wi []*internal.WorkerInfo, log *zap.Logger, tc temporalClient.Client, interceptors map[string]common.TemporalInterceptor) ([]worker.Worker, error) { +func TemporalWorkers(wDef *Workflow, actDef *Activity, wi []*internal.WorkerInfo, log *zap.Logger, tc temporalClient.Client, interceptors map[string]common.Interceptor) ([]worker.Worker, error) { workers := make([]worker.Worker, 0, 1) for i := 0; i < len(wi); i++ { diff --git a/common/interfaces.go b/common/interfaces.go index 2307d0a8..5f54b98d 100644 --- a/common/interfaces.go +++ b/common/interfaces.go @@ -14,7 +14,7 @@ import ( "go.uber.org/zap" ) -type TemporalInterceptor interface { +type Interceptor interface { WorkerInterceptor() interceptor.WorkerInterceptor Name() string } diff --git a/plugin.go b/plugin.go index 36d88192..7425a15c 100644 --- a/plugin.go +++ b/plugin.go @@ -91,7 +91,7 @@ type Plugin struct { workers []worker.Worker - interceptors map[string]common.TemporalInterceptor + interceptors map[string]common.Interceptor } func (p *Plugin) Init(cfg common.Configurer, log Logger, server common.Server) error { @@ -227,7 +227,7 @@ func (p *Plugin) Init(cfg common.Configurer, log Logger, server common.Server) e } } - p.interceptors = make(map[string]common.TemporalInterceptor) + p.interceptors = make(map[string]common.Interceptor) return nil } @@ -546,11 +546,11 @@ func (p *Plugin) initPool() error { func (p *Plugin) Collects() []*dep.In { return []*dep.In{ dep.Fits(func(pp any) { - mdw := pp.(common.TemporalInterceptor) + mdw := pp.(common.Interceptor) // just to be safe p.mu.Lock() p.interceptors[mdw.Name()] = mdw p.mu.Unlock() - }, (*common.TemporalInterceptor)(nil)), + }, (*common.Interceptor)(nil)), } }