Skip to content

Commit

Permalink
Merge pull request #12983 from zetaab/feature/drainos
Browse files Browse the repository at this point in the history
Drain OpenStack loadbalancers
  • Loading branch information
k8s-ci-robot authored Jan 3, 2022
2 parents a69f6bb + 117b98d commit af7e6a4
Show file tree
Hide file tree
Showing 19 changed files with 268 additions and 17 deletions.
4 changes: 4 additions & 0 deletions cloudmock/gce/mock_gce_cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,10 @@ func (c *MockGCECloud) DeleteInstance(i *cloudinstances.CloudInstance) error {
// return recreateCloudInstance(c, i)
}

func (c *MockGCECloud) DeregisterInstance(i *cloudinstances.CloudInstance) error {
return nil
}

// DetachInstance is not implemented yet. It needs to cause a cloud instance to no longer be counted against the group's size limits.
func (c *MockGCECloud) DetachInstance(i *cloudinstances.CloudInstance) error {
return nil
Expand Down
4 changes: 4 additions & 0 deletions pkg/instancegroups/instancegroups.go
Original file line number Diff line number Diff line change
Expand Up @@ -646,6 +646,10 @@ func (c *RollingUpdateCluster) drainNode(u *cloudinstances.CloudInstance) error
return fmt.Errorf("error excluding node from load balancer: %v", err)
}

if err := c.Cloud.DeregisterInstance(u); err != nil {
return fmt.Errorf("error deregistering instance %q, node %q: %v", u.ID, u.Node.Name, err)
}

