Skip to content

Commit

Permalink
Add refreshes and retries to server-acl-init job
Browse files Browse the repository at this point in the history
save
  • Loading branch information
curtbushko committed Nov 9, 2023
1 parent a3d1715 commit ea67e71
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 16 deletions.
3 changes: 3 additions & 0 deletions .changelog/3137.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
control-plane: fixes an issue with the server-acl-init job where the job would fail on upgrades due to consul server ip address changes.
```
1 change: 1 addition & 0 deletions control-plane/consul/retryable.go
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
package consul
1 change: 1 addition & 0 deletions control-plane/consul/retryable_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
package consul
97 changes: 82 additions & 15 deletions control-plane/subcommand/server-acl-init/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,9 @@ type Command struct {
// ctx is cancelled when the command timeout is reached.
ctx context.Context
retryDuration time.Duration
// timeoutDuration is how long the the command will try reaching the API. During upgrades the IP address of the
// consul servers can change which causes timeout errors. We recover by refreshing the IPs.
timeoutDuration time.Duration

// log
log hclog.Logger
Expand Down Expand Up @@ -233,6 +236,10 @@ func (c *Command) init() {
if c.retryDuration == 0 {
c.retryDuration = 1 * time.Second
}

if c.timeoutDuration == 0 {
c.timeoutDuration = 180 * time.Second
}
}

func (c *Command) Synopsis() string { return synopsis }
Expand Down Expand Up @@ -302,18 +309,6 @@ func (c *Command) Run(args []string) int {
}
}

var ipAddrs []net.IPAddr
if err := backoff.Retry(func() error {
ipAddrs, err = netaddrs.IPAddrs(c.ctx, c.consulFlags.Addresses, c.log)
if err != nil {
c.log.Error("Error resolving IP Address", "err", err)
return err
}
return nil
}, exponentialBackoffWithMaxInterval()); err != nil {
c.UI.Error(err.Error())
}

if err := c.configureSecretsBackend(); err != nil {
c.log.Error(err.Error())
return 1
Expand All @@ -328,8 +323,22 @@ func (c *Command) Run(args []string) int {
c.log.Info("ACL replication is enabled so skipping Consul server ACL bootstrapping")
bootstrapToken = aclReplicationToken
} else {
bootstrapToken, err = c.bootstrapServers(ipAddrs, c.backend)
if err != nil {
// During upgrades, there is a rare case where a consul-server statefulset may be rotated out while the
// bootstrap tokens are being updated. Catch this case, refresh the server ip addresses and try again.
if err := backoff.Retry(func() error {
ipAddrs, err := c.serverIPAddresses()
if err != nil {
c.log.Error(err.Error())
return err
}

bootstrapToken, err = c.bootstrapServers(ipAddrs, c.backend)
if err != nil {
c.log.Error(err.Error())
return err
}
return nil
}, exponentialBackoffWithMaxInterval()); err != nil {
c.log.Error(err.Error())
return 1
}
Expand Down Expand Up @@ -866,8 +875,10 @@ func (c *Command) configureSecretsBackend() error {
}

// untilSucceeds runs op until it returns a nil error.
// If c.timeoutDuration is reached it will exit so that the command can be retried with updated server settings
// If c.cmdTimeout is cancelled it will exit.
func (c *Command) untilSucceeds(opName string, op func() error) error {
timeoutCh := time.After(c.timeoutDuration)
for {
err := op()
if err == nil {
Expand All @@ -881,6 +892,8 @@ func (c *Command) untilSucceeds(opName string, op func() error) error {
select {
case <-time.After(c.retryDuration):
continue
case <-timeoutCh:
return errors.New("reached api timeout")
case <-c.ctx.Done():
return errors.New("reached command timeout")
}
Expand All @@ -897,8 +910,14 @@ func (c *Command) withPrefix(resource string) string {
// consulDatacenterList returns the current datacenter name and the primary datacenter using the
// /agent/self API endpoint.
func (c *Command) consulDatacenterList(client *api.Client) (string, string, error) {
// Refresh the consulClient as the server IPs maybe have changed during an upgrade.
client, err := c.refreshClient(client)
if err != nil {
return "", "", err
}

var agentCfg map[string]map[string]interface{}
err := c.untilSucceeds("calling /agent/self to get datacenter",
err = c.untilSucceeds("calling /agent/self to get datacenter",
func() error {
var opErr error
agentCfg, opErr = client.Agent().Self()
Expand Down Expand Up @@ -1032,6 +1051,25 @@ func (c *Command) quitVaultAgent() {
}
}

// serverIPAddresses attempts to refresh the server IPs using netaddrs methods. These 'raw' IPs are used
// when boostrapping ACLs and before consul-server-connection-manager runs.
func (c *Command) serverIPAddresses() ([]net.IPAddr, error) {
var ipAddrs []net.IPAddr
var err error
if err = backoff.Retry(func() error {
ipAddrs, err = netaddrs.IPAddrs(c.ctx, c.consulFlags.Addresses, c.log)
if err != nil {
c.log.Error("Error resolving IP Address", "err", err)
return err
}
c.log.Info("Refreshing server IP addresses", "addresses", ipAddrs)
return nil
}, exponentialBackoffWithMaxInterval()); err != nil {
return nil, err
}
return ipAddrs, nil
}

const (
consulDefaultNamespace = "default"
consulDefaultPartition = "default"
Expand All @@ -1048,3 +1086,32 @@ Usage: consul-k8s-control-plane server-acl-init [options]
`
)

