diff --git a/api/observability/v1/mads_helpers.go b/api/observability/v1/mads_helpers.go new file mode 100644 index 000000000000..92fa9711dc8f --- /dev/null +++ b/api/observability/v1/mads_helpers.go @@ -0,0 +1,15 @@ +package v1 + +import "fmt" + +// GetName returns a name including the mesh which is used for ResourceName in MADS snapshot cache +func (x *MonitoringAssignment) GetName() string { + if x != nil { + dpName := "" + if len(x.Targets) > 0 { + dpName = x.Targets[0].GetName() + } + return fmt.Sprintf("/meshes/%s/dataplanes/%s", x.GetMesh(), dpName) + } + return "" +} diff --git a/pkg/hds/cache/cache_suite_test.go b/pkg/hds/cache/cache_suite_test.go deleted file mode 100644 index 882f39936486..000000000000 --- a/pkg/hds/cache/cache_suite_test.go +++ /dev/null @@ -1,11 +0,0 @@ -package cache_test - -import ( - "testing" - - "github.com/kumahq/kuma/pkg/test" -) - -func TestHDSCache(t *testing.T) { - test.RunSpecs(t, "HDS Cache Suite") -} diff --git a/pkg/hds/cache/snapshot.go b/pkg/hds/cache/snapshot.go deleted file mode 100644 index dc24d1164358..000000000000 --- a/pkg/hds/cache/snapshot.go +++ /dev/null @@ -1,71 +0,0 @@ -package cache - -import ( - envoy_service_health_v3 "github.com/envoyproxy/go-control-plane/envoy/service/health/v3" - envoy_types "github.com/envoyproxy/go-control-plane/pkg/cache/types" - "github.com/envoyproxy/go-control-plane/pkg/cache/v3" - "github.com/pkg/errors" - - util_xds_v3 "github.com/kumahq/kuma/pkg/util/xds/v3" -) - -const HealthCheckSpecifierType = "envoy.service.health.v3.HealthCheckSpecifier" - -func NewSnapshot(version string, hcs *envoy_service_health_v3.HealthCheckSpecifier) util_xds_v3.Snapshot { - return &Snapshot{ - HealthChecks: cache.Resources{ - Version: version, - Items: map[string]envoy_types.ResourceWithTTL{ - "hcs": {Resource: hcs}, - }, - }, - } -} - -// Snapshot is an internally consistent snapshot of HDS resources. -type Snapshot struct { - HealthChecks cache.Resources -} - -func (s *Snapshot) GetSupportedTypes() []string { - return []string{HealthCheckSpecifierType} -} - -func (s *Snapshot) Consistent() error { - if s == nil { - return errors.New("nil Snapshot") - } - return nil -} - -func (s *Snapshot) GetResources(typ string) map[string]envoy_types.Resource { - if s == nil || typ != HealthCheckSpecifierType { - return nil - } - withoutTtl := make(map[string]envoy_types.Resource, len(s.HealthChecks.Items)) - for name, res := range s.HealthChecks.Items { - withoutTtl[name] = res.Resource - } - return withoutTtl -} - -func (s *Snapshot) GetVersion(typ string) string { - if s == nil || typ != HealthCheckSpecifierType { - return "" - } - return s.HealthChecks.Version -} - -func (s *Snapshot) WithVersion(typ string, version string) util_xds_v3.Snapshot { - if s == nil { - return nil - } - if s.GetVersion(typ) == version || typ != HealthCheckSpecifierType { - return s - } - n := cache.Resources{ - Version: version, - Items: s.HealthChecks.Items, - } - return &Snapshot{HealthChecks: n} -} diff --git a/pkg/hds/cache/snapshot_test.go b/pkg/hds/cache/snapshot_test.go deleted file mode 100644 index 89d76506a53f..000000000000 --- a/pkg/hds/cache/snapshot_test.go +++ /dev/null @@ -1,197 +0,0 @@ -package cache_test - -import ( - "time" - - envoy_service_health_v3 "github.com/envoyproxy/go-control-plane/envoy/service/health/v3" - envoy_types "github.com/envoyproxy/go-control-plane/pkg/cache/types" - . "github.com/onsi/ginkgo/v2" - . "github.com/onsi/gomega" - - "github.com/kumahq/kuma/pkg/hds/cache" - util_proto "github.com/kumahq/kuma/pkg/util/proto" -) - -var _ = Describe("Snapshot", func() { - expectedType := "envoy.service.health.v3.HealthCheckSpecifier" - - Describe("GetSupportedTypes()", func() { - It("should always return ['envoy.service.health.v3.HealthCheckSpecifier']", func() { - // when - var snapshot *cache.Snapshot - // then - Expect(snapshot.GetSupportedTypes()).To(Equal([]string{expectedType})) - - // when - snapshot = &cache.Snapshot{} - // then - Expect(snapshot.GetSupportedTypes()).To(Equal([]string{expectedType})) - }) - }) - - Describe("Consistent()", func() { - It("should handle `nil`", func() { - // when - var snapshot *cache.Snapshot - // then - Expect(snapshot.Consistent()).To(MatchError("nil Snapshot")) - }) - - It("non-`nil` Snapshot should be always consistent", func() { - // when - snapshot := cache.NewSnapshot("v1", nil) - // then - Expect(snapshot.Consistent()).To(Succeed()) - - // when - snapshot = cache.NewSnapshot("v2", &envoy_service_health_v3.HealthCheckSpecifier{}) - // then - Expect(snapshot.Consistent()).To(Succeed()) - }) - }) - - Describe("GetResources()", func() { - It("should handle `nil`", func() { - // when - var snapshot *cache.Snapshot - // then - Expect(snapshot.GetResources(expectedType)).To(BeNil()) - }) - - It("should return HealthCheckSpecifier", func() { - // given - hcs := &envoy_service_health_v3.HealthCheckSpecifier{ - Interval: util_proto.Duration(12 * time.Second), - ClusterHealthChecks: []*envoy_service_health_v3.ClusterHealthCheck{ - {ClusterName: "localhost:80"}, - {ClusterName: "localhost:9080"}, - }, - } - // when - snapshot := cache.NewSnapshot("v1", hcs) - // then - Expect(snapshot.GetResources(expectedType)).To(Equal(map[string]envoy_types.Resource{ - "hcs": hcs, - })) - }) - - It("should return `nil` for unsupported resource types", func() { - // given - hcs := &envoy_service_health_v3.HealthCheckSpecifier{ - Interval: util_proto.Duration(12 * time.Second), - ClusterHealthChecks: []*envoy_service_health_v3.ClusterHealthCheck{ - {ClusterName: "localhost:80"}, - {ClusterName: "localhost:9080"}, - }, - } - // when - snapshot := cache.NewSnapshot("v1", hcs) - // then - Expect(snapshot.GetResources("unsupported type")).To(BeNil()) - }) - }) - - Describe("GetVersion()", func() { - It("should handle `nil`", func() { - // when - var snapshot *cache.Snapshot - // then - Expect(snapshot.GetVersion(expectedType)).To(Equal("")) - }) - - It("should return proper version for a supported resource type", func() { - // given - hcs := &envoy_service_health_v3.HealthCheckSpecifier{ - Interval: util_proto.Duration(12 * time.Second), - ClusterHealthChecks: []*envoy_service_health_v3.ClusterHealthCheck{ - {ClusterName: "localhost:80"}, - {ClusterName: "localhost:9080"}, - }, - } - // when - snapshot := cache.NewSnapshot("v1", hcs) - // then - Expect(snapshot.GetVersion(expectedType)).To(Equal("v1")) - }) - - It("should return an empty string for unsupported resource type", func() { - // given - hcs := &envoy_service_health_v3.HealthCheckSpecifier{ - Interval: util_proto.Duration(12 * time.Second), - ClusterHealthChecks: []*envoy_service_health_v3.ClusterHealthCheck{ - {ClusterName: "localhost:80"}, - {ClusterName: "localhost:9080"}, - }, - } - // when - snapshot := cache.NewSnapshot("v1", hcs) - // then - Expect(snapshot.GetVersion("unsupported type")).To(Equal("")) - }) - }) - - Describe("WithVersion()", func() { - It("should handle `nil`", func() { - // given - var snapshot *cache.Snapshot - // when - actual := snapshot.WithVersion(expectedType, "v1") - // then - Expect(actual).To(BeNil()) - }) - - It("should return a new Snapshot if version has changed", func() { - // given - hcs := &envoy_service_health_v3.HealthCheckSpecifier{ - Interval: util_proto.Duration(12 * time.Second), - ClusterHealthChecks: []*envoy_service_health_v3.ClusterHealthCheck{ - {ClusterName: "localhost:80"}, - {ClusterName: "localhost:9080"}, - }, - } - snapshot := cache.NewSnapshot("v1", hcs) - // when - actual := snapshot.WithVersion(expectedType, "v2") - // then - Expect(actual.GetVersion(expectedType)).To(Equal("v2")) - // and - Expect(actual).To(Equal(cache.NewSnapshot("v2", hcs))) - }) - - It("should return the same Snapshot if version has not changed", func() { - // given - hcs := &envoy_service_health_v3.HealthCheckSpecifier{ - Interval: util_proto.Duration(12 * time.Second), - ClusterHealthChecks: []*envoy_service_health_v3.ClusterHealthCheck{ - {ClusterName: "localhost:80"}, - {ClusterName: "localhost:9080"}, - }, - } - snapshot := cache.NewSnapshot("v1", hcs) - // when - actual := snapshot.WithVersion(expectedType, "v1") - // then - Expect(actual.GetVersion(expectedType)).To(Equal("v1")) - // and - Expect(actual).To(BeIdenticalTo(snapshot)) - }) - - It("should return the same Snapshot if resource type is not supported", func() { - // given - hcs := &envoy_service_health_v3.HealthCheckSpecifier{ - Interval: util_proto.Duration(12 * time.Second), - ClusterHealthChecks: []*envoy_service_health_v3.ClusterHealthCheck{ - {ClusterName: "localhost:80"}, - {ClusterName: "localhost:9080"}, - }, - } - snapshot := cache.NewSnapshot("v1", hcs) - // when - actual := snapshot.WithVersion("unsupported type", "v2") - // then - Expect(actual.GetVersion(expectedType)).To(Equal("v1")) - // and - Expect(actual).To(BeIdenticalTo(snapshot)) - }) - }) -}) diff --git a/pkg/hds/components.go b/pkg/hds/components.go index e7190caf6de9..c6ae052dc52d 100644 --- a/pkg/hds/components.go +++ b/pkg/hds/components.go @@ -6,6 +6,7 @@ import ( envoy_core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" envoy_service_health "github.com/envoyproxy/go-control-plane/envoy/service/health/v3" + envoy_cache "github.com/envoyproxy/go-control-plane/pkg/cache/v3" config_core "github.com/kumahq/kuma/pkg/config/core" "github.com/kumahq/kuma/pkg/core" @@ -17,7 +18,6 @@ import ( hds_server "github.com/kumahq/kuma/pkg/hds/server" "github.com/kumahq/kuma/pkg/hds/tracker" util_xds "github.com/kumahq/kuma/pkg/util/xds" - util_xds_v3 "github.com/kumahq/kuma/pkg/util/xds/v3" ) var hdsServerLog = core.Log.WithName("hds-server") @@ -30,7 +30,7 @@ func Setup(rt core_runtime.Runtime) error { return nil } - snapshotCache := util_xds_v3.NewSnapshotCache(false, hasher{}, util_xds.NewLogger(hdsServerLog)) + snapshotCache := envoy_cache.NewSnapshotCache(false, hasher{}, util_xds.NewLogger(hdsServerLog)) callbacks, err := DefaultCallbacks(rt, snapshotCache) if err != nil { @@ -44,7 +44,7 @@ func Setup(rt core_runtime.Runtime) error { return nil } -func DefaultCallbacks(rt core_runtime.Runtime, cache util_xds_v3.SnapshotCache) (hds_callbacks.Callbacks, error) { +func DefaultCallbacks(rt core_runtime.Runtime, cache envoy_cache.SnapshotCache) (hds_callbacks.Callbacks, error) { metrics, err := hds_metrics.NewMetrics(rt.Metrics()) if err != nil { return nil, err diff --git a/pkg/hds/server/server.go b/pkg/hds/server/server.go index b072821b2683..2d482292b4f0 100644 --- a/pkg/hds/server/server.go +++ b/pkg/hds/server/server.go @@ -14,8 +14,8 @@ import ( "google.golang.org/grpc/status" mesh_proto "github.com/kumahq/kuma/api/mesh/v1alpha1" - hds_cache "github.com/kumahq/kuma/pkg/hds/cache" hds_callbacks "github.com/kumahq/kuma/pkg/hds/callbacks" + v3 "github.com/kumahq/kuma/pkg/hds/v3" util_proto "github.com/kumahq/kuma/pkg/util/proto" ) @@ -133,10 +133,9 @@ func (s *server) process(stream Stream, reqOrRespCh chan *envoy_service_health.H watchCancellation() } watchCancellation = s.cache.CreateWatch(&envoy_cache.Request{ - Node: node, - TypeUrl: hds_cache.HealthCheckSpecifierType, - ResourceNames: []string{"hcs"}, - VersionInfo: lastVersion, + Node: node, + TypeUrl: v3.HealthCheckSpecifierType, + VersionInfo: lastVersion, }, envoy_stream.NewStreamState(false, nil), responseChan) case reqOrResp, more := <-reqOrRespCh: if !more { @@ -168,10 +167,9 @@ func (s *server) process(stream Stream, reqOrRespCh chan *envoy_service_health.H watchCancellation() } watchCancellation = s.cache.CreateWatch(&envoy_cache.Request{ - Node: node, - TypeUrl: hds_cache.HealthCheckSpecifierType, - ResourceNames: []string{"hcs"}, - VersionInfo: lastVersion, + Node: node, + TypeUrl: v3.HealthCheckSpecifierType, + VersionInfo: lastVersion, }, envoy_stream.NewStreamState(false, nil), responseChan) } } diff --git a/pkg/hds/tracker/callbacks.go b/pkg/hds/tracker/callbacks.go index 7e2c47de51d6..a67aa80ef089 100644 --- a/pkg/hds/tracker/callbacks.go +++ b/pkg/hds/tracker/callbacks.go @@ -7,6 +7,7 @@ import ( envoy_core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" envoy_service_health "github.com/envoyproxy/go-control-plane/envoy/service/health/v3" + envoy_cache "github.com/envoyproxy/go-control-plane/pkg/cache/v3" "github.com/go-logr/logr" "github.com/pkg/errors" @@ -47,9 +48,9 @@ func NewCallbacks( log logr.Logger, resourceManager manager.ResourceManager, readOnlyResourceManager manager.ReadOnlyResourceManager, - cache util_xds_v3.SnapshotCache, + cache envoy_cache.SnapshotCache, config *dp_server.HdsConfig, - hasher util_xds_v3.NodeHash, + hasher envoy_cache.NodeHash, metrics *hds_metrics.Metrics, defaultAdminPort uint32, ) hds_callbacks.Callbacks { @@ -143,7 +144,7 @@ func (t *tracker) newWatchdog(node *envoy_core.Node) util_xds_v3.Watchdog { t.log.Error(err, "OnTick() failed") }, OnStop: func() { - if err := t.reconciler.Clear(node); err != nil { + if err := t.reconciler.Clear(context.Background(), node); err != nil { t.log.Error(err, "OnTick() failed") } }, diff --git a/pkg/hds/tracker/healthcheck_generator.go b/pkg/hds/tracker/healthcheck_generator.go index 6d44baa309f0..3a525da09920 100644 --- a/pkg/hds/tracker/healthcheck_generator.go +++ b/pkg/hds/tracker/healthcheck_generator.go @@ -6,6 +6,8 @@ import ( envoy_core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" envoy_endpoint "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3" envoy_service_health "github.com/envoyproxy/go-control-plane/envoy/service/health/v3" + "github.com/envoyproxy/go-control-plane/pkg/cache/types" + envoy_cache "github.com/envoyproxy/go-control-plane/pkg/cache/v3" "google.golang.org/protobuf/types/known/durationpb" "google.golang.org/protobuf/types/known/wrapperspb" @@ -16,7 +18,7 @@ import ( "github.com/kumahq/kuma/pkg/core/resources/store" "github.com/kumahq/kuma/pkg/core/user" "github.com/kumahq/kuma/pkg/core/xds" - "github.com/kumahq/kuma/pkg/hds/cache" + v3 "github.com/kumahq/kuma/pkg/hds/v3" "github.com/kumahq/kuma/pkg/util/net" util_proto "github.com/kumahq/kuma/pkg/util/proto" util_xds_v3 "github.com/kumahq/kuma/pkg/util/xds/v3" @@ -42,7 +44,7 @@ func NewSnapshotGenerator( } } -func (g *SnapshotGenerator) GenerateSnapshot(ctx context.Context, node *envoy_core.Node) (util_xds_v3.Snapshot, error) { +func (g *SnapshotGenerator) GenerateSnapshot(ctx context.Context, node *envoy_core.Node) (envoy_cache.ResourceSnapshot, error) { ctx = user.Ctx(ctx, user.ControlPlane) proxyId, err := xds.ParseProxyIdFromString(node.Id) if err != nil { @@ -138,7 +140,7 @@ func (g *SnapshotGenerator) GenerateSnapshot(ctx context.Context, node *envoy_co Interval: util_proto.Duration(g.config.Interval.Duration), } - return cache.NewSnapshot(core.NewUUID(), hcs), nil + return util_xds_v3.NewSingleTypeSnapshot(core.NewUUID(), v3.HealthCheckSpecifierType, []types.Resource{hcs}), nil } // envoyHealthCheck builds a HC for Envoy itself so when Envoy is in draining state HDS can report that DP is offline diff --git a/pkg/hds/tracker/healthcheck_generator_test.go b/pkg/hds/tracker/healthcheck_generator_test.go index fcc4c27e4567..d2afde5119ed 100644 --- a/pkg/hds/tracker/healthcheck_generator_test.go +++ b/pkg/hds/tracker/healthcheck_generator_test.go @@ -5,6 +5,7 @@ import ( "time" envoy_config_core_v3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" + envoy_cache "github.com/envoyproxy/go-control-plane/pkg/cache/v3" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" @@ -14,7 +15,7 @@ import ( "github.com/kumahq/kuma/pkg/core/resources/manager" "github.com/kumahq/kuma/pkg/core/resources/model" "github.com/kumahq/kuma/pkg/core/resources/store" - "github.com/kumahq/kuma/pkg/hds/cache" + v3 "github.com/kumahq/kuma/pkg/hds/v3" "github.com/kumahq/kuma/pkg/plugins/resources/memory" "github.com/kumahq/kuma/pkg/test/matchers" util_proto "github.com/kumahq/kuma/pkg/util/proto" @@ -47,13 +48,11 @@ var _ = Describe("HDS Snapshot generator", func() { generator := NewSnapshotGenerator(resourceManager, given.hdsConfig, 9901) // when - snapshot, err := generator.GenerateSnapshot(context.Background(), &envoy_config_core_v3.Node{Id: "mesh-1.dp-1"}) - - // then - Expect(err).ToNot(HaveOccurred()) - actual, err := util_proto.ToYAML(snapshot.GetResources(cache.HealthCheckSpecifierType)["hcs"]) - Expect(err).ToNot(HaveOccurred()) - Expect(actual).To(matchers.MatchGoldenYAML("testdata", given.goldenFile)) + Expect(generator.GenerateSnapshot(context.Background(), &envoy_config_core_v3.Node{Id: "mesh-1.dp-1"})).Should( + WithTransform(func(snapshot envoy_cache.ResourceSnapshot) ([]byte, error) { + return util_proto.ToYAML(snapshot.GetResources(v3.HealthCheckSpecifierType)[""]) + }, matchers.MatchGoldenYAML("testdata", given.goldenFile)), + ) }, Entry("should generate HealthCheckSpecifier", testCase{ goldenFile: "hds.1.golden.yaml", diff --git a/pkg/hds/tracker/reconciler.go b/pkg/hds/tracker/reconciler.go index 82f2c7abfd3a..062aec7460d6 100644 --- a/pkg/hds/tracker/reconciler.go +++ b/pkg/hds/tracker/reconciler.go @@ -4,14 +4,15 @@ import ( "context" envoy_config_core_v3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" + envoy_cache "github.com/envoyproxy/go-control-plane/pkg/cache/v3" - "github.com/kumahq/kuma/pkg/hds/cache" + v3 "github.com/kumahq/kuma/pkg/hds/v3" util_xds_v3 "github.com/kumahq/kuma/pkg/util/xds/v3" ) type reconciler struct { - hasher util_xds_v3.NodeHash - cache util_xds_v3.SnapshotCache + hasher envoy_cache.NodeHash + cache envoy_cache.SnapshotCache generator *SnapshotGenerator } @@ -20,11 +21,8 @@ func (r *reconciler) Reconcile(ctx context.Context, node *envoy_config_core_v3.N if err != nil { return err } - if err := newSnapshot.Consistent(); err != nil { - return err - } id := r.hasher.ID(node) - var snap util_xds_v3.Snapshot + var snap envoy_cache.ResourceSnapshot oldSnapshot, _ := r.cache.GetSnapshot(id) switch { case oldSnapshot == nil: @@ -34,12 +32,12 @@ func (r *reconciler) Reconcile(ctx context.Context, node *envoy_config_core_v3.N default: snap = oldSnapshot } - return r.cache.SetSnapshot(id, snap) + return r.cache.SetSnapshot(ctx, id, snap) } -func (r *reconciler) Clear(node *envoy_config_core_v3.Node) error { +func (r *reconciler) Clear(ctx context.Context, node *envoy_config_core_v3.Node) error { // cache.Clear() operation does not push a new (empty) configuration to Envoy. // That is why instead of calling cache.Clear() we set configuration to an empty Snapshot. // This fake value will be removed from cache on Envoy disconnect. - return r.cache.SetSnapshot(r.hasher.ID(node), &cache.Snapshot{}) + return r.cache.SetSnapshot(ctx, r.hasher.ID(node), util_xds_v3.NewSingleTypeSnapshot("", v3.HealthCheckSpecifierType, nil)) } diff --git a/pkg/hds/v3/types.go b/pkg/hds/v3/types.go new file mode 100644 index 000000000000..8f9eafd372f5 --- /dev/null +++ b/pkg/hds/v3/types.go @@ -0,0 +1,3 @@ +package v3 + +const HealthCheckSpecifierType = "envoy.service.health.v3.HealthCheckSpecifier" diff --git a/pkg/mads/server/server.go b/pkg/mads/server/server.go index c0481606fda6..814982fd40aa 100644 --- a/pkg/mads/server/server.go +++ b/pkg/mads/server/server.go @@ -19,6 +19,7 @@ import ( mads_config "github.com/kumahq/kuma/pkg/config/mads" config_types "github.com/kumahq/kuma/pkg/config/types" "github.com/kumahq/kuma/pkg/core" + "github.com/kumahq/kuma/pkg/core/resources/manager" core_runtime "github.com/kumahq/kuma/pkg/core/runtime" "github.com/kumahq/kuma/pkg/core/runtime/component" "github.com/kumahq/kuma/pkg/mads" @@ -26,6 +27,7 @@ import ( core_metrics "github.com/kumahq/kuma/pkg/metrics" kuma_srv "github.com/kumahq/kuma/pkg/util/http/server" util_prometheus "github.com/kumahq/kuma/pkg/util/prometheus" + "github.com/kumahq/kuma/pkg/xds/cache/mesh" ) var log = core.Log.WithName("mads-server") @@ -33,11 +35,12 @@ var log = core.Log.WithName("mads-server") // muxServer is a runtime component.Component that // serves MADs resources over HTTP type muxServer struct { - httpServices []HttpService - config *mads_config.MonitoringAssignmentServerConfig - metrics core_metrics.Metrics - ready atomic.Bool + config *mads_config.MonitoringAssignmentServerConfig + metrics core_metrics.Metrics + ready atomic.Bool mesh_proto.UnimplementedMultiplexServiceServer + rm manager.ReadOnlyResourceManager + meshCache *mesh.Cache } type HttpService interface { @@ -46,31 +49,13 @@ type HttpService interface { var _ component.Component = &muxServer{} -func (s *muxServer) createHttpServicesHandler() http.Handler { - container := restful.NewContainer() - promMiddleware := middleware.New(middleware.Config{ - Recorder: http_prometheus.NewRecorder(http_prometheus.Config{ - Registry: s.metrics, - Prefix: "mads_server", - }), - }) - promFilterFunc := util_prometheus.MetricsHandler("", promMiddleware) - - for _, service := range s.httpServices { - ws := new(restful.WebService) - ws.Filter(promFilterFunc) - service.RegisterRoutes(ws) - container.Add(ws) - } - - return container -} - func (s *muxServer) Ready() bool { return s.ready.Load() } func (s *muxServer) Start(stop <-chan struct{}) error { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() var tlsConfig *tls.Config if s.config.TlsEnabled { cert, err := tls.LoadX509KeyPair(s.config.TlsCertFile, s.config.TlsKeyFile) @@ -88,12 +73,29 @@ func (s *muxServer) Start(stop <-chan struct{}) error { return err } } + ws := new(restful.WebService) + ws.Filter( + util_prometheus.MetricsHandler("", middleware.New(middleware.Config{ + Recorder: http_prometheus.NewRecorder(http_prometheus.Config{ + Registry: s.metrics, + Prefix: "mads_server", + }), + }))) + if s.config.VersionIsEnabled(mads.API_V1) { + log.Info("MADS v1 is enabled") + svc := mads_v1.NewService(s.config, s.rm, log.WithValues("apiVersion", mads.API_V1), s.meshCache) + svc.RegisterRoutes(ws) + svc.Start(ctx) + } + + container := restful.NewContainer() + container.Add(ws) errChan := make(chan error) httpS := &http.Server{ Addr: fmt.Sprintf(":%d", s.config.Port), ReadHeaderTimeout: time.Second, TLSConfig: tlsConfig, - Handler: s.createHttpServicesHandler(), + Handler: container, ErrorLog: adapter.ToStd(log), } if err := kuma_srv.StartServer(log, httpS, &s.ready, errChan); err != nil { @@ -118,21 +120,10 @@ func SetupServer(rt core_runtime.Runtime) error { if rt.Config().Mode == config_core.Global { return nil } - config := rt.Config().MonitoringAssignmentServer - - rm := rt.ReadOnlyResourceManager() - - var httpServices []HttpService - - if config.VersionIsEnabled(mads.API_V1) { - log.Info("MADS v1 is enabled") - svc := mads_v1.NewService(config, rm, log.WithValues("apiVersion", mads.API_V1), rt.MeshCache()) - httpServices = append(httpServices, svc) - } - return rt.Add(&muxServer{ - httpServices: httpServices, - config: config, - metrics: rt.Metrics(), + meshCache: rt.MeshCache(), + rm: rt.ReadOnlyResourceManager(), + config: rt.Config().MonitoringAssignmentServer, + metrics: rt.Metrics(), }) } diff --git a/pkg/mads/v1/cache/cache_suite_test.go b/pkg/mads/v1/cache/cache_suite_test.go deleted file mode 100644 index 490ff0a76caf..000000000000 --- a/pkg/mads/v1/cache/cache_suite_test.go +++ /dev/null @@ -1,11 +0,0 @@ -package cache_test - -import ( - "testing" - - "github.com/kumahq/kuma/pkg/test" -) - -func TestCache(t *testing.T) { - test.RunSpecs(t, "Cache Suite") -} diff --git a/pkg/mads/v1/cache/snapshot.go b/pkg/mads/v1/cache/snapshot.go deleted file mode 100644 index 8c0edc2ece5f..000000000000 --- a/pkg/mads/v1/cache/snapshot.go +++ /dev/null @@ -1,102 +0,0 @@ -package cache - -import ( - envoy_types "github.com/envoyproxy/go-control-plane/pkg/cache/types" - envoy_cache "github.com/envoyproxy/go-control-plane/pkg/cache/v3" - "github.com/pkg/errors" - - v1 "github.com/kumahq/kuma/pkg/mads/v1" - util_xds_v3 "github.com/kumahq/kuma/pkg/util/xds/v3" -) - -// NewSnapshot creates a snapshot from response types and a version. -func NewSnapshot(version string, assignments map[string]envoy_types.Resource) *Snapshot { - withTtl := make(map[string]envoy_types.ResourceWithTTL, len(assignments)) - for name, res := range assignments { - withTtl[name] = envoy_types.ResourceWithTTL{ - Resource: res, - } - } - return &Snapshot{ - MonitoringAssignments: envoy_cache.Resources{Version: version, Items: withTtl}, - } -} - -// Snapshot is an internally consistent snapshot of xDS resources. -type Snapshot struct { - MonitoringAssignments envoy_cache.Resources -} - -var _ util_xds_v3.Snapshot = &Snapshot{} - -// GetSupportedTypes returns a list of xDS types supported by this snapshot. -func (s *Snapshot) GetSupportedTypes() []string { - return []string{v1.MonitoringAssignmentType} -} - -// Consistent check verifies that the dependent resources are exactly listed in the -// snapshot. -func (s *Snapshot) Consistent() error { - if s == nil { - return errors.New("nil snapshot") - } - return nil -} - -// GetResources selects snapshot resources by type. -func (s *Snapshot) GetResources(typ string) map[string]envoy_types.Resource { - if s == nil { - return nil - } - - resources := s.GetResourcesAndTtl(typ) - if resources == nil { - return nil - } - - withoutTtl := make(map[string]envoy_types.Resource, len(resources)) - for name, res := range resources { - withoutTtl[name] = res.Resource - } - return withoutTtl -} - -func (s *Snapshot) GetResourcesAndTtl(typ string) map[string]envoy_types.ResourceWithTTL { - if s == nil { - return nil - } - switch typ { - case v1.MonitoringAssignmentType: - return s.MonitoringAssignments.Items - } - return nil -} - -// GetVersion returns the version for a resource type. -func (s *Snapshot) GetVersion(typ string) string { - if s == nil { - return "" - } - switch typ { - case v1.MonitoringAssignmentType: - return s.MonitoringAssignments.Version - } - return "" -} - -// WithVersion creates a new snapshot with a different version for a given resource type. -func (s *Snapshot) WithVersion(typ string, version string) util_xds_v3.Snapshot { - if s == nil { - return nil - } - if s.GetVersion(typ) == version { - return s - } - switch typ { - case v1.MonitoringAssignmentType: - return &Snapshot{ - MonitoringAssignments: envoy_cache.Resources{Version: version, Items: s.MonitoringAssignments.Items}, - } - } - return s -} diff --git a/pkg/mads/v1/cache/snapshot_test.go b/pkg/mads/v1/cache/snapshot_test.go deleted file mode 100644 index 50dfbb342930..000000000000 --- a/pkg/mads/v1/cache/snapshot_test.go +++ /dev/null @@ -1,190 +0,0 @@ -package cache_test - -import ( - envoy_types "github.com/envoyproxy/go-control-plane/pkg/cache/types" - . "github.com/onsi/ginkgo/v2" - . "github.com/onsi/gomega" - - observability_proto "github.com/kumahq/kuma/api/observability/v1" - . "github.com/kumahq/kuma/pkg/mads/v1/cache" -) - -var _ = Describe("Snapshot", func() { - expectedType := "type.googleapis.com/kuma.observability.v1.MonitoringAssignment" - - Describe("GetSupportedTypes()", func() { - It("should always return ['type.googleapis.com/kuma.observability.v1alpha1.MonitoringAssignment']", func() { - // when - var snapshot *Snapshot - // then - Expect(snapshot.GetSupportedTypes()).To(Equal([]string{expectedType})) - - // when - snapshot = &Snapshot{} - // then - Expect(snapshot.GetSupportedTypes()).To(Equal([]string{expectedType})) - }) - }) - - Describe("Consistent()", func() { - It("should handle `nil`", func() { - // when - var snapshot *Snapshot - // then - Expect(snapshot.Consistent()).To(MatchError("nil snapshot")) - }) - - It("non-`nil` snapshot should be always consistet", func() { - // when - snapshot := NewSnapshot("v1", nil) - // then - Expect(snapshot.Consistent()).To(Succeed()) - - // when - snapshot = NewSnapshot("v2", map[string]envoy_types.Resource{ - "backend": &observability_proto.MonitoringAssignment{ - Mesh: "default", - Service: "backend", - }, - }) - // then - Expect(snapshot.Consistent()).To(Succeed()) - }) - }) - - Describe("GetResources()", func() { - It("should handle `nil`", func() { - // when - var snapshot *Snapshot - // then - Expect(snapshot.GetResources(expectedType)).To(BeNil()) - }) - - It("should return MonitoringAssignments", func() { - // given - assignments := map[string]envoy_types.Resource{ - "backend": &observability_proto.MonitoringAssignment{ - Mesh: "default", - Service: "backend", - }, - } - // when - snapshot := NewSnapshot("v1", assignments) - // then - Expect(snapshot.GetResources(expectedType)).To(Equal(assignments)) - }) - - It("should return `nil` for unsupported resource types", func() { - // given - assignments := map[string]envoy_types.Resource{ - "backend": &observability_proto.MonitoringAssignment{ - Mesh: "default", - Service: "backend", - }, - } - // when - snapshot := NewSnapshot("v1", assignments) - // then - Expect(snapshot.GetResources("unsupported type")).To(BeNil()) - }) - }) - - Describe("GetVersion()", func() { - It("should handle `nil`", func() { - // when - var snapshot *Snapshot - // then - Expect(snapshot.GetVersion(expectedType)).To(Equal("")) - }) - - It("should return proper version for a supported resource type", func() { - // given - assignments := map[string]envoy_types.Resource{ - "backend": &observability_proto.MonitoringAssignment{ - Mesh: "default", - Service: "backend", - }, - } - // when - snapshot := NewSnapshot("v1", assignments) - // then - Expect(snapshot.GetVersion(expectedType)).To(Equal("v1")) - }) - - It("should return an empty string for unsupported resource type", func() { - // given - assignments := map[string]envoy_types.Resource{ - "backend": &observability_proto.MonitoringAssignment{ - Mesh: "default", - Service: "backend", - }, - } - // when - snapshot := NewSnapshot("v1", assignments) - // then - Expect(snapshot.GetVersion("unsupported type")).To(Equal("")) - }) - }) - - Describe("WithVersion()", func() { - It("should handle `nil`", func() { - // given - var snapshot *Snapshot - // when - actual := snapshot.WithVersion(expectedType, "v1") - // then - Expect(actual).To(BeNil()) - }) - - It("should return a new snapshot if version has changed", func() { - // given - assignments := map[string]envoy_types.Resource{ - "backend": &observability_proto.MonitoringAssignment{ - Mesh: "default", - Service: "backend", - }, - } - snapshot := NewSnapshot("v1", assignments) - // when - actual := snapshot.WithVersion(expectedType, "v2") - // then - Expect(actual.GetVersion(expectedType)).To(Equal("v2")) - // and - Expect(actual).To(Equal(NewSnapshot("v2", assignments))) - }) - - It("should return the same snapshot if version has not changed", func() { - // given - assignments := map[string]envoy_types.Resource{ - "backend": &observability_proto.MonitoringAssignment{ - Mesh: "default", - Service: "backend", - }, - } - snapshot := NewSnapshot("v1", assignments) - // when - actual := snapshot.WithVersion(expectedType, "v1") - // then - Expect(actual.GetVersion(expectedType)).To(Equal("v1")) - // and - Expect(actual).To(BeIdenticalTo(snapshot)) - }) - - It("should return the same snapshot if resource type is not supported", func() { - // given - assignments := map[string]envoy_types.Resource{ - "backend": &observability_proto.MonitoringAssignment{ - Mesh: "default", - Service: "backend", - }, - } - snapshot := NewSnapshot("v1", assignments) - // when - actual := snapshot.WithVersion("unsupported type", "v2") - // then - Expect(actual.GetVersion(expectedType)).To(Equal("v1")) - // and - Expect(actual).To(BeIdenticalTo(snapshot)) - }) - }) -}) diff --git a/pkg/mads/v1/reconcile/interfaces.go b/pkg/mads/v1/reconcile/interfaces.go index e3adb6be8aa5..c3f8c5a99c8d 100644 --- a/pkg/mads/v1/reconcile/interfaces.go +++ b/pkg/mads/v1/reconcile/interfaces.go @@ -2,16 +2,10 @@ package reconcile import ( "context" - - envoy_core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" ) // Reconciler re-computes configuration for a given node. type Reconciler interface { Reconcile(context.Context) error - // NeedsReconciliation checks if there is a valid configuration snapshot already present - // for a given node - NeedsReconciliation(node *envoy_core.Node) bool - KnownClientIds() map[string]bool - ReconcileIfNeeded(ctx context.Context, node *envoy_core.Node) error + KnownClientIds() []string } diff --git a/pkg/mads/v1/reconcile/reconciler.go b/pkg/mads/v1/reconcile/reconciler.go index bc0c5f962756..d2d8c0fb76d4 100644 --- a/pkg/mads/v1/reconcile/reconciler.go +++ b/pkg/mads/v1/reconcile/reconciler.go @@ -2,60 +2,44 @@ package reconcile import ( "context" - "maps" - "sync" + "sync/atomic" - envoy_core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" envoy_cache "github.com/envoyproxy/go-control-plane/pkg/cache/v3" util_xds_v3 "github.com/kumahq/kuma/pkg/util/xds/v3" ) -func NewReconciler(hasher envoy_cache.NodeHash, cache util_xds_v3.SnapshotCache, generator *SnapshotGenerator) Reconciler { - return &reconciler{ - reconcilerMutex: &sync.Mutex{}, - hasher: hasher, - cache: cache, - generator: generator, - knownClientIds: map[string]bool{}, - knownClientIdsMutex: &sync.Mutex{}, +func NewReconciler(cache envoy_cache.SnapshotCache, generator *SnapshotGenerator) Reconciler { + r := &reconciler{ + cache: cache, + generator: generator, + knownClientIds: atomic.Pointer[[]string]{}, } + base := []string{} + r.knownClientIds.Store(&base) + return r } type reconciler struct { - reconcilerMutex *sync.Mutex - - hasher envoy_cache.NodeHash - cache util_xds_v3.SnapshotCache - generator *SnapshotGenerator - knownClientIds map[string]bool - knownClientIdsMutex *sync.Mutex + cache envoy_cache.SnapshotCache + generator *SnapshotGenerator + knownClientIds atomic.Pointer[[]string] } -func (r *reconciler) KnownClientIds() map[string]bool { - r.knownClientIdsMutex.Lock() - defer r.knownClientIdsMutex.Unlock() - return maps.Clone(r.knownClientIds) +func (r *reconciler) KnownClientIds() []string { + p := r.knownClientIds.Load() + return *p } func (r *reconciler) Reconcile(ctx context.Context) error { - r.reconcilerMutex.Lock() - defer r.reconcilerMutex.Unlock() - return r.reconcile(ctx) -} - -func (r *reconciler) reconcile(ctx context.Context) error { newSnapshotPerClient, err := r.generator.GenerateSnapshot(ctx) if err != nil { return err } - knownClients := map[string]bool{} + knownClients := []string{} for clientId, newSnapshot := range newSnapshotPerClient { - knownClients[clientId] = true - if err := newSnapshot.Consistent(); err != nil { - return err - } - var snap util_xds_v3.Snapshot + knownClients = append(knownClients, clientId) + var snap envoy_cache.ResourceSnapshot oldSnapshot, _ := r.cache.GetSnapshot(clientId) switch { case oldSnapshot == nil: @@ -65,28 +49,12 @@ func (r *reconciler) reconcile(ctx context.Context) error { default: snap = oldSnapshot } - err := r.cache.SetSnapshot(clientId, snap) + err := r.cache.SetSnapshot(ctx, clientId, snap) if err != nil { return err } } - r.knownClientIdsMutex.Lock() - r.knownClientIds = knownClients - r.knownClientIdsMutex.Unlock() + r.knownClientIds.Store(&knownClients) return nil } - -func (r *reconciler) ReconcileIfNeeded(ctx context.Context, node *envoy_core.Node) error { - r.reconcilerMutex.Lock() - defer r.reconcilerMutex.Unlock() - if r.NeedsReconciliation(node) { - return r.reconcile(ctx) - } - return nil -} - -func (r *reconciler) NeedsReconciliation(node *envoy_core.Node) bool { - id := r.hasher.ID(node) - return !r.cache.HasSnapshot(id) -} diff --git a/pkg/mads/v1/reconcile/snapshot_generator.go b/pkg/mads/v1/reconcile/snapshot_generator.go index d3a90e6d1214..f5677bc5d9d7 100644 --- a/pkg/mads/v1/reconcile/snapshot_generator.go +++ b/pkg/mads/v1/reconcile/snapshot_generator.go @@ -3,6 +3,7 @@ package reconcile import ( "context" + envoy_cache "github.com/envoyproxy/go-control-plane/pkg/cache/v3" "github.com/pkg/errors" "github.com/kumahq/kuma/pkg/core" @@ -11,7 +12,7 @@ import ( core_store "github.com/kumahq/kuma/pkg/core/resources/store" core_xds "github.com/kumahq/kuma/pkg/core/xds" "github.com/kumahq/kuma/pkg/mads/generator" - mads_v1_cache "github.com/kumahq/kuma/pkg/mads/v1/cache" + mads_v1 "github.com/kumahq/kuma/pkg/mads/v1" meshmetrics_generator "github.com/kumahq/kuma/pkg/mads/v1/generator" "github.com/kumahq/kuma/pkg/plugins/policies/core/matchers" "github.com/kumahq/kuma/pkg/plugins/policies/meshmetric/api/v1alpha1" @@ -36,7 +37,7 @@ type SnapshotGenerator struct { meshCache *mesh.Cache } -func (s *SnapshotGenerator) GenerateSnapshot(ctx context.Context) (map[string]util_xds_v3.Snapshot, error) { +func (s *SnapshotGenerator) GenerateSnapshot(ctx context.Context) (map[string]envoy_cache.ResourceSnapshot, error) { meshesWithMeshMetrics, err := s.getMeshesWithMeshMetrics(ctx) if err != nil { return nil, err @@ -52,7 +53,7 @@ func (s *SnapshotGenerator) GenerateSnapshot(ctx context.Context) (map[string]ut } var resources []*core_xds.Resource - resourcesPerClientId := map[string]util_xds_v3.Snapshot{} + resourcesPerClientId := map[string]envoy_cache.ResourceSnapshot{} if len(meshesWithMeshMetrics) == 0 { dataplanes, err := s.getDataplanes(ctx, meshes) if err != nil { @@ -179,6 +180,6 @@ func (s *SnapshotGenerator) getMeshGateways(ctx context.Context, meshes []*core_ return meshGateways, nil } -func createSnapshot(resources []*core_xds.Resource) *mads_v1_cache.Snapshot { - return mads_v1_cache.NewSnapshot(core.NewUUID(), core_xds.ResourceList(resources).ToIndex()) +func createSnapshot(resources []*core_xds.Resource) envoy_cache.ResourceSnapshot { + return util_xds_v3.NewSingleTypeSnapshot(core.NewUUID(), mads_v1.MonitoringAssignmentType, core_xds.ResourceList(resources).Payloads()) } diff --git a/pkg/mads/v1/reconcile/snapshot_generator_test.go b/pkg/mads/v1/reconcile/snapshot_generator_test.go index 79f342ebf5d3..848fed5f5423 100644 --- a/pkg/mads/v1/reconcile/snapshot_generator_test.go +++ b/pkg/mads/v1/reconcile/snapshot_generator_test.go @@ -20,7 +20,6 @@ import ( core_store "github.com/kumahq/kuma/pkg/core/resources/store" "github.com/kumahq/kuma/pkg/dns/vips" mads_v1 "github.com/kumahq/kuma/pkg/mads/v1" - mads_cache "github.com/kumahq/kuma/pkg/mads/v1/cache" mads_generator "github.com/kumahq/kuma/pkg/mads/v1/generator" meshmetrics_generator "github.com/kumahq/kuma/pkg/mads/v1/generator" . "github.com/kumahq/kuma/pkg/mads/v1/reconcile" @@ -32,7 +31,6 @@ import ( test_model "github.com/kumahq/kuma/pkg/test/resources/model" "github.com/kumahq/kuma/pkg/test/resources/samples" "github.com/kumahq/kuma/pkg/util/proto" - v3 "github.com/kumahq/kuma/pkg/util/xds/v3" "github.com/kumahq/kuma/pkg/xds/cache/mesh" xds_context "github.com/kumahq/kuma/pkg/xds/context" "github.com/kumahq/kuma/pkg/xds/server" @@ -44,7 +42,7 @@ var _ = Describe("snapshotGenerator", func() { var resourceManager core_manager.ResourceManager var store core_store.ResourceStore node1Id := "one" - snapshotWithTwoAssignments := mads_cache.NewSnapshot("", map[string]envoy_types.Resource{ + snapshotWithTwoAssignments := map[string]envoy_types.Resource{ "/meshes/demo/dataplanes/backend-02": &observability_v1.MonitoringAssignment{ Mesh: "demo", Service: "backend", @@ -77,9 +75,9 @@ var _ = Describe("snapshotGenerator", func() { }, }}, }, - }) + } - meshMetricSnapshot := mads_cache.NewSnapshot("", map[string]envoy_types.Resource{ + meshMetricSnapshot := map[string]envoy_types.Resource{ "/meshes/default/dataplanes/backend-01": &observability_v1.MonitoringAssignment{ Mesh: "default", Service: "backend", @@ -91,7 +89,7 @@ var _ = Describe("snapshotGenerator", func() { Labels: map[string]string{}, }}, }, - }) + } BeforeEach(func() { store = memory.NewStore() @@ -102,7 +100,7 @@ var _ = Describe("snapshotGenerator", func() { meshes []*core_mesh.MeshResource meshMetrics []*v1alpha1.MeshMetricResource dataplanes []*core_mesh.DataplaneResource - expectedSnapshots map[string]v3.Snapshot + expectedSnapshots map[string]map[string]envoy_types.Resource } DescribeTable("", @@ -152,15 +150,17 @@ var _ = Describe("snapshotGenerator", func() { // then Expect(err).ToNot(HaveOccurred()) // and - for c := range snapshotPerClient { - // Cleanup the versions as they are UUIDs - snapshotPerClient[c] = snapshotPerClient[c].WithVersion(mads_v1.MonitoringAssignmentType, "") + + for k, v := range given.expectedSnapshots { + Expect(snapshotPerClient).To(HaveKey(k)) + generatedSnapshot := snapshotPerClient[k].GetResources(mads_v1.MonitoringAssignmentType) + Expect(generatedSnapshot).To(Equal(v)) } - Expect(snapshotPerClient).To(Equal(given.expectedSnapshots)) + Expect(snapshotPerClient).To(HaveLen(len(given.expectedSnapshots))) }, Entry("no Meshes, no Dataplanes, no MeshMetrics", testCase{ - expectedSnapshots: map[string]v3.Snapshot{ - meshmetrics_generator.DefaultKumaClientId: mads_cache.NewSnapshot("", nil), + expectedSnapshots: map[string]map[string]envoy_types.Resource{ + meshmetrics_generator.DefaultKumaClientId: {}, }, }), Entry("no Meshes with Prometheus enabled, no MeshMetrics", testCase{ @@ -177,8 +177,8 @@ var _ = Describe("snapshotGenerator", func() { WithName("backend-01"). Build(), }, - expectedSnapshots: map[string]v3.Snapshot{ - meshmetrics_generator.DefaultKumaClientId: mads_cache.NewSnapshot("", nil), + expectedSnapshots: map[string]map[string]envoy_types.Resource{ + meshmetrics_generator.DefaultKumaClientId: {}, }, }), Entry("Mesh with Prometheus enabled but no Dataplanes, no MeshMetrics", testCase{ @@ -215,8 +215,8 @@ var _ = Describe("snapshotGenerator", func() { WithName("backend-01"). Build(), }, - expectedSnapshots: map[string]v3.Snapshot{ - meshmetrics_generator.DefaultKumaClientId: mads_cache.NewSnapshot("", nil), + expectedSnapshots: map[string]map[string]envoy_types.Resource{ + meshmetrics_generator.DefaultKumaClientId: {}, }, }), Entry("Mesh with Prometheus enabled and some Dataplanes, no MeshMetrics", testCase{ @@ -270,7 +270,7 @@ var _ = Describe("snapshotGenerator", func() { }). Build(), }, - expectedSnapshots: map[string]v3.Snapshot{ + expectedSnapshots: map[string]map[string]envoy_types.Resource{ meshmetrics_generator.DefaultKumaClientId: snapshotWithTwoAssignments, }, }), @@ -317,7 +317,7 @@ var _ = Describe("snapshotGenerator", func() { }, }, }, - expectedSnapshots: map[string]v3.Snapshot{ + expectedSnapshots: map[string]map[string]envoy_types.Resource{ meshmetrics_generator.DefaultKumaClientId: meshMetricSnapshot, }, }), @@ -364,8 +364,8 @@ var _ = Describe("snapshotGenerator", func() { }, }, }, - expectedSnapshots: map[string]v3.Snapshot{ - node1Id: mads_cache.NewSnapshot("", map[string]envoy_types.Resource{ + expectedSnapshots: map[string]map[string]envoy_types.Resource{ + node1Id: { "/meshes/default/dataplanes/backend-01": &observability_v1.MonitoringAssignment{ Mesh: "default", Service: "backend", @@ -377,7 +377,7 @@ var _ = Describe("snapshotGenerator", func() { Labels: map[string]string{}, }}, }, - }), + }, }, }), Entry("no Meshes with Prometheus enabled, MeshMetric with Prometheus enabled for node 1 and default client for rest", testCase{ @@ -433,8 +433,8 @@ var _ = Describe("snapshotGenerator", func() { }, }, }, - expectedSnapshots: map[string]v3.Snapshot{ - node1Id: mads_cache.NewSnapshot("", map[string]envoy_types.Resource{ + expectedSnapshots: map[string]map[string]envoy_types.Resource{ + node1Id: { "/meshes/default/dataplanes/backend-01": &observability_v1.MonitoringAssignment{ Mesh: "default", Service: "backend", @@ -446,8 +446,8 @@ var _ = Describe("snapshotGenerator", func() { Labels: map[string]string{}, }}, }, - }), - meshmetrics_generator.DefaultKumaClientId: mads_cache.NewSnapshot("", map[string]envoy_types.Resource{ + }, + meshmetrics_generator.DefaultKumaClientId: { "/meshes/default/dataplanes/backend-01": &observability_v1.MonitoringAssignment{ Mesh: "default", Service: "backend", @@ -459,7 +459,7 @@ var _ = Describe("snapshotGenerator", func() { Labels: map[string]string{}, }}, }, - }), + }, }, }), Entry("no Meshes with Prometheus enabled, MeshMetric with Prometheus for some dataplanes", testCase{ @@ -513,8 +513,8 @@ var _ = Describe("snapshotGenerator", func() { }, }, }, - expectedSnapshots: map[string]v3.Snapshot{ - node1Id: mads_cache.NewSnapshot("", map[string]envoy_types.Resource{ + expectedSnapshots: map[string]map[string]envoy_types.Resource{ + node1Id: { "/meshes/default/dataplanes/backend-01": &observability_v1.MonitoringAssignment{ Mesh: "default", Service: "backend-01", @@ -526,7 +526,7 @@ var _ = Describe("snapshotGenerator", func() { Labels: map[string]string{}, }}, }, - }), + }, }, }), Entry("no Meshes with Prometheus enabled, multiple MeshMetrics with Prometheus and merging", testCase{ @@ -611,8 +611,8 @@ var _ = Describe("snapshotGenerator", func() { }, }, }, - expectedSnapshots: map[string]v3.Snapshot{ - node1Id: mads_cache.NewSnapshot("", map[string]envoy_types.Resource{ + expectedSnapshots: map[string]map[string]envoy_types.Resource{ + node1Id: { "/meshes/default/dataplanes/backend-01": &observability_v1.MonitoringAssignment{ Mesh: "default", Service: "backend-01", @@ -646,7 +646,7 @@ var _ = Describe("snapshotGenerator", func() { Labels: map[string]string{}, }}, }, - }), + }, }, }), Entry("no Meshes with Prometheus enabled, MeshMetric targeting Mesh with override for backend-02 with Prometheus metrics backend", testCase{ @@ -730,8 +730,8 @@ var _ = Describe("snapshotGenerator", func() { }, }, }, - expectedSnapshots: map[string]v3.Snapshot{ - meshmetrics_generator.DefaultKumaClientId: mads_cache.NewSnapshot("", map[string]envoy_types.Resource{ + expectedSnapshots: map[string]map[string]envoy_types.Resource{ + meshmetrics_generator.DefaultKumaClientId: { "/meshes/default/dataplanes/backend-02": &observability_v1.MonitoringAssignment{ Mesh: "default", Service: "backend-02", @@ -743,8 +743,8 @@ var _ = Describe("snapshotGenerator", func() { Labels: map[string]string{}, }}, }, - }), - node1Id: mads_cache.NewSnapshot("", map[string]envoy_types.Resource{ + }, + node1Id: { "/meshes/default/dataplanes/backend-01": &observability_v1.MonitoringAssignment{ Mesh: "default", Service: "backend-01", @@ -767,7 +767,7 @@ var _ = Describe("snapshotGenerator", func() { Labels: map[string]string{}, }}, }, - }), + }, }, }), ) diff --git a/pkg/mads/v1/service/components.go b/pkg/mads/v1/service/components.go deleted file mode 100644 index 9fb4a367caf4..000000000000 --- a/pkg/mads/v1/service/components.go +++ /dev/null @@ -1,62 +0,0 @@ -package service - -import ( - "context" - "time" - - envoy_core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" - envoy_xds "github.com/envoyproxy/go-control-plane/pkg/server/v3" - "github.com/go-logr/logr" - "github.com/pkg/errors" - - meshmetrics_generator "github.com/kumahq/kuma/pkg/mads/v1/generator" - mads_reconcile "github.com/kumahq/kuma/pkg/mads/v1/reconcile" - util_watchdog "github.com/kumahq/kuma/pkg/util/watchdog" - util_xds "github.com/kumahq/kuma/pkg/util/xds" - util_xds_v3 "github.com/kumahq/kuma/pkg/util/xds/v3" -) - -type restReconcilerCallbacks struct { - reconciler mads_reconcile.Reconciler -} - -func (r *restReconcilerCallbacks) OnFetchRequest(ctx context.Context, request util_xds.DiscoveryRequest) error { - nodei := request.Node() - - node, ok := nodei.(*envoy_core.Node) - if !ok { - return errors.Errorf("expecting a v3 Node, got: %v", nodei) - } - - knownClients := r.reconciler.KnownClientIds() - if !knownClients[node.Id] { - node.Id = meshmetrics_generator.DefaultKumaClientId - } - - return r.reconciler.ReconcileIfNeeded(ctx, node) -} - -func (r *restReconcilerCallbacks) OnFetchResponse(request util_xds.DiscoveryRequest, response util_xds.DiscoveryResponse) { -} - -func NewReconcilerRestCallbacks(reconciler mads_reconcile.Reconciler) util_xds.RestCallbacks { - return &restReconcilerCallbacks{reconciler: reconciler} -} - -func NewSyncTracker(reconciler mads_reconcile.Reconciler, refresh time.Duration, log logr.Logger) envoy_xds.Callbacks { - return util_xds_v3.NewWatchdogCallbacks(func(_ context.Context, node *envoy_core.Node, streamID int64) (util_xds_v3.Watchdog, error) { - log := log.WithValues("streamID", streamID, "node", node) - return &util_watchdog.SimpleWatchdog{ - NewTicker: func() *time.Ticker { - return time.NewTicker(refresh) - }, - OnTick: func(ctx context.Context) error { - log.V(1).Info("on tick") - return reconciler.Reconcile(ctx) - }, - OnError: func(err error) { - log.Error(err, "OnTick() failed") - }, - }, nil - }) -} diff --git a/pkg/mads/v1/service/mads.go b/pkg/mads/v1/service/mads.go index ec9f39b07ea5..5cba77583409 100644 --- a/pkg/mads/v1/service/mads.go +++ b/pkg/mads/v1/service/mads.go @@ -2,10 +2,11 @@ package service import ( "context" + "errors" envoy_sd "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" envoy_cache "github.com/envoyproxy/go-control-plane/pkg/cache/v3" - "github.com/envoyproxy/go-control-plane/pkg/server/rest/v3" + "github.com/envoyproxy/go-control-plane/pkg/server/stream/v3" envoy_server "github.com/envoyproxy/go-control-plane/pkg/server/v3" observability_v1 "github.com/kumahq/kuma/api/observability/v1" @@ -16,17 +17,44 @@ type Server interface { } func NewServer(cache envoy_cache.Cache, callbacks envoy_server.Callbacks) Server { - restServer := rest.NewServer(cache, callbacks) - return &server{rest: restServer} + return &server{cache: cache, callbacks: callbacks} } var _ Server = &server{} type server struct { - rest rest.Server observability_v1.UnimplementedMonitoringAssignmentDiscoveryServiceServer + cache envoy_cache.Cache + callbacks envoy_server.Callbacks } -func (s *server) FetchMonitoringAssignments(ctx context.Context, request *envoy_sd.DiscoveryRequest) (*envoy_sd.DiscoveryResponse, error) { - return s.rest.Fetch(ctx, request) +func (s *server) FetchMonitoringAssignments(ctx context.Context, req *envoy_sd.DiscoveryRequest) (*envoy_sd.DiscoveryResponse, error) { + if s.callbacks != nil { + if err := s.callbacks.OnFetchRequest(ctx, req); err != nil { + return nil, err + } + } + resChan := make(chan envoy_cache.Response, 1) + streamState := stream.NewStreamState(false, nil) + // Because we want to do long polling we use the watch system if there's a deadline on the context + if _, hasDeadline := ctx.Deadline(); hasDeadline { + cancelWatch := s.cache.CreateWatch(req, streamState, resChan) + defer cancelWatch() + select { // Wait until either we timeout or the watch triggers + case <-ctx.Done(): + case <-resChan: + } + } + resp, err := s.cache.Fetch(context.WithoutCancel(ctx), req) + if err != nil { + return nil, err + } + if resp == nil { + return nil, errors.New("missing response") + } + out, err := resp.GetDiscoveryResponse() + if s.callbacks != nil { + s.callbacks.OnFetchResponse(req, out) + } + return out, err } diff --git a/pkg/mads/v1/service/service.go b/pkg/mads/v1/service/service.go index 9efef1c2eb57..fa5470fc9e66 100644 --- a/pkg/mads/v1/service/service.go +++ b/pkg/mads/v1/service/service.go @@ -1,46 +1,65 @@ package service import ( + "context" + "time" + + envoy_cache "github.com/envoyproxy/go-control-plane/pkg/cache/v3" "github.com/go-logr/logr" "github.com/kumahq/kuma/pkg/config/mads" core_manager "github.com/kumahq/kuma/pkg/core/resources/manager" mads_generator "github.com/kumahq/kuma/pkg/mads/v1/generator" mads_reconcile "github.com/kumahq/kuma/pkg/mads/v1/reconcile" + util_watchdog "github.com/kumahq/kuma/pkg/util/watchdog" util_xds "github.com/kumahq/kuma/pkg/util/xds" util_xds_v3 "github.com/kumahq/kuma/pkg/util/xds/v3" "github.com/kumahq/kuma/pkg/xds/cache/mesh" ) type service struct { - server Server - config *mads.MonitoringAssignmentServerConfig - log logr.Logger + server Server + config *mads.MonitoringAssignmentServerConfig + log logr.Logger + watchdog *util_watchdog.SimpleWatchdog } func NewService(config *mads.MonitoringAssignmentServerConfig, rm core_manager.ReadOnlyResourceManager, log logr.Logger, meshCache *mesh.Cache) *service { - hasher := util_xds_v3.IDHash{} - cache := util_xds_v3.NewSnapshotCache(false, hasher, util_xds.NewLogger(log)) + hasher := &util_xds_v3.FallBackNodeHash{DefaultId: mads_generator.DefaultKumaClientId} + cache := envoy_cache.NewSnapshotCache(false, hasher, util_xds.NewLogger(log)) reconciler := mads_reconcile.NewReconciler( - hasher, cache, mads_reconcile.NewSnapshotGenerator(rm, mads_generator.MonitoringAssignmentsGenerator{}, meshCache), ) - - callbacks := util_xds_v3.CallbacksChain{ - util_xds_v3.AdaptMultiCallbacks(util_xds.LoggingCallbacks{Log: log}), - // TODO Right now we are creating sync tracker for first REST request pkg/util/xds/v3/watchdog_callbacks.go:48, because of this - // using MeshMetrics with specified clientId will refresh only one client in background - // issue: https://github.com/kumahq/kuma/issues/8764 - NewSyncTracker(reconciler, config.AssignmentRefreshInterval.Duration, log), - // For on-demand reconciliation - util_xds_v3.AdaptRestCallbacks(NewReconcilerRestCallbacks(reconciler)), + // We use the clientIds from the reconciler from the node hasher + hasher.GetIds = reconciler.KnownClientIds + watchdog := &util_watchdog.SimpleWatchdog{ + NewTicker: func() *time.Ticker { + return time.NewTicker(config.AssignmentRefreshInterval.Duration) + }, + OnTick: func(ctx context.Context) error { + log.V(1).Info("on tick") + err := reconciler.Reconcile(ctx) + return err + }, + OnError: func(err error) { + log.Error(err, "OnTick() failed") + }, } - srv := NewServer(cache, callbacks) + watchdog.WithTickCheck() + srv := NewServer(cache, util_xds_v3.CallbacksChain{ + util_xds_v3.AdaptMultiCallbacks(util_xds.LoggingCallbacks{Log: log}), + }) return &service{ - server: srv, - config: config, - log: log, + server: srv, + config: config, + log: log, + watchdog: watchdog, } } + +func (s *service) Start(ctx context.Context) { + go s.watchdog.Start(ctx) + s.watchdog.HasTicked(true) +} diff --git a/pkg/mads/v1/service_test.go b/pkg/mads/v1/service_test.go index f4f9f04b525d..14e0cce5a7f8 100644 --- a/pkg/mads/v1/service_test.go +++ b/pkg/mads/v1/service_test.go @@ -49,15 +49,18 @@ var _ = Describe("MADS http service", func() { const refreshInterval = 250 * time.Millisecond const defaultFetchTimeout = 1 * time.Second - BeforeEach(func() { resManager = core_manager.NewResourceManager(memory.NewStore()) + }) + setupServer := func() { cfg := mads_config.DefaultMonitoringAssignmentServerConfig() cfg.AssignmentRefreshInterval = config_types.Duration{Duration: refreshInterval} cfg.DefaultFetchTimeout = config_types.Duration{Duration: defaultFetchTimeout} svc := service.NewService(cfg, resManager, logr.Discard(), nil) + ctx, cancel := context.WithCancel(context.Background()) + svc.Start(ctx) ws := new(restful.WebService) svc.RegisterRoutes(ws) @@ -69,14 +72,16 @@ var _ = Describe("MADS http service", func() { url = srv.Server().URL monitoringAssignmentPath = fmt.Sprintf("%s%s", url, service.FetchMonitoringAssignmentsPath) DeferCleanup(func() { + cancel() srv.Server().Close() }) // wait for the server Eventually(srv.Ready).Should(Succeed()) - }) + } Context("with resources", func() { + BeforeEach(setupServer) It("should respond with an empty discovery response", func() { // given discoveryReq := envoy_v3.DiscoveryRequest{ @@ -213,6 +218,7 @@ var _ = Describe("MADS http service", func() { // given Expect(createMesh(mesh)).To(Succeed()) Expect(createDataPlane(dp1)).To(Succeed()) + setupServer() }) It("should return the monitoring assignments", func() { @@ -548,6 +554,21 @@ var _ = Describe("MADS http service", func() { HaveField("Resources", HaveLen(1)), ))), ))) + + By("Second request should also be instant") + discoveryReq.VersionInfo = discoveryRes.VersionInfo + reqBytes, err = pbMarshaller.MarshalToString(&discoveryReq) + Expect(err).ToNot(HaveOccurred()) + // when + req, err = http.NewRequest("POST", monitoringAssignmentPath+"?fetch-timeout=0s", strings.NewReader(reqBytes)) + Expect(err).ToNot(HaveOccurred()) + req.Header.Add("content-type", "application/json") + + start := time.Now() + Expect(http.DefaultClient.Do(req)). + Should(HaveHTTPStatus(http.StatusNotModified)) + // Request should have been very fast + Expect(start).To(BeTemporally("~", time.Now(), time.Millisecond*200)) }) It("should return an error if the fetch timeout is unparseable", func() { diff --git a/pkg/util/watchdog/watchdog.go b/pkg/util/watchdog/watchdog.go index 6b0bf2a204d6..a02d0104f4b4 100644 --- a/pkg/util/watchdog/watchdog.go +++ b/pkg/util/watchdog/watchdog.go @@ -5,25 +5,52 @@ import ( "time" "github.com/pkg/errors" + + "github.com/kumahq/kuma/pkg/util/channels" ) type SimpleWatchdog struct { - NewTicker func() *time.Ticker - OnTick func(context.Context) error - OnError func(error) - OnStop func() + NewTicker func() *time.Ticker + OnTick func(context.Context) error + OnError func(error) + OnStop func() + hasTickedChan chan struct{} +} + +func (w *SimpleWatchdog) WithTickCheck() *SimpleWatchdog { + w.hasTickedChan = make(chan struct{}) + return w +} + +// WaitForFirstTick return whether this has ticked at least once. This is an optional feature and is opt-in by using `WithTickCheck` after creation +func (w *SimpleWatchdog) HasTicked(blocking bool) bool { + if w.hasTickedChan == nil { + panic("Calling HasTicked() before watchdog was started this is not supposed to happen") + } + if blocking { + <-w.hasTickedChan + return true + } + select { + case <-w.hasTickedChan: + return true + default: + return false + } } func (w *SimpleWatchdog) Start(ctx context.Context) { ticker := w.NewTicker() defer ticker.Stop() - for { if err := w.onTick(ctx); err != nil { if !errors.Is(err, context.Canceled) && w.OnError != nil { w.OnError(err) } } + if w.hasTickedChan != nil && !channels.IsClosed(w.hasTickedChan) { // On the first tick we close the channel + close(w.hasTickedChan) + } select { case <-ctx.Done(): case <-ticker.C: diff --git a/pkg/util/watchdog/watchdog_test.go b/pkg/util/watchdog/watchdog_test.go index 512be33a737f..278d3e543187 100644 --- a/pkg/util/watchdog/watchdog_test.go +++ b/pkg/util/watchdog/watchdog_test.go @@ -187,4 +187,48 @@ var _ = Describe("SimpleWatchdog", func() { Eventually(doneCh).Should(BeClosed()) Consistently(onErrorCalls).ShouldNot(Receive()) })) + + It("should wait for the first tick to happen on WaitForFirstTick", func() { + watchdog := &SimpleWatchdog{ + NewTicker: func() *time.Ticker { + return &time.Ticker{ + C: timeTicks, + } + }, + OnTick: func(ctx context.Context) error { + return nil + }, + OnError: func(err error) { + onErrorCalls <- err + }, + } + watchdog.WithTickCheck() + + // setup + hasTicked := make(chan struct{}) + go func() { + watchdog.HasTicked(true) + close(hasTicked) + }() + go func() { + watchdog.Start(ctx) + close(doneCh) + }() + Expect(watchdog.HasTicked(false)).Should(BeFalse()) + Consistently(hasTicked).ShouldNot(Receive()) + + By("simulating 1st tick") + // when + timeTicks <- time.Time{} + Expect(watchdog.HasTicked(false)).Should(BeTrue()) + Expect(hasTicked).Should(BeClosed()) + + By("simulating 2nd tick") + // when + timeTicks <- time.Time{} + Expect(watchdog.HasTicked(false)).Should(BeTrue()) + Expect(hasTicked).Should(BeClosed()) + + cancel() + }) }) diff --git a/pkg/util/xds/v3/cache.go b/pkg/util/xds/v3/cache.go deleted file mode 100644 index a313f5c7c082..000000000000 --- a/pkg/util/xds/v3/cache.go +++ /dev/null @@ -1,515 +0,0 @@ -// Copyright 2018 Envoyproxy Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package v3 - -import ( - "context" - "fmt" - "sync" - "sync/atomic" - "time" - - envoy_config_core_v3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" - "github.com/envoyproxy/go-control-plane/pkg/cache/types" - envoy_cache "github.com/envoyproxy/go-control-plane/pkg/cache/v3" - "github.com/envoyproxy/go-control-plane/pkg/log" - "github.com/envoyproxy/go-control-plane/pkg/server/stream/v3" - "google.golang.org/protobuf/proto" -) - -type Snapshot interface { - // GetSupportedTypes returns a list of xDS types supported by this snapshot. - GetSupportedTypes() []string - - // Consistent check verifies that the dependent resources are exactly listed in the - // snapshot: - // - all EDS resources are listed by name in CDS resources - // - all RDS resources are listed by name in LDS resources - // - // Note that clusters and listeners are requested without name references, so - // Envoy will accept the snapshot list of clusters as-is even if it does not match - // all references found in xDS. - Consistent() error - - // GetResources selects snapshot resources by type. - GetResources(typ string) map[string]types.Resource - - // GetVersion returns the version for a resource type. - GetVersion(typ string) string - - // WithVersion creates a new snapshot with a different version for a given resource type. - WithVersion(typ string, version string) Snapshot -} - -// SnapshotCache is a snapshot-based envoy_cache that maintains a single versioned -// snapshot of responses per node. SnapshotCache consistently replies with the -// latest snapshot. For the protocol to work correctly in ADS mode, EDS/RDS -// requests are responded only when all resources in the snapshot xDS response -// are named as part of the request. It is expected that the CDS response names -// all EDS clusters, and the LDS response names all RDS routes in a snapshot, -// to ensure that Envoy makes the request for all EDS clusters or RDS routes -// eventually. -// -// SnapshotCache can operate as a REST or regular xDS backend. The snapshot -// can be partial, e.g. only include RDS or EDS resources. -type SnapshotCache interface { - envoy_cache.Cache - - // SetSnapshot sets a response snapshot for a node. For ADS, the snapshots - // should have distinct versions and be internally consistent (e.g. all - // referenced resources must be included in the snapshot). - // - // This method will cause the server to respond to all open watches, for which - // the version differs from the snapshot version. - SetSnapshot(node string, snapshot Snapshot) error - - // GetSnapshot gets the snapshot for a node. - GetSnapshot(node string) (Snapshot, error) - - // HasSnapshot checks whether there is a snapshot present for a node. - HasSnapshot(node string) bool - - // ClearSnapshot removes all status and snapshot information associated with a node. Return the removed snapshot or nil - ClearSnapshot(node string) Snapshot - - // GetStatusInfo retrieves status information for a node ID. - GetStatusInfo(string) StatusInfo - - // GetStatusKeys retrieves node IDs for all statuses. - GetStatusKeys() []string -} - -// Generates a snapshot of xDS resources for a given node. -type SnapshotGenerator interface { - GenerateSnapshot(context.Context, *envoy_config_core_v3.Node) (Snapshot, error) -} - -type snapshotCache struct { - // watchCount is an atomic counter incremented for each watch. This needs to - // be the first field in the struct to guarantee that it is 64-bit aligned, - // which is a requirement for atomic operations on 64-bit operands to work on - // 32-bit machines. - watchCount int64 - - log log.Logger - - // ads flag to hold responses until all resources are named - ads bool - - // snapshots are cached resources indexed by node IDs - snapshots map[string]Snapshot - - // status information for all nodes indexed by node IDs - status map[string]*statusInfo - - // hash is the hashing function for Envoy nodes - hash NodeHash - - mu sync.RWMutex -} - -// NewSnapshotCache initializes a simple envoy_cache. -// -// ADS flag forces a delay in responding to streaming requests until all -// resources are explicitly named in the request. This avoids the problem of a -// partial request over a single stream for a subset of resources which would -// require generating a fresh version for acknowledgement. ADS flag requires -// snapshot consistency. For non-ADS case (and fetch), multiple partial -// requests are sent across multiple streams and re-using the snapshot version -// is OK. -// -// Logger is optional. -func NewSnapshotCache(ads bool, hash NodeHash, logger log.Logger) SnapshotCache { - return &snapshotCache{ - log: logger, - ads: ads, - snapshots: make(map[string]Snapshot), - status: make(map[string]*statusInfo), - hash: hash, - } -} - -// SetSnapshotCache updates a snapshot for a node. -func (cache *snapshotCache) SetSnapshot(node string, snapshot Snapshot) error { - cache.mu.Lock() - defer cache.mu.Unlock() - - // update the existing entry - cache.snapshots[node] = snapshot - - // trigger existing watches for which version changed - if info, ok := cache.status[node]; ok { - info.mu.Lock() - for id, watch := range info.watches { - version := snapshot.GetVersion(watch.Request.TypeUrl) - if version != watch.Request.VersionInfo { - if cache.log != nil { - cache.log.Debugf("respond open watch %d%v with new version %q", id, watch.Request.ResourceNames, version) - } - cache.respond(watch.Request, watch.Response, snapshot.GetResources(watch.Request.TypeUrl), version) - - // discard the watch - delete(info.watches, id) - } - } - info.mu.Unlock() - } - - return nil -} - -// GetSnapshots gets the snapshot for a node, and returns an error if not found. -func (cache *snapshotCache) GetSnapshot(node string) (Snapshot, error) { - cache.mu.RLock() - defer cache.mu.RUnlock() - - snap, ok := cache.snapshots[node] - if !ok { - return nil, fmt.Errorf("no snapshot found for node %s", node) - } - return snap, nil -} - -func (cache *snapshotCache) HasSnapshot(node string) bool { - cache.mu.RLock() - defer cache.mu.RUnlock() - - _, ok := cache.snapshots[node] - return ok -} - -// ClearSnapshot clears snapshot and info for a node. -func (cache *snapshotCache) ClearSnapshot(node string) Snapshot { - cache.mu.Lock() - defer cache.mu.Unlock() - - snapshot := cache.snapshots[node] - delete(cache.snapshots, node) - delete(cache.status, node) - return snapshot -} - -// nameSet creates a map from a string slice to value true. -func nameSet(names []string) map[string]bool { - set := make(map[string]bool) - for _, name := range names { - set[name] = true - } - return set -} - -// superset checks that all resources are listed in the names set. -func superset(names map[string]bool, resources map[string]types.Resource) error { - for resourceName := range resources { - if _, exists := names[resourceName]; !exists { - return fmt.Errorf("%q not listed", resourceName) - } - } - return nil -} - -func (cache *snapshotCache) CreateDeltaWatch(*envoy_cache.DeltaRequest, stream.StreamState, chan envoy_cache.DeltaResponse) func() { - return nil -} - -// CreateWatch returns a watch for an xDS request. -func (cache *snapshotCache) CreateWatch(request *envoy_cache.Request, _ stream.StreamState, responseChan chan envoy_cache.Response) func() { - nodeID := cache.hash.ID(request.Node) - - cache.mu.Lock() - defer cache.mu.Unlock() - info, ok := cache.status[nodeID] - if !ok { - info = newStatusInfo(request.Node) - cache.status[nodeID] = info - } - - // update last watch request time - info.mu.Lock() - info.lastWatchRequestTime = time.Now() - info.mu.Unlock() - - snapshot, exists := cache.snapshots[nodeID] - version := "" - if exists { - version = snapshot.GetVersion(request.TypeUrl) - } - - // if the requested version is up-to-date or missing a response, leave an open watch - if !exists || request.VersionInfo == version { - watchID := cache.nextWatchID() - if cache.log != nil { - cache.log.Debugf("open watch %d for %s%v from nodeID %q, version %q", watchID, - request.TypeUrl, request.ResourceNames, nodeID, request.VersionInfo) - } - info.mu.Lock() - info.watches[watchID] = ResponseWatch{Request: request, Response: responseChan} - info.mu.Unlock() - return cache.cancelWatch(nodeID, watchID) - } - - // otherwise, the watch may be responded immediately - cache.respond(request, responseChan, snapshot.GetResources(request.TypeUrl), version) - - return nil -} - -func (cache *snapshotCache) nextWatchID() int64 { - return atomic.AddInt64(&cache.watchCount, 1) -} - -// cancellation function for cleaning stale watches -func (cache *snapshotCache) cancelWatch(nodeID string, watchID int64) func() { - return func() { - // uses the envoy_cache mutex - cache.mu.Lock() - defer cache.mu.Unlock() - if info, ok := cache.status[nodeID]; ok { - info.mu.Lock() - delete(info.watches, watchID) - info.mu.Unlock() - } - } -} - -// Respond to a watch with the snapshot value. The value channel should have capacity not to block. -// TODO(kuat) do not respond always, see issue https://github.com/envoyproxy/go-control-plane/issues/46 -func (cache *snapshotCache) respond(request *envoy_cache.Request, value chan envoy_cache.Response, resources map[string]types.Resource, version string) { - // for ADS, the request names must match the snapshot names - // if they do not, then the watch is never responded, and it is expected that envoy makes another request - if len(request.ResourceNames) != 0 && cache.ads { - if err := superset(nameSet(request.ResourceNames), resources); err != nil { - if cache.log != nil { - cache.log.Debugf("ADS mode: not responding to request: %v", err) - } - return - } - } - if cache.log != nil { - cache.log.Debugf("respond %s%v version %q with version %q", - request.TypeUrl, request.ResourceNames, request.VersionInfo, version) - } - - value <- createResponse(request, resources, version) -} - -func createResponse(request *envoy_cache.Request, resources map[string]types.Resource, version string) envoy_cache.Response { - filtered := make([]types.ResourceWithTTL, 0, len(resources)) - - // Reply only with the requested resources. Envoy may ask each resource - // individually in a separate stream. It is ok to reply with the same version - // on separate streams since requests do not share their response versions. - if len(request.ResourceNames) != 0 { - set := nameSet(request.ResourceNames) - for name, resource := range resources { - if set[name] { - filtered = append(filtered, types.ResourceWithTTL{Resource: resource}) - } - } - } else { - for _, resource := range resources { - filtered = append(filtered, types.ResourceWithTTL{Resource: resource}) - } - } - - return &envoy_cache.RawResponse{ - Request: request, - Version: version, - Resources: filtered, - } -} - -// Fetch implements the envoy_cache fetch function. -// Fetch is called on multiple streams, so responding to individual names with the same version works. -// If there is a Deadline set on the context, the call will block until either the context is terminated -// or there is a new update. -func (cache *snapshotCache) Fetch(ctx context.Context, request *envoy_cache.Request) (envoy_cache.Response, error) { - if _, hasDeadline := ctx.Deadline(); hasDeadline { - return cache.blockingFetch(ctx, request) - } - - nodeID := cache.hash.ID(request.Node) - - cache.mu.RLock() - defer cache.mu.RUnlock() - - if snapshot, exists := cache.snapshots[nodeID]; exists { - // Respond only if the request version is distinct from the current snapshot state. - // It might be beneficial to hold the request since Envoy will re-attempt the refresh. - version := snapshot.GetVersion(request.TypeUrl) - if request.VersionInfo == version { - if cache.log != nil { - cache.log.Warnf("skip fetch: version up to date") - } - return nil, &types.SkipFetchError{} - } - - resources := snapshot.GetResources(request.TypeUrl) - out := createResponse(request, resources, version) - return out, nil - } - - return nil, fmt.Errorf("missing snapshot for %q", nodeID) -} - -// blockingFetch will wait until either the context is terminated or new resources become available -func (cache *snapshotCache) blockingFetch(ctx context.Context, request *envoy_cache.Request) (envoy_cache.Response, error) { - responseChan := make(chan envoy_cache.Response, 1) - cancelFunc := cache.CreateWatch(request, stream.StreamState{}, responseChan) - if cancelFunc != nil { - defer cancelFunc() - } - - select { - case <-ctx.Done(): - // finished without an update - return nil, &types.SkipFetchError{} - case resp := <-responseChan: - return resp, nil - } -} - -// GetStatusInfo retrieves the status info for the node. -func (cache *snapshotCache) GetStatusInfo(node string) StatusInfo { - cache.mu.RLock() - defer cache.mu.RUnlock() - - info, exists := cache.status[node] - if !exists { - if cache.log != nil { - cache.log.Warnf("node does not exist") - } - return nil - } - - return info -} - -// GetStatusKeys retrieves all node IDs in the status map. -func (cache *snapshotCache) GetStatusKeys() []string { - cache.mu.RLock() - defer cache.mu.RUnlock() - - out := make([]string, 0, len(cache.status)) - for id := range cache.status { - out = append(out, id) - } - - return out -} - -// NodeHash computes string identifiers for Envoy nodes. -type NodeHash interface { - // ID function defines a unique string identifier for the remote Envoy node. - ID(node *envoy_config_core_v3.Node) string -} - -// IDHash uses ID field as the node hash. -type IDHash struct{} - -// ID uses the node ID field -func (IDHash) ID(node *envoy_config_core_v3.Node) string { - if node == nil { - return "" - } - return node.Id -} - -var _ NodeHash = IDHash{} - -// StatusInfo tracks the server state for the remote Envoy node. -// Not all fields are used by all envoy_cache implementations. -type StatusInfo interface { - // GetNode returns the node metadata. - GetNode() *envoy_config_core_v3.Node - - // GetNumWatches returns the number of open watches. - GetNumWatches() int - - // GetLastWatchRequestTime returns the timestamp of the last discovery watch request. - GetLastWatchRequestTime() time.Time -} - -type statusInfo struct { - // node is the constant Envoy node metadata. - node *envoy_config_core_v3.Node - - // watches are indexed channels for the response watches and the original requests. - watches map[int64]ResponseWatch - - // the timestamp of the last watch request - lastWatchRequestTime time.Time - - // mutex to protect the status fields. - // should not acquire mutex of the parent envoy_cache after acquiring this mutex. - mu sync.RWMutex -} - -// ResponseWatch is a watch record keeping both the request and an open channel for the response. -type ResponseWatch struct { - // Request is the original request for the watch. - Request *envoy_cache.Request - - // Response is the channel to push responses to. - Response chan envoy_cache.Response -} - -// newStatusInfo initializes a status info data structure. -func newStatusInfo(node *envoy_config_core_v3.Node) *statusInfo { - out := statusInfo{ - node: node, - watches: make(map[int64]ResponseWatch), - } - return &out -} - -func (info *statusInfo) GetNode() *envoy_config_core_v3.Node { - info.mu.RLock() - defer info.mu.RUnlock() - return info.node -} - -func (info *statusInfo) GetNumWatches() int { - info.mu.RLock() - defer info.mu.RUnlock() - return len(info.watches) -} - -func (info *statusInfo) GetLastWatchRequestTime() time.Time { - info.mu.RLock() - defer info.mu.RUnlock() - return info.lastWatchRequestTime -} - -// SingleTypeSnapshotEqual checks value equality of 2 snapshots that contain a single type. -// This will panic if there is more than 1 type in the snapshot, it assumes the snapshots are equivalent -func SingleTypeSnapshotEqual(newSnap, oldSnap Snapshot) bool { - supportedTypes := newSnap.GetSupportedTypes() - if len(supportedTypes) != 1 { - panic(fmt.Sprintf("expected 1 supported type, got %v", supportedTypes)) - } - // For now there's a single resourceType so the diff is easy - newResources := newSnap.GetResources(supportedTypes[0]) - oldResources := oldSnap.GetResources(supportedTypes[0]) - if len(newResources) != len(oldResources) { - return false - } - for key, newValue := range newResources { - if oldValue, hasOldValue := oldResources[key]; !hasOldValue || !proto.Equal(newValue, oldValue) { - return false - } - } - return true -} diff --git a/pkg/util/xds/v3/fallback_node_hash.go b/pkg/util/xds/v3/fallback_node_hash.go new file mode 100644 index 000000000000..de01bffb539a --- /dev/null +++ b/pkg/util/xds/v3/fallback_node_hash.go @@ -0,0 +1,24 @@ +package v3 + +import ( + envoy_core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" + "github.com/envoyproxy/go-control-plane/pkg/cache/v3" +) + +// FallbackNodeHash is a hasher that will use as id either an item returned from a function or a fallback value +// this useful for having a fallback option when a nodeId is unknown or historically wasn't recorded +type FallBackNodeHash struct { + GetIds func() []string + DefaultId string +} + +var _ cache.NodeHash = &FallBackNodeHash{} + +func (h *FallBackNodeHash) ID(node *envoy_core.Node) string { + for _, id := range h.GetIds() { + if id == node.Id { + return id + } + } + return h.DefaultId +} diff --git a/pkg/util/xds/v3/single_type_snapshot.go b/pkg/util/xds/v3/single_type_snapshot.go new file mode 100644 index 000000000000..313687d5bd02 --- /dev/null +++ b/pkg/util/xds/v3/single_type_snapshot.go @@ -0,0 +1,149 @@ +package v3 + +import ( + "fmt" + + "github.com/envoyproxy/go-control-plane/pkg/cache/types" + envoy_cache "github.com/envoyproxy/go-control-plane/pkg/cache/v3" + "github.com/envoyproxy/go-control-plane/pkg/resource/v3" + "google.golang.org/protobuf/proto" +) + +// Snapshot is an internally consistent snapshot of xDS resources. +// Consistency is important for the convergence as different resource types +// from the snapshot may be delivered to the proxy in arbitrary order. +type SingleTypeSnapshot struct { + Resources envoy_cache.Resources + TypeUrl string + + // VersionMap holds the current hash map of all resources in the snapshot. + // This field should remain nil until it is used, at which point should be + // instantiated by calling ConstructVersionMap(). + // VersionMap is only to be used with delta xDS. + VersionMap map[string]map[string]string +} + +var _ envoy_cache.ResourceSnapshot = &SingleTypeSnapshot{} + +// NewSingleTypeSnapshot creates a snapshot from response types and a version. +// The resources map is keyed off the type URL of a resource, followed by the slice of resource objects. +func NewSingleTypeSnapshot(version string, typeURL string, resources []types.Resource) *SingleTypeSnapshot { + return &SingleTypeSnapshot{ + TypeUrl: typeURL, + Resources: envoy_cache.NewResources(version, resources), + } +} + +// NewSingleTypeSnapshotWithNamedResources creates a snapshot from response types and a version. +// The resources map is keyed off the type URL of a resource, followed by the slice of resource objects. +func NewSingleTypeSnapshotWithNamedResources(version string, typeURL string, resources map[string]types.Resource) *SingleTypeSnapshot { + resourcesByName := make(map[string]types.ResourceWithTTL, len(resources)) + for n, item := range resources { + resourcesByName[n] = types.ResourceWithTTL{Resource: item} + } + return &SingleTypeSnapshot{ + TypeUrl: typeURL, + Resources: envoy_cache.Resources{ + Version: version, + Items: resourcesByName, + }, + } +} + +// GetResources selects snapshot resources by type, returning the map of resources. +func (s *SingleTypeSnapshot) GetResources(typeURL resource.Type) map[string]types.Resource { + resources := s.GetResourcesAndTTL(typeURL) + if resources == nil { + return nil + } + + withoutTTL := make(map[string]types.Resource, len(resources)) + + for k, v := range resources { + withoutTTL[k] = v.Resource + } + + return withoutTTL +} + +// GetResourcesAndTTL selects snapshot resources by type, returning the map of resources and the associated TTL. +func (s *SingleTypeSnapshot) GetResourcesAndTTL(typeURL resource.Type) map[string]types.ResourceWithTTL { + if s == nil { + return nil + } + if typeURL != s.TypeUrl { + return nil + } + return s.Resources.Items +} + +// GetVersion returns the version for a resource type. +func (s *SingleTypeSnapshot) GetVersion(typeURL resource.Type) string { + if s == nil { + return "" + } + if typeURL != s.TypeUrl { + return "" + } + return s.Resources.Version +} + +// GetVersionMap will return the internal version map of the currently applied snapshot. +func (s *SingleTypeSnapshot) GetVersionMap(typeURL string) map[string]string { + return s.VersionMap[typeURL] +} + +// ConstructVersionMap will construct a version map based on the current state of a snapshot +func (s *SingleTypeSnapshot) ConstructVersionMap() error { + if s == nil { + return fmt.Errorf("missing snapshot") + } + + // The snapshot resources never change, so no need to ever rebuild. + if s.VersionMap != nil { + return nil + } + + s.VersionMap = make(map[string]map[string]string) + + s.VersionMap[s.TypeUrl] = make(map[string]string, len(s.Resources.Items)) + for _, r := range s.Resources.Items { + // Hash our version in here and build the version map. + marshaledResource, err := envoy_cache.MarshalResource(r.Resource) + if err != nil { + return err + } + v := envoy_cache.HashResource(marshaledResource) + if v == "" { + return fmt.Errorf("failed to build resource version: %w", err) + } + + s.VersionMap[s.TypeUrl][envoy_cache.GetResourceName(r.Resource)] = v + } + + return nil +} + +// SingleTypeSnapshotEqual checks value equality of 2 snapshots that contain a single type. +// This will panic if there is more than 1 type in the snapshot, it assumes the snapshots are equivalent +func SingleTypeSnapshotEqual(newSnap, oldSnap envoy_cache.ResourceSnapshot) bool { + var typeURL string + if stsnap, ok := newSnap.(*SingleTypeSnapshot); ok { + typeURL = stsnap.TypeUrl + } + if typeURL == "" { + panic("couldn't extract type from snapshot is this not a SingleTypeSnapshot?") + } + // For now there's a single resourceType so the diff is easy + newResources := newSnap.GetResources(typeURL) + oldResources := oldSnap.GetResources(typeURL) + if len(newResources) != len(oldResources) { + return false + } + for key, newValue := range newResources { + if oldValue, hasOldValue := oldResources[key]; !hasOldValue || !proto.Equal(newValue, oldValue) { + return false + } + } + return true +}