diff --git a/pkg/agent/apiserver/apiserver.go b/pkg/agent/apiserver/apiserver.go index 4b767d3c345..75152d68109 100644 --- a/pkg/agent/apiserver/apiserver.go +++ b/pkg/agent/apiserver/apiserver.go @@ -18,6 +18,7 @@ import ( "fmt" "io/ioutil" "net" + "net/http" "os" "path" @@ -28,6 +29,7 @@ import ( k8sversion "k8s.io/apimachinery/pkg/version" "k8s.io/apiserver/pkg/registry/rest" genericapiserver "k8s.io/apiserver/pkg/server" + "k8s.io/apiserver/pkg/server/healthz" genericoptions "k8s.io/apiserver/pkg/server/options" "github.com/vmware-tanzu/antrea/pkg/agent/apiserver/handlers/addressgroup" @@ -93,7 +95,7 @@ func installAPIGroup(s *genericapiserver.GenericAPIServer, aq agentquerier.Agent // New creates an APIServer for running in antrea agent. func New(aq agentquerier.AgentQuerier, npq querier.AgentNetworkPolicyInfoQuerier, bindPort int, enableMetrics bool, kubeconfig string, cipherSuites []uint16, tlsMinVersion uint16) (*agentAPIServer, error) { - cfg, err := newConfig(bindPort, enableMetrics, kubeconfig) + cfg, err := newConfig(npq, bindPort, enableMetrics, kubeconfig) if err != nil { return nil, err } @@ -110,7 +112,7 @@ func New(aq agentquerier.AgentQuerier, npq querier.AgentNetworkPolicyInfoQuerier return &agentAPIServer{GenericAPIServer: s}, nil } -func newConfig(bindPort int, enableMetrics bool, kubeconfig string) (*genericapiserver.CompletedConfig, error) { +func newConfig(npq querier.AgentNetworkPolicyInfoQuerier, bindPort int, enableMetrics bool, kubeconfig string) (*genericapiserver.CompletedConfig, error) { secureServing := genericoptions.NewSecureServingOptions().WithLoopback() authentication := genericoptions.NewDelegatingAuthenticationOptions() authorization := genericoptions.NewDelegatingAuthorizationOptions().WithAlwaysAllowPaths("/healthz", "/livez", "/readyz") @@ -155,6 +157,14 @@ func newConfig(bindPort int, enableMetrics bool, kubeconfig string) (*genericapi GitCommit: antreaversion.GetGitSHA(), } serverConfig.EnableMetrics = enableMetrics + // Add readiness probe to check the status of watchers. + check := healthz.NamedCheck("watcher", func(_ *http.Request) error { + if npq.GetControllerConnectionStatus() { + return nil + } + return fmt.Errorf("some watchers may not be connected") + }) + serverConfig.ReadyzChecks = append(serverConfig.ReadyzChecks, check) completedServerCfg := serverConfig.Complete(nil) return &completedServerCfg, nil diff --git a/pkg/agent/controller/networkpolicy/networkpolicy_controller.go b/pkg/agent/controller/networkpolicy/networkpolicy_controller.go index 38f18085248..877ce3f2f8f 100644 --- a/pkg/agent/controller/networkpolicy/networkpolicy_controller.go +++ b/pkg/agent/controller/networkpolicy/networkpolicy_controller.go @@ -17,6 +17,7 @@ package networkpolicy import ( "context" "fmt" + "reflect" "sync" "time" @@ -44,6 +45,8 @@ const ( defaultWorkers = 4 ) +var emptyWatch = watch.NewEmptyWatch() + // Controller is responsible for watching Antrea AddressGroups, AppliedToGroups, // and NetworkPolicies, feeding them to ruleCache, getting dirty rules from // ruleCache, invoking reconciler to reconcile them. @@ -584,6 +587,12 @@ func (w *watcher) watch() { klog.Warningf("Failed to start watch for %s: %v", w.objectType, err) return } + // Watch method doesn't return error but "emptyWatch" in case of some partial data errors, + // e.g. timeout error. Make sure that watcher is not empty and log warning otherwise. + if reflect.TypeOf(watcher) == reflect.TypeOf(emptyWatch) { + klog.Warningf("Failed to start watch for %s. Please ensure antrea service is reachable for the agent", w.objectType) + return + } klog.Infof("Started watch for %s", w.objectType) w.setConnected(true)