Skip to content

Commit

Permalink
Merge pull request #113 from k8snetworkplumbingwg/add-leader-election
Browse files Browse the repository at this point in the history
Add leader election
  • Loading branch information
dougbtv authored Jul 21, 2021
2 parents f1047ee + 5b52948 commit 6cb8c1b
Show file tree
Hide file tree
Showing 8 changed files with 375 additions and 121 deletions.
17 changes: 15 additions & 2 deletions cmd/whereabouts.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"fmt"
"net"
"strings"

"github.com/containernetworking/cni/pkg/skel"
Expand Down Expand Up @@ -39,7 +40,14 @@ func cmdAdd(args *skel.CmdArgs) error {
result.Routes = ipamConf.Routes

logging.Debugf("Beginning IPAM for ContainerID: %v", args.ContainerID)
newip, err := storage.IPManagement(types.Allocate, *ipamConf, args.ContainerID, getPodRef(args.Args))
var newip net.IPNet

switch ipamConf.Datastore {
case types.DatastoreETCD:
newip, err = storage.IPManagementEtcd(types.Allocate, *ipamConf, args.ContainerID, getPodRef(args.Args))
case types.DatastoreKubernetes:
newip, err = storage.IPManagementKubernetes(types.Allocate, *ipamConf, args.ContainerID, getPodRef(args.Args))
}
if err != nil {
logging.Errorf("Error at storage engine: %s", err)
return fmt.Errorf("Error at storage engine: %w", err)
Expand Down Expand Up @@ -78,7 +86,12 @@ func cmdDel(args *skel.CmdArgs) error {
logging.Debugf("DEL - IPAM configuration successfully read: %+v", filterConf(*ipamConf))
logging.Debugf("ContainerID: %v", args.ContainerID)

_, err = storage.IPManagement(types.Deallocate, *ipamConf, args.ContainerID, getPodRef(args.Args))
switch ipamConf.Datastore {
case types.DatastoreETCD:
_, err = storage.IPManagementEtcd(types.Deallocate, *ipamConf, args.ContainerID, getPodRef(args.Args))
case types.DatastoreKubernetes:
_, err = storage.IPManagementKubernetes(types.Deallocate, *ipamConf, args.ContainerID, getPodRef(args.Args))
}
if err != nil {
logging.Verbosef("WARNING: Problem deallocating IP: %s", err)
// return fmt.Errorf("Error deallocating IP: %s", err)
Expand Down
6 changes: 6 additions & 0 deletions doc/daemonset-install.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@ rules:
- update
- patch
- delete
- apiGroups:
- coordination.k8s.io
resources:
- leases
verbs:
- '*'
---
apiVersion: apps/v1
kind: DaemonSet
Expand Down
14 changes: 14 additions & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ func LoadIPAMConfig(bytes []byte, envArgs string) (*types.IPAMConfig, string, er
if err := cnitypes.LoadArgs(envArgs, &args); err != nil {
return nil, "", fmt.Errorf("LoadArgs - CNI Args Parsing Error: %s", err)
}
n.IPAM.PodName = string(args.K8S_POD_NAME)
n.IPAM.PodNamespace = string(args.K8S_POD_NAMESPACE)

// Once we have our basics, let's look for our (optional) configuration file
confdirs := []string{"/etc/kubernetes/cni/net.d/whereabouts.d/whereabouts.conf", "/etc/cni/net.d/whereabouts.d/whereabouts.conf"}
Expand Down Expand Up @@ -169,6 +171,18 @@ func LoadIPAMConfig(bytes []byte, envArgs string) (*types.IPAMConfig, string, er
return nil, "", err
}

if n.IPAM.LeaderLeaseDuration == 0 {
n.IPAM.LeaderLeaseDuration = types.DefaultLeaderLeaseDuration
}

if n.IPAM.LeaderRenewDeadline == 0 {
n.IPAM.LeaderRenewDeadline = types.DefaultLeaderRenewDeadline
}

if n.IPAM.LeaderRetryPeriod == 0 {
n.IPAM.LeaderRetryPeriod = types.DefaultLeaderRetryPeriod
}

// Copy net name into IPAM so not to drag Net struct around
n.IPAM.Name = n.Name

Expand Down
12 changes: 11 additions & 1 deletion pkg/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ var _ = Describe("Allocation operations", func() {
Expect(ipamconfig.RangeStart).To(Equal(net.ParseIP("192.168.1.5")))
Expect(ipamconfig.RangeEnd).To(Equal(net.ParseIP("192.168.1.25")))
Expect(ipamconfig.Gateway).To(Equal(net.ParseIP("192.168.10.1")))
Expect(ipamconfig.LeaderLeaseDuration).To(Equal(1500))
Expect(ipamconfig.LeaderRenewDeadline).To(Equal(1000))
Expect(ipamconfig.LeaderRetryPeriod).To(Equal(500))

})

Expand Down Expand Up @@ -71,7 +74,10 @@ var _ = Describe("Allocation operations", func() {
"type": "whereabouts",
"range": "192.168.2.230/24",
"range_start": "192.168.2.223",
"gateway": "192.168.10.1"
"gateway": "192.168.10.1",
"leader_lease_duration": 3000,
"leader_renew_deadline": 2000,
"leader_retry_period": 1000
}
}`

Expand All @@ -86,6 +92,10 @@ var _ = Describe("Allocation operations", func() {
Expect(ipamconfig.Datastore).To(Equal("kubernetes"))
Expect(ipamconfig.Kubernetes.KubeConfigPath).To(Equal("/etc/cni/net.d/whereabouts.d/whereabouts.kubeconfig"))

Expect(ipamconfig.LeaderLeaseDuration).To(Equal(3000))
Expect(ipamconfig.LeaderRenewDeadline).To(Equal(2000))
Expect(ipamconfig.LeaderRetryPeriod).To(Equal(1000))

})

})
89 changes: 89 additions & 0 deletions pkg/storage/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/clientv3/concurrency"
"github.com/coreos/etcd/pkg/transport"
"github.com/dougbtv/whereabouts/pkg/allocate"
"github.com/dougbtv/whereabouts/pkg/logging"
"github.com/dougbtv/whereabouts/pkg/types"
)

Expand Down Expand Up @@ -137,3 +139,90 @@ func (p *ETCDIPPool) Update(ctx context.Context, reservations []types.IPReservat
_, err := p.kv.Put(ctx, fmt.Sprintf("%s/%s", whereaboutsPrefix, p.ipRange), strings.Join(raw, "\n"))
return err
}

// IPManagement manages ip allocation and deallocation from a storage perspective
func IPManagementEtcd(mode int, ipamConf types.IPAMConfig, containerID string, podRef string) (net.IPNet, error) {

logging.Debugf("IPManagement -- mode: %v / host: %v / containerID: %v / podRef: %v", mode, ipamConf.EtcdHost, containerID, podRef)

var newip net.IPNet
// Skip invalid modes
switch mode {
case types.Allocate, types.Deallocate:
default:
return newip, fmt.Errorf("Got an unknown mode passed to IPManagement: %v", mode)
}

var ipam Store
var pool IPPool
var err error
switch ipamConf.Datastore {
case types.DatastoreETCD:
ipam, err = NewETCDIPAM(ipamConf)
case types.DatastoreKubernetes:
ipam, err = NewKubernetesIPAM(containerID, ipamConf)
}
if err != nil {
logging.Errorf("IPAM %s client initialization error: %v", ipamConf.Datastore, err)
return newip, fmt.Errorf("IPAM %s client initialization error: %v", ipamConf.Datastore, err)
}
defer ipam.Close()

ctx, cancel := context.WithTimeout(context.Background(), RequestTimeout)
defer cancel()

// Check our connectivity first
if err := ipam.Status(ctx); err != nil {
logging.Errorf("IPAM connectivity error: %v", err)
return newip, err
}

// handle the ip add/del until successful
RETRYLOOP:
for j := 0; j < DatastoreRetries; j++ {
select {
case <-ctx.Done():
return newip, nil
default:
// retry the IPAM loop if the context has not been cancelled
}

pool, err = ipam.GetIPPool(ctx, ipamConf.Range)
if err != nil {
logging.Errorf("IPAM error reading pool allocations (attempt: %d): %v", j, err)
if e, ok := err.(temporary); ok && e.Temporary() {
continue
}
return newip, err
}

reservelist := pool.Allocations()
var updatedreservelist []types.IPReservation
switch mode {
case types.Allocate:
newip, updatedreservelist, err = allocate.AssignIP(ipamConf, reservelist, containerID, podRef)
if err != nil {
logging.Errorf("Error assigning IP: %v", err)
return newip, err
}
case types.Deallocate:
updatedreservelist, err = allocate.DeallocateIP(ipamConf.Range, reservelist, containerID)
if err != nil {
logging.Errorf("Error deallocating IP: %v", err)
return newip, err
}
}

err = pool.Update(ctx, updatedreservelist)
if err != nil {
logging.Errorf("IPAM error updating pool (attempt: %d): %v", j, err)
if e, ok := err.(temporary); ok && e.Temporary() {
continue
}
break RETRYLOOP
}
break RETRYLOOP
}

return newip, err
}
Loading

0 comments on commit 6cb8c1b

Please sign in to comment.