Skip to content

Commit

Permalink
fix: Make MetaCache field of GrpcClient
Browse files Browse the repository at this point in the history
Related to milvus-io#809

Signed-off-by: Congqi Xia <[email protected]>
  • Loading branch information
congqixia committed Aug 21, 2024
1 parent 0c339b6 commit 67db196
Show file tree
Hide file tree
Showing 14 changed files with 118 additions and 106 deletions.
9 changes: 6 additions & 3 deletions client/client_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type MockSuiteBase struct {
svr *grpc.Server
mock *mocks.MilvusServiceServer

client Client
client *GrpcClient
}

func (s *MockSuiteBase) SetupSuite() {
Expand Down Expand Up @@ -71,7 +71,9 @@ func (s *MockSuiteBase) SetupTest() {
s.Require().NoError(err)
s.setupConnect()

s.client = c
grpcClient, ok := c.(*GrpcClient)
s.Require().True(ok)
s.client = grpcClient
}

func (s *MockSuiteBase) TearDownTest() {
Expand All @@ -80,8 +82,9 @@ func (s *MockSuiteBase) TearDownTest() {
}

func (s *MockSuiteBase) resetMock() {
MetaCache.reset()
// MetaCache.reset()
if s.mock != nil {
s.client.cache.reset()
s.mock.Calls = nil
s.mock.ExpectedCalls = nil
s.setupConnect()
Expand Down
4 changes: 2 additions & 2 deletions client/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ func (c *GrpcClient) DescribeCollection(ctx context.Context, collName string) (*
Schema: collection.Schema,
ConsistencyLevel: collection.ConsistencyLevel,
}
MetaCache.setCollectionInfo(collName, &colInfo)
c.cache.setCollectionInfo(collName, &colInfo)
return collection, nil
}

Expand All @@ -317,7 +317,7 @@ func (c *GrpcClient) DropCollection(ctx context.Context, collName string, opts .
}
err = handleRespStatus(resp)
if err == nil {
MetaCache.setCollectionInfo(collName, nil)
c.cache.setCollectionInfo(collName, nil)
}
return err
}
Expand Down
6 changes: 2 additions & 4 deletions client/collection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,10 +276,8 @@ func (s *CollectionSuite) TestCreateCollection() {
WithField(entity.NewField().WithName("dynamic").WithDataType(entity.FieldTypeJSON).WithIsDynamic(true))
for _, tc := range cases {
s.Run(tc.tag, func() {
grpcClient, ok := c.(*GrpcClient)
s.Require().True(ok)
grpcClient.config.addFlags(tc.flag)
defer grpcClient.config.resetFlags(tc.flag)
c.config.addFlags(tc.flag)
defer c.config.resetFlags(tc.flag)

err := c.CreateCollection(ctx, sch, 1)
s.Error(err)
Expand Down
47 changes: 14 additions & 33 deletions client/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,18 +43,11 @@ func (c *GrpcClient) HybridSearch(ctx context.Context, collName string, partitio
return nil, ErrClientNotReady
}

var schema *entity.Schema
collInfo, ok := MetaCache.getCollectionInfo(collName)
if !ok {
coll, err := c.DescribeCollection(ctx, collName)
if err != nil {
return nil, err
}
schema = coll.Schema
collInfo, _ = MetaCache.getCollectionInfo(collName)
} else {
schema = collInfo.Schema
collInfo, err := c.getCollectionInfo(ctx, collName)
if err != nil {
return nil, err
}
schema := collInfo.Schema

sReqs := make([]*milvuspb.SearchRequest, 0, len(subRequests))
nq := 0
Expand Down Expand Up @@ -103,19 +96,13 @@ func (c *GrpcClient) Search(ctx context.Context, collName string, partitions []s
if c.Service == nil {
return []SearchResult{}, ErrClientNotReady
}
var schema *entity.Schema
collInfo, ok := MetaCache.getCollectionInfo(collName)
if !ok {
coll, err := c.DescribeCollection(ctx, collName)
if err != nil {
return nil, err
}
schema = coll.Schema
} else {
schema = collInfo.Schema
collInfo, err := c.getCollectionInfo(ctx, collName)
if err != nil {
return nil, err
}
schema := collInfo.Schema

option, err := makeSearchQueryOption(collName, opts...)
option, err := c.makeSearchQueryOption(collName, opts...)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -320,19 +307,13 @@ func (c *GrpcClient) Query(ctx context.Context, collectionName string, partition
return nil, ErrClientNotReady
}

var sch *entity.Schema
collInfo, ok := MetaCache.getCollectionInfo(collectionName)
if !ok {
coll, err := c.DescribeCollection(ctx, collectionName)
if err != nil {
return nil, err
}
sch = coll.Schema
} else {
sch = collInfo.Schema
collInfo, err := c.getCollectionInfo(ctx, collectionName)
if err != nil {
return nil, err
}
sch := collInfo.Schema

option, err := makeSearchQueryOption(collectionName, opts...)
option, err := c.makeSearchQueryOption(collectionName, opts...)
if err != nil {
return nil, err
}
Expand Down
1 change: 1 addition & 0 deletions client/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func (c *GrpcClient) UsingDatabase(ctx context.Context, dbName string) error {
if err != nil {
return err
}
c.cache.reset()

return nil
}
Expand Down
33 changes: 33 additions & 0 deletions client/grpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"strconv"
"time"

"golang.org/x/sync/singleflight"

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus-sdk-go/v2/common"
Expand All @@ -24,6 +26,9 @@ type GrpcClient struct {
Conn *grpc.ClientConn // grpc connection instance
Service milvuspb.MilvusServiceClient // Service client stub

cache *metaCache
sf singleflight.Group

config *Config // No thread safety
}

Expand All @@ -32,6 +37,8 @@ func (c *GrpcClient) connect(ctx context.Context, addr string, opts ...grpc.Dial
if addr == "" {
return fmt.Errorf("address is empty")
}

c.cache = newMetaCache()
conn, err := grpc.DialContext(ctx, addr, opts...)
if err != nil {
return err
Expand Down Expand Up @@ -92,6 +99,32 @@ func (c *GrpcClient) connectInternal(ctx context.Context) error {
return nil
}

func (c *GrpcClient) getCollectionInfo(ctx context.Context, collectionName string) (*collInfo, error) {
// try cache first
info, ok := c.cache.getCollectionInfo(collectionName)
if ok {
return info, nil
}

// use singleflight to fetch collection info
v, err, _ := c.sf.Do(collectionName, func() (interface{}, error) {
coll, err := c.DescribeCollection(ctx, collectionName)
if err != nil {
return nil, err
}
return &collInfo{
ID: coll.ID,
Name: coll.Name,
Schema: coll.Schema,
ConsistencyLevel: coll.ConsistencyLevel,
}, nil
})
if err != nil {
return nil, err
}
return v.(*collInfo), nil
}

// Close close the connection
func (c *GrpcClient) Close() error {
if c.Conn != nil {
Expand Down
36 changes: 12 additions & 24 deletions client/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,17 +35,11 @@ func (c *GrpcClient) Insert(ctx context.Context, collName string, partitionName
if c.Service == nil {
return nil, ErrClientNotReady
}
var schema *entity.Schema
collInfo, ok := MetaCache.getCollectionInfo(collName)
if !ok {
coll, err := c.DescribeCollection(ctx, collName)
if err != nil {
return nil, err
}
schema = coll.Schema
} else {
schema = collInfo.Schema
collInfo, err := c.getCollectionInfo(ctx, collName)
if err != nil {
return nil, err
}
schema := collInfo.Schema

// convert columns to field data
fieldsData, rowSize, err := c.processInsertColumns(schema, columns...)
Expand All @@ -70,7 +64,7 @@ func (c *GrpcClient) Insert(ctx context.Context, collName string, partitionName
if err := handleRespStatus(resp.GetStatus()); err != nil {
return nil, err
}
MetaCache.setSessionTs(collName, resp.Timestamp)
c.cache.setSessionTs(collName, resp.Timestamp)
// 3. parse id column
return entity.IDColumns(schema, resp.GetIDs(), 0, -1)
}
Expand Down Expand Up @@ -332,7 +326,7 @@ func (c *GrpcClient) DeleteByPks(ctx context.Context, collName string, partition
if err != nil {
return err
}
MetaCache.setSessionTs(collName, resp.Timestamp)
c.cache.setSessionTs(collName, resp.Timestamp)
return nil
}

Expand Down Expand Up @@ -370,7 +364,7 @@ func (c *GrpcClient) Delete(ctx context.Context, collName string, partitionName
if err != nil {
return err
}
MetaCache.setSessionTs(collName, resp.Timestamp)
c.cache.setSessionTs(collName, resp.Timestamp)
return nil
}

Expand All @@ -382,17 +376,11 @@ func (c *GrpcClient) Upsert(ctx context.Context, collName string, partitionName
if c.Service == nil {
return nil, ErrClientNotReady
}
var schema *entity.Schema
collInfo, ok := MetaCache.getCollectionInfo(collName)
if !ok {
coll, err := c.DescribeCollection(ctx, collName)
if err != nil {
return nil, err
}
schema = coll.Schema
} else {
schema = collInfo.Schema
collInfo, err := c.getCollectionInfo(ctx, collName)
if err != nil {
return nil, err
}
schema := collInfo.Schema

fieldsData, rowSize, err := c.processInsertColumns(schema, columns...)
if err != nil {
Expand All @@ -416,7 +404,7 @@ func (c *GrpcClient) Upsert(ctx context.Context, collName string, partitionName
if err := handleRespStatus(resp.GetStatus()); err != nil {
return nil, err
}
MetaCache.setSessionTs(collName, resp.Timestamp)
c.cache.setSessionTs(collName, resp.Timestamp)
// 3. parse id column
return entity.IDColumns(schema, resp.GetIDs(), 0, -1)
}
Expand Down
32 changes: 10 additions & 22 deletions client/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,11 @@ import (

func (c *GrpcClient) SearchIterator(ctx context.Context, opt *SearchIteratorOption) (*SearchIterator, error) {
collectionName := opt.collectionName
var sch *entity.Schema
collInfo, ok := MetaCache.getCollectionInfo(collectionName)
if !ok {
coll, err := c.DescribeCollection(ctx, collectionName)
if err != nil {
return nil, err
}
sch = coll.Schema
} else {
sch = collInfo.Schema
collInfo, err := c.getCollectionInfo(ctx, collectionName)
if err != nil {
return nil, err
}
sch := collInfo.Schema

var vectorField *entity.Field
for _, field := range sch.Fields {
Expand Down Expand Up @@ -52,7 +46,7 @@ func (c *GrpcClient) SearchIterator(ctx context.Context, opt *SearchIteratorOpti
expr: opt.expr,
}

err := itr.init(ctx)
err = itr.init(ctx)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -202,17 +196,11 @@ func (itr *SearchIterator) Next(ctx context.Context) (*SearchResult, error) {

func (c *GrpcClient) QueryIterator(ctx context.Context, opt *QueryIteratorOption) (*QueryIterator, error) {
collectionName := opt.collectionName
var sch *entity.Schema
collInfo, ok := MetaCache.getCollectionInfo(collectionName)
if !ok {
coll, err := c.DescribeCollection(ctx, collectionName)
if err != nil {
return nil, err
}
sch = coll.Schema
} else {
sch = collInfo.Schema
collInfo, err := c.getCollectionInfo(ctx, collectionName)
if err != nil {
return nil, err
}
sch := collInfo.Schema

itr := &QueryIterator{
client: c,
Expand All @@ -227,7 +215,7 @@ func (c *GrpcClient) QueryIterator(ctx context.Context, opt *QueryIteratorOption
expr: opt.expr,
}

err := itr.init(ctx)
err = itr.init(ctx)
if err != nil {
return nil, err
}
Expand Down
8 changes: 5 additions & 3 deletions client/meta_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@ type collInfo struct {
ConsistencyLevel entity.ConsistencyLevel
}

var MetaCache = metaCache{
sessionTsMap: make(map[string]uint64),
collInfoMap: make(map[string]collInfo),
func newMetaCache() *metaCache {
return &metaCache{
sessionTsMap: make(map[string]uint64),
collInfoMap: make(map[string]collInfo),
}
}

// timestampMap collects the last-write-timestamp of every collection, which is required by session consistency level.
Expand Down
Loading

0 comments on commit 67db196

Please sign in to comment.