Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make service mirror controller per target cluster #4710

Merged
merged 15 commits into from
Jul 23, 2020
2 changes: 1 addition & 1 deletion cli/cmd/install.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,7 @@ func (options *installOptions) validateAndBuild(stage string, flags *pflag.FlagS

if options.enableEndpointSlices {
if err = validateEndpointSlicesFeature(); err != nil {
return nil, nil, fmt.Errorf("--enableEndpointSlice=true not supported:%s", err)
return nil, nil, fmt.Errorf("--enableEndpointSlice=true not supported: %s", err)
}
}

Expand Down
10 changes: 9 additions & 1 deletion cli/cmd/multicluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -542,6 +542,14 @@ func newLinkCommand() *cobra.Command {
for _, ingress := range gateway.Status.LoadBalancer.Ingress {
gatewayAddresses = append(gatewayAddresses, ingress.IP)
}
if len(gatewayAddresses) == 0 {
return fmt.Errorf("Gateway %s.%s has no ingress addresses", gateway.Name, gateway.Namespace)
}

gatewayIdentity, ok := gateway.Annotations[k8s.GatewayIdentity]
if !ok || gatewayIdentity == "" {
return fmt.Errorf("Gatway %s.%s has no %s annotation", gateway.Name, gateway.Namespace, k8s.GatewayIdentity)
}

probeSpec, err := mc.ExtractProbeSpec(gateway)
if err != nil {
Expand All @@ -562,7 +570,7 @@ func newLinkCommand() *cobra.Command {
ClusterCredentialsSecret: fmt.Sprintf("cluster-credentials-%s", opts.clusterName),
GatewayAddress: strings.Join(gatewayAddresses, ","),
GatewayPort: gatewayPort,
GatewayIdentity: gateway.Annotations[k8s.GatewayIdentity],
GatewayIdentity: gatewayIdentity,
ProbeSpec: probeSpec,
}

Expand Down
2 changes: 0 additions & 2 deletions controller/api/destination/watcher/endpoints_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,6 @@ var endpointsVecs = newEndpointsMetricsVecs()
// NewEndpointsWatcher creates an EndpointsWatcher and begins watching the
// k8sAPI for pod, service, and endpoint changes. An EndpointsWatcher will
// watch on Endpoints or EndpointSlice resources, depending on cluster configuration.
//TODO: Allow EndpointSlice resources to be used once opt-in functionality is supported.
func NewEndpointsWatcher(k8sAPI *k8s.API, log *logging.Entry, enableEndpointSlices bool) *EndpointsWatcher {
ew := &EndpointsWatcher{
publishers: make(map[ServiceID]*servicePublisher),
Expand Down Expand Up @@ -161,7 +160,6 @@ func NewEndpointsWatcher(k8sAPI *k8s.API, log *logging.Entry, enableEndpointSlic
UpdateFunc: ew.updateEndpointSlice,
})
} else {
// ew.log.Debugf("Cluster does not have EndpointSlice access:%v", err)
ew.log.Debugf("Watching Endpoints resources")
k8sAPI.Endpoint().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: ew.addEndpoints,
Expand Down
15 changes: 10 additions & 5 deletions controller/cmd/destination/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,17 @@ func Main(args []string) {
}
}

// we need to create a separate client to check for EndpointSlice access in k8s cluster
// when slices are enabled and registered, k8sAPI is initialized with 'ES' resource
k8Client, err := pkgK8s.NewAPI(*kubeConfigPath, "", "", []string{}, 0)
if err != nil {
log.Fatalf("Failed to initialize K8s API Client: %s", err)
}
enableEndpointSlices := global.GetEndpointSliceEnabled()
err = pkgK8s.EndpointSliceAccess(k8Client)
if enableEndpointSlices && err != nil {
log.Fatalf("Failed to start with EndpointSlices enabled: %s", err)
}

var k8sAPI *k8s.API
if enableEndpointSlices {
Expand All @@ -85,11 +95,6 @@ func Main(args []string) {
log.Fatalf("Failed to initialize K8s API: %s", err)
}

err = consts.EndpointSliceAccess(k8sAPI.Client)
if enableEndpointSlices && err != nil {
log.Fatalf("Failed to start with EndpointSlices enabled: %s", err)
}

server := destination.NewServer(
*addr,
*controllerNamespace,
Expand Down
44 changes: 19 additions & 25 deletions controller/cmd/service-mirror/cluster_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,10 +259,7 @@ func (rcsw *RemoteClusterServiceWatcher) cleanupOrphanedServices() error {
// created. This piece of code is responsible for doing just that. It takes care of
// services, endpoints and namespaces (if needed)
func (rcsw *RemoteClusterServiceWatcher) cleanupMirroredResources() error {
matchLabels := map[string]string{
consts.MirroredResourceLabel: "true",
consts.RemoteClusterNameLabel: rcsw.link.TargetClusterName,
}
matchLabels := rcsw.getMirroredServiceLabels()

services, err := rcsw.localAPIClient.Svc().Lister().List(labels.Set(matchLabels).AsSelector())
if err != nil {
Expand Down Expand Up @@ -315,20 +312,14 @@ func (rcsw *RemoteClusterServiceWatcher) cleanupMirroredResources() error {
// Deletes a locally mirrored service as it is not present on the remote cluster anymore
func (rcsw *RemoteClusterServiceWatcher) handleRemoteServiceDeleted(ev *RemoteServiceDeleted) error {
localServiceName := rcsw.mirroredResourceName(ev.Name)
rcsw.log.Infof("Deleting mirrored service %s/%s and its corresponding Endpoints", ev.Namespace, localServiceName)
rcsw.log.Infof("Deleting mirrored service %s/%s", ev.Namespace, localServiceName)
var errors []error
if err := rcsw.localAPIClient.Client.CoreV1().Services(ev.Namespace).Delete(localServiceName, &metav1.DeleteOptions{}); err != nil {
if !kerrors.IsNotFound(err) {
errors = append(errors, fmt.Errorf("could not delete Service: %s/%s: %s", ev.Namespace, localServiceName, err))
}
}

if err := rcsw.localAPIClient.Client.CoreV1().Endpoints(ev.Namespace).Delete(localServiceName, &metav1.DeleteOptions{}); err != nil {
if !kerrors.IsNotFound(err) {
errors = append(errors, fmt.Errorf("could not delete Endpoints: %s/%s: %s", ev.Namespace, localServiceName, err))
}
}

if len(errors) > 0 {
return RetryableError{errors}
}
Expand All @@ -350,11 +341,10 @@ func (rcsw *RemoteClusterServiceWatcher) handleRemoteServiceUpdated(ev *RemoteSe
},
}

if rcsw.link.GatewayIdentity != "" {
copiedEndpoints.Annotations[consts.RemoteGatewayIdentity] = rcsw.link.GatewayIdentity
} else {
delete(copiedEndpoints.Annotations, consts.RemoteGatewayIdentity)
if copiedEndpoints.Annotations == nil {
copiedEndpoints.Annotations = make(map[string]string)
}
copiedEndpoints.Annotations[consts.RemoteGatewayIdentity] = rcsw.link.GatewayIdentity

if _, err := rcsw.localAPIClient.Client.CoreV1().Endpoints(copiedEndpoints.Namespace).Update(copiedEndpoints); err != nil {
return RetryableError{[]error{err}}
Expand Down Expand Up @@ -646,6 +636,11 @@ func (rcsw *RemoteClusterServiceWatcher) Start() error {
)
go rcsw.processEvents()

// We need to issue a RepairEndpoints immediately to populate the gateway
// mirror endpoints.
ev := RepairEndpoints{}
rcsw.eventsQueue.Add(&ev)
Comment on lines +639 to +642
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This fixes the issue of 503s that I was seeing 👍


go func() {
ticker := time.NewTicker(rcsw.repairPeriod)
for {
Expand Down Expand Up @@ -746,11 +741,10 @@ func (rcsw *RemoteClusterServiceWatcher) repairEndpoints() {
},
}

if rcsw.link.GatewayIdentity != "" {
updatedEndpoints.Annotations[consts.RemoteGatewayIdentity] = rcsw.link.GatewayIdentity
} else {
delete(updatedEndpoints.Annotations, consts.RemoteGatewayIdentity)
if updatedEndpoints.Annotations == nil {
updatedEndpoints.Annotations = make(map[string]string)
}
updatedEndpoints.Annotations[consts.RemoteGatewayIdentity] = rcsw.link.GatewayIdentity

_, err = rcsw.localAPIClient.Client.CoreV1().Services(updatedService.Namespace).Update(updatedService)
if err != nil {
Expand All @@ -777,12 +771,12 @@ func (rcsw *RemoteClusterServiceWatcher) createOrUpdateEndpoints(ep *corev1.Endp
} else {
return err
}
} else {
// Exists so we should update it.
_, err = rcsw.localAPIClient.Client.CoreV1().Endpoints(ep.Namespace).Update(ep)
if err != nil {
return err
}
}
// Exists so we should update it.
_, err = rcsw.localAPIClient.Client.CoreV1().Endpoints(ep.Namespace).Update(ep)
if err != nil {
return err
}

return nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,8 @@ func (tc *mirroringTestCase) run(t *testing.T) {
}

if tc.expectedLocalEndpoints == nil {
// ensure the are no local endpoints
endpoints, err := localAPI.Client.CoreV1().Endpoints(corev1.NamespaceAll).List(metav1.ListOptions{})
if err != nil {
t.Fatal(err)
}
if len(endpoints.Items) > 0 {
t.Fatalf("Was expecting no local endpoints but instead found %d", len(endpoints.Items))

}
// In a real Kubernetes cluster, deleting the service is sufficient
// to delete the endpoints.
} else {
for _, expected := range tc.expectedLocalEndpoints {
actual, err := localAPI.Client.CoreV1().Endpoints(expected.Namespace).Get(expected.Name, metav1.GetOptions{})
Expand Down
1 change: 0 additions & 1 deletion pkg/healthcheck/healthcheck_multicluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,6 @@ func (hc *HealthChecker) multiClusterCategory() []category {
{
description: "all gateway mirrors are healthy",
hintAnchor: "l5d-multicluster-gateways-endpoints",
warning: true,
check: func(ctx context.Context) error {
if hc.Options.MultiCluster {
return hc.checkIfGatewayMirrorsHaveEndpoints(ctx)
Expand Down
15 changes: 6 additions & 9 deletions pkg/k8s/authz.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,14 +109,12 @@ func ServiceProfilesAccess(k8sClient kubernetes.Interface) error {

// EndpointSliceAccess verifies whether the K8s cluster has
// access to EndpointSlice resources.
//TODO: Uncomment function and change return type once EndpointSlices
// are supported and made opt-in through install flag
func EndpointSliceAccess(k8sClient kubernetes.Interface) error {
// gv := discovery.SchemeGroupVersion.String()
// res, err := k8sClient.Discovery().ServerResourcesForGroupVersion(gv)
// if err != nil {
// return err
// }
gv := discovery.SchemeGroupVersion.String()
res, err := k8sClient.Discovery().ServerResourcesForGroupVersion(gv)
if err != nil {
return err
}

if res.GroupVersion == gv {
for _, apiRes := range res.APIResources {
Expand All @@ -126,8 +124,7 @@ func EndpointSliceAccess(k8sClient kubernetes.Interface) error {
}
}

// return errors.New("EndpointSlice resource not found")
return errors.New("EndpointSlice not supported")
return errors.New("EndpointSlice resource not found")
}

func checkEndpointSlicesExist(k8sClient kubernetes.Interface) error {
Expand Down