Skip to content

Commit

Permalink
Revert "Add a timeout for Closes in begin.Server (networkservicemesh#…
Browse files Browse the repository at this point in the history
…1650)"

This reverts commit 3016313.

Signed-off-by: NikitaSkrynnik <[email protected]>
  • Loading branch information
NikitaSkrynnik committed Oct 15, 2024
1 parent adcdbfe commit 6522b19
Show file tree
Hide file tree
Showing 5 changed files with 23 additions and 164 deletions.
14 changes: 2 additions & 12 deletions pkg/networkservice/common/begin/event_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,12 @@ package begin

import (
"context"
"time"

"github.com/edwarnicke/serialize"
"github.com/networkservicemesh/api/pkg/api/networkservice"
"google.golang.org/grpc"
"google.golang.org/grpc/peer"

"github.com/networkservicemesh/sdk/pkg/tools/clock"
"github.com/networkservicemesh/sdk/pkg/tools/extend"
"github.com/networkservicemesh/sdk/pkg/tools/postpone"

Expand Down Expand Up @@ -160,16 +158,14 @@ type eventFactoryServer struct {
ctxFunc func() (context.Context, context.CancelFunc)
request *networkservice.NetworkServiceRequest
returnedConnection *networkservice.Connection
closeTimeout time.Duration
afterCloseFunc func()
server networkservice.NetworkServiceServer
}

func newEventFactoryServer(ctx context.Context, closeTimeout time.Duration, afterClose func()) *eventFactoryServer {
func newEventFactoryServer(ctx context.Context, afterClose func()) *eventFactoryServer {
f := &eventFactoryServer{
server: next.Server(ctx),
initialCtxFunc: postpone.Context(ctx),
closeTimeout: closeTimeout,
}
f.updateContext(ctx)

Expand Down Expand Up @@ -235,13 +231,7 @@ func (f *eventFactoryServer) Close(opts ...Option) <-chan error {
default:
ctx, cancel := f.ctxFunc()
defer cancel()

c := clock.FromContext(ctx)
closeCtx, cancel := c.WithTimeout(context.Background(), f.closeTimeout)
defer cancel()

closeCtx = extend.WithValuesFromContext(closeCtx, ctx)
_, err := f.server.Close(closeCtx, f.request.GetConnection())
_, err := f.server.Close(ctx, f.request.GetConnection())
f.afterCloseFunc()
ch <- err
}
Expand Down
12 changes: 5 additions & 7 deletions pkg/networkservice/common/begin/event_factory_server_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2022-2024 Cisco and/or its affiliates.
// Copyright (c) 2022 Cisco and/or its affiliates.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -142,15 +142,14 @@ func TestContextTimeout_Server(t *testing.T) {
clockMock := clockmock.New(ctx)
ctx = clock.WithClock(ctx, clockMock)

ctx, cancel = clockMock.WithDeadline(ctx, clockMock.Now().Add(time.Second*3))
ctx, cancel = context.WithDeadline(ctx, clockMock.Now().Add(time.Second*3))
defer cancel()

closeTimeout := time.Minute
eventFactoryServ := &eventFactoryServer{}
server := chain.NewNetworkServiceServer(
begin.NewServer(begin.WithCloseTimeout(closeTimeout)),
begin.NewServer(),
eventFactoryServ,
&delayedNSEServer{t: t, closeTimeout: closeTimeout, clock: clockMock},
&delayedNSEServer{t: t, clock: clockMock},
)

// Do Request
Expand Down Expand Up @@ -231,7 +230,6 @@ type delayedNSEServer struct {
t *testing.T
clock *clockmock.Mock
initialTimeout time.Duration
closeTimeout time.Duration
}

func (d *delayedNSEServer) Request(ctx context.Context, request *networkservice.NetworkServiceRequest) (*networkservice.Connection, error) {
Expand Down Expand Up @@ -260,7 +258,7 @@ func (d *delayedNSEServer) Close(ctx context.Context, conn *networkservice.Conne
deadline, _ := ctx.Deadline()
clockTime := clock.FromContext(ctx)

require.Equal(d.t, d.closeTimeout, clockTime.Until(deadline))
require.Equal(d.t, d.initialTimeout, clockTime.Until(deadline))

return next.Server(ctx).Close(ctx, conn)
}
15 changes: 3 additions & 12 deletions pkg/networkservice/common/begin/options.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2021-2024 Cisco and/or its affiliates.
// Copyright (c) 2021 Cisco and/or its affiliates.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand All @@ -18,13 +18,11 @@ package begin

import (
"context"
"time"
)

type option struct {
cancelCtx context.Context
reselect bool
closeTimeout time.Duration
cancelCtx context.Context
reselect bool
}

// Option - event option
Expand All @@ -43,10 +41,3 @@ func WithReselect() Option {
o.reselect = true
}
}

// WithCloseTimeout - set a custom timeout for a context in begin.Close
func WithCloseTimeout(timeout time.Duration) Option {
return func(o *option) {
o.closeTimeout = timeout
}
}
62 changes: 13 additions & 49 deletions pkg/networkservice/common/begin/server.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2021-2024 Cisco and/or its affiliates.
// Copyright (c) 2021-2023 Cisco and/or its affiliates.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand All @@ -18,45 +18,27 @@ package begin

import (
"context"
"time"

"github.com/edwarnicke/genericsync"
"github.com/networkservicemesh/api/pkg/api/networkservice"
"github.com/pkg/errors"
"google.golang.org/protobuf/types/known/emptypb"

"github.com/networkservicemesh/sdk/pkg/tools/extend"
"github.com/networkservicemesh/sdk/pkg/tools/log"

"github.com/networkservicemesh/sdk/pkg/networkservice/core/next"
)

type beginServer struct {
genericsync.Map[string, *eventFactoryServer]
closeTimeout time.Duration
}

// NewServer - creates a new begin chain element
func NewServer(opts ...Option) networkservice.NetworkServiceServer {
o := &option{
cancelCtx: context.Background(),
reselect: false,
closeTimeout: time.Minute,
}

for _, opt := range opts {
opt(o)
}

return &beginServer{
closeTimeout: o.closeTimeout,
}
func NewServer() networkservice.NetworkServiceServer {
return &beginServer{}
}

func (b *beginServer) Request(ctx context.Context, request *networkservice.NetworkServiceRequest) (*networkservice.Connection, error) {
var conn *networkservice.Connection
var err error

func (b *beginServer) Request(ctx context.Context, request *networkservice.NetworkServiceRequest) (conn *networkservice.Connection, err error) {
// No connection.ID, no service
if request.GetConnection().GetId() == "" {
return nil, errors.New("request.EventFactory.Id must not be zero valued")
Expand All @@ -68,14 +50,12 @@ func (b *beginServer) Request(ctx context.Context, request *networkservice.Netwo
eventFactoryServer, _ := b.LoadOrStore(request.GetConnection().GetId(),
newEventFactoryServer(
ctx,
b.closeTimeout,
func() {
b.Delete(request.GetRequestConnection().GetId())
},
),
)
select {
case <-eventFactoryServer.executor.AsyncExec(func() {
<-eventFactoryServer.executor.AsyncExec(func() {
currentEventFactoryServer, _ := b.Load(request.GetConnection().GetId())
if currentEventFactoryServer != eventFactoryServer {
log.FromContext(ctx).Debug("recalling begin.Request because currentEventFactoryServer != eventFactoryServer")
Expand Down Expand Up @@ -113,49 +93,33 @@ func (b *beginServer) Request(ctx context.Context, request *networkservice.Netwo

eventFactoryServer.returnedConnection = conn.Clone()
eventFactoryServer.updateContext(ctx)
}):
case <-ctx.Done():
return nil, ctx.Err()
}

})
return conn, err
}

func (b *beginServer) Close(ctx context.Context, conn *networkservice.Connection) (*emptypb.Empty, error) {
var err error
connID := conn.GetId()
func (b *beginServer) Close(ctx context.Context, conn *networkservice.Connection) (emp *emptypb.Empty, err error) {
// If some other EventFactory is already in the ctx... we are already running in an executor, and can just execute normally
if fromContext(ctx) != nil {
return next.Server(ctx).Close(ctx, conn)
}
eventFactoryServer, ok := b.Load(connID)
eventFactoryServer, ok := b.Load(conn.GetId())
if !ok {
// If we don't have a connection to Close, just let it be
return &emptypb.Empty{}, nil
}

select {
case <-eventFactoryServer.executor.AsyncExec(func() {
<-eventFactoryServer.executor.AsyncExec(func() {
if eventFactoryServer.state != established || eventFactoryServer.request == nil {
return
}
currentServerClient, _ := b.Load(connID)
currentServerClient, _ := b.Load(conn.GetId())
if currentServerClient != eventFactoryServer {
return
}
closeCtx, cancel := context.WithTimeout(context.Background(), b.closeTimeout)
defer cancel()

// Always close with the last valid EventFactory we got
conn = eventFactoryServer.request.Connection
withEventFactoryCtx := withEventFactory(ctx, eventFactoryServer)
closeCtx = extend.WithValuesFromContext(closeCtx, withEventFactoryCtx)
_, err = next.Server(closeCtx).Close(closeCtx, conn)
emp, err = next.Server(withEventFactoryCtx).Close(withEventFactoryCtx, conn)
eventFactoryServer.afterCloseFunc()
}):
return &emptypb.Empty{}, err
case <-ctx.Done():
b.Delete(connID)
return nil, ctx.Err()
}
})
return &emptypb.Empty{}, err
}
84 changes: 0 additions & 84 deletions pkg/networkservice/common/begin/server_test.go

This file was deleted.

0 comments on commit 6522b19

Please sign in to comment.