Skip to content

Commit

Permalink
Fix rollback invocation after CmdAdd failure in CNI server
Browse files Browse the repository at this point in the history
* When performing configuration rollback after an error in CmdAdd, we do
  not invoke CmdDel directly. Instead, we invoke an internal version of
  it which does not log a "Received CmdDel request" message (the message
  is confusing otherwise as it implies that we received a new CNI DEL
  command from the container runtime), and which does not process the
  network config again (as it was already processed at the beginning of
  CmdAdd). By not processing the config a second time, we ensure that
  there are no duplicate CIDRs in the IPAMConfig.
* Migrate klog calls in server.go to use structured logging.
* Improve unit tests for the CNI server to validate this fix.

Fixes #5547

Signed-off-by: Antonin Bas <[email protected]>
  • Loading branch information
antoninbas committed Oct 5, 2023
1 parent bbec9d0 commit 904a6b5
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 55 deletions.
6 changes: 6 additions & 0 deletions pkg/agent/cniserver/ipam/ipam_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,12 @@ func RegisterIPAMDriver(ipamType string, ipamDriver IPAMDriver) {
ipamDrivers[ipamType] = append(ipamDrivers[ipamType], ipamDriver)
}

func ResetIPAMDrivers(ipamType string) {
if ipamDrivers != nil {
delete(ipamDrivers, ipamType)
}
}

