Skip to content

Commit

Permalink
Use the private service's hostname & target port
Browse files Browse the repository at this point in the history
When we set 'ClusterIP: None' on the private service the activator
can't perform the fallback service probing. This is needed
while in mesh mode.

This change tweaks the fallback probing to use the hostname
of the private service instead of using a cluster IP.

On caveat is that when using a headless service Istio/K8s doesn't
perform any port translation 80 (http)->8012 (queue-proxy http).

Thus we perform this lookup ourselves.
  • Loading branch information
dprotaso committed Jul 11, 2024
1 parent 6760aa6 commit a76d104
Show file tree
Hide file tree
Showing 6 changed files with 173 additions and 173 deletions.
6 changes: 3 additions & 3 deletions pkg/activator/net/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,13 +92,13 @@ func endpointsToDests(endpoints *corev1.Endpoints, portName string) (ready, notR
return ready, notReady
}

// getServicePort takes a service and a protocol and returns the port number of
// getTargetPort takes a service and a protocol and returns the port number of
// the port named for that protocol. If the port is not found then ok is false.
func getServicePort(protocol networking.ProtocolType, svc *corev1.Service) (int, bool) {
func getTargetPort(protocol networking.ProtocolType, svc *corev1.Service) (int, bool) {
wantName := networking.ServicePortName(protocol)
for _, p := range svc.Spec.Ports {
if p.Name == wantName {
return int(p.Port), true
return p.TargetPort.IntValue(), true
}
}

Expand Down
13 changes: 7 additions & 6 deletions pkg/activator/net/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/google/go-cmp/cmp"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/sets"
"knative.dev/networking/pkg/apis/networking"
)
Expand Down Expand Up @@ -153,7 +154,7 @@ func TestEndpointsToDests(t *testing.T) {
}
}

