-
Notifications
You must be signed in to change notification settings - Fork 301
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
K8s-NEG Integration #48
Conversation
/assign bowei |
3b78b21
to
c092e64
Compare
47ea8c6
to
bdefa81
Compare
cloud provider change was merged: #54 |
65bc226
to
8dfad3f
Compare
cmd/glbc/main.go
Outdated
// Start loadbalancer controller | ||
lbc, err := controller.NewLoadBalancerController(kubeClient, ctx, clusterManager) | ||
lbc, err := controller.NewLoadBalancerController(kubeClient, ctx, clusterManager, cloud.AlphaFeatureGate.Enabled(gce.AlphaFeatureNetworkEndpointGroup)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is negEnabled passed to NewLoadBalancerController
when it's accessible in the ctx?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ControllerContext does not have a flag for negEnabled. It has a flag for enabling endpoint informer.
cmd/glbc/main.go
Outdated
@@ -301,6 +302,13 @@ func main() { | |||
glog.V(3).Infof("Cluster name %+v", clusterManager.ClusterNamer.GetClusterName()) | |||
} | |||
clusterManager.Init(&controller.GCETranslator{LoadBalancerController: lbc}) | |||
|
|||
// Start NEG controller | |||
if cloud.AlphaFeatureGate.Enabled(gce.AlphaFeatureNetworkEndpointGroup) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we determine this value once at the start of the file instead of recalling this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ack
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The HealthCheck changes divert from the existing design pattern and introduces complexity into consumer packages. If we needed to add support for HTTP2HealthChecks soon, it would be more difficult with this code.
pkg/backends/backends.go
Outdated
Protocol utils.AppProtocol | ||
SvcName types.NamespacedName | ||
SvcPort intstr.IntOrString | ||
Port int64 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you comment that this is NodePort. Also, add TODO for changing the name.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ack
pkg/backends/backends.go
Outdated
b := &computealpha.Backend{ | ||
Group: neg.SelfLink, | ||
BalancingMode: string(Rate), | ||
MaxRate: maxRPS, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Believe we should use MaxRatePerEndpoint
to be consistent with MaxRatePerInstance
pkg/backends/backends.go
Outdated
|
||
negName := b.namer.NEGName(port.SvcName.Namespace, port.SvcName.Name, port.SvcTargetPort) | ||
|
||
negs := []*computealpha.NetworkEndpointGroup{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: var negs []*computealpha.NetworkEndpointGroup
negs = append(negs, neg) | ||
} | ||
|
||
backendService, err := b.cloud.GetAlphaGlobalBackendService(b.namer.BeName(port.Port)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you have a note somewhere saying the services must still be type NodePort? May be ideal to have a neg-design.md
file that documents behavior, caveats, and TODOs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have a NEG to beta doc that included this.
pkg/backends/backends.go
Outdated
for _, backend := range backendService.Backends { | ||
found := false | ||
for _, negBackend := range targetBackends { | ||
// Warnning: Group link includes the api version. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sp
pkg/backends/backends.go
Outdated
needToUpdate := len(backendService.Backends) == 0 | ||
for _, backend := range backendService.Backends { | ||
found := false | ||
for _, negBackend := range targetBackends { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not clear what exactly you want from this logic. Do you want set equality? Right now, you're only testing whether the targetBackends exist in the current set - not whether there are extraneous backends in the current set.
Could we use sets.NewString()
and build two separate sets and test for equality? The code would be smaller, fewer indents, & easier to grok.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ack
pkg/backends/backends.go
Outdated
hc.Description = description | ||
hc.CheckIntervalSec = checkInterval | ||
hc.TimeoutSec = timeout | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have a large problem with how the alpha health check is incorporated, and this exemplifies why. The point of the HealthCheck
struct is to present all parameters at the root level so other packages do not need to have this conditional mess.
pkg/healthchecks/healthchecks.go
Outdated
@@ -174,6 +266,7 @@ func DefaultHealthCheck(port int64, protocol utils.AppProtocol) *HealthCheck { | |||
type HealthCheck struct { | |||
compute.HTTPHealthCheck | |||
compute.HealthCheck | |||
AlphaHealthCheck *computealpha.HealthCheck |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rather than just adding this as a field. We should change the struct to:
type HealthCheck struct {
computealpha.HTTPHealthCheck
computealpha.HealthCheck
We know that computealpha.HealthCheck and computealpha.HTTPHealthCheck are supersets of their compute.
variants, so let's use them to store everything. It just means that when we retrieve an existing health check or output a health check struct, we need two more conversion functions. Just because we're using the alpha structs here does not mean we need to always use the alpha-API. We're just using them as structs which are compatible with both alpha and GA structs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SGTM. I was afraid that it will mess up with existing logic.
e84e8ca
to
223829d
Compare
Fixed the comments. Ready for another round. |
c1244b9
to
28be7e4
Compare
fixed comments and rebased. PTAL |
5461544
to
7a0f4cb
Compare
"k8s1-0123456789abcdef-0123456789012-0123456789-0123456-71877a60", | ||
}, | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unit test trimFieldsEvenly to make sure we don't generate labels that are too long or crash if there is not enough space.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
networkendpointgroup/interfaces.go
Outdated
} | ||
|
||
// Syncer is an interface to interact with syncer | ||
type Syncer interface { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Document the methods
networkendpointgroup/interfaces.go
Outdated
|
||
// SyncerManager is an interface for controllers to manage Syncers | ||
type SyncerManager interface { | ||
EnsureSyncer(namespace, name string, targetPorts sets.String) error |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Document the methods
networkendpointgroup/manager.go
Outdated
"k8s.io/client-go/tools/record" | ||
) | ||
|
||
// syncerManager exposes a few interfaces to manage syncer and ensures thread safety. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doesn't this actually contain all the active sync goroutines and manage their lifecycle? Update the docstring comment, right now I find the description to be not very useful.
networkendpointgroup/manager.go
Outdated
mu sync.Mutex | ||
// svcPortMap is the canonical indicator for whether a service needs NEG | ||
// key is service namespace/name, value is the list of target port that requires NEG | ||
svcPortMap map[string]sets.String |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not
type serviceKey struct {
ns string
name string
}
type targetPorts sets.String
servicePortMap map[serviceKey]targetPorts
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the serviceKey (namespace/name) is consistent with the key used in apimachinery
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added the serviceKey struct. But type targetPorts sets.String
does not work. I am relying on methods like sets.String.Difference
. This function is expecting another sets.Sting
type arg.
leaving the value as sets.String is cleaner.
networkendpointgroup/manager.go
Outdated
// key is service namespace/name, value is the list of target port that requires NEG | ||
svcPortMap map[string]sets.String | ||
// syncerMap stores the NEG syncer | ||
// key is service namespace/name/targetPort. Value is the corresponding syncer |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
type struct negKey {
namespace string
name string
targetPort int
}
networkendpointgroup/manager.go
Outdated
} | ||
} | ||
|
||
errList := []error{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
move this to right before it is used.
networkendpointgroup/manager.go
Outdated
} | ||
|
||
errList := []error{} | ||
// Start syncer for added ports |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// Ensure a syncer is running for each port that is being added.
networkendpointgroup/manager.go
Outdated
} | ||
|
||
// StopSyncer stops all syncers for the input service. | ||
func (manager *syncerManager) StopSyncer(namespace, name string) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
probably better name: StopServiceSyncers
networkendpointgroup/manager.go
Outdated
key := serviceKeyFunc(namespace, name) | ||
if ports, ok := manager.svcPortMap[key]; ok { | ||
for _, port := range ports.List() { | ||
syncer, ok := manager.syncerMap[encodeSyncerKey(namespace, name, port)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[minor] if syncer, ok := ...; ok {
networkendpointgroup/manager.go
Outdated
func (manager *syncerManager) GC() error { | ||
glog.V(2).Infof("Start NEG garbage collection.") | ||
defer glog.V(2).Infof("NEG garbage collection finished.") | ||
// Garbage collect syncer |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
delete this comment
networkendpointgroup/controller.go
Outdated
resyncPeriod time.Duration, | ||
) (*Controller, error) { | ||
// init event recorder | ||
eventBroadcaster := record.NewBroadcaster() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can this reuse the one created at the top level in Ingress
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If so, need to move event recorder initialization in main. I think it would better to change this in a follow up PR. Added a TODO
networkendpointgroup/controller.go
Outdated
}, stopCh) | ||
|
||
glog.V(2).Infof("Starting network endpoint group controller") | ||
defer c.stop() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Better to have explicit order.
defer func() {
...
}
networkendpointgroup/controller.go
Outdated
if err != nil { | ||
return err | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
prefer not to have multipath dep on conditional
if ! exists {
c.manager.StopSyncer(...)
}
networkendpointgroup/controller.go
Outdated
func (c *Controller) garbageCollection() { | ||
if err := c.manager.GC(); err != nil { | ||
glog.Errorf("NEG controller garbage collection failed: %v", err) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
generate an event
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Events need to be associated with a API object.
I am not sure which one to associate this with.
networkendpointgroup/controller.go
Outdated
return | ||
} | ||
|
||
glog.Warningf("Dropping service %q out of the queue: %v", key, err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
generate an event
} | ||
|
||
func TestGatherSerivceTargetPortUsedByIngress(t *testing.T) { | ||
ings := []extensions.Ingress{*getTestIngress(), *getTestIngress()} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we have a few more test cases here?
pkg/healthchecks/healthchecks.go
Outdated
DefaultUnhealthyThreshold = 10 | ||
// DefaultTimeout defines the timeout of each probe | ||
// DefaultNEGUnhealthyThreshold defines the threshold of failure probes that declare a network endpoint "unhealthy" | ||
DefaultNEGUnhealthyThreshold = 2 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
probably want to state this is derived how?
// This is to preserve the existing health check setting as much as possible. | ||
// WARNING: if a service backend is converted from IG mode to NEG mode, | ||
// the existing health check setting will be preserve, although it may not suit the customer needs. | ||
func mergeHealthcheckForNEG(oldHC, newHC *HealthCheck) *HealthCheck { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this actually do anything other than a straight copy of all the fields?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the approach @nicksardo and I discussed. It mainly copies the fields and manipulate port and portSpecification field to make it compatible. So that any manual changes are preserved while switching between IG and NEG mode.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ack
hc := healthChecks.New(8000, utils.ProtocolHTTP, true) | ||
_, err := healthChecks.Sync(hc) | ||
if err != nil { | ||
t.Fatalf("unexpected error: %v", err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
got %v, want nil
|
||
ret, err := healthChecks.Get(8000, true) | ||
if err != nil { | ||
t.Fatalf("unexpected error: %v", err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
got %v, want nil
t.Fatalf("unexpected error: %v", err) | ||
} | ||
if ret.Port != 0 { | ||
t.Errorf("Expect port to not be specified, but got %d.", ret.Port) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
got ret.Port == %d, want 0
t.Errorf("Expect port to not be specified, but got %d.", ret.Port) | ||
} | ||
if ret.PortSpecification != UseServingPortSpecification { | ||
t.Errorf("Expect port specification to be %q, but got %q.", UseServingPortSpecification, ret.PortSpecification) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
got ret.PortSpecification = %q, want %q
0c50fb8
to
c7781e5
Compare
Fixed the commemnts. PTAL |
38169db
to
ae5fe51
Compare
pkg/backends/backends.go
Outdated
// For NEG, the api version for health check will be alpha. | ||
// Hence, it will cause the health check links to be always different | ||
// TODO (mixia): compare health check link directly once NEG is GA | ||
splited := strings.Split(existingHCLink, "/") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
make this its own helper func
pkg/controller/utils.go
Outdated
@@ -358,6 +363,42 @@ IngressLoop: | |||
return | |||
} | |||
|
|||
func (s *StoreToEndpointLister) ListEndpointTargetPorts(namespace, name, targetPort string) []int { | |||
// if targetPort is integer, no need to translate to endpoint ports | |||
i, _ := strconv.Atoi(targetPort) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems more correct to do:
if i, err := strconv.Atoi(targetPort); err == nil {
return []int{i}
}
Rather than relying on the special value of 0
pkg/controller/utils.go
Outdated
ret := []int{} | ||
|
||
if !exists { | ||
glog.Errorf("Endpoint object %v/%v does not exists.", namespace, name) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[minor] exist
no s
pkg/controller/utils.go
Outdated
}, | ||
}, | ||
) | ||
ret := []int{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
move this down to the for
loop
pkg/controller/utils.go
Outdated
} | ||
if err != nil { | ||
glog.Errorf("Failed to retrieve endpoint object %v/%v: %v", namespace, name, err) | ||
return ret |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return []int{}
pkg/controller/utils.go
Outdated
|
||
if !exists { | ||
glog.Errorf("Endpoint object %v/%v does not exists.", namespace, name) | ||
return ret |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return []int{}
Start() error | ||
// Start stops the syncer | ||
Stop() | ||
// Sync signals the syncer to sync NEG |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
State if this is blocking or async
// Start starts the syncer | ||
Start() error | ||
// Start stops the syncer | ||
Stop() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
state if this waits for syncer to stop
type negSyncerManager interface { | ||
// EnsureSyncer ensures corresponding syncers are started and stops any unnecessary syncer | ||
EnsureSyncer(namespace, name string, targetPorts sets.String) error | ||
// StopSyncer stops all syncers related to the service |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
state whether this waits for actually stop
// StopSyncer stops all syncers related to the service | ||
StopSyncer(namespace, name string) | ||
// Sync signals all syncers related to the service to sync | ||
Sync(namespace, name string) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
state if this is async or blocking
pkg/networkendpointgroup/manager.go
Outdated
} | ||
|
||
// EnsureSyncer starts and stops syncers based on the input service ports. | ||
func (manager *syncerManager) EnsureSyncer(namespace, name string, targetPorts sets.String) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
EnsureSyncers
Fixed the comments. PTAL |
Rebased. Let us merge it today. |
h00t! |
End-to-end K8s-NEG integration