diff --git a/.changelog/3137.txt b/.changelog/3137.txt new file mode 100644 index 0000000000..8fd8e0876b --- /dev/null +++ b/.changelog/3137.txt @@ -0,0 +1,3 @@ +```release-note:bug +control-plane: fixes an issue with the server-acl-init job where the job would fail on upgrades due to consul server ip address changes. +``` diff --git a/control-plane/consul/dynamic.go b/control-plane/consul/dynamic.go new file mode 100644 index 0000000000..36201701dd --- /dev/null +++ b/control-plane/consul/dynamic.go @@ -0,0 +1,65 @@ +package consul + +import ( + "time" + + capi "github.com/hashicorp/consul/api" +) + +type DynamicClient struct { + ConsulClient *capi.Client + Config *Config + watcher ServerConnectionManager +} + +func NewDynamicClientFromConnMgr(config *Config, watcher ServerConnectionManager) (*DynamicClient, error) { + client, err := NewClientFromConnMgr(config, watcher) + if err != nil { + return nil, err + } + return &DynamicClient{ + ConsulClient: client, + Config: config, + watcher: watcher, + }, nil +} + +func (d *DynamicClient) RefreshClient() error { + var err error + var client *capi.Client + // If the watcher is not set then we did not create the client using NewDynamicClientFromConnMgr and are using it in + // testing + // TODO: Use watcher in testing ;) + if d.watcher == nil { + return nil + } + client, err = NewClientFromConnMgr(d.Config, d.watcher) + if err != nil { + return err + } + d.ConsulClient = client + return nil +} + +func NewDynamicClientWithTimeout(config *capi.Config, consulAPITimeout time.Duration) (*DynamicClient, error) { + client, err := NewClient(config, consulAPITimeout) + if err != nil { + return nil, err + } + return &DynamicClient{ + ConsulClient: client, + Config: &Config{ + APIClientConfig: config, + }, + }, nil +} + +func NewDynamicClient(config *capi.Config) (*DynamicClient, error) { + // defaultTimeout is taken from flags.go.. + defaultTimeout := 5 * time.Second + client, err := NewDynamicClientWithTimeout(config, defaultTimeout) + if err != nil { + return nil, err + } + return client, nil +} diff --git a/control-plane/consul/dynamic_test.go b/control-plane/consul/dynamic_test.go new file mode 100644 index 0000000000..8159b82d66 --- /dev/null +++ b/control-plane/consul/dynamic_test.go @@ -0,0 +1,73 @@ +package consul + +import ( + "fmt" + "net" + "net/http" + "net/http/httptest" + "net/url" + "strconv" + "testing" + + "github.com/hashicorp/consul-server-connection-manager/discovery" + capi "github.com/hashicorp/consul/api" + "github.com/stretchr/testify/require" +) + +func TestRefreshDynamicClient(t *testing.T) { + // Create a server + consulServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + fmt.Fprintln(w, "\"leader\"") + })) + defer consulServer.Close() + + serverURL, err := url.Parse(consulServer.URL) + require.NoError(t, err) + serverIP := net.ParseIP(serverURL.Hostname()) + + port, err := strconv.Atoi(serverURL.Port()) + require.NoError(t, err) + + connMgr := &MockServerConnectionManager{} + + // Use a bad IP so that the client call fails + badState := discovery.State{ + Address: discovery.Addr{ + TCPAddr: net.TCPAddr{ + IP: net.ParseIP("126.0.0.1"), + Port: port, + }, + }, + } + + goodState := discovery.State{ + Address: discovery.Addr{ + TCPAddr: net.TCPAddr{ + IP: serverIP, + Port: port, + }, + }, + } + + // testify/mock has a weird behaviour when returning function calls. You cannot update On("State") to return + // something different but instead need to load up the returns. Here we are simulating a bad consul server manager + // state and then a good one + connMgr.On("State").Return(badState, nil).Once() + connMgr.On("State").Return(goodState, nil).Once() + + cfg := capi.DefaultConfig() + client, err := NewDynamicClientFromConnMgr(&Config{APIClientConfig: cfg, HTTPPort: port, GRPCPort: port}, connMgr) + require.NoError(t, err) + + // Make a request to the bad ip of the server + _, err = client.ConsulClient.Status().Leader() + require.Error(t, err) + require.Contains(t, err.Error(), "connection refused") + + // Refresh the client and make a call to the server now that consul-server-connection-manager state is good + err = client.RefreshClient() + require.NoError(t, err) + leader, err := client.ConsulClient.Status().Leader() + require.NoError(t, err) + require.Equal(t, "leader", leader) +} diff --git a/control-plane/subcommand/server-acl-init/anonymous_token.go b/control-plane/subcommand/server-acl-init/anonymous_token.go index 2f7ad6f513..b050558968 100644 --- a/control-plane/subcommand/server-acl-init/anonymous_token.go +++ b/control-plane/subcommand/server-acl-init/anonymous_token.go @@ -4,6 +4,7 @@ package serveraclinit import ( + "github.com/hashicorp/consul-k8s/control-plane/consul" "github.com/hashicorp/consul/api" ) @@ -14,8 +15,8 @@ const ( // configureAnonymousPolicy sets up policies and tokens so that Consul DNS and // cross-datacenter Consul connect calls will work. -func (c *Command) configureAnonymousPolicy(consulClient *api.Client) error { - exists, err := checkIfAnonymousTokenPolicyExists(consulClient) +func (c *Command) configureAnonymousPolicy(client *consul.DynamicClient) error { + exists, err := checkIfAnonymousTokenPolicyExists(client) if err != nil { c.log.Error("Error checking if anonymous token policy exists", "err", err) return err @@ -40,7 +41,7 @@ func (c *Command) configureAnonymousPolicy(consulClient *api.Client) error { err = c.untilSucceeds("creating anonymous token policy - PUT /v1/acl/policy", func() error { - return c.createOrUpdateACLPolicy(anonPolicy, consulClient) + return c.createOrUpdateACLPolicy(anonPolicy, client) }) if err != nil { return err @@ -55,13 +56,17 @@ func (c *Command) configureAnonymousPolicy(consulClient *api.Client) error { // Update anonymous token to include this policy return c.untilSucceeds("updating anonymous token with policy", func() error { - _, _, err := consulClient.ACL().TokenUpdate(&aToken, &api.WriteOptions{}) + err := client.RefreshClient() + if err != nil { + c.log.Error("could not refresh client", err) + } + _, _, err = client.ConsulClient.ACL().TokenUpdate(&aToken, &api.WriteOptions{}) return err }) } -func checkIfAnonymousTokenPolicyExists(consulClient *api.Client) (bool, error) { - token, _, err := consulClient.ACL().TokenRead(anonymousTokenAccessorID, nil) +func checkIfAnonymousTokenPolicyExists(client *consul.DynamicClient) (bool, error) { + token, _, err := client.ConsulClient.ACL().TokenRead(anonymousTokenAccessorID, nil) if err != nil { return false, err } diff --git a/control-plane/subcommand/server-acl-init/anonymous_token_test.go b/control-plane/subcommand/server-acl-init/anonymous_token_test.go index 8ed58c045f..41689a88c7 100644 --- a/control-plane/subcommand/server-acl-init/anonymous_token_test.go +++ b/control-plane/subcommand/server-acl-init/anonymous_token_test.go @@ -7,6 +7,7 @@ import ( "strings" "testing" + "github.com/hashicorp/consul-k8s/control-plane/consul" "github.com/hashicorp/consul/api" "github.com/mitchellh/cli" "github.com/stretchr/testify/require" @@ -39,16 +40,16 @@ func Test_configureAnonymousPolicy(t *testing.T) { require.Equal(t, 0, responseCode, ui.ErrorWriter.String()) bootToken := getBootToken(t, k8s, resourcePrefix, ns) - consul, err := api.NewClient(&api.Config{ + client, err := consul.NewDynamicClient(&api.Config{ Address: consulHTTPAddr, Token: bootToken, }) require.NoError(t, err) - err = cmd.configureAnonymousPolicy(consul) + err = cmd.configureAnonymousPolicy(client) require.NoError(t, err) - policy, _, err := consul.ACL().PolicyReadByName(anonymousTokenPolicyName, nil) + policy, _, err := client.ConsulClient.ACL().PolicyReadByName(anonymousTokenPolicyName, nil) require.NoError(t, err) testPolicy := api.ACLPolicy{ @@ -57,13 +58,13 @@ func Test_configureAnonymousPolicy(t *testing.T) { Description: "Anonymous token Policy", Rules: `acl = "read"`, } - readOnlyPolicy, _, err := consul.ACL().PolicyUpdate(&testPolicy, &api.WriteOptions{}) + readOnlyPolicy, _, err := client.ConsulClient.ACL().PolicyUpdate(&testPolicy, &api.WriteOptions{}) require.NoError(t, err) - err = cmd.configureAnonymousPolicy(consul) + err = cmd.configureAnonymousPolicy(client) require.NoError(t, err) - actualPolicy, _, err := consul.ACL().PolicyReadByName(anonymousTokenPolicyName, nil) + actualPolicy, _, err := client.ConsulClient.ACL().PolicyReadByName(anonymousTokenPolicyName, nil) require.NoError(t, err) // assert policy is still same. diff --git a/control-plane/subcommand/server-acl-init/command.go b/control-plane/subcommand/server-acl-init/command.go index e1bd3c2356..9ceb7f8a11 100644 --- a/control-plane/subcommand/server-acl-init/command.go +++ b/control-plane/subcommand/server-acl-init/command.go @@ -115,6 +115,9 @@ type Command struct { ctx context.Context retryDuration time.Duration + // the amount of time to contact the Consul API before timing out + apiTimeoutDuration time.Duration + // log log hclog.Logger @@ -233,6 +236,12 @@ func (c *Command) init() { if c.retryDuration == 0 { c.retryDuration = 1 * time.Second } + + // Most of the API calls are in an infinite loop until the command cancels. This timeout + // allows us to refresh the server IPs so that calls will succeed. + if c.apiTimeoutDuration == 0 { + c.apiTimeoutDuration = 2 * time.Minute + } } func (c *Command) Synopsis() string { return synopsis } @@ -302,18 +311,6 @@ func (c *Command) Run(args []string) int { } } - var ipAddrs []net.IPAddr - if err := backoff.Retry(func() error { - ipAddrs, err = netaddrs.IPAddrs(c.ctx, c.consulFlags.Addresses, c.log) - if err != nil { - c.log.Error("Error resolving IP Address", "err", err) - return err - } - return nil - }, exponentialBackoffWithMaxInterval()); err != nil { - c.UI.Error(err.Error()) - } - if err := c.configureSecretsBackend(); err != nil { c.log.Error(err.Error()) return 1 @@ -328,8 +325,22 @@ func (c *Command) Run(args []string) int { c.log.Info("ACL replication is enabled so skipping Consul server ACL bootstrapping") bootstrapToken = aclReplicationToken } else { - bootstrapToken, err = c.bootstrapServers(ipAddrs, c.backend) - if err != nil { + // During upgrades, there is a rare case where a consul-server statefulset may be rotated out while the + // bootstrap tokens are being updated. Catch this case, refresh the server ip addresses and try again. + if err := backoff.Retry(func() error { + ipAddrs, err := c.serverIPAddresses() + if err != nil { + c.log.Error(err.Error()) + return err + } + + bootstrapToken, err = c.bootstrapServers(ipAddrs, c.backend) + if err != nil { + c.log.Error(err.Error()) + return err + } + return nil + }, exponentialBackoffWithMaxInterval()); err != nil { c.log.Error(err.Error()) return 1 } @@ -363,12 +374,12 @@ func (c *Command) Run(args []string) int { return 1 } - consulClient, err := consul.NewClientFromConnMgrState(c.consulFlags.ConsulClientConfig(), c.state) + dynamicClient, err := consul.NewDynamicClientFromConnMgr(c.consulFlags.ConsulClientConfig(), watcher) if err != nil { c.log.Error(fmt.Sprintf("Error creating Consul client for addr %q: %s", c.state.Address, err)) return 1 } - consulDC, primaryDC, err := c.consulDatacenterList(consulClient) + consulDC, primaryDC, err := c.consulDatacenterList(dynamicClient) if err != nil { c.log.Error("Error getting datacenter name", "err", err) return 1 @@ -379,9 +390,9 @@ func (c *Command) Run(args []string) int { if c.consulFlags.Partition == consulDefaultPartition && primary { // Partition token is local because only the Primary datacenter can have Admin Partitions. if c.flagPartitionTokenFile != "" { - err = c.createACLWithSecretID("partitions", partitionRules, consulDC, primary, consulClient, partitionToken, true) + err = c.createACLWithSecretID("partitions", partitionRules, consulDC, primary, dynamicClient, partitionToken, true) } else { - err = c.createLocalACL("partitions", partitionRules, consulDC, primary, consulClient) + err = c.createLocalACL("partitions", partitionRules, consulDC, primary, dynamicClient) } if err != nil { c.log.Error(err.Error()) @@ -408,7 +419,7 @@ func (c *Command) Run(args []string) int { } err = c.untilSucceeds(fmt.Sprintf("creating %s policy", policyTmpl.Name), func() error { - return c.createOrUpdateACLPolicy(policyTmpl, consulClient) + return c.createOrUpdateACLPolicy(policyTmpl, dynamicClient) }) if err != nil { c.log.Error("Error creating or updating the cross namespace policy", "err", err) @@ -425,7 +436,7 @@ func (c *Command) Run(args []string) int { Name: consulDefaultNamespace, ACLs: &aclConfig, } - _, _, err = consulClient.Namespaces().Update(&consulNamespace, &api.WriteOptions{}) + _, _, err = dynamicClient.ConsulClient.Namespaces().Update(&consulNamespace, &api.WriteOptions{}) if err != nil { if strings.Contains(strings.ToLower(err.Error()), "unexpected response code: 404") { // If this returns a 404 it's most likely because they're not running @@ -441,7 +452,7 @@ func (c *Command) Run(args []string) int { // Create the component auth method, this is the auth method that Consul components will use // to issue an `ACL().Login()` against at startup, for local tokens. localComponentAuthMethodName := c.withPrefix("k8s-component-auth-method") - err = c.configureLocalComponentAuthMethod(consulClient, localComponentAuthMethodName) + err = c.configureLocalComponentAuthMethod(dynamicClient, localComponentAuthMethodName) if err != nil { c.log.Error(err.Error()) return 1 @@ -449,7 +460,7 @@ func (c *Command) Run(args []string) int { globalComponentAuthMethodName := fmt.Sprintf("%s-%s", localComponentAuthMethodName, consulDC) if !primary && c.flagAuthMethodHost != "" { - err = c.configureGlobalComponentAuthMethod(consulClient, globalComponentAuthMethodName, primaryDC) + err = c.configureGlobalComponentAuthMethod(dynamicClient, globalComponentAuthMethodName, primaryDC) if err != nil { c.log.Error(err.Error()) return 1 @@ -464,7 +475,7 @@ func (c *Command) Run(args []string) int { } serviceAccountName := c.withPrefix("client") - err = c.createACLPolicyRoleAndBindingRule("client", agentRules, consulDC, primaryDC, false, primary, localComponentAuthMethodName, serviceAccountName, consulClient) + err = c.createACLPolicyRoleAndBindingRule("client", agentRules, consulDC, primaryDC, false, primary, localComponentAuthMethodName, serviceAccountName, dynamicClient) if err != nil { c.log.Error(err.Error()) return 1 @@ -480,7 +491,7 @@ func (c *Command) Run(args []string) int { if c.consulFlags.Partition != "" { anonTokenConfig.APIClientConfig.Partition = consulDefaultPartition } - anonTokenClient, err := consul.NewClientFromConnMgrState(anonTokenConfig, c.state) + anonTokenClient, err := consul.NewDynamicClientFromConnMgr(anonTokenConfig, watcher) if err != nil { c.log.Error(err.Error()) return 1 @@ -511,9 +522,9 @@ func (c *Command) Run(args []string) int { if !primary { componentAuthMethodName = globalComponentAuthMethodName } - err = c.createACLPolicyRoleAndBindingRule("sync-catalog", syncRules, consulDC, primaryDC, globalPolicy, primary, componentAuthMethodName, serviceAccountName, consulClient) + err = c.createACLPolicyRoleAndBindingRule("sync-catalog", syncRules, consulDC, primaryDC, globalPolicy, primary, componentAuthMethodName, serviceAccountName, dynamicClient) } else { - err = c.createACLPolicyRoleAndBindingRule("sync-catalog", syncRules, consulDC, primaryDC, localPolicy, primary, componentAuthMethodName, serviceAccountName, consulClient) + err = c.createACLPolicyRoleAndBindingRule("sync-catalog", syncRules, consulDC, primaryDC, localPolicy, primary, componentAuthMethodName, serviceAccountName, dynamicClient) } if err != nil { c.log.Error(err.Error()) @@ -523,7 +534,7 @@ func (c *Command) Run(args []string) int { if c.flagConnectInject { connectAuthMethodName := c.withPrefix("k8s-auth-method") - err := c.configureConnectInjectAuthMethod(consulClient, connectAuthMethodName) + err := c.configureConnectInjectAuthMethod(dynamicClient, connectAuthMethodName) if err != nil { c.log.Error(err.Error()) return 1 @@ -545,7 +556,7 @@ func (c *Command) Run(args []string) int { if !primary { componentAuthMethodName = globalComponentAuthMethodName } - err = c.createACLPolicyRoleAndBindingRule("connect-inject", injectRules, consulDC, primaryDC, globalPolicy, primary, componentAuthMethodName, serviceAccountName, consulClient) + err = c.createACLPolicyRoleAndBindingRule("connect-inject", injectRules, consulDC, primaryDC, globalPolicy, primary, componentAuthMethodName, serviceAccountName, dynamicClient) if err != nil { c.log.Error(err.Error()) return 1 @@ -555,9 +566,9 @@ func (c *Command) Run(args []string) int { if c.flagCreateEntLicenseToken { var err error if c.consulFlags.Partition != "" { - err = c.createLocalACL("enterprise-license", entPartitionLicenseRules, consulDC, primary, consulClient) + err = c.createLocalACL("enterprise-license", entPartitionLicenseRules, consulDC, primary, dynamicClient) } else { - err = c.createLocalACL("enterprise-license", entLicenseRules, consulDC, primary, consulClient) + err = c.createLocalACL("enterprise-license", entLicenseRules, consulDC, primary, dynamicClient) } if err != nil { c.log.Error(err.Error()) @@ -567,7 +578,7 @@ func (c *Command) Run(args []string) int { if c.flagSnapshotAgent { serviceAccountName := c.withPrefix("server") - if err := c.createACLPolicyRoleAndBindingRule("snapshot-agent", snapshotAgentRules, consulDC, primaryDC, localPolicy, primary, localComponentAuthMethodName, serviceAccountName, consulClient); err != nil { + if err := c.createACLPolicyRoleAndBindingRule("snapshot-agent", snapshotAgentRules, consulDC, primaryDC, localPolicy, primary, localComponentAuthMethodName, serviceAccountName, dynamicClient); err != nil { c.log.Error(err.Error()) return 1 } @@ -588,7 +599,7 @@ func (c *Command) Run(args []string) int { if !primary { authMethodName = globalComponentAuthMethodName } - err = c.createACLPolicyRoleAndBindingRule("api-gateway-controller", rules, consulDC, primaryDC, globalPolicy, primary, authMethodName, serviceAccountName, consulClient) + err = c.createACLPolicyRoleAndBindingRule("api-gateway-controller", rules, consulDC, primaryDC, globalPolicy, primary, authMethodName, serviceAccountName, dynamicClient) if err != nil { c.log.Error(err.Error()) return 1 @@ -609,7 +620,7 @@ func (c *Command) Run(args []string) int { if !primary { authMethodName = globalComponentAuthMethodName } - err = c.createACLPolicyRoleAndBindingRule("mesh-gateway", rules, consulDC, primaryDC, globalPolicy, primary, authMethodName, serviceAccountName, consulClient) + err = c.createACLPolicyRoleAndBindingRule("mesh-gateway", rules, consulDC, primaryDC, globalPolicy, primary, authMethodName, serviceAccountName, dynamicClient) if err != nil { c.log.Error(err.Error()) return 1 @@ -626,7 +637,7 @@ func (c *Command) Run(args []string) int { PrimaryDC: primaryDC, Primary: primary, } - err := c.configureGateway(params, consulClient) + err := c.configureGateway(params, dynamicClient) if err != nil { c.log.Error(err.Error()) return 1 @@ -643,7 +654,7 @@ func (c *Command) Run(args []string) int { PrimaryDC: primaryDC, Primary: primary, } - err := c.configureGateway(params, consulClient) + err := c.configureGateway(params, dynamicClient) if err != nil { c.log.Error(err.Error()) return 1 @@ -659,9 +670,9 @@ func (c *Command) Run(args []string) int { // Policy must be global because it replicates from the primary DC // and so the primary DC needs to be able to accept the token. if aclReplicationToken != "" { - err = c.createACLWithSecretID(common.ACLReplicationTokenName, rules, consulDC, primary, consulClient, aclReplicationToken, false) + err = c.createACLWithSecretID(common.ACLReplicationTokenName, rules, consulDC, primary, dynamicClient, aclReplicationToken, false) } else { - err = c.createGlobalACL(common.ACLReplicationTokenName, rules, consulDC, primary, consulClient) + err = c.createGlobalACL(common.ACLReplicationTokenName, rules, consulDC, primary, dynamicClient) } if err != nil { c.log.Error(err.Error()) @@ -685,7 +696,7 @@ func exponentialBackoffWithMaxInterval() *backoff.ExponentialBackOff { // configureGlobalComponentAuthMethod sets up an AuthMethod in the primary datacenter, // that the Consul components will use to issue global ACL tokens with. -func (c *Command) configureGlobalComponentAuthMethod(consulClient *api.Client, authMethodName, primaryDC string) error { +func (c *Command) configureGlobalComponentAuthMethod(client *consul.DynamicClient, authMethodName, primaryDC string) error { // Create the auth method template. This requires calls to the kubernetes environment. authMethod, err := c.createAuthMethodTmpl(authMethodName, false) if err != nil { @@ -693,29 +704,33 @@ func (c *Command) configureGlobalComponentAuthMethod(consulClient *api.Client, a } authMethod.TokenLocality = "global" writeOptions := &api.WriteOptions{Datacenter: primaryDC} - return c.createAuthMethod(consulClient, &authMethod, writeOptions) + return c.createAuthMethod(client, &authMethod, writeOptions) } // configureLocalComponentAuthMethod sets up an AuthMethod in the same datacenter, // that the Consul components will use to issue local ACL tokens with. -func (c *Command) configureLocalComponentAuthMethod(consulClient *api.Client, authMethodName string) error { +func (c *Command) configureLocalComponentAuthMethod(client *consul.DynamicClient, authMethodName string) error { // Create the auth method template. This requires calls to the kubernetes environment. authMethod, err := c.createAuthMethodTmpl(authMethodName, false) if err != nil { return err } - return c.createAuthMethod(consulClient, &authMethod, &api.WriteOptions{}) + return c.createAuthMethod(client, &authMethod, &api.WriteOptions{}) } // createAuthMethod creates the desired Authmethod. -func (c *Command) createAuthMethod(consulClient *api.Client, authMethod *api.ACLAuthMethod, writeOptions *api.WriteOptions) error { +func (c *Command) createAuthMethod(client *consul.DynamicClient, authMethod *api.ACLAuthMethod, writeOptions *api.WriteOptions) error { return c.untilSucceeds(fmt.Sprintf("creating auth method %s", authMethod.Name), func() error { var err error + err = client.RefreshClient() + if err != nil { + c.log.Error("could not refresh client", err) + } // `AuthMethodCreate` will also be able to update an existing // AuthMethod based on the name provided. This means that any // configuration changes will correctly update the AuthMethod. - _, _, err = consulClient.ACL().AuthMethodCreate(authMethod, writeOptions) + _, _, err = client.ConsulClient.ACL().AuthMethodCreate(authMethod, writeOptions) return err }) } @@ -740,7 +755,7 @@ type ConfigureGatewayParams struct { Primary bool } -func (c *Command) configureGateway(gatewayParams ConfigureGatewayParams, consulClient *api.Client) error { +func (c *Command) configureGateway(gatewayParams ConfigureGatewayParams, client *consul.DynamicClient) error { // Each gateway needs to be configured // separately because users may need to attach different policies // to each gateway role depending on what services it represents. @@ -795,7 +810,7 @@ func (c *Command) configureGateway(gatewayParams ConfigureGatewayParams, consulC serviceAccountName := c.withPrefix(name) err = c.createACLPolicyRoleAndBindingRule(name, rules, gatewayParams.ConsulDC, gatewayParams.PrimaryDC, localPolicy, - gatewayParams.Primary, gatewayParams.AuthMethodName, serviceAccountName, consulClient) + gatewayParams.Primary, gatewayParams.AuthMethodName, serviceAccountName, client) if err != nil { c.log.Error(err.Error()) return err @@ -866,8 +881,10 @@ func (c *Command) configureSecretsBackend() error { } // untilSucceeds runs op until it returns a nil error. +// If c.timeoutDuration is reached it will exit so that the command can be retried with updated server settings // If c.cmdTimeout is cancelled it will exit. func (c *Command) untilSucceeds(opName string, op func() error) error { + timeoutCh := time.After(c.apiTimeoutDuration) for { err := op() if err == nil { @@ -881,6 +898,8 @@ func (c *Command) untilSucceeds(opName string, op func() error) error { select { case <-time.After(c.retryDuration): continue + case <-timeoutCh: + return errors.New("reached api timeout") case <-c.ctx.Done(): return errors.New("reached command timeout") } @@ -896,12 +915,16 @@ func (c *Command) withPrefix(resource string) string { // consulDatacenterList returns the current datacenter name and the primary datacenter using the // /agent/self API endpoint. -func (c *Command) consulDatacenterList(client *api.Client) (string, string, error) { +func (c *Command) consulDatacenterList(client *consul.DynamicClient) (string, string, error) { var agentCfg map[string]map[string]interface{} err := c.untilSucceeds("calling /agent/self to get datacenter", func() error { var opErr error - agentCfg, opErr = client.Agent().Self() + opErr = client.RefreshClient() + if opErr != nil { + c.log.Error("could not refresh client", opErr) + } + agentCfg, opErr = client.ConsulClient.Agent().Self() return opErr }) if err != nil { @@ -1032,6 +1055,25 @@ func (c *Command) quitVaultAgent() { } } +// serverIPAddresses attempts to refresh the server IPs using netaddrs methods. These 'raw' IPs are used +// when boostrapping ACLs and before consul-server-connection-manager runs. +func (c *Command) serverIPAddresses() ([]net.IPAddr, error) { + var ipAddrs []net.IPAddr + var err error + if err = backoff.Retry(func() error { + ipAddrs, err = netaddrs.IPAddrs(c.ctx, c.consulFlags.Addresses, c.log) + if err != nil { + c.log.Error("Error resolving IP Address", "err", err) + return err + } + c.log.Info("Refreshing server IP addresses", "addresses", ipAddrs) + return nil + }, exponentialBackoffWithMaxInterval()); err != nil { + return nil, err + } + return ipAddrs, nil +} + const ( consulDefaultNamespace = "default" consulDefaultPartition = "default" diff --git a/control-plane/subcommand/server-acl-init/command_test.go b/control-plane/subcommand/server-acl-init/command_test.go index 928a50e285..af15429508 100644 --- a/control-plane/subcommand/server-acl-init/command_test.go +++ b/control-plane/subcommand/server-acl-init/command_test.go @@ -30,6 +30,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes/fake" + "github.com/hashicorp/consul-k8s/control-plane/consul" "github.com/hashicorp/consul-k8s/control-plane/helper/cert" "github.com/hashicorp/consul-k8s/control-plane/helper/test" "github.com/hashicorp/consul-k8s/control-plane/subcommand/common" @@ -1389,14 +1390,14 @@ func TestConsulDatacenterList(t *testing.T) { })) defer consulServer.Close() - consulClient, err := api.NewClient(&api.Config{Address: consulServer.URL}) + client, err := consul.NewDynamicClient(&api.Config{Address: consulServer.URL}) require.NoError(t, err) command := Command{ log: hclog.New(hclog.DefaultOptions), ctx: context.Background(), } - actDC, actPrimaryDC, err := command.consulDatacenterList(consulClient) + actDC, actPrimaryDC, err := command.consulDatacenterList(client) if c.expErr != "" { require.EqualError(t, err, c.expErr) } else { diff --git a/control-plane/subcommand/server-acl-init/connect_inject.go b/control-plane/subcommand/server-acl-init/connect_inject.go index f853144a2c..0e373d2ea5 100644 --- a/control-plane/subcommand/server-acl-init/connect_inject.go +++ b/control-plane/subcommand/server-acl-init/connect_inject.go @@ -10,6 +10,7 @@ import ( apiv1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "github.com/hashicorp/consul-k8s/control-plane/consul" "github.com/hashicorp/consul-k8s/control-plane/namespaces" ) @@ -21,7 +22,7 @@ const defaultKubernetesHost = "https://kubernetes.default.svc" // configureConnectInject sets up auth methods so that connect injection will // work. -func (c *Command) configureConnectInjectAuthMethod(consulClient *api.Client, authMethodName string) error { +func (c *Command) configureConnectInjectAuthMethod(client *consul.DynamicClient, authMethodName string) error { // Create the auth method template. This requires calls to the // kubernetes environment. @@ -47,7 +48,11 @@ func (c *Command) configureConnectInjectAuthMethod(consulClient *api.Client, aut err = c.untilSucceeds(fmt.Sprintf("checking or creating namespace %s", c.flagConsulInjectDestinationNamespace), func() error { - _, err := namespaces.EnsureExists(consulClient, c.flagConsulInjectDestinationNamespace, "cross-namespace-policy") + err = client.RefreshClient() + if err != nil { + c.log.Error("could not refresh client", err) + } + _, err := namespaces.EnsureExists(client.ConsulClient, c.flagConsulInjectDestinationNamespace, "cross-namespace-policy") return err }) if err != nil { @@ -59,10 +64,14 @@ func (c *Command) configureConnectInjectAuthMethod(consulClient *api.Client, aut err = c.untilSucceeds(fmt.Sprintf("creating auth method %s", authMethodTmpl.Name), func() error { var err error + err = client.RefreshClient() + if err != nil { + c.log.Error("could not refresh client", err) + } // `AuthMethodCreate` will also be able to update an existing // AuthMethod based on the name provided. This means that any // configuration changes will correctly update the AuthMethod. - _, _, err = consulClient.ACL().AuthMethodCreate(&authMethodTmpl, &writeOptions) + _, _, err = client.ConsulClient.ACL().AuthMethodCreate(&authMethodTmpl, &writeOptions) return err }) if err != nil { @@ -92,7 +101,7 @@ func (c *Command) configureConnectInjectAuthMethod(consulClient *api.Client, aut } } - return c.createConnectBindingRule(consulClient, authMethodName, &abr) + return c.createConnectBindingRule(client, authMethodName, &abr) } // createAuthMethodTmpl sets up the auth method template based on the connect-injector's service account diff --git a/control-plane/subcommand/server-acl-init/create_or_update.go b/control-plane/subcommand/server-acl-init/create_or_update.go index 8099e6e9c6..254fba4964 100644 --- a/control-plane/subcommand/server-acl-init/create_or_update.go +++ b/control-plane/subcommand/server-acl-init/create_or_update.go @@ -11,13 +11,14 @@ import ( apiv1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "github.com/hashicorp/consul-k8s/control-plane/consul" "github.com/hashicorp/consul-k8s/control-plane/subcommand/common" ) // createACLPolicyRoleAndBindingRule will create the ACL Policy for the component // then create a set of ACLRole and ACLBindingRule which tie the component's serviceaccount // to the authMethod, allowing the serviceaccount to later be allowed to issue a Consul Login. -func (c *Command) createACLPolicyRoleAndBindingRule(componentName, rules, dc, primaryDC string, global, primary bool, authMethodName, serviceAccountName string, client *api.Client) error { +func (c *Command) createACLPolicyRoleAndBindingRule(componentName, rules, dc, primaryDC string, global, primary bool, authMethodName, serviceAccountName string, client *consul.DynamicClient) error { // Create policy with the given rules. policyName := fmt.Sprintf("%s-policy", componentName) if c.flagFederation && !primary { @@ -55,7 +56,7 @@ func (c *Command) createACLPolicyRoleAndBindingRule(componentName, rules, dc, pr } // addRoleAndBindingRule adds an ACLRole and ACLBindingRule which reference the authMethod. -func (c *Command) addRoleAndBindingRule(client *api.Client, componentName, serviceAccountName, authMethodName string, policies []*api.ACLRolePolicyLink, global, primary bool, primaryDC, dc string) error { +func (c *Command) addRoleAndBindingRule(client *consul.DynamicClient, componentName, serviceAccountName, authMethodName string, policies []*api.ACLRolePolicyLink, global, primary bool, primaryDC, dc string) error { // This is the ACLRole which will allow the component which uses the serviceaccount // to be able to do a consul login. aclRoleName := c.withPrefix(fmt.Sprintf("%s-acl-role", componentName)) @@ -92,24 +93,28 @@ func (c *Command) addRoleAndBindingRule(client *api.Client, componentName, servi // updateOrCreateACLRole will query to see if existing role is in place and update them // or create them if they do not yet exist. -func (c *Command) updateOrCreateACLRole(client *api.Client, role *api.ACLRole) error { +func (c *Command) updateOrCreateACLRole(client *consul.DynamicClient, role *api.ACLRole) error { err := c.untilSucceeds(fmt.Sprintf("update or create acl role for %s", role.Name), func() error { var err error - aclRole, _, err := client.ACL().RoleReadByName(role.Name, &api.QueryOptions{}) + err = client.RefreshClient() + if err != nil { + c.log.Error("could not refresh client", err) + } + aclRole, _, err := client.ConsulClient.ACL().RoleReadByName(role.Name, &api.QueryOptions{}) if err != nil { c.log.Error("unable to read ACL Roles", err) return err } if aclRole != nil { - _, _, err := client.ACL().RoleUpdate(aclRole, &api.WriteOptions{}) + _, _, err := client.ConsulClient.ACL().RoleUpdate(aclRole, &api.WriteOptions{}) if err != nil { c.log.Error("unable to update role", err) return err } return nil } - _, _, err = client.ACL().RoleCreate(role, &api.WriteOptions{}) + _, _, err = client.ConsulClient.ACL().RoleCreate(role, &api.WriteOptions{}) if err != nil { c.log.Error("unable to create role", err) return err @@ -121,7 +126,7 @@ func (c *Command) updateOrCreateACLRole(client *api.Client, role *api.ACLRole) e // createConnectBindingRule will query to see if existing binding rules are in place and update them // or create them if they do not yet exist. -func (c *Command) createConnectBindingRule(client *api.Client, authMethodName string, abr *api.ACLBindingRule) error { +func (c *Command) createConnectBindingRule(client *consul.DynamicClient, authMethodName string, abr *api.ACLBindingRule) error { // Binding rule list api call query options. queryOptions := api.QueryOptions{} @@ -136,12 +141,16 @@ func (c *Command) createConnectBindingRule(client *api.Client, authMethodName st return c.createOrUpdateBindingRule(client, authMethodName, abr, &queryOptions, nil) } -func (c *Command) createOrUpdateBindingRule(client *api.Client, authMethodName string, abr *api.ACLBindingRule, queryOptions *api.QueryOptions, writeOptions *api.WriteOptions) error { +func (c *Command) createOrUpdateBindingRule(client *consul.DynamicClient, authMethodName string, abr *api.ACLBindingRule, queryOptions *api.QueryOptions, writeOptions *api.WriteOptions) error { var existingRules []*api.ACLBindingRule err := c.untilSucceeds(fmt.Sprintf("listing binding rules for auth method %s", authMethodName), func() error { var err error - existingRules, _, err = client.ACL().BindingRuleList(authMethodName, queryOptions) + err = client.RefreshClient() + if err != nil { + c.log.Error("could not refresh client", err) + } + existingRules, _, err = client.ConsulClient.ACL().BindingRuleList(authMethodName, queryOptions) return err }) if err != nil { @@ -170,13 +179,21 @@ func (c *Command) createOrUpdateBindingRule(client *api.Client, authMethodName s c.log.Info("unable to find a matching ACL binding rule to update. creating ACL binding rule.") err = c.untilSucceeds(fmt.Sprintf("creating acl binding rule for %s", authMethodName), func() error { - _, _, err := client.ACL().BindingRuleCreate(abr, writeOptions) + err = client.RefreshClient() + if err != nil { + c.log.Error("could not refresh client", err) + } + _, _, err := client.ConsulClient.ACL().BindingRuleCreate(abr, writeOptions) return err }) } else { err = c.untilSucceeds(fmt.Sprintf("updating acl binding rule for %s", authMethodName), func() error { - _, _, err := client.ACL().BindingRuleUpdate(abr, writeOptions) + err = client.RefreshClient() + if err != nil { + c.log.Error("could not refresh client", err) + } + _, _, err := client.ConsulClient.ACL().BindingRuleUpdate(abr, writeOptions) return err }) } @@ -184,7 +201,11 @@ func (c *Command) createOrUpdateBindingRule(client *api.Client, authMethodName s // Otherwise create the binding rule err = c.untilSucceeds(fmt.Sprintf("creating acl binding rule for %s", authMethodName), func() error { - _, _, err := client.ACL().BindingRuleCreate(abr, writeOptions) + err = client.RefreshClient() + if err != nil { + c.log.Error("could not refresh client", err) + } + _, _, err := client.ConsulClient.ACL().BindingRuleCreate(abr, writeOptions) return err }) } @@ -193,19 +214,19 @@ func (c *Command) createOrUpdateBindingRule(client *api.Client, authMethodName s // createLocalACL creates a policy and acl token for this dc (datacenter), i.e. // the policy is only valid for this datacenter and the token is a local token. -func (c *Command) createLocalACL(name, rules, dc string, isPrimary bool, consulClient *api.Client) error { +func (c *Command) createLocalACL(name, rules, dc string, isPrimary bool, consulClient *consul.DynamicClient) error { return c.createACL(name, rules, true, dc, isPrimary, consulClient, "") } // createGlobalACL creates a global policy and acl token. The policy is valid // for all datacenters and the token is global. dc must be passed because the // policy name may have the datacenter name appended. -func (c *Command) createGlobalACL(name, rules, dc string, isPrimary bool, consulClient *api.Client) error { +func (c *Command) createGlobalACL(name, rules, dc string, isPrimary bool, consulClient *consul.DynamicClient) error { return c.createACL(name, rules, false, dc, isPrimary, consulClient, "") } // createACLWithSecretID creates a global policy and acl token with provided secret ID. -func (c *Command) createACLWithSecretID(name, rules, dc string, isPrimary bool, consulClient *api.Client, secretID string, local bool) error { +func (c *Command) createACLWithSecretID(name, rules, dc string, isPrimary bool, consulClient *consul.DynamicClient, secretID string, local bool) error { return c.createACL(name, rules, local, dc, isPrimary, consulClient, secretID) } @@ -215,7 +236,7 @@ func (c *Command) createACLWithSecretID(name, rules, dc string, isPrimary bool, // When secretID is provided, we will use that value for the created token and // will skip writing it to a Kubernetes secret (because in this case we assume that // this value already exists in some secrets storage). -func (c *Command) createACL(name, rules string, localToken bool, dc string, isPrimary bool, consulClient *api.Client, secretID string) error { +func (c *Command) createACL(name, rules string, localToken bool, dc string, isPrimary bool, client *consul.DynamicClient, secretID string) error { // Create policy with the given rules. policyName := fmt.Sprintf("%s-token", name) if c.flagFederation && !isPrimary { @@ -235,7 +256,7 @@ func (c *Command) createACL(name, rules string, localToken bool, dc string, isPr } err := c.untilSucceeds(fmt.Sprintf("creating %s policy", policyTmpl.Name), func() error { - return c.createOrUpdateACLPolicy(policyTmpl, consulClient) + return c.createOrUpdateACLPolicy(policyTmpl, client) }) if err != nil { return err @@ -263,7 +284,7 @@ func (c *Command) createACL(name, rules string, localToken bool, dc string, isPr } else { // If secretID is provided, we check if the token with secretID already exists in Consul // and exit if it does. Otherwise, set the secretID to the provided value. - _, _, err = consulClient.ACL().TokenReadSelf(&api.QueryOptions{Token: secretID}) + _, _, err = client.ConsulClient.ACL().TokenReadSelf(&api.QueryOptions{Token: secretID}) if err == nil { c.log.Info("ACL replication token already exists; skipping creation") return nil @@ -275,7 +296,11 @@ func (c *Command) createACL(name, rules string, localToken bool, dc string, isPr var token string err = c.untilSucceeds(fmt.Sprintf("creating token for policy %s", policyTmpl.Name), func() error { - createdToken, _, err := consulClient.ACL().TokenCreate(&tokenTmpl, &api.WriteOptions{}) + err = client.RefreshClient() + if err != nil { + c.log.Error("could not refresh client", err) + } + createdToken, _, err := client.ConsulClient.ACL().TokenCreate(&tokenTmpl, &api.WriteOptions{}) if err == nil { token = createdToken.SecretID } @@ -305,9 +330,14 @@ func (c *Command) createACL(name, rules string, localToken bool, dc string, isPr return nil } -func (c *Command) createOrUpdateACLPolicy(policy api.ACLPolicy, consulClient *api.Client) error { +func (c *Command) createOrUpdateACLPolicy(policy api.ACLPolicy, client *consul.DynamicClient) error { + err := client.RefreshClient() + if err != nil { + c.log.Error("could not refresh client", err) + } + // Attempt to create the ACL policy. - _, _, err := consulClient.ACL().PolicyCreate(&policy, &api.WriteOptions{}) + _, _, err = client.ConsulClient.ACL().PolicyCreate(&policy, &api.WriteOptions{}) // With the introduction of Consul namespaces, if someone upgrades into a // Consul version with namespace support or changes any of their namespace @@ -320,7 +350,7 @@ func (c *Command) createOrUpdateACLPolicy(policy api.ACLPolicy, consulClient *ap // The policy ID is required in any PolicyUpdate call, so first we need to // get the existing policy to extract its ID. - existingPolicies, _, err := consulClient.ACL().PolicyList(&api.QueryOptions{}) + existingPolicies, _, err := client.ConsulClient.ACL().PolicyList(&api.QueryOptions{}) if err != nil { return err } @@ -345,7 +375,7 @@ func (c *Command) createOrUpdateACLPolicy(policy api.ACLPolicy, consulClient *ap } // Update the policy now that we've found its ID - _, _, err = consulClient.ACL().PolicyUpdate(&policy, &api.WriteOptions{}) + _, _, err = client.ConsulClient.ACL().PolicyUpdate(&policy, &api.WriteOptions{}) return err } return err diff --git a/control-plane/subcommand/server-acl-init/create_or_update_test.go b/control-plane/subcommand/server-acl-init/create_or_update_test.go index 96d3945617..7fd7dc29b3 100644 --- a/control-plane/subcommand/server-acl-init/create_or_update_test.go +++ b/control-plane/subcommand/server-acl-init/create_or_update_test.go @@ -7,6 +7,7 @@ import ( "testing" "time" + "github.com/hashicorp/consul-k8s/control-plane/consul" "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/sdk/testutil" "github.com/hashicorp/go-hclog" @@ -41,7 +42,7 @@ func TestCreateOrUpdateACLPolicy_ErrorsIfDescriptionDoesNotMatch(t *testing.T) { svr.WaitForLeader(t) // Get a Consul client. - consul, err := api.NewClient(&api.Config{ + client, err := consul.NewDynamicClient(&api.Config{ Address: svr.HTTPAddr, Token: bootToken, }) @@ -50,7 +51,7 @@ func TestCreateOrUpdateACLPolicy_ErrorsIfDescriptionDoesNotMatch(t *testing.T) { // Create the policy manually. policyDescription := "not the expected description" policyName := "policy-name" - policy, _, err := consul.ACL().PolicyCreate(&api.ACLPolicy{ + policy, _, err := client.ConsulClient.ACL().PolicyCreate(&api.ACLPolicy{ Name: policyName, Description: policyDescription, }, nil) @@ -60,14 +61,14 @@ func TestCreateOrUpdateACLPolicy_ErrorsIfDescriptionDoesNotMatch(t *testing.T) { err = cmd.createOrUpdateACLPolicy(api.ACLPolicy{ Name: policyName, Description: "expected description", - }, consul) + }, client) require.EqualError(err, "policy found with name \"policy-name\" but not with expected description \"expected description\";"+ " if this policy was created manually it must be renamed to something else because this name is reserved by consul-k8s", ) // Check that the policy wasn't modified. - rereadPolicy, _, err := consul.ACL().PolicyRead(policy.ID, nil) + rereadPolicy, _, err := client.ConsulClient.ACL().PolicyRead(policy.ID, nil) require.NoError(err) require.Equal(policyDescription, rereadPolicy.Description) } @@ -93,14 +94,14 @@ func TestCreateOrUpdateACLPolicy(t *testing.T) { svr.WaitForLeader(t) // Get a Consul client. - consul, err := api.NewClient(&api.Config{ + client, err := consul.NewDynamicClient(&api.Config{ Address: svr.HTTPAddr, Token: bootToken, }) // Make sure the ACL system is bootstrapped first require.Eventually(func() bool { - _, _, err := consul.ACL().PolicyList(nil) + _, _, err := client.ConsulClient.ACL().PolicyList(nil) return err == nil }, 5*time.Second, 500*time.Millisecond) @@ -136,9 +137,9 @@ func TestCreateOrUpdateACLPolicy(t *testing.T) { Name: tt.PolicyName, Description: tt.PolicyDescription, Rules: tt.Rules, - }, consul) + }, client) require.Nil(err) - policy, _, err := consul.ACL().PolicyReadByName(tt.PolicyName, nil) + policy, _, err := client.ConsulClient.ACL().PolicyReadByName(tt.PolicyName, nil) require.Nil(err) require.Equal(tt.Rules, policy.Rules) require.Equal(tt.PolicyName, policy.Name) diff --git a/control-plane/subcommand/server-acl-init/servers.go b/control-plane/subcommand/server-acl-init/servers.go index c530f648e5..9665ec8f48 100644 --- a/control-plane/subcommand/server-acl-init/servers.go +++ b/control-plane/subcommand/server-acl-init/servers.go @@ -120,17 +120,17 @@ func (c *Command) setServerTokens(serverAddresses []net.IPAddr, bootstrapToken s clientConfig := c.consulFlags.ConsulClientConfig().APIClientConfig clientConfig.Address = fmt.Sprintf("%s:%d", serverAddresses[0].IP.String(), c.consulFlags.HTTPPort) clientConfig.Token = bootstrapToken - serverClient, err := consul.NewClient(clientConfig, + client, err := consul.NewDynamicClientWithTimeout(clientConfig, c.consulFlags.APITimeout) if err != nil { return err } - agentPolicy, err := c.setServerPolicy(serverClient) + agentPolicy, err := c.setServerPolicy(client) if err != nil { return err } - existingTokens, _, err := serverClient.ACL().TokenList(nil) + existingTokens, _, err := client.ConsulClient.ACL().TokenList(nil) if err != nil { return err } @@ -198,7 +198,7 @@ func (c *Command) setServerTokens(serverAddresses []net.IPAddr, bootstrapToken s return nil } -func (c *Command) setServerPolicy(consulClient *api.Client) (api.ACLPolicy, error) { +func (c *Command) setServerPolicy(client *consul.DynamicClient) (api.ACLPolicy, error) { agentRules, err := c.agentRules() if err != nil { c.log.Error("Error templating server agent rules", "err", err) @@ -213,7 +213,7 @@ func (c *Command) setServerPolicy(consulClient *api.Client) (api.ACLPolicy, erro } err = c.untilSucceeds("creating agent policy - PUT /v1/acl/policy", func() error { - return c.createOrUpdateACLPolicy(agentPolicy, consulClient) + return c.createOrUpdateACLPolicy(agentPolicy, client) }) if err != nil { return api.ACLPolicy{}, err