Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: metric plugin system based on hashicorp go-plugin #2514

Merged
merged 56 commits into from
Feb 7, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
56 commits
Select commit Hold shift + click to select a range
14fc188
clean up commits
zachaller Jan 29, 2023
fa20177
Merge branch 'master' of github.com:argoproj/argo-rollouts into feat-…
zachaller Jan 29, 2023
acc21f1
add test for error
zachaller Jan 30, 2023
6ceb973
remove test changes
zachaller Jan 30, 2023
4a66c5b
small refactor
zachaller Jan 30, 2023
00edfbb
refactor
zachaller Jan 30, 2023
42a5b03
refactor
zachaller Jan 30, 2023
9b7ca41
change type
zachaller Jan 30, 2023
14907af
allow multiple plugins
zachaller Jan 30, 2023
58b30b1
allow multiple plugins
zachaller Jan 30, 2023
a718b47
move client into own package
zachaller Jan 30, 2023
39488c8
lint
zachaller Jan 31, 2023
106aa7c
wip: configmap
zachaller Jan 31, 2023
5904c05
lint
zachaller Jan 31, 2023
eb38b75
downloading tests
zachaller Jan 31, 2023
cbbc238
add one more test
zachaller Jan 31, 2023
037f49c
better error msg's
zachaller Jan 31, 2023
9b93bce
cleanup
zachaller Jan 31, 2023
893e215
more errors and lint
zachaller Jan 31, 2023
6130621
add in configmap
zachaller Jan 31, 2023
af0f9fa
codegen
zachaller Jan 31, 2023
29ace7c
cleanup configmap
zachaller Jan 31, 2023
37295e9
fix file copy error
zachaller Jan 31, 2023
d6445d5
set file perms
zachaller Jan 31, 2023
de7d59d
switch from scratch to distroless
zachaller Feb 1, 2023
2fab44b
update docs
zachaller Feb 1, 2023
1a2cbff
lint
zachaller Feb 1, 2023
44f1057
add godocs
zachaller Feb 1, 2023
2a121b2
Merge branch 'master' of github.com:argoproj/argo-rollouts into feat-…
zachaller Feb 1, 2023
0d73d03
put code back where it was
zachaller Feb 1, 2023
41e151a
lint
zachaller Feb 1, 2023
0eac40a
remove embedded interface
zachaller Feb 1, 2023
7fea5b3
setup singleton config for when no configmap can be found
zachaller Feb 1, 2023
7d13003
add some extra logs
zachaller Feb 1, 2023
e77809a
some docs changes
zachaller Feb 1, 2023
520b68d
create /tmp dir for plugins
zachaller Feb 1, 2023
be708e1
review changes
zachaller Feb 2, 2023
924451c
depend on forwarded functions when we can
zachaller Feb 2, 2023
49eab75
refactor plugin download
zachaller Feb 2, 2023
be31179
make config thread safe
zachaller Feb 2, 2023
ec93e92
update comments
zachaller Feb 2, 2023
623ef07
lint
zachaller Feb 2, 2023
cb321d8
download plugins on non leading controllers as well
zachaller Feb 2, 2023
6e59272
fix tests
zachaller Feb 2, 2023
fc5957f
lint
zachaller Feb 2, 2023
fce8c6c
cleanup tests
zachaller Feb 2, 2023
5d62e95
add new test
zachaller Feb 2, 2023
825bfb2
lint
zachaller Feb 2, 2023
b9400db
fix test
zachaller Feb 2, 2023
1cff9a2
fix test
zachaller Feb 3, 2023
ccf4aba
more tests
zachaller Feb 3, 2023
d02efd3
more tests
zachaller Feb 3, 2023
4ecca70
add godoc
zachaller Feb 3, 2023
b4d5993
use distroless for plugins chmod cp did not work
zachaller Feb 7, 2023
3f94bcc
rename file
zachaller Feb 7, 2023
2cb9a35
Merge branch 'master' of github.com:argoproj/argo-rollouts into feat-…
zachaller Feb 7, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ lint: go-mod-vendor

.PHONY: test
test: test-kustomize
go test -covermode=count -coverprofile=coverage.out ${TEST_TARGET}
@make test-unit

.PHONY: test-kustomize
test-kustomize:
Expand All @@ -225,7 +225,7 @@ test-e2e: install-devtools-local

.PHONY: test-unit
test-unit: install-devtools-local
${DIST_DIR}/gotestsum --junitfile=junit.xml --format=testname --packages="./..." -- -covermode=count -coverprofile=coverage.out ./...
${DIST_DIR}/gotestsum --junitfile=junit.xml --format=testname -- -covermode=count -coverprofile=coverage.out `go list ./... | grep -v ./cmd/sample-metrics-plugin`