func TestGetServicePort(t *testing.T) {
func TestGetTargetPort(t *testing.T) {
for _, tc := range []struct {
name string
protocol networking.ProtocolType
Expand All @@ -164,17 +165,17 @@ func TestGetServicePort(t *testing.T) {
name: "Single port",
protocol: networking.ProtocolHTTP1,
ports: []corev1.ServicePort{{
Name: "http",
Port: 100,
Name: "http",
TargetPort: intstr.FromInt(100),
}},
expect: 100,
expectOK: true,
}, {
name: "Missing port",
protocol: networking.ProtocolHTTP1,
ports: []corev1.ServicePort{{
Name: "invalid",
Port: 100,
Name: "invalid",
TargetPort: intstr.FromInt(100),
}},
expect: 0,
expectOK: false,
Expand All @@ -186,7 +187,7 @@ func TestGetServicePort(t *testing.T) {
},
}

port, ok := getServicePort(tc.protocol, &svc)
port, ok := getTargetPort(tc.protocol, &svc)
if ok != tc.expectOK {
t.Errorf("Wanted ok %v, got %v", tc.expectOK, ok)
}
Expand Down
63 changes: 32 additions & 31 deletions pkg/activator/net/revision_backends.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import (
"knative.dev/pkg/controller"
"knative.dev/pkg/logging"
"knative.dev/pkg/logging/logkey"
"knative.dev/pkg/network"
"knative.dev/pkg/reconciler"
"knative.dev/serving/pkg/apis/serving"
revisioninformer "knative.dev/serving/pkg/client/injection/informers/serving/v1/revision"
Expand All @@ -62,9 +63,9 @@ import (
// ClusterIPDest will be set to non empty string and Dests will be nil. Otherwise Dests will be set
// to a slice of healthy l4 dests for reaching the revision.
type revisionDestsUpdate struct {
Rev types.NamespacedName
ClusterIPDest string
Dests sets.Set[string]
Rev types.NamespacedName
PrivateService string
Dests sets.Set[string]
}

type dests struct {
Expand All @@ -91,7 +92,7 @@ const (
defaultProbeFrequency time.Duration = 200 * time.Millisecond
)

// revisionWatcher watches the podIPs and ClusterIP of the service for a revision. It implements the logic
// revisionWatcher watches the podIPs/service of a revision. It implements the logic
// to supply revisionDestsUpdate events on updateCh
type revisionWatcher struct {
stopCh <-chan struct{}
Expand All @@ -103,8 +104,9 @@ type revisionWatcher struct {

// Stores the list of pods that have been successfully probed.
healthyPods sets.Set[string]
// Stores whether the service ClusterIP has been seen as healthy.
clusterIPHealthy bool

// Stores whether the private k8s service has been seen as healthy.
privateServiceHealthy bool

transport http.RoundTripper
destsCh chan dests
Expand Down Expand Up @@ -200,23 +202,22 @@ func (rw *revisionWatcher) probe(ctx context.Context, dest string) (pass bool, n
return match, notMesh, err
}

func (rw *revisionWatcher) getDest() (string, error) {
svc, err := rw.serviceLister.Services(rw.rev.Namespace).Get(names.PrivateService(rw.rev.Name))
func (rw *revisionWatcher) getPrivateServiceDest() (string, error) {
svcName := names.PrivateService(rw.rev.Name)
svc, err := rw.serviceLister.Services(rw.rev.Namespace).Get(svcName)
if err != nil {
return "", err
}
if svc.Spec.ClusterIP == "" {
return "", fmt.Errorf("private service %s/%s clusterIP is nil, this should never happen", svc.ObjectMeta.Namespace, svc.ObjectMeta.Name)
}

svcPort, ok := getServicePort(rw.protocol, svc)
svcHostname := network.GetServiceHostname(svcName, rw.rev.Namespace)
svcPort, ok := getTargetPort(rw.protocol, svc)
if !ok {
return "", fmt.Errorf("unable to find port in service %s/%s", svc.Namespace, svc.Name)
}
return net.JoinHostPort(svc.Spec.ClusterIP, strconv.Itoa(svcPort)), nil
return net.JoinHostPort(svcHostname, strconv.Itoa(svcPort)), nil
}

func (rw *revisionWatcher) probeClusterIP(dest string) (bool, error) {
func (rw *revisionWatcher) probePrivateService(dest string) (bool, error) {
ctx, cancel := context.WithTimeout(context.Background(), probeTimeout)
defer cancel()
match, _, err := rw.probe(ctx, dest)
Expand Down Expand Up @@ -296,12 +297,12 @@ func (rw *revisionWatcher) probePodIPs(ready, notReady sets.Set[string]) (succee
return healthy, unchanged, sawNotMesh.Load(), err
}

func (rw *revisionWatcher) sendUpdate(clusterIP string, dests sets.Set[string]) {
func (rw *revisionWatcher) sendUpdate(privateService string, dests sets.Set[string]) {
select {
case <-rw.stopCh:
return
default:
rw.updateCh <- revisionDestsUpdate{Rev: rw.rev, ClusterIPDest: clusterIP, Dests: dests}
rw.updateCh <- revisionDestsUpdate{Rev: rw.rev, PrivateService: privateService, Dests: dests}
}
}

Expand All @@ -310,9 +311,9 @@ func (rw *revisionWatcher) sendUpdate(clusterIP string, dests sets.Set[string])
func (rw *revisionWatcher) checkDests(curDests, prevDests dests) {
if len(curDests.ready) == 0 && len(curDests.notReady) == 0 {
// We must have scaled down.
rw.clusterIPHealthy = false
rw.privateServiceHealthy = false
rw.healthyPods = nil
rw.logger.Debug("ClusterIP is no longer healthy.")
rw.logger.Debug("Private service is no longer healthy.")
// Send update that we are now inactive (both params invalid).
rw.sendUpdate("", nil)
return
Expand Down Expand Up @@ -351,7 +352,7 @@ func (rw *revisionWatcher) checkDests(curDests, prevDests dests) {
// Note: it's important that this copies (via hs.Union) the healthy pods
// set before sending the update to avoid concurrent modifications
// affecting the throttler, which iterates over the set.
rw.sendUpdate("" /*clusterIP*/, hs.Union(nil))
rw.sendUpdate("", hs.Union(nil))
return
}
// no-op, and we have successfully probed at least one pod.
Expand Down Expand Up @@ -380,28 +381,28 @@ func (rw *revisionWatcher) checkDests(curDests, prevDests dests) {
// If we failed to probe even a single pod, check the clusterIP.
// NB: We can't cache the IP address, since user might go rogue
// and delete the K8s service. We'll fix it, but the cluster IP will be different.
dest, err := rw.getDest()
dest, err := rw.getPrivateServiceDest()
if err != nil {
rw.logger.Errorw("Failed to determine service destination", zap.Error(err))
return
}

// If cluster IP is healthy and we haven't scaled down, short circuit.
if rw.clusterIPHealthy {
rw.logger.Debugf("ClusterIP %s already probed (ready backends: %d)", dest, len(curDests.ready))
// If service hostname is healthy and we haven't scaled down, short circuit.
if rw.privateServiceHealthy {
rw.logger.Debugf("service hostname %s already probed (ready backends: %d)", dest, len(curDests.ready))
rw.sendUpdate(dest, curDests.ready)
return
}

// If clusterIP is healthy send this update and we are done.
if ok, err := rw.probeClusterIP(dest); err != nil {
rw.logger.Errorw("Failed to probe clusterIP "+dest, zap.Error(err))
// If service via hostname is healthy send this update and we are done.
if ok, err := rw.probePrivateService(dest); err != nil {
rw.logger.Errorw("Failed to probe private service: "+dest, zap.Error(err))
} else if ok {
// We can reach here only iff pods are not successfully individually probed
// but ClusterIP conversely has been successfully probed.
// but PrivateService conversely has been successfully probed.
rw.podsAddressable = false
rw.logger.Debugf("ClusterIP is successfully probed: %s (ready backends: %d)", dest, len(curDests.ready))
rw.clusterIPHealthy = true
rw.logger.Debugf("Private service is successfully probed: %s (ready backends: %d)", dest, len(curDests.ready))
rw.privateServiceHealthy = true
rw.healthyPods = nil
rw.sendUpdate(dest, curDests.ready)
}
Expand All @@ -421,8 +422,8 @@ func (rw *revisionWatcher) run(probeFrequency time.Duration) {
// then we want to probe on timer.
rw.logger.Debugw("Revision state", zap.Object("dests", curDests),
zap.Object("healthy", logging.StringSet(rw.healthyPods)),
zap.Bool("clusterIPHealthy", rw.clusterIPHealthy))
if len(curDests.ready)+len(curDests.notReady) > 0 && !(rw.clusterIPHealthy ||
zap.Bool("clusterHealthy", rw.privateServiceHealthy))
if len(curDests.ready)+len(curDests.notReady) > 0 && !(rw.privateServiceHealthy ||
curDests.ready.Union(curDests.notReady).Equal(rw.healthyPods)) {
rw.logger.Debug("Probing on timer")
tickCh = timer.C
Expand Down
Loading

0 comments on commit a76d104

Please sign in to comment.