Skip to content

Commit

Permalink
fix unstable tests
Browse files Browse the repository at this point in the history
Signed-off-by: denis-tingaikin <[email protected]>
  • Loading branch information
denis-tingaikin committed Oct 17, 2024
1 parent 3dadc67 commit 54e953c
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 12 deletions.
3 changes: 2 additions & 1 deletion pkg/networkservice/chains/nsmgr/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ func NewServer(ctx context.Context, tokenGenerator token.GeneratorFunc, options
chain.NewNetworkServiceRegistryClient(
clienturl.NewNetworkServiceRegistryClient(opts.regURL),
begin.NewNetworkServiceRegistryClient(),
//querycache.NewNetworkServiceRegistryClient(ctx),
clientconn.NewNetworkServiceRegistryClient(),
opts.authorizeNSRegistryClient,
grpcmetadata.NewNetworkServiceRegistryClient(),
Expand All @@ -264,11 +265,11 @@ func NewServer(ctx context.Context, tokenGenerator token.GeneratorFunc, options
registryconnect.NewNetworkServiceEndpointRegistryServer(
chain.NewNetworkServiceEndpointRegistryClient(
begin.NewNetworkServiceEndpointRegistryClient(),
querycache.NewNetworkServiceEndpointRegistryClient(ctx),
clienturl.NewNetworkServiceEndpointRegistryClient(opts.regURL),
clientconn.NewNetworkServiceEndpointRegistryClient(),
opts.authorizeNSERegistryClient,
grpcmetadata.NewNetworkServiceEndpointRegistryClient(),
querycache.NewClient(ctx),
dial.NewNetworkServiceEndpointRegistryClient(ctx,
dial.WithDialTimeout(opts.dialTimeout),
dial.WithDialOptions(opts.dialOptions...),
Expand Down
88 changes: 88 additions & 0 deletions pkg/registry/common/querycache/ns_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// Copyright (c) 2020-2021 Doc.ai and/or its affiliates.
//
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at:
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package querycache adds possible to cache Find queries
package querycache

import (
"context"
"time"

cache "github.com/go-pkgz/expirable-cache/v3"
"github.com/golang/protobuf/ptypes/empty"
"google.golang.org/grpc"

"github.com/networkservicemesh/api/pkg/api/registry"

"github.com/networkservicemesh/sdk/pkg/registry/core/next"
"github.com/networkservicemesh/sdk/pkg/registry/core/streamchannel"
)

type queryCacheNSClient struct {
chainContext context.Context
cache cache.Cache[string, []*registry.NetworkService]
}

// NewNetworkServiceRegistryClient creates new querycache NS registry client that caches all resolved NSs
func NewNetworkServiceRegistryClient(ctx context.Context) registry.NetworkServiceRegistryClient {
var res = &queryCacheNSClient{
chainContext: ctx,
cache: cache.NewCache[string, []*registry.NetworkService]().WithLRU().WithMaxKeys(32).WithTTL(time.Millisecond * 100),
}
return res
}

func (q *queryCacheNSClient) Register(ctx context.Context, nse *registry.NetworkService, opts ...grpc.CallOption) (*registry.NetworkService, error) {
resp, err := next.NetworkServiceRegistryClient(ctx).Register(ctx, nse, opts...)
if err == nil {
q.cache.Add(resp.GetName(), []*registry.NetworkService{resp})
}
return resp, err
}

func (q *queryCacheNSClient) Find(ctx context.Context, query *registry.NetworkServiceQuery, opts ...grpc.CallOption) (registry.NetworkServiceRegistry_FindClient, error) {
if query.Watch {
return next.NetworkServiceRegistryClient(ctx).Find(ctx, query, opts...)
}

var list []*registry.NetworkService
if v, ok := q.cache.Get(query.GetNetworkService().GetName()); ok {
list = v
} else {
var streamClient, err = next.NetworkServiceRegistryClient(ctx).Find(ctx, query, opts...)
if err != nil {
return streamClient, err
}
list = registry.ReadNetworkServiceList(streamClient)
for _, item := range list {
q.cache.Add(item.GetName(), []*registry.NetworkService{item.Clone()})
}
}
var resultStreamChannel = make(chan *registry.NetworkServiceResponse, len(list))
for _, item := range list {
resultStreamChannel <- &registry.NetworkServiceResponse{NetworkService: item}
}
close(resultStreamChannel)
return streamchannel.NewNetworkServiceFindClient(ctx, resultStreamChannel), nil
}

func (q *queryCacheNSClient) Unregister(ctx context.Context, in *registry.NetworkService, opts ...grpc.CallOption) (*empty.Empty, error) {
resp, err := next.NetworkServiceRegistryClient(ctx).Unregister(ctx, in, opts...)
if err == nil {
q.cache.Remove(in.GetName())
}
return resp, err
}
15 changes: 8 additions & 7 deletions pkg/registry/common/querycache/nse_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,15 @@ import (
)

type queryCacheNSEClient struct {
ctx context.Context
cache cache.Cache[string, []*registry.NetworkServiceEndpoint]
chainContext context.Context
cache cache.Cache[string, []*registry.NetworkServiceEndpoint]
}

// NewClient creates new querycache NSE registry client that caches all resolved NSEs
func NewClient(ctx context.Context) registry.NetworkServiceEndpointRegistryClient {
func NewNetworkServiceEndpointRegistryClient(ctx context.Context) registry.NetworkServiceEndpointRegistryClient {
var res = &queryCacheNSEClient{
ctx: ctx,
cache: cache.NewCache[string, []*registry.NetworkServiceEndpoint]().WithLRU().WithMaxKeys(32).WithTTL(time.Millisecond * 300),
chainContext: ctx,
cache: cache.NewCache[string, []*registry.NetworkServiceEndpoint]().WithLRU().WithMaxKeys(32).WithTTL(time.Millisecond * 300),
}
return res
}
Expand All @@ -67,8 +67,9 @@ func (q *queryCacheNSEClient) Find(ctx context.Context, query *registry.NetworkS
return streamClient, err
}
list = registry.ReadNetworkServiceEndpointList(streamClient)
q.cache.Add(query.GetNetworkServiceEndpoint().GetName(), list)

for _, item := range list {
q.cache.Add(item.GetName(), []*registry.NetworkServiceEndpoint{item.Clone()})
}
}
var resultStreamChannel = make(chan *registry.NetworkServiceEndpointResponse, len(list))
for _, item := range list {
Expand Down
6 changes: 2 additions & 4 deletions pkg/registry/common/querycache/nse_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package querycache_test

import (
"context"
"fmt"
"sync/atomic"
"testing"
"time"
Expand Down Expand Up @@ -64,7 +63,7 @@ func Test_QueryCacheClient_ShouldCacheNSEs(t *testing.T) {

failureClient := new(failureNSEClient)
c := next.NewNetworkServiceEndpointRegistryClient(
querycache.NewClient(ctx),
querycache.NewNetworkServiceEndpointRegistryClient(ctx),
failureClient,
adapters.NetworkServiceEndpointServerToClient(mem),
)
Expand Down Expand Up @@ -109,7 +108,6 @@ func Test_QueryCacheClient_ShouldCacheNSEs(t *testing.T) {
if nseResp, err = stream.Recv(); err != nil {
return false
}
fmt.Println(name == nseResp.NetworkServiceEndpoint.Name, url2 == nseResp.NetworkServiceEndpoint.Url)
return name == nseResp.NetworkServiceEndpoint.Name && url2 == nseResp.NetworkServiceEndpoint.Url
}, testWait, testTick)

Expand Down Expand Up @@ -139,7 +137,7 @@ func Test_QueryCacheClient_ShouldCleanUpOnTimeout(t *testing.T) {

failureClient := new(failureNSEClient)
c := next.NewNetworkServiceEndpointRegistryClient(
querycache.NewClient(ctx),
querycache.NewNetworkServiceEndpointRegistryClient(ctx),
failureClient,
adapters.NetworkServiceEndpointServerToClient(mem),
)
Expand Down

0 comments on commit 54e953c

Please sign in to comment.