Skip to content

Commit

Permalink
feat: add support for traffic router plugins
Browse files Browse the repository at this point in the history
Signed-off-by: zachaller <[email protected]>
  • Loading branch information
zachaller committed Feb 9, 2023
1 parent 5c9bc03 commit a8a254a
Show file tree
Hide file tree
Showing 25 changed files with 1,981 additions and 526 deletions.
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -286,3 +286,7 @@ checksums:
build-sample-metric-plugin-debug:
go build -gcflags="all=-N -l" -o metric-plugin test/cmd/sample-metrics-plugin/main.go

.PHONY: build-sample-traffic-plugin-debug
build-sample-traffic-plugin-debug:
go build -gcflags="all=-N -l" -o traffic-plugin test/cmd/sample-trafficrouter-plugin/main.go

3 changes: 3 additions & 0 deletions manifests/crds/rollout-crd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -862,6 +862,9 @@ spec:
required:
- stableIngress
type: object
plugin:
type: object
x-kubernetes-preserve-unknown-fields: true
smi:
properties:
rootService:
Expand Down
3 changes: 3 additions & 0 deletions manifests/install.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11881,6 +11881,9 @@ spec:
required:
- stableIngress
type: object
plugin:
type: object
x-kubernetes-preserve-unknown-fields: true
smi:
properties:
rootService:
Expand Down
3 changes: 1 addition & 2 deletions metricproviders/plugin/rpc/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ func init() {
gob.RegisterName("TerminateAndResumeArgs", new(TerminateAndResumeArgs))
gob.RegisterName("GarbageCollectArgs", new(GarbageCollectArgs))
gob.RegisterName("InitMetricsPluginAndGetMetadataArgs", new(InitMetricsPluginAndGetMetadataArgs))
gob.RegisterName("RpcError", new(types.RpcError))
}

// MetricsPlugin is the interface that we're exposing as a plugin. It needs to match metricproviders.Providers but we can
Expand All @@ -52,7 +51,7 @@ type MetricsPlugin interface {
// MetricsPluginRPC Here is an implementation that talks over RPC
type MetricsPluginRPC struct{ client *rpc.Client }

// NewMetricsPlugin is the client side function that is wrapped by a local provider this makes an rpc call to the
// NewMetricsPlugin is the client side function that is wrapped by a local provider this makes a rpc call to the
// server side function.
func (g *MetricsPluginRPC) NewMetricsPlugin(metric v1alpha1.Metric) types.RpcError {
var resp types.RpcError
Expand Down
8 changes: 8 additions & 0 deletions pkg/apiclient/rollout/rollout.swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -1620,6 +1620,14 @@
"apisix": {
"$ref": "#/definitions/github.com.argoproj.argo_rollouts.pkg.apis.rollouts.v1alpha1.ApisixTrafficRouting",
"title": "Apisix holds specific configuration to use Apisix to route traffic"
},
"plugin": {
"type": "object",
"additionalProperties": {
"type": "string",
"format": "byte"
},
"title": "+kubebuilder:validation:Schemaless\n+kubebuilder:pruning:PreserveUnknownFields\n+kubebuilder:validation:Type=object\nPlugin holds specific configuration to use Apisix to route traffic"
}
},
"title": "RolloutTrafficRouting hosts all the different configuration for supported service meshes to enable more fine-grained traffic routing"
Expand Down
1,160 changes: 669 additions & 491 deletions pkg/apis/rollouts/v1alpha1/generated.pb.go

Large diffs are not rendered by default.

6 changes: 6 additions & 0 deletions pkg/apis/rollouts/v1alpha1/generated.proto

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions pkg/apis/rollouts/v1alpha1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,11 @@ type RolloutTrafficRouting struct {
ManagedRoutes []MangedRoutes `json:"managedRoutes,omitempty" protobuf:"bytes,8,rep,name=managedRoutes"`
// Apisix holds specific configuration to use Apisix to route traffic
Apisix *ApisixTrafficRouting `json:"apisix,omitempty" protobuf:"bytes,9,opt,name=apisix"`
// +kubebuilder:validation:Schemaless
// +kubebuilder:pruning:PreserveUnknownFields
// +kubebuilder:validation:Type=object
// Plugin holds specific configuration to use Apisix to route traffic
Plugin map[string]json.RawMessage `json:"plugin,omitempty" protobuf:"bytes,10,opt,name=plugin"`
}

type MangedRoutes struct {
Expand Down
15 changes: 15 additions & 0 deletions pkg/apis/rollouts/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 17 additions & 0 deletions rollout/trafficrouting.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"strconv"
"strings"

"github.com/argoproj/argo-rollouts/rollout/trafficrouting/plugin"

"github.com/argoproj/argo-rollouts/pkg/apis/rollouts/v1alpha1"
"github.com/argoproj/argo-rollouts/rollout/trafficrouting"
"github.com/argoproj/argo-rollouts/rollout/trafficrouting/alb"
Expand Down Expand Up @@ -105,6 +107,21 @@ func (c *Controller) NewTrafficRoutingReconciler(roCtx *rolloutContext) ([]traff
}))
}