// refreshClient refreshes the IP for the consulClient. This is used in cases where the servers statefulsets
// have changed and the IP addresses have changed. The consulClient is dependant on consul-server-connection-manager
// having run.
func (c *Command) refreshClient(client *api.Client) (*api.Client, error) {
// In tests, consulFlags are not set
// if Address.IP is emtpy then the consul-server-connection-manager hasn't run yet. We are using direct IPs.
if c.consulFlags != nil && c.state.Address.IP != nil {
c.log.Info("Refreshing server connection", "address", c.state.Address.IP.String())

// (copied from above to provide context for this use case)
// When the default partition is in a VM, the anonymous policy does not allow cross-partition
// DNS lookups. The anonymous policy in the default partition needs to be updated in order to
// support this use-case. Creating a separate anonymous token client that updates the anonymous
// policy and token in the default partition ensures this works.
config := c.consulFlags.ConsulClientConfig()
if c.consulFlags.Partition != "" {
config.APIClientConfig.Partition = consulDefaultPartition
}

consulClient, err := consul.NewClientFromConnMgrState(config, c.state)
if err != nil {
c.log.Error(err.Error())
return nil, err
}
return consulClient, nil
}
return client, nil
}
14 changes: 13 additions & 1 deletion control-plane/subcommand/server-acl-init/create_or_update.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,13 @@ func (c *Command) addRoleAndBindingRule(client *api.Client, componentName, servi
// updateOrCreateACLRole will query to see if existing role is in place and update them
// or create them if they do not yet exist.
func (c *Command) updateOrCreateACLRole(client *api.Client, role *api.ACLRole) error {
err := c.untilSucceeds(fmt.Sprintf("update or create acl role for %s", role.Name),
// Refresh the consulClient as the server IPs maybe have changed during an upgrade.
client, err := c.refreshClient(client)
if err != nil {
return err
}

err = c.untilSucceeds(fmt.Sprintf("update or create acl role for %s", role.Name),
func() error {
var err error
aclRole, _, err := client.ACL().RoleReadByName(role.Name, &api.QueryOptions{})
Expand Down Expand Up @@ -306,6 +312,12 @@ func (c *Command) createACL(name, rules string, localToken bool, dc string, isPr
}

func (c *Command) createOrUpdateACLPolicy(policy api.ACLPolicy, consulClient *api.Client) error {
// Refresh the consulClient as the server IPs maybe have changed during an upgrade.
//consulClient, err := c.refreshClient(consulClient)
//if err != nil {
// return err
//}

// Attempt to create the ACL policy.
_, _, err := consulClient.ACL().PolicyCreate(&policy, &api.WriteOptions{})

Expand Down

0 comments on commit ea67e71

Please sign in to comment.