Skip to content

Commit

Permalink
Minor cleanup to instance group management
Browse files Browse the repository at this point in the history
  • Loading branch information
Nick Sardo committed Oct 9, 2017
1 parent ef0e824 commit 3d57e7a
Show file tree
Hide file tree
Showing 7 changed files with 77 additions and 61 deletions.
2 changes: 1 addition & 1 deletion backends/backends.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ func (b *Backends) Ensure(svcPorts []ServicePort, igs []*compute.InstanceGroup)
ports = append(ports, p.Port)
}
var err error
igs, _, err = instances.EnsureInstanceGroupsAndPorts(b.nodePool, b.namer, ports)
igs, err = instances.EnsureInstanceGroupsAndPorts(b.nodePool, b.namer, ports)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion controller/cluster_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ func (c *ClusterManager) EnsureInstanceGroupsAndPorts(servicePorts []backends.Se
for _, p := range servicePorts {
ports = append(ports, p.Port)
}
igs, _, err := instances.EnsureInstanceGroupsAndPorts(c.instancePool, c.ClusterNamer, ports)
igs, err := instances.EnsureInstanceGroupsAndPorts(c.instancePool, c.ClusterNamer, ports)
return igs, err
}

Expand Down
2 changes: 1 addition & 1 deletion controller/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func TestInstancesAddedToZones(t *testing.T) {

// Create 2 igs, one per zone.
testIG := "test-ig"
lbc.CloudClusterManager.instancePool.AddInstanceGroup(testIG, []int64{int64(3001)})
lbc.CloudClusterManager.instancePool.EnsureInstanceGroupsAndPorts(testIG, []int64{int64(3001)})

// node pool syncs kube-nodes, this will add them to both igs.
lbc.CloudClusterManager.instancePool.Sync([]string{"n1", "n2", "n3"})
Expand Down
118 changes: 67 additions & 51 deletions instances/instances.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,10 @@ type Instances struct {
// - cloud: implements InstanceGroups, used to sync Kubernetes nodes with
// members of the cloud InstanceGroup.
func NewNodePool(cloud InstanceGroups) NodePool {
return &Instances{cloud, storage.NewInMemoryPool(), nil}
return &Instances{
cloud: cloud,
snapshotter: storage.NewInMemoryPool(),
}
}

// Init initializes the instance pool. The given zoneLister is used to list
Expand All @@ -58,72 +61,85 @@ func (i *Instances) Init(zl zoneLister) {
i.zoneLister = zl
}

// AddInstanceGroup creates or gets an instance group if it doesn't exist
// EnsureInstanceGroupsAndPorts creates or gets an instance group if it doesn't exist
// and adds the given ports to it. Returns a list of one instance group per zone,
// all of which have the exact same named ports.
func (i *Instances) AddInstanceGroup(name string, ports []int64) ([]*compute.InstanceGroup, []*compute.NamedPort, error) {
igs := []*compute.InstanceGroup{}

var namedPorts []*compute.NamedPort
for _, port := range ports {
namedPorts = append(namedPorts, utils.GetNamedPort(port))
}

func (i *Instances) EnsureInstanceGroupsAndPorts(name string, ports []int64) (igs []*compute.InstanceGroup, err error) {
zones, err := i.ListZones()
if err != nil {
return igs, namedPorts, err
return nil, err
}

defer i.snapshotter.Add(name, struct{}{})
for _, zone := range zones {
ig, err := i.Get(name, zone)
if err != nil && !utils.IsHTTPErrorCode(err, http.StatusNotFound) {
glog.Errorf("Failed to get instance group %v/%v, err: %v", zone, name, err)
return nil, nil, err
ig, err := i.ensureInstanceGroupAndPorts(name, zone, ports)
if err != nil {
return nil, err
}

if ig == nil {
glog.Infof("Creating instance group %v in zone %v", name, zone)
if err = i.cloud.CreateInstanceGroup(&compute.InstanceGroup{Name: name}, zone); err != nil {
// Error may come back with StatusConflict meaning the instance group was created by another controller
// possibly the Service Controller for internal load balancers.
if utils.IsHTTPErrorCode(err, http.StatusConflict) {
glog.Warningf("Failed to create instance group %v/%v due to conflict status, but continuing sync. err: %v", zone, name, err)
} else {
glog.Errorf("Failed to create instance group %v/%v, err: %v", zone, name, err)
return nil, nil, err
}
}
ig, err = i.cloud.GetInstanceGroup(name, zone)
if err != nil {
glog.Errorf("Failed to get instance group %v/%v after ensuring existence, err: %v", zone, name, err)
return nil, nil, err
igs = append(igs, ig)
}
return igs, nil
}

func (i *Instances) ensureInstanceGroupAndPorts(name, zone string, ports []int64) (*compute.InstanceGroup, error) {
ig, err := i.Get(name, zone)
if err != nil && !utils.IsHTTPErrorCode(err, http.StatusNotFound) {
glog.Errorf("Failed to get instance group %v/%v, err: %v", zone, name, err)
return nil, err
}

if ig == nil {
glog.V(3).Infof("Creating instance group %v/%v.", zone, name)
if err = i.cloud.CreateInstanceGroup(&compute.InstanceGroup{Name: name}, zone); err != nil {
// Error may come back with StatusConflict meaning the instance group was created by another controller
// possibly the Service Controller for internal load balancers.
if utils.IsHTTPErrorCode(err, http.StatusConflict) {
glog.Warningf("Failed to create instance group %v/%v due to conflict status, but continuing sync. err: %v", zone, name, err)
} else {
glog.Errorf("Failed to create instance group %v/%v, err: %v", zone, name, err)
return nil, err
}
} else {
glog.V(3).Infof("Instance group %v already exists in zone %v", name, zone)
}

existingPorts := map[int64]bool{}
for _, np := range ig.NamedPorts {
existingPorts[np.Port] = true
ig, err = i.cloud.GetInstanceGroup(name, zone)
if err != nil {
glog.Errorf("Failed to get instance group %v/%v after ensuring existence, err: %v", zone, name, err)
return nil, err
}
var newPorts []*compute.NamedPort
for _, np := range namedPorts {
if existingPorts[np.Port] {
glog.V(5).Infof("Instance group %v already has named port %+v", ig.Name, np)
continue
}
newPorts = append(newPorts, np)
} else {
glog.V(5).Infof("Instance group %v/%v already exists.", zone, name)
}

// Build map of existing ports
existingPorts := map[int64]bool{}
for _, np := range ig.NamedPorts {
existingPorts[np.Port] = true
}

// Determine which ports need to be added
var newPorts []int64
for _, p := range ports {
if existingPorts[p] {
glog.V(5).Infof("Instance group %v/%v already has named port %v", zone, ig.Name, p)
continue
}
if len(newPorts) > 0 {
glog.V(3).Infof("Instance group %v/%v does not have ports %+v, adding them now.", zone, name, newPorts)
if err := i.cloud.SetNamedPortsOfInstanceGroup(ig.Name, zone, append(ig.NamedPorts, newPorts...)); err != nil {
return nil, nil, err
}
newPorts = append(newPorts, p)
}

// Build slice of NamedPorts for adding
var newNamedPorts []*compute.NamedPort
for _, v := range newPorts {
newNamedPorts = append(newNamedPorts, utils.GetNamedPort(v))
}

if len(newNamedPorts) > 0 {
glog.V(3).Infof("Instance group %v/%v does not have ports %+v, adding them now.", zone, name, newPorts)
if err := i.cloud.SetNamedPortsOfInstanceGroup(ig.Name, zone, append(ig.NamedPorts, newNamedPorts...)); err != nil {
return nil, err
}
igs = append(igs, ig)
}
return igs, namedPorts, nil

return ig, nil
}

// DeleteInstanceGroup deletes the given IG by name, from all zones.
Expand Down
8 changes: 4 additions & 4 deletions instances/instances_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func TestNodePoolSync(t *testing.T) {
f := NewFakeInstanceGroups(sets.NewString(
[]string{"n1", "n2"}...))
pool := newNodePool(f, defaultZone)
pool.AddInstanceGroup("test", []int64{80})
pool.EnsureInstanceGroupsAndPorts("test", []int64{80})

// KubeNodes: n1
// GCENodes: n1, n2
Expand All @@ -53,7 +53,7 @@ func TestNodePoolSync(t *testing.T) {

f = NewFakeInstanceGroups(sets.NewString([]string{"n1"}...))
pool = newNodePool(f, defaultZone)
pool.AddInstanceGroup("test", []int64{80})
pool.EnsureInstanceGroupsAndPorts("test", []int64{80})

f.calls = []int{}
kubeNodes = sets.NewString([]string{"n1", "n2"}...)
Expand All @@ -69,7 +69,7 @@ func TestNodePoolSync(t *testing.T) {

f = NewFakeInstanceGroups(sets.NewString([]string{"n1", "n2"}...))
pool = newNodePool(f, defaultZone)
pool.AddInstanceGroup("test", []int64{80})
pool.EnsureInstanceGroupsAndPorts("test", []int64{80})

f.calls = []int{}
kubeNodes = sets.NewString([]string{"n1", "n2"}...)
Expand Down Expand Up @@ -112,7 +112,7 @@ func TestSetNamedPorts(t *testing.T) {
// TODO: Add tests to remove named ports when we support that.
}
for _, test := range testCases {
igs, _, err := pool.AddInstanceGroup("ig", test.activePorts)
igs, err := pool.EnsureInstanceGroupsAndPorts("ig", test.activePorts)
if err != nil {
t.Fatalf("unexpected error in setting ports %v to instance group: %s", test.activePorts, err)
}
Expand Down
2 changes: 1 addition & 1 deletion instances/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type NodePool interface {
Init(zl zoneLister)

// The following 2 methods operate on instance groups.
AddInstanceGroup(name string, ports []int64) ([]*compute.InstanceGroup, []*compute.NamedPort, error)
EnsureInstanceGroupsAndPorts(name string, ports []int64) ([]*compute.InstanceGroup, error)
DeleteInstanceGroup(name string) error

// TODO: Refactor for modularity
Expand Down
4 changes: 2 additions & 2 deletions instances/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,6 @@ import (

// Helper method to create instance groups.
// This method exists to ensure that we are using the same logic at all places.
func EnsureInstanceGroupsAndPorts(nodePool NodePool, namer *utils.Namer, ports []int64) ([]*compute.InstanceGroup, []*compute.NamedPort, error) {
return nodePool.AddInstanceGroup(namer.IGName(), ports)
func EnsureInstanceGroupsAndPorts(nodePool NodePool, namer *utils.Namer, ports []int64) ([]*compute.InstanceGroup, error) {
return nodePool.EnsureInstanceGroupsAndPorts(namer.IGName(), ports)
}

0 comments on commit 3d57e7a

Please sign in to comment.