Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a timeout for Closes in begin.Server #1650

Merged
14 changes: 12 additions & 2 deletions pkg/networkservice/common/begin/event_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@ package begin

import (
"context"
"time"

"github.com/edwarnicke/serialize"
"github.com/networkservicemesh/api/pkg/api/networkservice"
"google.golang.org/grpc"
"google.golang.org/grpc/peer"

"github.com/networkservicemesh/sdk/pkg/tools/clock"
"github.com/networkservicemesh/sdk/pkg/tools/extend"
"github.com/networkservicemesh/sdk/pkg/tools/postpone"

Expand Down Expand Up @@ -158,14 +160,16 @@ type eventFactoryServer struct {
ctxFunc func() (context.Context, context.CancelFunc)
request *networkservice.NetworkServiceRequest
returnedConnection *networkservice.Connection
closeTimeout time.Duration
afterCloseFunc func()
server networkservice.NetworkServiceServer
}

func newEventFactoryServer(ctx context.Context, afterClose func()) *eventFactoryServer {
func newEventFactoryServer(ctx context.Context, closeTimeout time.Duration, afterClose func()) *eventFactoryServer {
f := &eventFactoryServer{
server: next.Server(ctx),
initialCtxFunc: postpone.Context(ctx),
closeTimeout: closeTimeout,
}
f.updateContext(ctx)

Expand Down Expand Up @@ -231,7 +235,13 @@ func (f *eventFactoryServer) Close(opts ...Option) <-chan error {
default:
ctx, cancel := f.ctxFunc()
defer cancel()
_, err := f.server.Close(ctx, f.request.GetConnection())

c := clock.FromContext(ctx)
closeCtx, cancel := c.WithTimeout(context.Background(), f.closeTimeout)
defer cancel()

closeCtx = extend.WithValuesFromContext(closeCtx, ctx)
_, err := f.server.Close(closeCtx, f.request.GetConnection())
f.afterCloseFunc()
ch <- err
}
Expand Down
12 changes: 7 additions & 5 deletions pkg/networkservice/common/begin/event_factory_server_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2022 Cisco and/or its affiliates.
// Copyright (c) 2022-2024 Cisco and/or its affiliates.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -142,14 +142,15 @@ func TestContextTimeout_Server(t *testing.T) {
clockMock := clockmock.New(ctx)
ctx = clock.WithClock(ctx, clockMock)

ctx, cancel = context.WithDeadline(ctx, clockMock.Now().Add(time.Second*3))
ctx, cancel = clockMock.WithDeadline(ctx, clockMock.Now().Add(time.Second*3))
defer cancel()

closeTimeout := time.Minute
eventFactoryServ := &eventFactoryServer{}
server := chain.NewNetworkServiceServer(
begin.NewServer(),
begin.NewServer(begin.WithCloseTimeout(closeTimeout)),
eventFactoryServ,
&delayedNSEServer{t: t, clock: clockMock},
&delayedNSEServer{t: t, closeTimeout: closeTimeout, clock: clockMock},
)

// Do Request
Expand Down Expand Up @@ -230,6 +231,7 @@ type delayedNSEServer struct {
t *testing.T
clock *clockmock.Mock
initialTimeout time.Duration
closeTimeout time.Duration
}

func (d *delayedNSEServer) Request(ctx context.Context, request *networkservice.NetworkServiceRequest) (*networkservice.Connection, error) {
Expand Down Expand Up @@ -258,7 +260,7 @@ func (d *delayedNSEServer) Close(ctx context.Context, conn *networkservice.Conne
deadline, _ := ctx.Deadline()
clockTime := clock.FromContext(ctx)

require.Equal(d.t, d.initialTimeout, clockTime.Until(deadline))
require.Equal(d.t, d.closeTimeout, clockTime.Until(deadline))

return next.Server(ctx).Close(ctx, conn)
}
15 changes: 12 additions & 3 deletions pkg/networkservice/common/begin/options.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2021 Cisco and/or its affiliates.
// Copyright (c) 2021-2024 Cisco and/or its affiliates.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand All @@ -18,11 +18,13 @@ package begin

import (
"context"
"time"
)

type option struct {
cancelCtx context.Context
reselect bool
cancelCtx context.Context
reselect bool
closeTimeout time.Duration
}

// Option - event option
Expand All @@ -41,3 +43,10 @@ func WithReselect() Option {
o.reselect = true
}
}

// WithCloseTimeout - set a custom timeout for a context in begin.Close
func WithCloseTimeout(timeout time.Duration) Option {
return func(o *option) {
o.closeTimeout = timeout
}
}
63 changes: 51 additions & 12 deletions pkg/networkservice/common/begin/server.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2021-2023 Cisco and/or its affiliates.
// Copyright (c) 2021-2024 Cisco and/or its affiliates.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand All @@ -18,27 +18,49 @@ package begin

import (
"context"
"time"

"github.com/edwarnicke/genericsync"
"github.com/networkservicemesh/api/pkg/api/networkservice"
"github.com/pkg/errors"
"google.golang.org/protobuf/types/known/emptypb"

"github.com/networkservicemesh/sdk/pkg/tools/extend"
"github.com/networkservicemesh/sdk/pkg/tools/log"

"github.com/networkservicemesh/sdk/pkg/networkservice/core/next"
)

const (
closeTimeout = time.Minute
)

NikitaSkrynnik marked this conversation as resolved.
Show resolved Hide resolved
type beginServer struct {
genericsync.Map[string, *eventFactoryServer]
closeTimeout time.Duration
}

// NewServer - creates a new begin chain element
func NewServer() networkservice.NetworkServiceServer {
return &beginServer{}
func NewServer(opts ...Option) networkservice.NetworkServiceServer {
o := &option{
cancelCtx: context.Background(),
reselect: false,
closeTimeout: closeTimeout,
}
NikitaSkrynnik marked this conversation as resolved.
Show resolved Hide resolved

for _, opt := range opts {
opt(o)
}

return &beginServer{
closeTimeout: o.closeTimeout,
}
}

func (b *beginServer) Request(ctx context.Context, request *networkservice.NetworkServiceRequest) (conn *networkservice.Connection, err error) {
func (b *beginServer) Request(ctx context.Context, request *networkservice.NetworkServiceRequest) (*networkservice.Connection, error) {
var conn *networkservice.Connection
var err error

// No connection.ID, no service
if request.GetConnection().GetId() == "" {
return nil, errors.New("request.EventFactory.Id must not be zero valued")
Expand All @@ -50,12 +72,14 @@ func (b *beginServer) Request(ctx context.Context, request *networkservice.Netwo
eventFactoryServer, _ := b.LoadOrStore(request.GetConnection().GetId(),
newEventFactoryServer(
ctx,
b.closeTimeout,
func() {
b.Delete(request.GetRequestConnection().GetId())
},
),
)
<-eventFactoryServer.executor.AsyncExec(func() {
select {
case <-eventFactoryServer.executor.AsyncExec(func() {
currentEventFactoryServer, _ := b.Load(request.GetConnection().GetId())
if currentEventFactoryServer != eventFactoryServer {
log.FromContext(ctx).Debug("recalling begin.Request because currentEventFactoryServer != eventFactoryServer")
Expand Down Expand Up @@ -93,33 +117,48 @@ func (b *beginServer) Request(ctx context.Context, request *networkservice.Netwo

eventFactoryServer.returnedConnection = conn.Clone()
eventFactoryServer.updateContext(ctx)
})
}):
case <-ctx.Done():
return nil, ctx.Err()
}

return conn, err
}

func (b *beginServer) Close(ctx context.Context, conn *networkservice.Connection) (emp *emptypb.Empty, err error) {
connID := conn.GetId()
// If some other EventFactory is already in the ctx... we are already running in an executor, and can just execute normally
if fromContext(ctx) != nil {
return next.Server(ctx).Close(ctx, conn)
}
eventFactoryServer, ok := b.Load(conn.GetId())
eventFactoryServer, ok := b.Load(connID)
if !ok {
// If we don't have a connection to Close, just let it be
return &emptypb.Empty{}, nil
}
<-eventFactoryServer.executor.AsyncExec(func() {

select {
case <-eventFactoryServer.executor.AsyncExec(func() {
if eventFactoryServer.state != established || eventFactoryServer.request == nil {
return
}
currentServerClient, _ := b.Load(conn.GetId())
currentServerClient, _ := b.Load(connID)
if currentServerClient != eventFactoryServer {
return
}
closeCtx, cancel := context.WithTimeout(context.Background(), b.closeTimeout)
defer cancel()

// Always close with the last valid EventFactory we got
conn = eventFactoryServer.request.Connection
withEventFactoryCtx := withEventFactory(ctx, eventFactoryServer)
emp, err = next.Server(withEventFactoryCtx).Close(withEventFactoryCtx, conn)
closeCtx = extend.WithValuesFromContext(closeCtx, withEventFactoryCtx)
emp, err = next.Server(closeCtx).Close(closeCtx, conn)
eventFactoryServer.afterCloseFunc()
})
return &emptypb.Empty{}, err
}):
return &emptypb.Empty{}, err
case <-ctx.Done():
b.Delete(connID)
return nil, ctx.Err()
}
}
Loading