Skip to content

Commit

Permalink
Add peer cleanup on Requests and Closes from begin (#1636)
Browse files Browse the repository at this point in the history
* Add peer cleanup on Requests and Closes from begin

Signed-off-by: NikitaSkrynnik <[email protected]>

* fix go linter issues

Signed-off-by: NikitaSkrynnik <[email protected]>

* fix race condition + fix all go linter issues

Signed-off-by: NikitaSkrynnik <[email protected]>

---------

Signed-off-by: NikitaSkrynnik <[email protected]>
  • Loading branch information
NikitaSkrynnik authored May 29, 2024
1 parent 7b51d9c commit c01f9e1
Show file tree
Hide file tree
Showing 3 changed files with 146 additions and 1 deletion.
2 changes: 2 additions & 0 deletions pkg/networkservice/common/begin/event_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/edwarnicke/serialize"
"github.com/networkservicemesh/api/pkg/api/networkservice"
"google.golang.org/grpc"
"google.golang.org/grpc/peer"

"github.com/networkservicemesh/sdk/pkg/tools/extend"
"github.com/networkservicemesh/sdk/pkg/tools/postpone"
Expand Down Expand Up @@ -179,6 +180,7 @@ func (f *eventFactoryServer) updateContext(valueCtx context.Context) {
f.ctxFunc = func() (context.Context, context.CancelFunc) {
eventCtx, cancel := f.initialCtxFunc()
eventCtx = extend.WithValuesFromContext(eventCtx, valueCtx)
eventCtx = peer.NewContext(eventCtx, &peer.Peer{})
return withEventFactory(eventCtx, f), cancel
}
}
Expand Down
86 changes: 85 additions & 1 deletion pkg/networkservice/common/mechanisms/recvfd/server_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// Copyright (c) 2021-2022 Doc.ai and/or its affiliates.
//
// Copyright (c) 2023 Cisco and/or its affiliates.
// Copyright (c) 2023-2024 Cisco and/or its affiliates.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand All @@ -27,7 +27,9 @@ import (
"net/url"
"os"
"path"
"path/filepath"
"runtime"
"sync"
"testing"
"time"

Expand All @@ -36,6 +38,8 @@ import (
"github.com/networkservicemesh/api/pkg/api/networkservice/mechanisms/cls"
"github.com/networkservicemesh/api/pkg/api/networkservice/mechanisms/common"
"github.com/networkservicemesh/api/pkg/api/networkservice/mechanisms/kernel"
"github.com/pkg/errors"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"go.uber.org/goleak"
"google.golang.org/grpc"
Expand All @@ -47,6 +51,8 @@ import (
"github.com/networkservicemesh/sdk/pkg/networkservice/common/mechanisms/sendfd"
"github.com/networkservicemesh/sdk/pkg/networkservice/core/chain"
"github.com/networkservicemesh/sdk/pkg/networkservice/utils/checks/checkcontext"
"github.com/networkservicemesh/sdk/pkg/networkservice/utils/checks/checkcontextonreturn"
"github.com/networkservicemesh/sdk/pkg/networkservice/utils/inject/injecterror"
"github.com/networkservicemesh/sdk/pkg/tools/grpcfdutils"
"github.com/networkservicemesh/sdk/pkg/tools/grpcutils"
"github.com/networkservicemesh/sdk/pkg/tools/sandbox"
Expand Down Expand Up @@ -220,3 +226,81 @@ func (s *checkRecvfdTestSuite) TestRecvfdClosesMultipleFiles() {
}, time.Second, time.Millisecond*100)
}
}

func TestRecvfdDoesntWaitForAnyFilesOnRequestsFromBegin(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)

t.Cleanup(func() {
cancel()
goleak.VerifyNone(t)
})

