Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add default expiration option for registry #1406

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion pkg/networkservice/chains/nsmgr/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ type serverOptions struct {
authorizeNSRegistryClient registryapi.NetworkServiceRegistryClient
authorizeNSERegistryServer registryapi.NetworkServiceEndpointRegistryServer
authorizeNSERegistryClient registryapi.NetworkServiceEndpointRegistryClient
defaultExpiration time.Duration
dialOptions []grpc.DialOption
dialTimeout time.Duration
regURL *url.URL
Expand All @@ -110,6 +111,13 @@ func WithForwarderServiceName(forwarderServiceName string) Option {
}
}

// WithDefaultExpiration sets the default expiration for endpoints
func WithDefaultExpiration(d time.Duration) Option {
return func(o *serverOptions) {
o.defaultExpiration = d
}
}

// WithDialTimeout sets dial timeout for the client
func WithDialTimeout(dialTimeout time.Duration) Option {
return func(o *serverOptions) {
Expand Down Expand Up @@ -213,6 +221,7 @@ func NewServer(ctx context.Context, tokenGenerator token.GeneratorFunc, options
authorizeNSRegistryClient: registryauthorize.NewNetworkServiceRegistryClient(registryauthorize.Any()),
authorizeNSERegistryServer: registryauthorize.NewNetworkServiceEndpointRegistryServer(registryauthorize.Any()),
authorizeNSERegistryClient: registryauthorize.NewNetworkServiceEndpointRegistryClient(registryauthorize.Any()),
defaultExpiration: time.Minute,
name: "nsmgr-" + uuid.New().String(),
forwarderServiceName: "forwarder",
}
Expand Down Expand Up @@ -276,7 +285,7 @@ func NewServer(ctx context.Context, tokenGenerator token.GeneratorFunc, options
opts.authorizeNSERegistryServer,
begin.NewNetworkServiceEndpointRegistryServer(),
registryclientinfo.NewNetworkServiceEndpointRegistryServer(),
expire.NewNetworkServiceEndpointRegistryServer(ctx),
expire.NewNetworkServiceEndpointRegistryServer(ctx, expire.WithDefaultExpiration(opts.defaultExpiration)),
registryrecvfd.NewNetworkServiceEndpointRegistryServer(), // Allow to receive a passed files
registrysendfd.NewNetworkServiceEndpointRegistryServer(),
remoteOrLocalRegistry,
Expand Down
4 changes: 4 additions & 0 deletions pkg/networkservice/chains/nsmgr/single_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,7 @@ func Test_FailedRegistryAuthorization(t *testing.T) {
registrySupplier := func(
ctx context.Context,
tokenGenerator token.GeneratorFunc,
expiryDuration time.Duration,
proxyRegistryURL *url.URL,
options ...grpc.DialOption) registry.Registry {
registryName := sandbox.UniqueName("registry-memory")
Expand All @@ -527,6 +528,7 @@ func Test_FailedRegistryAuthorization(t *testing.T) {
ctx,
tokenGeneratorFunc("spiffe://test.com/"+registryName),
memory.WithProxyRegistryURL(proxyRegistryURL),
memory.WithDefaultExpiration(expiryDuration),
memory.WithDialOptions(options...),
memory.WithAuthorizeNSRegistryServer(
authorizeregistry.NewNetworkServiceRegistryServer(authorizeregistry.WithPolicies("etc/nsm/opa/registry/client_allowed.rego"))),
Expand Down Expand Up @@ -688,12 +690,14 @@ func Test_Expire(t *testing.T) {
registrySupplier := func(
ctx context.Context,
tokenGenerator token.GeneratorFunc,
expiryDuration time.Duration,
proxyRegistryURL *url.URL,
options ...grpc.DialOption) registry.Registry {
return memory.NewServer(
ctx,
tokenGenerator,
memory.WithProxyRegistryURL(proxyRegistryURL),
memory.WithDefaultExpiration(expiryDuration),
memory.WithDialOptions(options...),
memory.WithAuthorizeNSRegistryServer(
authorizeregistry.NewNetworkServiceRegistryServer(authorizeregistry.WithPolicies("etc/nsm/opa/common/tokens_expired.rego"))),
Expand Down
12 changes: 11 additions & 1 deletion pkg/registry/chains/memory/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package memory
import (
"context"
"net/url"
"time"

"google.golang.org/grpc"

Expand Down Expand Up @@ -50,6 +51,7 @@ type serverOptions struct {
authorizeNSERegistryServer registry.NetworkServiceEndpointRegistryServer
authorizeNSRegistryClient registry.NetworkServiceRegistryClient
authorizeNSERegistryClient registry.NetworkServiceEndpointRegistryClient
defaultExpiration time.Duration
proxyRegistryURL *url.URL
dialOptions []grpc.DialOption
}
Expand Down Expand Up @@ -97,6 +99,13 @@ func WithAuthorizeNSERegistryClient(authorizeNSERegistryClient registry.NetworkS
}
}

// WithDefaultExpiration sets the default expiration for endpoints
func WithDefaultExpiration(d time.Duration) Option {
return func(o *serverOptions) {
o.defaultExpiration = d
}
}

// WithProxyRegistryURL sets URL to reach the proxy registry
func WithProxyRegistryURL(proxyRegistryURL *url.URL) Option {
return func(o *serverOptions) {
Expand All @@ -118,6 +127,7 @@ func NewServer(ctx context.Context, tokenGenerator token.GeneratorFunc, options
authorizeNSERegistryServer: registryauthorize.NewNetworkServiceEndpointRegistryServer(registryauthorize.Any()),
authorizeNSRegistryClient: registryauthorize.NewNetworkServiceRegistryClient(registryauthorize.Any()),
authorizeNSERegistryClient: registryauthorize.NewNetworkServiceEndpointRegistryClient(registryauthorize.Any()),
defaultExpiration: time.Minute,
proxyRegistryURL: nil,
}
for _, opt := range options {
Expand Down Expand Up @@ -161,7 +171,7 @@ func NewServer(ctx context.Context, tokenGenerator token.GeneratorFunc, options
Condition: func(c context.Context, nse *registry.NetworkServiceEndpoint) bool { return true },
Action: chain.NewNetworkServiceEndpointRegistryServer(
setregistrationtime.NewNetworkServiceEndpointRegistryServer(),
expire.NewNetworkServiceEndpointRegistryServer(ctx),
expire.NewNetworkServiceEndpointRegistryServer(ctx, expire.WithDefaultExpiration(opts.defaultExpiration)),
memory.NewNetworkServiceEndpointRegistryServer(),
),
},
Expand Down
23 changes: 20 additions & 3 deletions pkg/registry/common/expire/nse_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@ package expire

import (
"context"
"time"

"github.com/golang/protobuf/ptypes/empty"
"google.golang.org/protobuf/types/known/timestamppb"

"github.com/networkservicemesh/api/pkg/api/registry"

"github.com/networkservicemesh/sdk/pkg/registry/common/begin"
Expand All @@ -29,15 +32,23 @@ import (
)

type expireNSEServer struct {
ctx context.Context
ctx context.Context
defaultExpiration time.Duration
cancelsMap
}

// NewNetworkServiceEndpointRegistryServer creates a new NetworkServiceServer chain element that implements unregister
// of expired connections for the subsequent chain elements.
func NewNetworkServiceEndpointRegistryServer(ctx context.Context) registry.NetworkServiceEndpointRegistryServer {
func NewNetworkServiceEndpointRegistryServer(ctx context.Context, opts ...Option) registry.NetworkServiceEndpointRegistryServer {
var serverOptions = &options{}

for _, opt := range opts {
opt(serverOptions)
}

return &expireNSEServer{
ctx: ctx,
ctx: ctx,
defaultExpiration: serverOptions.defaultExpiration,
}
}

Expand All @@ -54,6 +65,12 @@ func (s *expireNSEServer) Register(ctx context.Context, nse *registry.NetworkSer
}

expirationTime := nse.GetExpirationTime().AsTime()
if nse.GetExpirationTime() == nil {
expirationTime = timeClock.Now().Add(s.defaultExpiration).Local()
nse.ExpirationTime = timestamppb.New(expirationTime)
logger.Infof("selected expiration time %v for %v", expirationTime, nse.GetName())
}

resp, err := next.NetworkServiceEndpointRegistryServer(ctx).Register(ctx, nse)
if err != nil {
return nil, err
Expand Down
20 changes: 20 additions & 0 deletions pkg/registry/common/expire/nse_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,26 @@ func TestExpireNSEServer_ShouldUseLessExpirationTimeFromInput_AndWork(t *testing
}, testWait, testTick)
}

func TestExpireNSEServer_ShouldSetDefaultExpiration(t *testing.T) {
t.Cleanup(func() { goleak.VerifyNone(t) })

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

clockMock := clockmock.New(ctx)
ctx = clock.WithClock(ctx, clockMock)

s := next.NewNetworkServiceEndpointRegistryServer(
begin.NewNetworkServiceEndpointRegistryServer(),
expire.NewNetworkServiceEndpointRegistryServer(ctx, expire.WithDefaultExpiration(expireTimeout)),
)

resp, err := s.Register(ctx, &registry.NetworkServiceEndpoint{Name: "nse-1"})
require.NoError(t, err)

require.Equal(t, expireTimeout, clockMock.Until(resp.ExpirationTime.AsTime()))
}

func TestExpireNSEServer_ShouldUseLessExpirationTimeFromResponse(t *testing.T) {
t.Cleanup(func() { goleak.VerifyNone(t) })

Expand Down
33 changes: 33 additions & 0 deletions pkg/registry/common/expire/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright (c) 2022 Cisco and/or its affiliates.
//
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at:
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package expire

import "time"

type options struct {
defaultExpiration time.Duration
}

// Option is option to configure expire chain element
type Option func(*options)

// WithDefaultExpiration sets the default expiration for endpoints
func WithDefaultExpiration(d time.Duration) Option {
return func(o *options) {
o.defaultExpiration = d
}
}
39 changes: 25 additions & 14 deletions pkg/tools/sandbox/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"os"
"runtime"
"testing"
"time"

"github.com/stretchr/testify/require"
"google.golang.org/grpc"
Expand Down Expand Up @@ -52,36 +53,39 @@ type Builder struct {
supplyRegistryProxy SupplyRegistryProxyFunc
setupNode SetupNodeFunc

name string
dnsResolver dnsresolve.Resolver
generateTokenFunc token.GeneratorFunc
name string
dnsResolver dnsresolve.Resolver
generateTokenFunc token.GeneratorFunc
registryDefaultExpiration time.Duration

useUnixSockets bool

domain *Domain
}

func newRegistryMemoryServer(ctx context.Context, tokenGenerator token.GeneratorFunc, proxyRegistryURL *url.URL, options ...grpc.DialOption) registry.Registry {
func newRegistryMemoryServer(ctx context.Context, tokenGenerator token.GeneratorFunc, defaultExpiration time.Duration, proxyRegistryURL *url.URL, options ...grpc.DialOption) registry.Registry {
return memory.NewServer(
ctx,
tokenGenerator,
memory.WithDefaultExpiration(defaultExpiration),
memory.WithProxyRegistryURL(proxyRegistryURL),
memory.WithDialOptions(options...))
}

// NewBuilder creates new SandboxBuilder
func NewBuilder(ctx context.Context, t *testing.T) *Builder {
b := &Builder{
t: t,
ctx: ctx,
nodesCount: 1,
supplyNSMgr: nsmgr.NewServer,
supplyNSMgrProxy: nsmgrproxy.NewServer,
supplyRegistry: newRegistryMemoryServer,
supplyRegistryProxy: proxydns.NewServer,
name: "cluster.local",
dnsResolver: NewFakeResolver(),
generateTokenFunc: GenerateTestToken,
t: t,
ctx: ctx,
nodesCount: 1,
supplyNSMgr: nsmgr.NewServer,
supplyNSMgrProxy: nsmgrproxy.NewServer,
supplyRegistry: newRegistryMemoryServer,
supplyRegistryProxy: proxydns.NewServer,
name: "cluster.local",
dnsResolver: NewFakeResolver(),
generateTokenFunc: GenerateTestToken,
registryDefaultExpiration: time.Minute,
}

b.setupNode = func(ctx context.Context, node *Node, _ int) {
Expand Down Expand Up @@ -147,6 +151,12 @@ func (b *Builder) SetTokenGenerateFunc(f token.GeneratorFunc) *Builder {
return b
}

// SetRegistryDefaultExpiration sets default expiration for endpoints
func (b *Builder) SetRegistryDefaultExpiration(d time.Duration) *Builder {
b.registryDefaultExpiration = d
return b
}

// UseUnixSockets sets 1 node and mark it to use unix socket to listen on.
func (b *Builder) UseUnixSockets() *Builder {
require.NotEqual(b.t, "windows", runtime.GOOS, "Unix sockets are not available for windows")
Expand Down Expand Up @@ -265,6 +275,7 @@ func (b *Builder) newRegistry() *RegistryEntry {
entry.Registry = b.supplyRegistry(
ctx,
b.generateTokenFunc,
b.registryDefaultExpiration,
nsmgrProxyURL,
DialOptions(WithTokenGenerator(b.generateTokenFunc))...,
)
Expand Down
3 changes: 2 additions & 1 deletion pkg/tools/sandbox/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package sandbox
import (
"context"
"net/url"
"time"

registryapi "github.com/networkservicemesh/api/pkg/api/registry"
"google.golang.org/grpc"
Expand All @@ -40,7 +41,7 @@ type SupplyNSMgrProxyFunc func(ctx context.Context, regURL, proxyURL *url.URL, t
type SupplyNSMgrFunc func(ctx context.Context, tokenGenerator token.GeneratorFunc, options ...nsmgr.Option) nsmgr.Nsmgr

// SupplyRegistryFunc supplies Registry
type SupplyRegistryFunc func(ctx context.Context, tokenGenerator token.GeneratorFunc, proxyRegistryURL *url.URL, options ...grpc.DialOption) registry.Registry
type SupplyRegistryFunc func(ctx context.Context, tokenGenerator token.GeneratorFunc, defaultExpiration time.Duration, proxyRegistryURL *url.URL, options ...grpc.DialOption) registry.Registry

// SupplyRegistryProxyFunc supplies registry proxy
type SupplyRegistryProxyFunc func(ctx context.Context, tokenGenerator token.GeneratorFunc, dnsResolver dnsresolve.Resolver, options ...proxydns.Option) registry.Registry
Expand Down