if rollout.Spec.Strategy.Canary.TrafficRouting.Plugin != nil {
for pluginName := range rollout.Spec.Strategy.Canary.TrafficRouting.Plugin {
pluginReconciler, err := plugin.NewReconciler(&plugin.ReconcilerConfig{
Rollout: rollout,
Client: c.kubeclientset,
Recorder: c.recorder,
PluginName: pluginName,
})
if err != nil {
return trafficReconcilers, err
}
trafficReconcilers = append(trafficReconcilers, pluginReconciler)
}
}

// ensure that the trafficReconcilers is a healthy list and its not empty
if len(trafficReconcilers) > 0 {
return trafficReconcilers, nil
Expand Down
89 changes: 89 additions & 0 deletions rollout/trafficrouting/plugin/client/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package client

import (
"fmt"
"os/exec"
"sync"

"github.com/argoproj/argo-rollouts/rollout/trafficrouting/plugin/rpc"
"github.com/argoproj/argo-rollouts/utils/plugin"
goPlugin "github.com/hashicorp/go-plugin"
)

type trafficPlugin struct {
pluginClient map[string]*goPlugin.Client
plugin map[string]rpc.TrafficRouterPlugin
}

var pluginClients *trafficPlugin
var once sync.Once

func GetTrafficPlugin(pluginName string) (rpc.TrafficRouterPlugin, error) {
once.Do(func() {
pluginClients = &trafficPlugin{
pluginClient: make(map[string]*goPlugin.Client),
plugin: make(map[string]rpc.TrafficRouterPlugin),
}
})
plugin, err := pluginClients.startPlugin(pluginName)
if err != nil {
return nil, fmt.Errorf("unable to start plugin system: %w", err)
}
return plugin, nil
}

func (t *trafficPlugin) startPlugin(pluginName string) (rpc.TrafficRouterPlugin, error) {
var handshakeConfig = goPlugin.HandshakeConfig{
ProtocolVersion: 1,
MagicCookieKey: "ARGO_ROLLOUTS_RPC_PLUGIN",
MagicCookieValue: "trafficrouter",
}

// pluginMap is the map of plugins we can dispense.
var pluginMap = map[string]goPlugin.Plugin{
"RpcTrafficRouterPlugin": &rpc.RpcTrafficRouterPlugin{},
}

if t.pluginClient[pluginName] == nil || t.pluginClient[pluginName].Exited() {
pluginPath, err := plugin.GetPluginLocation(pluginName)
if err != nil {
return nil, fmt.Errorf("unable to find plugin (%s): %w", pluginName, err)
}

t.pluginClient[pluginName] = goPlugin.NewClient(&goPlugin.ClientConfig{
HandshakeConfig: handshakeConfig,
Plugins: pluginMap,
Cmd: exec.Command(pluginPath),
Managed: true,
})

rpcClient, err := t.pluginClient[pluginName].Client()
if err != nil {
return nil, err
}

// Request the plugin
plugin, err := rpcClient.Dispense("RpcTrafficRouterPlugin")
if err != nil {
return nil, err
}
t.plugin[pluginName] = plugin.(rpc.TrafficRouterPlugin)

err = t.plugin[pluginName].NewTrafficRouterPlugin()
if err.Error() != "" {
return nil, err
}
}

client, err := t.pluginClient[pluginName].Client()
if err != nil {
return nil, err
}
if err := client.Ping(); err != nil {
t.pluginClient[pluginName].Kill()
t.pluginClient[pluginName] = nil
return nil, fmt.Errorf("could not ping plugin will cleanup process so we can restart it next reconcile (%w)", err)
}

return t.plugin[pluginName], nil
}
102 changes: 102 additions & 0 deletions rollout/trafficrouting/plugin/plugin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package plugin

import (
"github.com/argoproj/argo-rollouts/pkg/apis/rollouts/v1alpha1"
"github.com/argoproj/argo-rollouts/rollout/trafficrouting/plugin/client"
"github.com/argoproj/argo-rollouts/rollout/trafficrouting/plugin/rpc"
"github.com/argoproj/argo-rollouts/utils/record"
"k8s.io/client-go/kubernetes"
)

const ErrNotImplemented = "not-implemented"

type ReconcilerConfig struct {
Rollout *v1alpha1.Rollout
PluginName string
Client kubernetes.Interface
Recorder record.EventRecorder
}

type Reconciler struct {
Rollout *v1alpha1.Rollout
PluginName string
Client kubernetes.Interface
Recorder record.EventRecorder
rpc.TrafficRouterPlugin
}

func NewReconciler(cfg *ReconcilerConfig) (*Reconciler, error) {
pluginClient, err := client.GetTrafficPlugin(cfg.PluginName)
if err != nil {
return nil, err
}

reconciler := &Reconciler{
Rollout: cfg.Rollout,
Client: cfg.Client,
Recorder: cfg.Recorder,
PluginName: cfg.PluginName,
TrafficRouterPlugin: pluginClient,
}
return reconciler, nil
}

// UpdateHash informs a traffic routing reconciler about new canary, stable, and additionalDestination(s) pod hashes
func (r *Reconciler) UpdateHash(canaryHash, stableHash string, additionalDestinations ...v1alpha1.WeightDestination) error {
err := r.TrafficRouterPlugin.UpdateHash(r.Rollout, canaryHash, stableHash, additionalDestinations)
if err.Error() != "" {
return err
}
return nil
}

// SetWeight sets the canary weight to the desired weight
func (r *Reconciler) SetWeight(desiredWeight int32, additionalDestinations ...v1alpha1.WeightDestination) error {
err := r.TrafficRouterPlugin.SetWeight(r.Rollout, desiredWeight, additionalDestinations)
if err.Error() != "" {
return err
}
return nil
}

// SetHeaderRoute sets the header routing step
func (r *Reconciler) SetHeaderRoute(headerRouting *v1alpha1.SetHeaderRoute) error {
err := r.TrafficRouterPlugin.SetHeaderRoute(r.Rollout, headerRouting)
if err.Error() != "" {
return err
}
return nil
}

// VerifyWeight returns true if the canary is at the desired weight and additionalDestinations are at the weights specified
// Returns nil if weight verification is not supported or not applicable
func (r *Reconciler) VerifyWeight(desiredWeight int32, additionalDestinations ...v1alpha1.WeightDestination) (*bool, error) {
verified, err := r.TrafficRouterPlugin.VerifyWeight(r.Rollout, desiredWeight, additionalDestinations)
if err.Error() != "" {
// We do this to keep sematics with local implementations, rpc calls can not send a nil back in a *bool so they
// send a *true with an error of ErrNotImplemented then we can wrap the response.
if err.Error() == ErrNotImplemented {
return nil, nil
}
return nil, err
}
return verified, nil
}

// SetMirrorRoute sets up the traffic router to mirror traffic to a service
func (r *Reconciler) SetMirrorRoute(setMirrorRoute *v1alpha1.SetMirrorRoute) error {
err := r.TrafficRouterPlugin.SetMirrorRoute(r.Rollout, setMirrorRoute)
if err.Error() != "" {
return err
}
return nil
}

// RemoveManagedRoutes Removes all routes that are managed by rollouts by looking at spec.strategy.canary.trafficRouting.managedRoutes
func (r *Reconciler) RemoveManagedRoutes() error {
err := r.TrafficRouterPlugin.RemoveManagedRoutes(r.Rollout)
if err.Error() != "" {
return err
}
return nil
}
Loading

0 comments on commit a8a254a

Please sign in to comment.