eventFactoryCh := make(chan begin.EventFactory, 1)
var once sync.Once
// Create a server
server := chain.NewNetworkServiceServer(
begin.NewServer(),
checkcontextonreturn.NewServer(t, func(t *testing.T, ctx context.Context) {
once.Do(func() {
eventFactoryCh <- begin.FromContext(ctx)
close(eventFactoryCh)
})
}),
recvfd.NewServer(),
injecterror.NewServer(
injecterror.WithError(errors.New("error")),
injecterror.WithRequestErrorTimes(1),
injecterror.WithCloseErrorTimes(1)),
)

tempDir := t.TempDir()
sock, err := os.Create(filepath.Clean(path.Join(tempDir, "test.sock")))
require.NoError(t, err)

serveURL := &url.URL{Scheme: "unix", Path: sock.Name()}
grpcServer := grpc.NewServer(grpc.Creds(grpcfd.TransportCredentials(insecure.NewCredentials())))
networkservice.RegisterNetworkServiceServer(grpcServer, server)
errCh := grpcutils.ListenAndServe(ctx, serveURL, grpcServer)
require.Len(t, errCh, 0)

// Create a client
c := createClient(ctx, serveURL)

// Create a file to send
testFileName := filepath.Clean(path.Join(tempDir, "TestRecvfdDoesntWaitForAnyFilesOnRequestsFromBegin.test"))
f, err := os.Create(testFileName)
require.NoErrorf(t, err, "Failed to create and open a file: %v", err)
err = f.Close()
require.NoErrorf(t, err, "Failed to close file: %v", err)

// Create a request
request := &networkservice.NetworkServiceRequest{
Connection: &networkservice.Connection{
Id: "id",
Mechanism: &networkservice.Mechanism{
Cls: cls.LOCAL,
Type: kernel.MECHANISM,
Parameters: map[string]string{
common.InodeURL: "file:" + testFileName,
},
},
},
}

// Make the first request from the client to send files
conn, err := c.Request(ctx, request)
require.NoError(t, err)
request.Connection = conn.Clone()

// Make the second request that return an error.
// It should make recvfd close all the files.
_, err = c.Request(ctx, request)
require.Error(t, err)

// Send Close. Recvfd shouldn't freeze trying to read files
// from the client because we send Close from begin.
eventFactory := <-eventFactoryCh
ch := eventFactory.Close()
err = <-ch
require.NoError(t, err)
}
59 changes: 59 additions & 0 deletions pkg/networkservice/utils/checks/checkcontextonreturn/server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Copyright (c) 2020-2024 Cisco and/or its affiliates.
//
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at:
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package checkcontextonreturn - provides a NetworkServiceClient chain element for checking the state of the context.Context
//
// after the next element in the chain has returned
package checkcontextonreturn

import (
"context"
"testing"

"github.com/golang/protobuf/ptypes/empty"
"github.com/networkservicemesh/api/pkg/api/networkservice"

"github.com/networkservicemesh/sdk/pkg/networkservice/core/next"
)

type checkContextOnReturnServer struct {
*testing.T
check func(t *testing.T, ctx context.Context)
}

// NewServer - returns a NetworkServiceServer chain element for checking the state of the context.Context
//
// after the next element in the chain has returned
// t - *testing.T for doing the checks
// check - function for checking the context.Context
func NewServer(t *testing.T, check func(t *testing.T, ctx context.Context)) networkservice.NetworkServiceServer {
return &checkContextOnReturnServer{
T: t,
check: check,
}
}

func (t *checkContextOnReturnServer) Request(ctx context.Context, request *networkservice.NetworkServiceRequest) (*networkservice.Connection, error) {
conn, err := next.Server(ctx).Request(ctx, request)
t.check(t.T, ctx)
return conn, err
}

func (t *checkContextOnReturnServer) Close(ctx context.Context, conn *networkservice.Connection) (*empty.Empty, error) {
e, err := next.Server(ctx).Close(ctx, conn)
t.check(t.T, ctx)
return e, err
}

0 comments on commit c01f9e1

Please sign in to comment.