diff --git a/internal/imports/imports_linux.go b/internal/imports/imports_linux.go index 026827c..cffd27e 100644 --- a/internal/imports/imports_linux.go +++ b/internal/imports/imports_linux.go @@ -25,7 +25,6 @@ import ( _ "github.com/networkservicemesh/sdk/pkg/networkservice/common/mechanisms/kernel" _ "github.com/networkservicemesh/sdk/pkg/networkservice/common/mechanisms/sendfd" _ "github.com/networkservicemesh/sdk/pkg/networkservice/common/null" - _ "github.com/networkservicemesh/sdk/pkg/networkservice/common/retry" _ "github.com/networkservicemesh/sdk/pkg/networkservice/common/upstreamrefresh" _ "github.com/networkservicemesh/sdk/pkg/networkservice/connectioncontext/dnscontext" _ "github.com/networkservicemesh/sdk/pkg/networkservice/core/chain" diff --git a/main.go b/main.go index 2e38ddf..6ea6117 100644 --- a/main.go +++ b/main.go @@ -1,7 +1,7 @@ // Copyright (c) 2020-2022 Doc.ai and/or its affiliates. // Copyright (c) 2021-2022 Nordix and/or its affiliates. // -// Copyright (c) 2022-2023 Cisco and/or its affiliates. +// Copyright (c) 2022-2024 Cisco and/or its affiliates. // // SPDX-License-Identifier: Apache-2.0 // @@ -26,6 +26,7 @@ import ( "context" "crypto/tls" "fmt" + "net/url" "os" "os/signal" "syscall" @@ -57,7 +58,6 @@ import ( "github.com/networkservicemesh/sdk/pkg/networkservice/common/mechanisms/kernel" "github.com/networkservicemesh/sdk/pkg/networkservice/common/mechanisms/sendfd" "github.com/networkservicemesh/sdk/pkg/networkservice/common/null" - "github.com/networkservicemesh/sdk/pkg/networkservice/common/retry" "github.com/networkservicemesh/sdk/pkg/networkservice/common/upstreamrefresh" "github.com/networkservicemesh/sdk/pkg/networkservice/connectioncontext/dnscontext" "github.com/networkservicemesh/sdk/pkg/networkservice/core/chain" @@ -208,8 +208,6 @@ func main() { client.WithDialOptions(dialOptions...), ) - nsmClient = retry.NewClient(nsmClient, retry.WithTryTimeout(c.RequestTimeout)) - // ******************************************************************************** // Configure signal handling context // ******************************************************************************** @@ -242,13 +240,10 @@ func main() { // ******************************************************************************** for i := 0; i < len(c.NetworkServices); i++ { // Update network services configs - u := (*nsurl.NSURL)(&c.NetworkServices[i]) - id := fmt.Sprintf("%s-%d", c.Name, i) - var monitoredConnections map[string]*networkservice.Connection - monitorCtx, cancelMonitor := context.WithTimeout(signalCtx, c.RequestTimeout) - defer cancelMonitor() + var monitoredConnections genericsync.Map[string, *networkservice.Connection] + monitorCtx, cancelMonitor := context.WithTimeout(signalCtx, c.RequestTimeout) stream, err := monitorClient.MonitorConnections(monitorCtx, &networkservice.MonitorScopeSelector{ PathSegments: []*networkservice.PathSegment{ { @@ -257,53 +252,93 @@ func main() { }, }) if err != nil { - logger.Fatal("error from monitorConnectionClient ", err.Error()) + logger.Fatalf("error from monitorConnectionClient: %v", err.Error()) } + // Recv initial event event, err := stream.Recv() if err != nil { - logger.Errorf("error from monitorConnection stream ", err.Error()) - } else { - monitoredConnections = event.Connections + logger.Errorf("error from monitorConnection stream: %v ", err.Error()) } - cancelMonitor() - - // Construct a request - request := &networkservice.NetworkServiceRequest{ - Connection: &networkservice.Connection{ - Id: id, - NetworkService: u.NetworkService(), - Labels: u.Labels(), - }, - MechanismPreferences: []*networkservice.Mechanism{ - u.Mechanism(), - }, + for k, conn := range event.Connections { + monitoredConnections.Store(k, conn) } - for _, conn := range monitoredConnections { - path := conn.GetPath() - if path.Index == 1 && path.PathSegments[0].Id == id && conn.Mechanism.Type == u.Mechanism().Type { - request.Connection = conn - request.Connection.Path.Index = 0 - request.Connection.Id = id - break + go func() { + for { + event, err := stream.Recv() + if err != nil { + break + } + for k, conn := range event.Connections { + monitoredConnections.Store(k, conn) + } } - } + }() - resp, err := nsmClient.Request(ctx, request) - if err != nil { - logger.Fatalf("failed connect to NSMgr: %v", err.Error()) - } + for { + // Construct a request + request := constructRequest(ctx, c, id, &c.NetworkServices[i], &monitoredConnections) - defer func() { - closeCtx, cancelClose := context.WithTimeout(ctx, c.RequestTimeout) - defer cancelClose() - _, _ = nsmClient.Close(closeCtx, resp) - }() + resp, err := nsmClient.Request(ctx, request) + if err != nil { + logger.Errorf("failed connect to NSMgr: %v", err.Error()) + continue + } - logger.Infof("successfully connected to %v. Response: %v", u.NetworkService(), resp) + defer func() { + closeCtx, cancelClose := context.WithTimeout(ctx, c.RequestTimeout) + defer cancelClose() + _, _ = nsmClient.Close(closeCtx, resp) + }() + + logger.Infof("successfully connected to %v. Response: %v", resp.NetworkService, resp) + break + } + cancelMonitor() } // Wait for cancel event to terminate <-signalCtx.Done() } + +func constructRequest(ctx context.Context, c *config.Config, connectionID string, networkService *url.URL, monitoredConnections *genericsync.Map[string, *networkservice.Connection]) *networkservice.NetworkServiceRequest { + u := (*nsurl.NSURL)(networkService) + + request := &networkservice.NetworkServiceRequest{ + Connection: &networkservice.Connection{ + Id: connectionID, + NetworkService: u.NetworkService(), + Labels: u.Labels(), + }, + MechanismPreferences: []*networkservice.Mechanism{ + u.Mechanism(), + }, + } + + // Looking for a match in the connections received from monitoring + monitoredConnections.Range(func(key string, conn *networkservice.Connection) bool { + path := conn.GetPath() + if path.Index == 1 && path.PathSegments[0].Id == connectionID && conn.Mechanism.Type == u.Mechanism().Type { + request.Connection = conn + request.Connection.Path.Index = 0 + request.Connection.Id = connectionID + return false + } + return true + }) + + lCheckCtx, lCheckCtxCancel := context.WithTimeout(ctx, c.LivenessCheckTimeout) + defer lCheckCtxCancel() + if request.GetConnection().State == networkservice.State_DOWN && + (!c.LivenessCheckEnabled || !kernelheal.KernelLivenessCheck(lCheckCtx, request.GetConnection())) { + // We cannot Close this because the connection was not established through this chain. + // We can only reselect an endpoint + log.FromContext(ctx).Infof("NetworkServiceEndpoint %v is unavailable. Reconnection...", request.GetConnection().NetworkServiceEndpointName) + request.GetConnection().Mechanism = nil + request.GetConnection().NetworkServiceEndpointName = "" + request.GetConnection().Context = nil + request.GetConnection().State = networkservice.State_RESELECT_REQUESTED + } + return request +}