diff --git a/.changelog/4255.txt b/.changelog/4255.txt new file mode 100644 index 0000000000..1960697afa --- /dev/null +++ b/.changelog/4255.txt @@ -0,0 +1,3 @@ +```release-note:bug +sync-catalog: Enable the user to purge the registered services by passing parent node and necessary filters. +``` \ No newline at end of file diff --git a/control-plane/subcommand/sync-catalog/command.go b/control-plane/subcommand/sync-catalog/command.go index 004f1bea17..c3965165ed 100644 --- a/control-plane/subcommand/sync-catalog/command.go +++ b/control-plane/subcommand/sync-catalog/command.go @@ -12,16 +12,20 @@ import ( "os" "os/signal" "regexp" + "strings" "sync" "syscall" "time" "github.com/armon/go-metrics/prometheus" + "github.com/cenkalti/backoff" mapset "github.com/deckarep/golang-set" "github.com/hashicorp/consul-server-connection-manager/discovery" + "github.com/hashicorp/consul/api" "github.com/hashicorp/go-hclog" "github.com/mitchellh/cli" "github.com/prometheus/client_golang/prometheus/promhttp" + "golang.org/x/sync/errgroup" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" @@ -42,27 +46,29 @@ import ( type Command struct { UI cli.Ui - flags *flag.FlagSet - consul *flags.ConsulFlags - k8s *flags.K8SFlags - flagListen string - flagToConsul bool - flagToK8S bool - flagConsulDomain string - flagConsulK8STag string - flagConsulNodeName string - flagK8SDefault bool - flagK8SServicePrefix string - flagConsulServicePrefix string - flagK8SSourceNamespace string - flagK8SWriteNamespace string - flagConsulWritePeriod time.Duration - flagSyncClusterIPServices bool - flagSyncLBEndpoints bool - flagNodePortSyncType string - flagAddK8SNamespaceSuffix bool - flagLogLevel string - flagLogJSON bool + flags *flag.FlagSet + consul *flags.ConsulFlags + k8s *flags.K8SFlags + flagListen string + flagToConsul bool + flagToK8S bool + flagConsulDomain string + flagConsulK8STag string + flagConsulNodeName string + flagK8SDefault bool + flagK8SServicePrefix string + flagConsulServicePrefix string + flagK8SSourceNamespace string + flagK8SWriteNamespace string + flagConsulWritePeriod time.Duration + flagSyncClusterIPServices bool + flagSyncLBEndpoints bool + flagNodePortSyncType string + flagAddK8SNamespaceSuffix bool + flagLogLevel string + flagLogJSON bool + flagPurgeK8SServicesFromNode string + flagFilter string // Flags to support namespaces flagEnableNamespaces bool // Use namespacing on all components @@ -150,6 +156,11 @@ func (c *Command) init() { "\"debug\", \"info\", \"warn\", and \"error\".") c.flags.BoolVar(&c.flagLogJSON, "log-json", false, "Enable or disable JSON output format for logging.") + c.flags.StringVar(&c.flagPurgeK8SServicesFromNode, "purge-k8s-services-from-node", "", + "Specifies the name of the Consul node for which to deregister synced Kubernetes services.") + c.flags.StringVar(&c.flagFilter, "filter", "", + "Specifies the expression used to filter the services on the Consul node that will be deregistered. "+ + "The syntax for this filter is the same as the syntax used in the List Services for Node API in the Consul catalog.") c.flags.Var((*flags.AppendSliceValue)(&c.flagAllowK8sNamespacesList), "allow-k8s-namespace", "K8s namespaces to explicitly allow. May be specified multiple times.") @@ -268,6 +279,19 @@ func (c *Command) Run(args []string) int { } c.ready = true + if c.flagPurgeK8SServicesFromNode != "" { + consulClient, err := consul.NewClientFromConnMgr(consulConfig, c.connMgr) + if err != nil { + c.UI.Error(fmt.Sprintf("unable to instantiate consul client: %s", err)) + return 1 + } + if err := c.removeAllK8SServicesFromConsulNode(consulClient); err != nil { + c.UI.Error(fmt.Sprintf("unable to remove all K8S services: %s", err)) + return 1 + } + return 0 + } + // Convert allow/deny lists to sets allowSet := flags.ToSet(c.flagAllowK8sNamespacesList) denySet := flags.ToSet(c.flagDenyK8sNamespacesList) @@ -436,6 +460,70 @@ func (c *Command) Run(args []string) int { } } +// remove all k8s services from Consul. +func (c *Command) removeAllK8SServicesFromConsulNode(consulClient *api.Client) error { + node, _, err := consulClient.Catalog().NodeServiceList(c.flagPurgeK8SServicesFromNode, &api.QueryOptions{Filter: c.flagFilter}) + if err != nil { + return err + } + + var firstErr error + services := node.Services + batchSize := 300 + maxRetries := 2 + retryDelay := 200 * time.Millisecond + + // Ask for user confirmation before purging services + for { + c.UI.Info(fmt.Sprintf("Are you sure you want to delete %v K8S services from %v? (y/n): ", len(services), c.flagPurgeK8SServicesFromNode)) + var input string + fmt.Scanln(&input) + if input = strings.ToLower(input); input == "y" { + break + } else if input == "n" { + return nil + } else { + c.UI.Info("Invalid input. Please enter 'y' or 'n'.") + } + } + + for i := 0; i < len(services); i += batchSize { + end := i + batchSize + if end > len(services) { + end = len(services) + } + + var eg errgroup.Group + for _, service := range services[i:end] { + s := service + eg.Go(func() error { + var b backoff.BackOff = backoff.NewConstantBackOff(retryDelay) + b = backoff.WithMaxRetries(b, uint64(maxRetries)) + return backoff.Retry(func() error { + _, err := consulClient.Catalog().Deregister(&api.CatalogDeregistration{ + Node: c.flagPurgeK8SServicesFromNode, + ServiceID: s.ID, + }, nil) + return err + }, b) + }) + } + if err := eg.Wait(); err != nil { + if firstErr == nil { + c.UI.Info("Some K8S services were not deregistered from Consul") + firstErr = err + } + } + c.UI.Info(fmt.Sprintf("Processed %v K8S services from %v", end-i, c.flagPurgeK8SServicesFromNode)) + } + + if firstErr != nil { + return firstErr + } + c.UI.Info("All K8S services were deregistered from Consul") + return nil +} + func (c *Command) handleReady(rw http.ResponseWriter, _ *http.Request) { if !c.ready { c.UI.Error("[GET /health/ready] sync catalog controller is not yet ready") diff --git a/control-plane/subcommand/sync-catalog/command_test.go b/control-plane/subcommand/sync-catalog/command_test.go index ca2aca4e37..b6aa63a903 100644 --- a/control-plane/subcommand/sync-catalog/command_test.go +++ b/control-plane/subcommand/sync-catalog/command_test.go @@ -13,6 +13,7 @@ import ( "time" "github.com/armon/go-metrics/prometheus" + "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/sdk/testutil/retry" "github.com/hashicorp/go-hclog" "github.com/mitchellh/cli" @@ -557,6 +558,164 @@ func TestRun_ToConsulChangingFlags(t *testing.T) { } } +// Test services could be de-registered from Consul. +func TestRemoveAllK8SServicesFromConsul(t *testing.T) { + t.Parallel() + + k8s, testClient := completeSetup(t) + consulClient := testClient.APIClient + + // Create a mock reader to simulate user input + input := "y\n" + reader, writer, err := os.Pipe() + require.NoError(t, err) + oldStdin := os.Stdin + os.Stdin = reader + defer func() { os.Stdin = oldStdin }() + + // Write the simulated user input to the mock reader + go func() { + defer writer.Close() + _, err := writer.WriteString(input) + require.NoError(t, err) + }() + + // Run the command. + ui := cli.NewMockUi() + cmd := Command{ + UI: ui, + clientset: k8s, + logger: hclog.New(&hclog.LoggerOptions{ + Name: t.Name(), + Level: hclog.Debug, + }), + flagAllowK8sNamespacesList: []string{"*"}, + connMgr: testClient.Watcher, + } + + // create two services in k8s + _, err = k8s.CoreV1().Services("bar").Create(context.Background(), lbService("foo", "1.1.1.1"), metav1.CreateOptions{}) + require.NoError(t, err) + + _, err = k8s.CoreV1().Services("baz").Create(context.Background(), lbService("foo", "2.2.2.2"), metav1.CreateOptions{}) + require.NoError(t, err) + + longRunningChan := runCommandAsynchronously(&cmd, []string{ + "-addresses", "127.0.0.1", + "-http-port", strconv.Itoa(testClient.Cfg.HTTPPort), + "-consul-write-interval", "100ms", + "-add-k8s-namespace-suffix", + }) + defer stopCommand(t, &cmd, longRunningChan) + + // check that the two K8s services have been synced into Consul + retry.Run(t, func(r *retry.R) { + svc, _, err := consulClient.Catalog().Service("foo-bar", "k8s", nil) + require.NoError(r, err) + require.Len(r, svc, 1) + require.Equal(r, "1.1.1.1", svc[0].ServiceAddress) + svc, _, err = consulClient.Catalog().Service("foo-baz", "k8s", nil) + require.NoError(r, err) + require.Len(r, svc, 1) + require.Equal(r, "2.2.2.2", svc[0].ServiceAddress) + }) + + exitChan := runCommandAsynchronously(&cmd, []string{ + "-addresses", "127.0.0.1", + "-http-port", strconv.Itoa(testClient.Cfg.HTTPPort), + "-purge-k8s-services-from-node=k8s-sync", + }) + stopCommand(t, &cmd, exitChan) + + retry.Run(t, func(r *retry.R) { + serviceList, _, err := consulClient.Catalog().NodeServiceList("k8s-sync", &api.QueryOptions{AllowStale: false}) + require.NoError(r, err) + require.Len(r, serviceList.Services, 0) + }) +} + +// Test services could be de-registered from Consul with filter. +func TestRemoveAllK8SServicesFromConsulWithFilter(t *testing.T) { + t.Parallel() + + k8s, testClient := completeSetup(t) + consulClient := testClient.APIClient + + // Create a mock reader to simulate user input + input := "y\n" + reader, writer, err := os.Pipe() + require.NoError(t, err) + oldStdin := os.Stdin + os.Stdin = reader + defer func() { os.Stdin = oldStdin }() + + // Write the simulated user input to the mock reader + go func() { + defer writer.Close() + _, err := writer.WriteString(input) + require.NoError(t, err) + }() + + // Run the command. + ui := cli.NewMockUi() + cmd := Command{ + UI: ui, + clientset: k8s, + logger: hclog.New(&hclog.LoggerOptions{ + Name: t.Name(), + Level: hclog.Debug, + }), + flagAllowK8sNamespacesList: []string{"*"}, + connMgr: testClient.Watcher, + } + + // create two services in k8s + _, err = k8s.CoreV1().Services("bar").Create(context.Background(), lbService("foo", "1.1.1.1"), metav1.CreateOptions{}) + require.NoError(t, err) + _, err = k8s.CoreV1().Services("baz").Create(context.Background(), lbService("foo", "2.2.2.2"), metav1.CreateOptions{}) + require.NoError(t, err) + _, err = k8s.CoreV1().Services("bat").Create(context.Background(), lbService("foo", "3.3.3.3"), metav1.CreateOptions{}) + require.NoError(t, err) + + longRunningChan := runCommandAsynchronously(&cmd, []string{ + "-addresses", "127.0.0.1", + "-http-port", strconv.Itoa(testClient.Cfg.HTTPPort), + "-consul-write-interval", "100ms", + "-add-k8s-namespace-suffix", + }) + defer stopCommand(t, &cmd, longRunningChan) + + // check that the name of the service is namespaced + retry.Run(t, func(r *retry.R) { + svc, _, err := consulClient.Catalog().Service("foo-bar", "k8s", nil) + require.NoError(r, err) + require.Len(r, svc, 1) + require.Equal(r, "1.1.1.1", svc[0].ServiceAddress) + svc, _, err = consulClient.Catalog().Service("foo-baz", "k8s", nil) + require.NoError(r, err) + require.Len(r, svc, 1) + require.Equal(r, "2.2.2.2", svc[0].ServiceAddress) + svc, _, err = consulClient.Catalog().Service("foo-bat", "k8s", nil) + require.NoError(r, err) + require.Len(r, svc, 1) + require.Equal(r, "3.3.3.3", svc[0].ServiceAddress) + }) + + exitChan := runCommandAsynchronously(&cmd, []string{ + "-addresses", "127.0.0.1", + "-http-port", strconv.Itoa(testClient.Cfg.HTTPPort), + "-purge-k8s-services-from-node=k8s-sync", + "-filter=baz in ID", + }) + stopCommand(t, &cmd, exitChan) + + retry.Run(t, func(r *retry.R) { + serviceList, _, err := consulClient.Catalog().NodeServiceList("k8s-sync", &api.QueryOptions{AllowStale: false}) + require.NoError(r, err) + require.Len(r, serviceList.Services, 2) + }) +} + // Set up test consul agent and fake kubernetes cluster client. func completeSetup(t *testing.T) (*fake.Clientset, *test.TestServerClient) { k8s := fake.NewSimpleClientset()