func argsFromEnv(cniArgs *cnipb.CniCmdArgs) *invoke.Args {
return &invoke.Args{
ContainerID: cniArgs.ContainerId,
Expand Down
89 changes: 46 additions & 43 deletions pkg/agent/cniserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ func (s *CNIServer) loadNetworkConfig(request *cnipb.CniCmdRequest) (*CNIConfig,
cniConfig.MTU = s.networkConfig.InterfaceMTU
}
cniConfig.CniCmdArgs = request.CniArgs
klog.V(3).Infof("Load network configurations: %v", cniConfig)
klog.V(3).InfoS("Loaded network configuration", "conf", cniConfig)
return &cniConfig, nil
}

Expand All @@ -215,7 +215,7 @@ func (s *CNIServer) validateCNIAndIPAMType(cniConfig *CNIConfig) *cnipb.CniCmdRe
return nil
}
if !ipam.IsIPAMTypeValid(ipamType) {
klog.Errorf("Unsupported IPAM type %s", ipamType)
klog.ErrorS(nil, "Unsupported IPAM type", "type", ipamType)
return s.unsupportedFieldResponse("ipam/type", ipamType)
}
if s.enableBridgingMode {
Expand All @@ -230,7 +230,7 @@ func (s *CNIServer) validateCNIAndIPAMType(cniConfig *CNIConfig) *cnipb.CniCmdRe
return s.unsupportedFieldResponse("type", cniConfig.Type)
}
if ipamType != ipam.AntreaIPAMType {
klog.Errorf("Unsupported IPAM type %s", ipamType)
klog.ErrorS(nil, "Unsupported IPAM type", "type", ipamType)
return s.unsupportedFieldResponse("ipam/type", ipamType)
}
// IPAM for an interface not managed by Antrea CNI.
Expand All @@ -241,14 +241,14 @@ func (s *CNIServer) validateCNIAndIPAMType(cniConfig *CNIConfig) *cnipb.CniCmdRe
func (s *CNIServer) validateRequestMessage(request *cnipb.CniCmdRequest) (*CNIConfig, *cnipb.CniCmdResponse) {
cniConfig, err := s.loadNetworkConfig(request)
if err != nil {
klog.Errorf("Failed to parse network configuration: %v", err)
klog.ErrorS(err, "Failed to parse network configuration")
return nil, s.decodingFailureResponse("network config")
}

cniVersion := cniConfig.CNIVersion
// Check if CNI version in the request is supported
if !s.isCNIVersionSupported(cniVersion) {
klog.Errorf(fmt.Sprintf("Unsupported CNI version [%s], supported CNI versions %s", cniVersion, version.All.SupportedVersions()))
klog.ErrorS(nil, "Unsupported CNI version", "requested", cniVersion, "supported", version.All.SupportedVersions())
return nil, s.incompatibleCniVersionResponse(cniVersion)
}

Expand Down Expand Up @@ -344,19 +344,19 @@ func buildVersionSet() map[string]bool {

func (s *CNIServer) parsePrevResultFromRequest(networkConfig *types.NetworkConfig) (*current.Result, *cnipb.CniCmdResponse) {
if networkConfig.PrevResult == nil && networkConfig.RawPrevResult == nil {
klog.Errorf("Previous network configuration not specified")
klog.ErrorS(nil, "Previous network configuration not specified")
return nil, s.unsupportedFieldResponse("prevResult", "")
}

if err := parsePrevResult(networkConfig); err != nil {
klog.Errorf("Failed to parse previous network configuration")
klog.ErrorS(err, "Failed to parse previous network configuration")
return nil, s.decodingFailureResponse("prevResult")
}
// Convert whatever the result was into the current Result type (for the current CNI
// version)
prevResult, err := current.NewResultFromResult(networkConfig.PrevResult)
if err != nil {
klog.Errorf("Failed to construct prevResult using previous network configuration")
klog.ErrorS(err, "Failed to construct prevResult using previous network configuration")
return nil, s.unsupportedFieldResponse("prevResult", networkConfig.PrevResult)
}
prevResult.CNIVersion = networkConfig.CNIVersion
Expand All @@ -372,7 +372,7 @@ func (s *CNIServer) validatePrevResult(cfgArgs *cnipb.CniCmdArgs, prevResult *cu
// Find interfaces from previous configuration
containerIntf := parseContainerIfaceFromResults(cfgArgs, prevResult)
if containerIntf == nil {
klog.Errorf("Failed to find interface %s of container %s", cfgArgs.Ifname, containerID)
klog.ErrorS(nil, "Failed to find interface of container", "interface", cfgArgs.Ifname, "container", containerID)
return s.invalidNetworkConfigResponse("prevResult does not match network configuration")
}
if err := s.podConfigurator.checkInterfaces(
Expand Down Expand Up @@ -422,7 +422,7 @@ func (s *CNIServer) ipamCheck(cniConfig *CNIConfig) (*cnipb.CniCmdResponse, erro
}

func (s *CNIServer) CmdAdd(ctx context.Context, request *cnipb.CniCmdRequest) (*cnipb.CniCmdResponse, error) {
klog.Infof("Received CmdAdd request %v", request)
klog.InfoS("Received CmdAdd request", "request", request)
cniConfig, response := s.validateRequestMessage(request)
if response != nil {
return response, nil
Expand All @@ -439,7 +439,7 @@ func (s *CNIServer) CmdAdd(ctx context.Context, request *cnipb.CniCmdRequest) (*

select {
case <-time.After(networkReadyTimeout):
klog.Errorf("Cannot process CmdAdd request for container %v because network is not ready", cniConfig.ContainerId)
klog.ErrorS(nil, "Cannot process CmdAdd request for container because network is not ready", "container", cniConfig.ContainerId, "timeout", networkReadyTimeout)
return s.tryAgainLaterResponse(), nil
case <-s.networkReadyCh:
}
Expand All @@ -453,12 +453,12 @@ func (s *CNIServer) CmdAdd(ctx context.Context, request *cnipb.CniCmdRequest) (*
// Rollback to delete configurations once ADD is failure.
if !success {
if isInfraContainer {
klog.Warningf("CmdAdd for container %v failed, and try to rollback", cniConfig.ContainerId)
if _, err := s.CmdDel(ctx, request); err != nil {
klog.Warningf("Failed to rollback after CNI add failure: %v", err)
klog.InfoS("CmdAdd for container failed, trying to rollback", "container", cniConfig.ContainerId)
if _, err := s.cmdDel(ctx, cniConfig); err != nil {
klog.ErrorS(err, "Failed to rollback after CNI add failure", "container", cniConfig.ContainerId)
}
} else {
klog.Warningf("CmdAdd for container %v failed", cniConfig.ContainerId)
klog.InfoS("CmdAdd for container failed", "container", cniConfig.ContainerId)
}
}
}()
Expand Down Expand Up @@ -487,7 +487,7 @@ func (s *CNIServer) CmdAdd(ctx context.Context, request *cnipb.CniCmdRequest) (*
// Request IP Address from IPAM driver.
ipamResult, err = ipam.ExecIPAMAdd(cniConfig.CniCmdArgs, cniConfig.K8sArgs, cniConfig.IPAM.Type, infraContainer)
if err != nil {
klog.Errorf("Failed to request IP addresses for container %v: %v", cniConfig.ContainerId, err)
klog.ErrorS(err, "Failed to request IP addresses for container", "container", cniConfig.ContainerId)
return s.ipamFailureResponse(err), nil
}
}
Expand All @@ -514,13 +514,13 @@ func (s *CNIServer) CmdAdd(ctx context.Context, request *cnipb.CniCmdRequest) (*
isInfraContainer,
s.containerAccess,
); err != nil {
klog.Errorf("Failed to configure interfaces for container %s: %v", cniConfig.ContainerId, err)
klog.ErrorS(err, "Failed to configure interfaces for container", "container", cniConfig.ContainerId)
return s.configInterfaceFailureResponse(err), nil
}
cniVersion := cniConfig.CNIVersion
cniResult, _ := result.Result.GetAsVersion(cniVersion)

klog.Infof("CmdAdd for container %v succeeded", cniConfig.ContainerId)
klog.InfoS("CmdAdd for container succeeded", "container", cniConfig.ContainerId)
// mark success as true to avoid rollback
success = true

Expand All @@ -534,15 +534,7 @@ func (s *CNIServer) CmdAdd(ctx context.Context, request *cnipb.CniCmdRequest) (*
return resultToResponse(cniResult), nil
}

func (s *CNIServer) CmdDel(_ context.Context, request *cnipb.CniCmdRequest) (
*cnipb.CniCmdResponse, error) {
klog.Infof("Received CmdDel request %v", request)

cniConfig, response := s.validateRequestMessage(request)
if response != nil {
return response, nil
}

func (s *CNIServer) cmdDel(_ context.Context, cniConfig *CNIConfig) (*cnipb.CniCmdResponse, error) {
infraContainer := cniConfig.getInfraContainer()
s.containerAccess.lockContainer(infraContainer)
defer s.containerAccess.unlockContainer(infraContainer)
Expand All @@ -557,16 +549,16 @@ func (s *CNIServer) CmdDel(_ context.Context, request *cnipb.CniCmdRequest) (
}
// Release IP to IPAM driver
if err := ipam.ExecIPAMDelete(cniConfig.CniCmdArgs, cniConfig.K8sArgs, cniConfig.IPAM.Type, infraContainer); err != nil {
klog.Errorf("Failed to delete IP addresses for container %v: %v", cniConfig.ContainerId, err)
klog.ErrorS(err, "Failed to delete IP addresses for container", "container", cniConfig.ContainerId)
return s.ipamFailureResponse(err), nil
}
klog.Infof("Deleted IP addresses for container %v", cniConfig.ContainerId)
klog.InfoS("Deleted IP addresses for container", "container", cniConfig.ContainerId)
// Remove host interface and OVS configuration
if err := s.podConfigurator.removeInterfaces(cniConfig.ContainerId); err != nil {
klog.Errorf("Failed to remove interfaces for container %s: %v", cniConfig.ContainerId, err)
klog.ErrorS(err, "Failed to remove interfaces for container", "container", cniConfig.ContainerId)
return s.configInterfaceFailureResponse(err), nil
}
klog.Infof("CmdDel for container %v succeeded", cniConfig.ContainerId)
klog.InfoS("CmdDel for container succeeded", "container", cniConfig.ContainerId)
if s.secondaryNetworkEnabled {
podName := string(cniConfig.K8S_POD_NAME)
podNamespace := string(cniConfig.K8S_POD_NAMESPACE)
Expand All @@ -580,9 +572,20 @@ func (s *CNIServer) CmdDel(_ context.Context, request *cnipb.CniCmdRequest) (
return &cnipb.CniCmdResponse{CniResult: []byte("")}, nil
}

func (s *CNIServer) CmdDel(ctx context.Context, request *cnipb.CniCmdRequest) (*cnipb.CniCmdResponse, error) {
klog.InfoS("Received CmdDel request", "request", request)

cniConfig, response := s.validateRequestMessage(request)
if response != nil {
return response, nil
}

return s.cmdDel(ctx, cniConfig)
}

func (s *CNIServer) CmdCheck(_ context.Context, request *cnipb.CniCmdRequest) (
*cnipb.CniCmdResponse, error) {
klog.Infof("Received CmdCheck request %v", request)
klog.InfoS("Received CmdCheck request", "request", request)

cniConfig, response := s.validateRequestMessage(request)
if response != nil {
Expand All @@ -603,7 +606,7 @@ func (s *CNIServer) CmdCheck(_ context.Context, request *cnipb.CniCmdRequest) (
}

if err := ipam.ExecIPAMCheck(cniConfig.CniCmdArgs, cniConfig.K8sArgs, cniConfig.IPAM.Type); err != nil {
klog.Errorf("Failed to check IPAM configuration for container %v: %v", cniConfig.ContainerId, err)
klog.ErrorS(err, "Failed to check IPAM configuration for container", "container", cniConfig.ContainerId)
return s.ipamFailureResponse(err), nil
}

Expand All @@ -615,7 +618,7 @@ func (s *CNIServer) CmdCheck(_ context.Context, request *cnipb.CniCmdRequest) (
return response, nil
}
}
klog.Infof("CmdCheck for container %v succeeded", cniConfig.ContainerId)
klog.InfoS("CmdCheck for container succeeded", "container", cniConfig.ContainerId)
return &cnipb.CniCmdResponse{CniResult: []byte("")}, nil
}

Expand Down Expand Up @@ -676,8 +679,8 @@ func (s *CNIServer) Initialize(
}

func (s *CNIServer) Run(stopCh <-chan struct{}) {
klog.Info("Starting CNI server")
defer klog.Info("Shutting down CNI server")
klog.InfoS("Starting CNI server")
defer klog.InfoS("Shutting down CNI server")

listener, err := util.ListenLocalSocket(s.cniSocket)
if err != nil {
Expand All @@ -686,10 +689,10 @@ func (s *CNIServer) Run(stopCh <-chan struct{}) {
rpcServer := grpc.NewServer()

cnipb.RegisterCniServer(rpcServer, s)
klog.Info("CNI server is listening ...")
klog.InfoS("CNI server is listening ...")
go func() {
if err := rpcServer.Serve(listener); err != nil {
klog.Errorf("Failed to serve connections: %v", err)
klog.ErrorS(err, "Failed to serve connections")
}
}()
<-stopCh
Expand All @@ -699,10 +702,10 @@ func (s *CNIServer) Run(stopCh <-chan struct{}) {
// be called prior to Antrea CNI to allocate IP and ports. Antrea takes allocated port
// and hooks it to OVS br-int.
func (s *CNIServer) interceptAdd(cniConfig *CNIConfig) (*cnipb.CniCmdResponse, error) {
klog.Infof("CNI Chaining: add for container %s", cniConfig.ContainerId)
klog.InfoS("CNI Chaining: add for container", "container", cniConfig.ContainerId)
prevResult, response := s.parsePrevResultFromRequest(cniConfig.NetworkConfig)
if response != nil {
klog.Infof("Failed to parse prev result for container %s", cniConfig.ContainerId)
klog.InfoS("Failed to parse prev result", "container", cniConfig.ContainerId)
return response, nil
}
podName := string(cniConfig.K8S_POD_NAME)
Expand Down Expand Up @@ -737,15 +740,15 @@ func (s *CNIServer) interceptAdd(cniConfig *CNIConfig) (*cnipb.CniCmdResponse, e
}

func (s *CNIServer) interceptDel(cniConfig *CNIConfig) (*cnipb.CniCmdResponse, error) {
klog.Infof("CNI Chaining: delete for container %s", cniConfig.ContainerId)
klog.InfoS("CNI Chaining: delete for container", "container", cniConfig.ContainerId)
return &cnipb.CniCmdResponse{CniResult: []byte("")}, s.podConfigurator.disconnectInterceptedInterface(
string(cniConfig.K8S_POD_NAME),
string(cniConfig.K8S_POD_NAMESPACE),
cniConfig.ContainerId)
}

func (s *CNIServer) interceptCheck(cniConfig *CNIConfig) (*cnipb.CniCmdResponse, error) {
klog.Infof("CNI Chaining: check for container %s", cniConfig.ContainerId)
klog.InfoS("CNI Chaining: check for container", "container", cniConfig.ContainerId)
// TODO, check for host interface setup later
return &cnipb.CniCmdResponse{CniResult: []byte("")}, nil
}
Expand All @@ -754,7 +757,7 @@ func (s *CNIServer) interceptCheck(cniConfig *CNIConfig) (*cnipb.CniCmdResponse,
// installing Pod flows, so as part of this reconciliation process we retrieve the Pod list from the
// K8s apiserver and replay the necessary flows.
func (s *CNIServer) reconcile() error {
klog.Infof("Reconciliation for CNI server")
klog.InfoS("Reconciliation for CNI server")
// For performance reasons, use ResourceVersion="0" in the ListOptions to ensure the request is served from
// the watch cache in kube-apiserver.
pods, err := s.kubeClient.CoreV1().Pods("").List(context.TODO(), metav1.ListOptions{
Expand Down
Loading

0 comments on commit 904a6b5

Please sign in to comment.