Skip to content

Commit

Permalink
Check connections received from monitoring
Browse files Browse the repository at this point in the history
Signed-off-by: Artem Glazychev <[email protected]>
  • Loading branch information
glazychev-art committed Jan 11, 2024
1 parent cfc2107 commit 4bfa6cc
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 44 deletions.
1 change: 0 additions & 1 deletion internal/imports/imports_linux.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

121 changes: 78 additions & 43 deletions main.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright (c) 2020-2022 Doc.ai and/or its affiliates.
// Copyright (c) 2021-2022 Nordix and/or its affiliates.
//
// Copyright (c) 2022-2023 Cisco and/or its affiliates.
// Copyright (c) 2022-2024 Cisco and/or its affiliates.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand All @@ -26,6 +26,7 @@ import (
"context"
"crypto/tls"
"fmt"
"net/url"
"os"
"os/signal"
"syscall"
Expand Down Expand Up @@ -57,7 +58,6 @@ import (
"github.com/networkservicemesh/sdk/pkg/networkservice/common/mechanisms/kernel"
"github.com/networkservicemesh/sdk/pkg/networkservice/common/mechanisms/sendfd"
"github.com/networkservicemesh/sdk/pkg/networkservice/common/null"
"github.com/networkservicemesh/sdk/pkg/networkservice/common/retry"
"github.com/networkservicemesh/sdk/pkg/networkservice/common/upstreamrefresh"
"github.com/networkservicemesh/sdk/pkg/networkservice/connectioncontext/dnscontext"
"github.com/networkservicemesh/sdk/pkg/networkservice/core/chain"
Expand Down Expand Up @@ -208,8 +208,6 @@ func main() {
client.WithDialOptions(dialOptions...),
)

nsmClient = retry.NewClient(nsmClient, retry.WithTryTimeout(c.RequestTimeout))

// ********************************************************************************
// Configure signal handling context
// ********************************************************************************
Expand Down Expand Up @@ -242,13 +240,10 @@ func main() {
// ********************************************************************************
for i := 0; i < len(c.NetworkServices); i++ {
// Update network services configs
u := (*nsurl.NSURL)(&c.NetworkServices[i])

id := fmt.Sprintf("%s-%d", c.Name, i)
var monitoredConnections map[string]*networkservice.Connection
monitorCtx, cancelMonitor := context.WithTimeout(signalCtx, c.RequestTimeout)
defer cancelMonitor()

var monitoredConnections genericsync.Map[string, *networkservice.Connection]
monitorCtx, cancelMonitor := context.WithTimeout(signalCtx, c.RequestTimeout)
stream, err := monitorClient.MonitorConnections(monitorCtx, &networkservice.MonitorScopeSelector{
PathSegments: []*networkservice.PathSegment{
{
Expand All @@ -257,53 +252,93 @@ func main() {
},
})
if err != nil {
logger.Fatal("error from monitorConnectionClient ", err.Error())
logger.Fatalf("error from monitorConnectionClient: %v", err.Error())
}

// Recv initial event
event, err := stream.Recv()
if err != nil {
logger.Errorf("error from monitorConnection stream ", err.Error())
} else {
monitoredConnections = event.Connections
logger.Errorf("error from monitorConnection stream: %v ", err.Error())
}
cancelMonitor()

// Construct a request
request := &networkservice.NetworkServiceRequest{
Connection: &networkservice.Connection{
Id: id,
NetworkService: u.NetworkService(),
Labels: u.Labels(),
},
MechanismPreferences: []*networkservice.Mechanism{
u.Mechanism(),
},
for k, conn := range event.Connections {
monitoredConnections.Store(k, conn)
}

for _, conn := range monitoredConnections {
path := conn.GetPath()
if path.Index == 1 && path.PathSegments[0].Id == id && conn.Mechanism.Type == u.Mechanism().Type {
request.Connection = conn
request.Connection.Path.Index = 0
request.Connection.Id = id
break
go func() {
for {
event, err := stream.Recv()
if err != nil {
break
}
for k, conn := range event.Connections {
monitoredConnections.Store(k, conn)
}
}
}
}()

resp, err := nsmClient.Request(ctx, request)
if err != nil {
logger.Fatalf("failed connect to NSMgr: %v", err.Error())
}
for {
// Construct a request
request := constructRequest(ctx, c, id, &c.NetworkServices[i], &monitoredConnections)

defer func() {
closeCtx, cancelClose := context.WithTimeout(ctx, c.RequestTimeout)
defer cancelClose()
_, _ = nsmClient.Close(closeCtx, resp)
}()
resp, err := nsmClient.Request(ctx, request)
if err != nil {
logger.Errorf("failed connect to NSMgr: %v", err.Error())
continue
}

logger.Infof("successfully connected to %v. Response: %v", u.NetworkService(), resp)
defer func() {
closeCtx, cancelClose := context.WithTimeout(ctx, c.RequestTimeout)
defer cancelClose()
_, _ = nsmClient.Close(closeCtx, resp)
}()

logger.Infof("successfully connected to %v. Response: %v", resp.NetworkService, resp)
break
}
cancelMonitor()
}

// Wait for cancel event to terminate
<-signalCtx.Done()
}

func constructRequest(ctx context.Context, c *config.Config, connectionID string, networkService *url.URL, monitoredConnections *genericsync.Map[string, *networkservice.Connection]) *networkservice.NetworkServiceRequest {
u := (*nsurl.NSURL)(networkService)

request := &networkservice.NetworkServiceRequest{
Connection: &networkservice.Connection{
Id: connectionID,
NetworkService: u.NetworkService(),
Labels: u.Labels(),
},
MechanismPreferences: []*networkservice.Mechanism{
u.Mechanism(),
},
}

// Looking for a match in the connections received from monitoring
monitoredConnections.Range(func(key string, conn *networkservice.Connection) bool {
path := conn.GetPath()
if path.Index == 1 && path.PathSegments[0].Id == connectionID && conn.Mechanism.Type == u.Mechanism().Type {
request.Connection = conn
request.Connection.Path.Index = 0
request.Connection.Id = connectionID
return false
}
return true
})

lCheckCtx, lCheckCtxCancel := context.WithTimeout(ctx, c.LivenessCheckTimeout)
defer lCheckCtxCancel()
if request.GetConnection().State == networkservice.State_DOWN &&
(!c.LivenessCheckEnabled || !kernelheal.KernelLivenessCheck(lCheckCtx, request.GetConnection())) {
// We cannot Close this because the connection was not established through this chain.
// We can only reselect an endpoint
log.FromContext(ctx).Infof("NetworkServiceEndpoint %v is unavailable. Reconnection...", request.GetConnection().NetworkServiceEndpointName)
request.GetConnection().Mechanism = nil
request.GetConnection().NetworkServiceEndpointName = ""
request.GetConnection().Context = nil
request.GetConnection().State = networkservice.State_RESELECT_REQUESTED
}
return request
}

0 comments on commit 4bfa6cc

Please sign in to comment.