if err := drain.RunNodeDrain(helper, u.Node.Name); err != nil {
if apierrors.IsNotFound(err) {
return nil
Expand Down
4 changes: 4 additions & 0 deletions pkg/kubeconfig/create_kubecfg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ func (f fakeStatusCloud) DetachInstance(instance *cloudinstances.CloudInstance)
panic("not implemented")
}

func (f fakeStatusCloud) DeregisterInstance(instance *cloudinstances.CloudInstance) error {
panic("not implemented")
}

func (f fakeStatusCloud) GetCloudGroups(cluster *kops.Cluster, instancegroups []*kops.InstanceGroup, warnUnmatched bool, nodes []v1.Node) (map[string]*cloudinstances.CloudInstanceGroup, error) {
panic("not implemented")
}
Expand Down
1 change: 1 addition & 0 deletions pkg/model/openstackmodel/servergroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,7 @@ func (b *ServerGroupModelBuilder) Build(c *fi.ModelBuilderContext) error {
InterfaceName: fi.String(ifName),
ProtocolPort: fi.Int(443),
Lifecycle: b.Lifecycle,
Weight: fi.Int(1),
}

c.AddTask(associateTask)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -769,6 +769,7 @@ ServerGroup:
Name: cluster-master-a
Policies:
- anti-affinity
Weight: 1
---
ID: null
InterfaceName: cluster
Expand Down Expand Up @@ -802,6 +803,7 @@ ServerGroup:
Name: cluster-master-b
Policies:
- anti-affinity
Weight: 1
---
ID: null
InterfaceName: cluster
Expand Down Expand Up @@ -835,6 +837,7 @@ ServerGroup:
Name: cluster-master-c
Policies:
- anti-affinity
Weight: 1
---
AdditionalSecurityGroups: null
ID: null
Expand Down
3 changes: 3 additions & 0 deletions upup/pkg/fi/cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ type Cloud interface {
// DeleteInstance deletes a cloud instance.
DeleteInstance(instance *cloudinstances.CloudInstance) error

// // DeregisterInstance drains a cloud instance and loadbalancers.
DeregisterInstance(instance *cloudinstances.CloudInstance) error

// DeleteGroup deletes the cloud resources that make up a CloudInstanceGroup, including the instances.
DeleteGroup(group *cloudinstances.CloudInstanceGroup) error

Expand Down
16 changes: 10 additions & 6 deletions upup/pkg/fi/cloudup/awsup/aws_cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -541,18 +541,22 @@ func (c *awsCloudImplementation) DeleteInstance(i *cloudinstances.CloudInstance)
return deleteInstance(c, i)
}

func deleteInstance(c AWSCloud, i *cloudinstances.CloudInstance) error {
id := i.ID
if id == "" {
return fmt.Errorf("id was not set on CloudInstance: %v", i)
}

// DeregisterInstance drains a cloud instance and loadbalancers.
func (c *awsCloudImplementation) DeregisterInstance(i *cloudinstances.CloudInstance) error {
if i.CloudInstanceGroup.InstanceGroup.Spec.Manager != kops.InstanceManagerKarpenter {
err := deregisterInstance(c, i)
if err != nil {
return fmt.Errorf("failed to deregister instance from loadBalancer before terminating: %v", err)
}
}
return nil
}

func deleteInstance(c AWSCloud, i *cloudinstances.CloudInstance) error {
id := i.ID
if id == "" {
return fmt.Errorf("id was not set on CloudInstance: %v", i)
}

request := &ec2.TerminateInstancesInput{
InstanceIds: []*string{aws.String(id)},
Expand Down
4 changes: 4 additions & 0 deletions upup/pkg/fi/cloudup/awsup/mock_aws_cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,10 @@ func (c *MockAWSCloud) DeleteInstance(i *cloudinstances.CloudInstance) error {
return deleteInstance(c, i)
}

func (c *MockAWSCloud) DeregisterInstance(i *cloudinstances.CloudInstance) error {
return nil
}

func (c *MockAWSCloud) DetachInstance(i *cloudinstances.CloudInstance) error {
return detachInstance(c, i)
}
Expand Down
7 changes: 7 additions & 0 deletions upup/pkg/fi/cloudup/azure/azure_cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"fmt"

"github.com/Azure/go-autorest/autorest/azure/auth"
"k8s.io/klog/v2"
"k8s.io/kops/dnsprovider/pkg/dnsprovider"
"k8s.io/kops/pkg/apis/kops"
"k8s.io/kops/pkg/cloudinstances"
Expand Down Expand Up @@ -144,6 +145,12 @@ func (c *azureCloudImplementation) DeleteInstance(i *cloudinstances.CloudInstanc
return errors.New("DeleteInstance not implemented on azureCloud")
}

// DeregisterInstance drains a cloud instance and loadbalancers.
func (c *azureCloudImplementation) DeregisterInstance(i *cloudinstances.CloudInstance) error {
klog.V(8).Info("Azure DeregisterInstance not implemented")
return nil
}

func (c *azureCloudImplementation) DeleteGroup(g *cloudinstances.CloudInstanceGroup) error {
return errors.New("DeleteGroup not implemented on azureCloud")
}
Expand Down
4 changes: 4 additions & 0 deletions upup/pkg/fi/cloudup/azuretasks/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,10 @@ func (c *MockAzureCloud) DeleteInstance(i *cloudinstances.CloudInstance) error {
return errors.New("DeleteInstance not implemented on azureCloud")
}

func (c *MockAzureCloud) DeregisterInstance(i *cloudinstances.CloudInstance) error {
return nil
}

// DeleteGroup deletes the group.
func (c *MockAzureCloud) DeleteGroup(g *cloudinstances.CloudInstanceGroup) error {
return errors.New("DeleteGroup not implemented on azureCloud")
Expand Down
6 changes: 6 additions & 0 deletions upup/pkg/fi/cloudup/do/cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,12 @@ func (c *doCloudImplementation) DeleteGroup(g *cloudinstances.CloudInstanceGroup
return fmt.Errorf("digital ocean cloud provider does not support deleting cloud groups at this time")
}

// DeregisterInstance drains a cloud instance and loadbalancers.
func (c *doCloudImplementation) DeregisterInstance(i *cloudinstances.CloudInstance) error {
klog.V(8).Info("DO DeregisterInstance not implemented")
return nil
}

func (c *doCloudImplementation) DeleteInstance(i *cloudinstances.CloudInstance) error {
dropletID, err := strconv.Atoi(i.ID)
if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions upup/pkg/fi/cloudup/do/mock_do_cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ func (c *doCloudMockImplementation) DeleteInstance(instance *cloudinstances.Clou
return errors.New("not tested")
}

func (c *doCloudMockImplementation) DeregisterInstance(i *cloudinstances.CloudInstance) error {
return nil
}

// DetachInstance is not implemented yet. It needs to cause a cloud instance to no longer be counted against the group's size limits.
func (c *doCloudMockImplementation) DetachInstance(i *cloudinstances.CloudInstance) error {
return fmt.Errorf("digital ocean cloud provider does not support surging")
Expand Down
5 changes: 5 additions & 0 deletions upup/pkg/fi/cloudup/gce/instancegroups.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ func (c *gceCloudImplementation) DeleteInstance(i *cloudinstances.CloudInstance)
return recreateCloudInstance(c, i)
}

func (c *gceCloudImplementation) DeregisterInstance(i *cloudinstances.CloudInstance) error {
klog.V(8).Info("GCE DeregisterInstance not implemented")
return nil
}

// DetachInstance is not implemented yet. It needs to cause a cloud instance to no longer be counted against the group's size limits.
func (c *gceCloudImplementation) DetachInstance(i *cloudinstances.CloudInstance) error {
klog.V(8).Info("gce cloud provider DetachInstance not implemented yet")
Expand Down
1 change: 1 addition & 0 deletions upup/pkg/fi/cloudup/openstack/BUILD.bazel

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions upup/pkg/fi/cloudup/openstack/cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,8 +252,11 @@ type OpenstackCloud interface {
// ListDNSRecordsets will list the DNS recordsets for the given zone id
ListDNSRecordsets(zoneID string, opt recordsets.ListOptsBuilder) ([]recordsets.RecordSet, error)
GetLB(loadbalancerID string) (*loadbalancers.LoadBalancer, error)
GetLBStats(loadbalancerID string) (*loadbalancers.Stats, error)
CreateLB(opt loadbalancers.CreateOptsBuilder) (*loadbalancers.LoadBalancer, error)
ListLBs(opt loadbalancers.ListOptsBuilder) ([]loadbalancers.LoadBalancer, error)
UpdateMemberInPool(poolID string, memberID string, opts v2pools.UpdateMemberOptsBuilder) (*v2pools.Member, error)
ListPoolMembers(poolID string, opts v2pools.ListMembersOpts) ([]v2pools.Member, error)

// DeleteLB will delete loadbalancer
DeleteLB(lbID string, opt loadbalancers.DeleteOpts) error
Expand Down
97 changes: 97 additions & 0 deletions upup/pkg/fi/cloudup/openstack/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,19 @@ limitations under the License.
package openstack

import (
"context"
"fmt"
"time"

"github.com/gophercloud/gophercloud/openstack/compute/v2/flavors"

"github.com/gophercloud/gophercloud"
"github.com/gophercloud/gophercloud/openstack/compute/v2/servers"
"github.com/gophercloud/gophercloud/openstack/loadbalancer/v2/loadbalancers"
v2pools "github.com/gophercloud/gophercloud/openstack/loadbalancer/v2/pools"
"github.com/gophercloud/gophercloud/openstack/networking/v2/ports"
"github.com/mitchellh/mapstructure"
"golang.org/x/sync/errgroup"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog/v2"
"k8s.io/kops/pkg/cloudinstances"
Expand Down Expand Up @@ -170,6 +174,99 @@ func deleteInstanceWithID(c OpenstackCloud, instanceID string) error {
}
}

// DeregisterInstance drains a cloud instance and loadbalancers.
func (c *openstackCloud) DeregisterInstance(i *cloudinstances.CloudInstance) error {
return deregisterInstance(c, i.ID)
}

// deregisterInstance will drain all the loadbalancers attached to instance
func deregisterInstance(c OpenstackCloud, instanceID string) error {
instance, err := c.GetInstance(instanceID)
if err != nil {
return err
}

// Kubernetes creates loadbalancers that member name matches to instance name
// However, kOps uses different name format in API LB which is <cluster>-<ig>
instanceName := instance.Name
kopsName := ""
ig, igok := instance.Metadata[TagKopsInstanceGroup]
clusterName, clusterok := instance.Metadata[TagClusterName]
if igok && clusterok {
kopsName = fmt.Sprintf("%s-%s", clusterName, ig)
}

lbs, err := c.ListLBs(loadbalancers.ListOpts{})
if err != nil {
return err
}
ctx := context.Background()
eg, _ := errgroup.WithContext(ctx)
for i := range lbs {
func(lb loadbalancers.LoadBalancer) {
eg.Go(func() error {
return drainSingleLB(c, lb, instanceName, kopsName)
})
}(lbs[i])
}

if err := eg.Wait(); err != nil {
return fmt.Errorf("failed to deregister instance from load balancers: %v", err)
}

return nil
}

// drainSingleLB will drain single loadbalancer that is attached to instance
func drainSingleLB(c OpenstackCloud, lb loadbalancers.LoadBalancer, instanceName string, kopsName string) error {
oldStats, err := c.GetLBStats(lb.ID)
if err != nil {
return err
}

draining := false
pools, err := c.ListPools(v2pools.ListOpts{
LoadbalancerID: lb.ID,
})
if err != nil {
return err
}
for _, pool := range pools {
members, err := c.ListPoolMembers(pool.ID, v2pools.ListMembersOpts{})
if err != nil {
return err
}
for _, member := range members {
if member.Name == instanceName || (member.Name == kopsName && len(kopsName) > 0) {
// https://docs.openstack.org/api-ref/load-balancer/v2/?expanded=update-a-member-detail
// Setting the member weight to 0 means that the member will not receive new requests but will finish any existing connections.
// This “drains” the backend member of active connections.
_, err := c.UpdateMemberInPool(pool.ID, member.ID, v2pools.UpdateMemberOpts{
Weight: fi.Int(0),
})
if err != nil {
return err
}
draining = true
break
}
}
}

if draining {
// TODO: should we do somekind of loop here and check that connections are really drained?
time.Sleep(20 * time.Second)

newStats, err := c.GetLBStats(lb.ID)
if err != nil {
return err
}

klog.Infof("Loadbalancer %s connections before draining %d and after %d", lb.Name, oldStats.ActiveConnections, newStats.ActiveConnections)
}
return nil
}

// DetachInstance is not implemented yet. It needs to cause a cloud instance to no longer be counted against the group's size limits.
func (c *openstackCloud) DetachInstance(i *cloudinstances.CloudInstance) error {
return detachInstance(c, i)
Expand Down
Loading

0 comments on commit af7e6a4

Please sign in to comment.