Skip to content

Commit

Permalink
Introducing the begin chain element
Browse files Browse the repository at this point in the history
Please read pkg/networkservice/common/begin/doc.go
for extensive documentation.

Signed-off-by: Ed Warnicke <[email protected]>
  • Loading branch information
edwarnicke committed Aug 26, 2021
1 parent b137008 commit 8202fc4
Show file tree
Hide file tree
Showing 26 changed files with 1,410 additions and 388 deletions.
11 changes: 6 additions & 5 deletions pkg/networkservice/chains/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,15 @@ import (
"github.com/google/uuid"
"google.golang.org/grpc"

"github.com/networkservicemesh/sdk/pkg/networkservice/common/begin"

"github.com/networkservicemesh/api/pkg/api/networkservice"

"github.com/networkservicemesh/sdk/pkg/networkservice/common/clienturl"
"github.com/networkservicemesh/sdk/pkg/networkservice/common/connect"
"github.com/networkservicemesh/sdk/pkg/networkservice/common/heal"
"github.com/networkservicemesh/sdk/pkg/networkservice/common/null"
"github.com/networkservicemesh/sdk/pkg/networkservice/common/refresh"
"github.com/networkservicemesh/sdk/pkg/networkservice/common/serialize"
"github.com/networkservicemesh/sdk/pkg/networkservice/common/updatepath"
"github.com/networkservicemesh/sdk/pkg/networkservice/core/adapters"
"github.com/networkservicemesh/sdk/pkg/networkservice/core/chain"
Expand Down Expand Up @@ -106,9 +107,9 @@ func NewClient(ctx context.Context, connectTo *url.URL, clientOpts ...Option) ne

*rv = chain.NewNetworkServiceClient(
updatepath.NewClient(opts.name),
serialize.NewClient(),
refresh.NewClient(ctx),
begin.NewClient(),
metadata.NewClient(),
refresh.NewClient(ctx),
adapters.NewServerToClient(
chain.NewNetworkServiceServer(
heal.NewServer(ctx,
Expand Down Expand Up @@ -149,9 +150,9 @@ func NewClientFactory(clientOpts ...Option) connect.ClientFactory {
append(
append([]networkservice.NetworkServiceClient{
updatepath.NewClient(opts.name),
serialize.NewClient(),
refresh.NewClient(ctx),
begin.NewClient(),
metadata.NewClient(),
refresh.NewClient(ctx),
// TODO: move back to the end of the chain when `begin` chain element will be ready
heal.NewClient(ctx, networkservice.NewMonitorConnectionClient(cc)),
}, opts.additionalFunctionality...),
Expand Down
13 changes: 6 additions & 7 deletions pkg/networkservice/chains/endpoint/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,15 @@ import (

"google.golang.org/grpc"

"github.com/networkservicemesh/sdk/pkg/networkservice/utils/metadata"

"github.com/networkservicemesh/sdk/pkg/networkservice/common/begin"

"github.com/google/uuid"
"github.com/networkservicemesh/api/pkg/api/networkservice"

"github.com/networkservicemesh/sdk/pkg/networkservice/common/authorize"
"github.com/networkservicemesh/sdk/pkg/networkservice/common/serialize"
"github.com/networkservicemesh/sdk/pkg/networkservice/common/updatetoken"
"github.com/networkservicemesh/sdk/pkg/networkservice/utils/metadata"
"github.com/networkservicemesh/sdk/pkg/tools/grpcutils"

"github.com/networkservicemesh/sdk/pkg/networkservice/common/monitor"
Expand Down Expand Up @@ -104,14 +106,11 @@ func NewServer(ctx context.Context, tokenGenerator token.GeneratorFunc, options
opts.name,
append([]networkservice.NetworkServiceServer{
updatepath.NewServer(opts.name),
serialize.NewServer(),
begin.NewServer(),
updatetoken.NewServer(tokenGenerator),
opts.authorizeServer,
// `timeout` uses ctx as a context for the timeout Close and it closes only the subsequent chain, so
// chain elements before the `timeout` in chain shouldn't make any updates to the Close context and
// shouldn't be closed on Connection Close.
timeout.NewServer(ctx),
metadata.NewServer(),
timeout.NewServer(ctx),
monitor.NewServer(ctx, &rv.MonitorConnectionServer),
}, opts.additionalFunctionality...)...)
return rv
Expand Down
112 changes: 112 additions & 0 deletions pkg/networkservice/common/begin/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
// Copyright (c) 2021 Cisco and/or its affiliates.
//
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at:
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package begin

import (
"context"

"github.com/networkservicemesh/api/pkg/api/networkservice"
"github.com/pkg/errors"
"google.golang.org/grpc"
"google.golang.org/protobuf/types/known/emptypb"

"github.com/networkservicemesh/sdk/pkg/networkservice/core/next"
)

type beginClient struct {
clientMap
}

// NewClient - creates a new begin chain element
func NewClient() networkservice.NetworkServiceClient {
return &beginClient{}
}

func (b *beginClient) Request(ctx context.Context, request *networkservice.NetworkServiceRequest, opts ...grpc.CallOption) (conn *networkservice.Connection, err error) {
// No connection.ID, no service
if request.GetConnection().GetId() == "" {
return nil, errors.New("request.EventFactory.Id must not be zero valued")
}
// If some other EventFactory is already in the ctx... we are already running in an executor, and can just execute normally
if fromContext(ctx) != nil {
return next.Client(ctx).Request(ctx, request, opts...)
}
eventFactoryClient, _ := b.LoadOrStore(request.GetConnection().GetId(),
newEventFactoryClient(
ctx,
func() {
b.Delete(request.GetRequestConnection().GetId())
},
opts...,
),
)
<-eventFactoryClient.executor.AsyncExec(func() {
// If the eventFactory has changed, usually because the connection has been Closed and re-established
// go back to the beginning and try again.
currentConnClient, _ := b.LoadOrStore(request.GetConnection().GetId(), eventFactoryClient)
if currentConnClient != eventFactoryClient {
conn, err = b.Request(ctx, request)
return
}

ctx = withEventFactory(ctx, eventFactoryClient)
conn, err = next.Client(ctx).Request(ctx, request, opts...)
if err != nil {
if eventFactoryClient.state != established {
eventFactoryClient.state = closed
b.Delete(request.GetConnection().GetId())
}
return
}
eventFactoryClient.request = request.Clone()
eventFactoryClient.request.Connection = conn.Clone()
eventFactoryClient.opts = opts
eventFactoryClient.state = established
})
return conn, err
}

func (b *beginClient) Close(ctx context.Context, conn *networkservice.Connection, opts ...grpc.CallOption) (emp *emptypb.Empty, err error) {
// If some other EventFactory is already in the ctx... we are already running in an executor, and can just execute normally
if fromContext(ctx) != nil {
return next.Client(ctx).Close(ctx, conn, opts...)
}
eventFactoryClient, ok := b.Load(conn.GetId())
if !ok {
// If we don't have a connection to Close, just let it be
return
}
<-eventFactoryClient.executor.AsyncExec(func() {
// If the connection is not established, don't do anything
if eventFactoryClient.state != established || eventFactoryClient.client == nil || eventFactoryClient.request == nil {
return
}

// If this isn't the connection we started with, do nothing
currentConnClient, _ := b.LoadOrStore(conn.GetId(), eventFactoryClient)
if currentConnClient != eventFactoryClient {
return
}
// Always close with the last valid Connection we got
conn = eventFactoryClient.request.Connection
ctx = withEventFactory(ctx, eventFactoryClient)
emp, err = next.Client(ctx).Close(ctx, conn, opts...)
// afterClose() is used to cleanup things like the entry in the Map for EventFactories
eventFactoryClient.afterClose()
})
return emp, err
}

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

119 changes: 119 additions & 0 deletions pkg/networkservice/common/begin/close_client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
// Copyright (c) 2021 Cisco and/or its affiliates.
//
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at:
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package begin_test

import (
"context"
"sync"
"testing"

"github.com/networkservicemesh/api/pkg/api/networkservice"
"github.com/stretchr/testify/assert"
"go.uber.org/goleak"
"google.golang.org/grpc"
"google.golang.org/protobuf/types/known/emptypb"

"github.com/networkservicemesh/sdk/pkg/networkservice/common/begin"
"github.com/networkservicemesh/sdk/pkg/networkservice/core/chain"
"github.com/networkservicemesh/sdk/pkg/networkservice/core/next"
)

const (
mark = "mark"
)

func TestCloseClient(t *testing.T) {
t.Cleanup(func() { goleak.VerifyNone(t) })
client := chain.NewNetworkServiceClient(
begin.NewClient(),
&markClient{t: t},
)
id := "1"
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
conn, err := client.Request(ctx, testRequest(id))
assert.NotNil(t, t, conn)
assert.NoError(t, err)
assert.Equal(t, conn.GetContext().GetExtraContext()[mark], mark)
conn = conn.Clone()
delete(conn.GetContext().GetExtraContext(), mark)
assert.Zero(t, conn.GetContext().GetExtraContext()[mark])
_, err = client.Close(ctx, conn)
assert.NoError(t, err)
}

type markClient struct {
t *testing.T
}

func (m *markClient) Request(ctx context.Context, request *networkservice.NetworkServiceRequest, opts ...grpc.CallOption) (*networkservice.Connection, error) {
if request.GetConnection().GetContext() == nil {
request.GetConnection().Context = &networkservice.ConnectionContext{}
}
if request.GetConnection().GetContext().GetExtraContext() == nil {
request.GetConnection().GetContext().ExtraContext = make(map[string]string)
}
request.GetConnection().GetContext().GetExtraContext()[mark] = mark
return next.Client(ctx).Request(ctx, request, opts...)
}

func (m *markClient) Close(ctx context.Context, conn *networkservice.Connection, opts ...grpc.CallOption) (*emptypb.Empty, error) {
assert.NotNil(m.t, conn.GetContext().GetExtraContext())
assert.Equal(m.t, mark, conn.GetContext().GetExtraContext()[mark])
return next.Client(ctx).Close(ctx, conn, opts...)
}

var _ networkservice.NetworkServiceClient = &markClient{}

func TestDoubleCloseClient(t *testing.T) {
t.Cleanup(func() { goleak.VerifyNone(t) })
client := chain.NewNetworkServiceClient(
begin.NewClient(),
&doubleCloseClient{t: t},
)
id := "1"
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
conn, err := client.Request(ctx, testRequest(id))
assert.NotNil(t, t, conn)
assert.NoError(t, err)
conn = conn.Clone()
_, err = client.Close(ctx, conn)
assert.NoError(t, err)
_, err = client.Close(ctx, conn)
assert.NoError(t, err)
}

type doubleCloseClient struct {
t *testing.T
sync.Once
}

func (s *doubleCloseClient) Request(ctx context.Context, request *networkservice.NetworkServiceRequest, opts ...grpc.CallOption) (*networkservice.Connection, error) {
return next.Client(ctx).Request(ctx, request, opts...)
}

func (s *doubleCloseClient) Close(ctx context.Context, conn *networkservice.Connection, opts ...grpc.CallOption) (*emptypb.Empty, error) {
count := 1
s.Do(func() {
count++
})
assert.Equal(s.t, 2, count, "Close has been called more than once")
return next.Client(ctx).Close(ctx, conn, opts...)
}

var _ networkservice.NetworkServiceClient = &doubleCloseClient{}
Loading

0 comments on commit 8202fc4

Please sign in to comment.