.PHONY: coverage
Expand Down Expand Up @@ -279,3 +279,10 @@ trivy:
.PHONY: checksums
checksums:
shasum -a 256 ./dist/kubectl-argo-rollouts-* | awk -F './dist/' '{print $$1 $$2}' > ./dist/argo-rollouts-checksums.txt

# Build sample plugin with debug info
# https://www.jetbrains.com/help/go/attach-to-running-go-processes-with-debugger.html
.PHONY: build-sample-metric-plugin-debug
build-sample-metric-plugin-debug:
go build -gcflags="all=-N -l" -o metric-plugin cmd/sample-metrics-plugin/main.go

2 changes: 1 addition & 1 deletion analysis/analysis.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ func (c *Controller) runMeasurements(run *v1alpha1.AnalysisRun, tasks []metricTa

provider, err := c.newProvider(*logger, t.metric)
if err != nil {
log.Errorf("Error in getting provider :%v", err)
log.Errorf("Error in getting metric provider :%v", err)
return err
}
if metricResult == nil {
Expand Down
15 changes: 13 additions & 2 deletions cmd/rollouts-controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ import (
"strings"
"time"

"github.com/argoproj/argo-rollouts/utils/plugin"

"github.com/argoproj/pkg/kubeclientmetrics"
smiclientset "github.com/servicemeshinterface/smi-sdk-go/pkg/gen/client/split/clientset/versioned"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
Expand All @@ -20,8 +23,6 @@ import (
_ "k8s.io/client-go/plugin/pkg/client/auth/oidc"
"k8s.io/client-go/tools/clientcmd"

"github.com/argoproj/pkg/kubeclientmetrics"

"github.com/argoproj/argo-rollouts/controller"
"github.com/argoproj/argo-rollouts/controller/metrics"
jobprovider "github.com/argoproj/argo-rollouts/metricproviders/job"
Expand Down Expand Up @@ -70,6 +71,8 @@ func newCommand() *cobra.Command {
awsVerifyTargetGroup bool
namespaced bool
printVersion bool
metricPluginLocation string
metricPluginSha256 string
)
electOpts := controller.NewLeaderElectionOptions()
var command = cobra.Command{
Expand Down Expand Up @@ -199,6 +202,12 @@ func newCommand() *cobra.Command {
controllerNamespaceInformerFactory,
jobInformerFactory)

defaults.SetMetricPluginLocation(metricPluginLocation)
err = plugin.InitMetricsPlugin(metricPluginLocation, plugin.FileDownloaderImpl{}, metricPluginSha256)
if err != nil {
log.Fatalf("Failed to init metric plugin: %v", err)
}

if err = cm.Run(ctx, rolloutThreads, serviceThreads, ingressThreads, experimentThreads, analysisThreads, electOpts); err != nil {
log.Fatalf("Error running controller: %s", err.Error())
}
Expand Down Expand Up @@ -240,6 +249,8 @@ func newCommand() *cobra.Command {
command.Flags().DurationVar(&electOpts.LeaderElectionLeaseDuration, "leader-election-lease-duration", controller.DefaultLeaderElectionLeaseDuration, "The duration that non-leader candidates will wait after observing a leadership renewal until attempting to acquire leadership of a led but unrenewed leader slot. This is effectively the maximum duration that a leader can be stopped before it is replaced by another candidate. This is only applicable if leader election is enabled.")
command.Flags().DurationVar(&electOpts.LeaderElectionRenewDeadline, "leader-election-renew-deadline", controller.DefaultLeaderElectionRenewDeadline, "The interval between attempts by the acting master to renew a leadership slot before it stops leading. This must be less than or equal to the lease duration. This is only applicable if leader election is enabled.")
command.Flags().DurationVar(&electOpts.LeaderElectionRetryPeriod, "leader-election-retry-period", controller.DefaultLeaderElectionRetryPeriod, "The duration the clients should wait between attempting acquisition and renewal of a leadership. This is only applicable if leader election is enabled.")
command.Flags().StringVar(&metricPluginLocation, "metric-plugin-location", defaults.DefaultMetricsPluginLocation, "The file path to the location of the metric plugin binary")
command.Flags().StringVar(&metricPluginSha256, "metric-plugin-sha256", "", "The expected sha256 of the metric plugin binary")
zachaller marked this conversation as resolved.
Show resolved Hide resolved
return &command
}

Expand Down
185 changes: 185 additions & 0 deletions cmd/sample-metrics-plugin/internal/plugin/plugin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
package plugin
zachaller marked this conversation as resolved.
Show resolved Hide resolved

import (
"context"
"encoding/json"
"errors"
"fmt"
"net/url"
"os"
"time"

"github.com/argoproj/argo-rollouts/utils/plugin/types"

"github.com/argoproj/argo-rollouts/metricproviders/plugin"
"github.com/argoproj/argo-rollouts/pkg/apis/rollouts/v1alpha1"
"github.com/argoproj/argo-rollouts/utils/evaluate"
metricutil "github.com/argoproj/argo-rollouts/utils/metric"
timeutil "github.com/argoproj/argo-rollouts/utils/time"
"github.com/prometheus/client_golang/api"
v1 "github.com/prometheus/client_golang/api/prometheus/v1"
"github.com/prometheus/common/model"
log "github.com/sirupsen/logrus"
)

const EnvVarArgoRolloutsPrometheusAddress string = "ARGO_ROLLOUTS_PROMETHEUS_ADDRESS"

// Here is a real implementation of MetricsPlugin
type RpcPlugin struct {
LogCtx log.Entry
api v1.API
}

type Config struct {
// Address is the HTTP address and port of the prometheus server
Address string `json:"address,omitempty" protobuf:"bytes,1,opt,name=address"`
// Query is a raw prometheus query to perform
Query string `json:"query,omitempty" protobuf:"bytes,2,opt,name=query"`
}

func (g *RpcPlugin) NewMetricsPlugin(metric v1alpha1.Metric) types.RpcError {
config := Config{}
err := json.Unmarshal(metric.Provider.Plugin.Config, &config)
if err != nil {
return types.RpcError{ErrorString: err.Error()}
}

api, err := newPrometheusAPI(config.Address)
g.api = api

return types.RpcError{}
}

func (g *RpcPlugin) Run(anaysisRun *v1alpha1.AnalysisRun, metric v1alpha1.Metric) v1alpha1.Measurement {
startTime := timeutil.MetaNow()
newMeasurement := v1alpha1.Measurement{
StartedAt: &startTime,
}

config := Config{}
json.Unmarshal(metric.Provider.Plugin.Config, &config)

ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

response, warnings, err := g.api.Query(ctx, config.Query, time.Now())
if err != nil {
return metricutil.MarkMeasurementError(newMeasurement, err)
}

newValue, newStatus, err := g.processResponse(metric, response)
if err != nil {
return metricutil.MarkMeasurementError(newMeasurement, err)

}
newMeasurement.Value = newValue
if len(warnings) > 0 {
warningMetadata := ""
for _, warning := range warnings {
warningMetadata = fmt.Sprintf(`%s"%s", `, warningMetadata, warning)
}
warningMetadata = warningMetadata[:len(warningMetadata)-2]
if warningMetadata != "" {
newMeasurement.Metadata = map[string]string{"warnings": warningMetadata}
g.LogCtx.Warnf("Prometheus returned the following warnings: %s", warningMetadata)
}
}

newMeasurement.Phase = newStatus
finishedTime := timeutil.MetaNow()
newMeasurement.FinishedAt = &finishedTime
return newMeasurement
}

func (g *RpcPlugin) Resume(analysisRun *v1alpha1.AnalysisRun, metric v1alpha1.Metric, measurement v1alpha1.Measurement) v1alpha1.Measurement {
return measurement
}

func (g *RpcPlugin) Terminate(analysisRun *v1alpha1.AnalysisRun, metric v1alpha1.Metric, measurement v1alpha1.Measurement) v1alpha1.Measurement {
return measurement
}

func (g *RpcPlugin) GarbageCollect(*v1alpha1.AnalysisRun, v1alpha1.Metric, int) types.RpcError {
return types.RpcError{}
}

func (g *RpcPlugin) Type() string {
return plugin.ProviderType
}

func (g *RpcPlugin) GetMetadata(metric v1alpha1.Metric) map[string]string {
metricsMetadata := make(map[string]string)

config := Config{}
json.Unmarshal(metric.Provider.Plugin.Config, &config)
if config.Query != "" {
metricsMetadata["ResolvedPrometheusQuery"] = config.Query
}
return metricsMetadata
}

func (g *RpcPlugin) processResponse(metric v1alpha1.Metric, response model.Value) (string, v1alpha1.AnalysisPhase, error) {
switch value := response.(type) {
case *model.Scalar:
valueStr := value.Value.String()
result := float64(value.Value)
newStatus, err := evaluate.EvaluateResult(result, metric, g.LogCtx)
return valueStr, newStatus, err
case model.Vector:
results := make([]float64, 0, len(value))
valueStr := "["
for _, s := range value {
if s != nil {
valueStr = valueStr + s.Value.String() + ","
results = append(results, float64(s.Value))
}
}
// if we appended to the string, we should remove the last comma on the string
if len(valueStr) > 1 {
valueStr = valueStr[:len(valueStr)-1]
}
valueStr = valueStr + "]"
newStatus, err := evaluate.EvaluateResult(results, metric, g.LogCtx)
return valueStr, newStatus, err
default:
return "", v1alpha1.AnalysisPhaseError, fmt.Errorf("Prometheus metric type not supported")
}
}

func newPrometheusAPI(address string) (v1.API, error) {
envValuesByKey := make(map[string]string)
if value, ok := os.LookupEnv(fmt.Sprintf("%s", EnvVarArgoRolloutsPrometheusAddress)); ok {
envValuesByKey[EnvVarArgoRolloutsPrometheusAddress] = value
log.Debugf("ARGO_ROLLOUTS_PROMETHEUS_ADDRESS: %v", envValuesByKey[EnvVarArgoRolloutsPrometheusAddress])
}
if len(address) != 0 {
if !isUrl(address) {
return nil, errors.New("prometheus address is not is url format")
}
} else if envValuesByKey[EnvVarArgoRolloutsPrometheusAddress] != "" {
if isUrl(envValuesByKey[EnvVarArgoRolloutsPrometheusAddress]) {
address = envValuesByKey[EnvVarArgoRolloutsPrometheusAddress]
} else {
return nil, errors.New("prometheus address is not is url format")
}
} else {
return nil, errors.New("prometheus address is not configured")
}
client, err := api.NewClient(api.Config{
Address: address,
})
if err != nil {
log.Errorf("Error in getting prometheus client: %v", err)
return nil, err
}
return v1.NewAPI(client), nil
}

func isUrl(str string) bool {
u, err := url.Parse(str)
if err != nil {
log.Errorf("Error in parsing url: %v", err)
}
log.Debugf("Parsed url: %v", u)
return err == nil && u.Scheme != "" && u.Host != ""
}
108 changes: 108 additions & 0 deletions cmd/sample-metrics-plugin/internal/plugin/plugin_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package plugin

import (
"context"
"encoding/json"
"testing"
"time"

"github.com/argoproj/argo-rollouts/metricproviders/plugin/rpc"

"github.com/argoproj/argo-rollouts/pkg/apis/rollouts/v1alpha1"
log "github.com/sirupsen/logrus"

goPlugin "github.com/hashicorp/go-plugin"
)

var testHandshake = goPlugin.HandshakeConfig{
ProtocolVersion: 1,
MagicCookieKey: "ARGO_ROLLOUTS_RPC_PLUGIN",
MagicCookieValue: "metrics",
}

// This is just an example of how to test a plugin.
func TestRunSuccessfully(t *testing.T) {
//Skip test because this is just an example of how to test a plugin.
t.Skip("Skipping test because it requires a running prometheus server")

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

logCtx := *log.WithFields(log.Fields{"plugin-test": "prometheus"})

rpcPluginImp := &RpcPlugin{
LogCtx: logCtx,
}

// pluginMap is the map of plugins we can dispense.
var pluginMap = map[string]goPlugin.Plugin{
"RpcMetricsPlugin": &rpc.RpcMetricsPlugin{Impl: rpcPluginImp},
}

ch := make(chan *goPlugin.ReattachConfig, 1)
closeCh := make(chan struct{})
go goPlugin.Serve(&goPlugin.ServeConfig{
HandshakeConfig: testHandshake,
Plugins: pluginMap,
Test: &goPlugin.ServeTestConfig{
Context: ctx,
ReattachConfigCh: ch,
CloseCh: closeCh,
},
})

// We should get a config
var config *goPlugin.ReattachConfig
select {
case config = <-ch:
case <-time.After(2000 * time.Millisecond):
t.Fatal("should've received reattach")
}
if config == nil {
t.Fatal("config should not be nil")
}

// Connect!
c := goPlugin.NewClient(&goPlugin.ClientConfig{
Cmd: nil,
HandshakeConfig: testHandshake,
Plugins: pluginMap,
Reattach: config,
})
client, err := c.Client()
if err != nil {
t.Fatalf("err: %s", err)
}

// Pinging should work
if err := client.Ping(); err != nil {
t.Fatalf("should not err: %s", err)
}

// Kill which should do nothing
c.Kill()
if err := client.Ping(); err != nil {
t.Fatalf("should not err: %s", err)
}

// Request the plugin
raw, err := client.Dispense("RpcMetricsPlugin")
if err != nil {
t.Fail()
}

plugin := raw.(rpc.MetricsPlugin)

err = plugin.NewMetricsPlugin(v1alpha1.Metric{
Provider: v1alpha1.MetricProvider{
Plugin: &v1alpha1.PluginMetric{Config: json.RawMessage(`{"address":"http://prometheus.local", "query":"machine_cpu_cores"}`)},
},
})
if err != nil {
t.Fail()
}

// Canceling should cause an exit
cancel()
<-closeCh
}
Loading