diff --git a/cmd/whereabouts.go b/cmd/whereabouts.go index d8814a60f..f8d2565e5 100644 --- a/cmd/whereabouts.go +++ b/cmd/whereabouts.go @@ -1,6 +1,7 @@ package main import ( + "context" "fmt" "strings" @@ -81,6 +82,10 @@ func cmdDel(args *skel.CmdArgs) error { _, err = storage.IPManagement(types.Deallocate, *ipamConf, args.ContainerID, getPodRef(args.Args)) if err != nil { logging.Verbosef("WARNING: Problem deallocating IP: %s", err) + // ok to return context deadline error. this makes kubelet/cni would retry for deallocate. + if err == context.DeadlineExceeded || strings.Contains(err.Error(), "context deadline exceeded") { + return err + } // return fmt.Errorf("Error deallocating IP: %s", err) } diff --git a/pkg/config/config.go b/pkg/config/config.go index 625ec7998..ea9878b88 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -132,6 +132,23 @@ func LoadIPAMConfig(bytes []byte, envArgs string) (*types.IPAMConfig, string, er n.IPAM.Datastore = types.DatastoreETCD } + if n.IPAM.AllocateRequestTimeout == 0 { + // default to 10s + n.IPAM.AllocateRequestTimeout = 10 + } + + if n.IPAM.DeAllocateRequestTimeout == 0 { + // default to 10s + n.IPAM.DeAllocateRequestTimeout = 10 + } + + if !strings.EqualFold(n.IPAM.BackOffRetryScheme, "exponential") { + if n.IPAM.BackoffLinearStep == 0 { + // set backoff step to 500 ms + n.IPAM.BackoffLinearStep = 500 + } + } + var err error storageError := "You have not configured the storage engine (looks like you're using an invalid `%s` parameter in your config)" switch n.IPAM.Datastore { diff --git a/pkg/storage/kubernetes.go b/pkg/storage/kubernetes.go index a2ccd01b2..13147aad5 100644 --- a/pkg/storage/kubernetes.go +++ b/pkg/storage/kubernetes.go @@ -2,7 +2,6 @@ package storage import ( "context" - "encoding/json" "fmt" "net" "strconv" @@ -12,7 +11,6 @@ import ( whereaboutsv1alpha1 "github.com/dougbtv/whereabouts/pkg/api/v1alpha1" "github.com/dougbtv/whereabouts/pkg/logging" whereaboutstypes "github.com/dougbtv/whereabouts/pkg/types" - jsonpatch "gomodules.xyz/jsonpatch/v2" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -165,49 +163,11 @@ func (p *KubernetesIPPool) Allocations() []whereaboutstypes.IPReservation { // Update sets the pool allocated IP list to the given IP reservations func (p *KubernetesIPPool) Update(ctx context.Context, reservations []whereaboutstypes.IPReservation) error { - // marshal the current pool to serve as the base for the patch creation - orig := p.pool.DeepCopy() - origBytes, err := json.Marshal(orig) - if err != nil { - return err - } - - // update the pool before marshalling once again + // update the pool with new ip allocations p.pool.Spec.Allocations = toAllocationMap(reservations, p.firstIP) - modBytes, err := json.Marshal(p.pool) - if err != nil { - return err - } - - // create the patch - patch, err := jsonpatch.CreatePatch(origBytes, modBytes) - if err != nil { - return err - } - - // add additional tests to the patch - ops := []jsonpatch.Operation{ - // ensure patch is applied to appropriate resource version only - {Operation: "test", Path: "/metadata/resourceVersion", Value: orig.ObjectMeta.ResourceVersion}, - } - for _, o := range patch { - // safeguard add ops -- "add" will update existing paths, this "test" ensures the path is empty - if o.Operation == "add" { - var m map[string]interface{} - ops = append(ops, jsonpatch.Operation{Operation: "test", Path: o.Path, Value: m}) - } - } - ops = append(ops, patch...) - patchData, err := json.Marshal(ops) - if err != nil { - return err - } - - // apply the patch - err = p.client.Patch(ctx, orig, client.ConstantPatch(types.JSONPatchType, patchData)) + err := p.client.Update(ctx, p.pool) if err != nil { - if errors.IsInvalid(err) { - // expect "invalid" errors if any of the jsonpatch "test" Operations fail + if errors.IsConflict(err) { return &temporaryError{err} } return err diff --git a/pkg/storage/storage.go b/pkg/storage/storage.go index 7a549132b..208daf864 100644 --- a/pkg/storage/storage.go +++ b/pkg/storage/storage.go @@ -2,8 +2,11 @@ package storage import ( "context" + "crypto/rand" "fmt" + "math/big" "net" + "strings" "time" "github.com/dougbtv/whereabouts/pkg/allocate" @@ -12,9 +15,6 @@ import ( ) var ( - // RequestTimeout defines how long the context timesout in - RequestTimeout = 10 * time.Second - // DatastoreRetries defines how many retries are attempted when updating the Pool DatastoreRetries = 100 ) @@ -60,8 +60,16 @@ func IPManagement(mode int, ipamConf types.IPAMConfig, containerID string, podRe } defer ipam.Close() - ctx, cancel := context.WithTimeout(context.Background(), RequestTimeout) - defer cancel() + var ctx context.Context + var cancel context.CancelFunc + switch mode { + case types.Allocate: + ctx, cancel = context.WithTimeout(context.Background(), time.Duration(ipamConf.AllocateRequestTimeout)*time.Second) + defer cancel() + case types.Deallocate: + ctx, cancel = context.WithTimeout(context.Background(), time.Duration(ipamConf.DeAllocateRequestTimeout)*time.Second) + defer cancel() + } // Check our connectivity first if err := ipam.Status(ctx); err != nil { @@ -69,12 +77,14 @@ func IPManagement(mode int, ipamConf types.IPAMConfig, containerID string, podRe return newip, err } + var step int // handle the ip add/del until successful RETRYLOOP: - for j := 0; j < DatastoreRetries; j++ { + for j := 1; j < DatastoreRetries+1; j++ { select { case <-ctx.Done(): - return newip, nil + // return last available newip and context.DeadlineExceeded error + return newip, context.DeadlineExceeded default: // retry the IPAM loop if the context has not been cancelled } @@ -83,6 +93,13 @@ RETRYLOOP: if err != nil { logging.Errorf("IPAM error reading pool allocations (attempt: %d): %v", j, err) if e, ok := err.(temporary); ok && e.Temporary() { + interval, _ := rand.Int(rand.Reader, big.NewInt(1000)) + if strings.EqualFold(ipamConf.BackOffRetryScheme, "exponential") { + time.Sleep(time.Duration(int(interval.Int64())*(2^j)) * time.Millisecond) + } else { + time.Sleep(time.Duration(int(interval.Int64())+step) * time.Millisecond) + step += ipamConf.BackoffLinearStep + } continue } return newip, err @@ -107,8 +124,16 @@ RETRYLOOP: err = pool.Update(ctx, updatedreservelist) if err != nil { - logging.Errorf("IPAM error updating pool (attempt: %d): %v", j, err) + logging.Errorf("IPAM error updating pool %s (attempt: %d): %v", ipamConf.Range, j, err) if e, ok := err.(temporary); ok && e.Temporary() { + logging.Errorf("IPAM error is temporary for pool %s: %v, retrying", ipamConf.Range, err) + interval, _ := rand.Int(rand.Reader, big.NewInt(1000)) + if strings.EqualFold(ipamConf.BackOffRetryScheme, "exponential") { + time.Sleep(time.Duration(int(interval.Int64())*(2^j)) * time.Millisecond) + } else { + time.Sleep(time.Duration(int(interval.Int64())+step) * time.Millisecond) + step += ipamConf.BackoffLinearStep + } continue } break RETRYLOOP diff --git a/pkg/types/types.go b/pkg/types/types.go index 563f735ab..a2ba9c845 100644 --- a/pkg/types/types.go +++ b/pkg/types/types.go @@ -22,28 +22,32 @@ type Net struct { // IPAMConfig describes the expected json configuration for this plugin type IPAMConfig struct { - Name string - Type string `json:"type"` - Routes []*cnitypes.Route `json:"routes"` - Datastore string `json:"datastore"` - Addresses []Address `json:"addresses,omitempty"` - OmitRanges []string `json:"exclude,omitempty"` - DNS cnitypes.DNS `json:"dns"` - Range string `json:"range"` - RangeStart net.IP `json:"range_start,omitempty"` - RangeEnd net.IP `json:"range_end,omitempty"` - GatewayStr string `json:"gateway"` - EtcdHost string `json:"etcd_host,omitempty"` - EtcdUsername string `json:"etcd_username,omitempty"` - EtcdPassword string `json:"etcd_password,omitempty"` - EtcdKeyFile string `json:"etcd_key_file,omitempty"` - EtcdCertFile string `json:"etcd_cert_file,omitempty"` - EtcdCACertFile string `json:"etcd_ca_cert_file,omitempty"` - LogFile string `json:"log_file"` - LogLevel string `json:"log_level"` - Gateway net.IP - Kubernetes KubernetesConfig `json:"kubernetes,omitempty"` - ConfigurationPath string `json:"configuration_path"` + Name string + Type string `json:"type"` + Routes []*cnitypes.Route `json:"routes"` + Datastore string `json:"datastore"` + Addresses []Address `json:"addresses,omitempty"` + OmitRanges []string `json:"exclude,omitempty"` + DNS cnitypes.DNS `json:"dns"` + Range string `json:"range"` + RangeStart net.IP `json:"range_start,omitempty"` + RangeEnd net.IP `json:"range_end,omitempty"` + GatewayStr string `json:"gateway"` + EtcdHost string `json:"etcd_host,omitempty"` + EtcdUsername string `json:"etcd_username,omitempty"` + EtcdPassword string `json:"etcd_password,omitempty"` + EtcdKeyFile string `json:"etcd_key_file,omitempty"` + EtcdCertFile string `json:"etcd_cert_file,omitempty"` + EtcdCACertFile string `json:"etcd_ca_cert_file,omitempty"` + LogFile string `json:"log_file"` + LogLevel string `json:"log_level"` + Gateway net.IP + Kubernetes KubernetesConfig `json:"kubernetes,omitempty"` + ConfigurationPath string `json:"configuration_path"` + AllocateRequestTimeout int `json:"allocate_request_timeout"` + DeAllocateRequestTimeout int `json:"deallocate_request_timeout"` + BackOffRetryScheme string `json:"backoff_scheme"` + BackoffLinearStep int `json:"linear_step"` } // IPAMEnvArgs are the environment vars we expect