From 13e4a618c46df9d42bcc2a87a34b117e94805e06 Mon Sep 17 00:00:00 2001 From: bigsheeper Date: Thu, 30 May 2024 15:56:06 +0800 Subject: [PATCH 1/3] fix: Fill stats log id and check validity Signed-off-by: bigsheeper --- internal/datacoord/import_scheduler.go | 2 +- internal/metastore/kv/binlog/binlog.go | 18 +++------ internal/metastore/kv/binlog/binlog_test.go | 8 ++-- .../metastore/kv/datacoord/kv_catalog_test.go | 38 +++++++++++++++++++ internal/metastore/kv/datacoord/util.go | 18 +++++++++ 5 files changed, 67 insertions(+), 17 deletions(-) diff --git a/internal/datacoord/import_scheduler.go b/internal/datacoord/import_scheduler.go index bfe48e041d6de..7833ca49e8df2 100644 --- a/internal/datacoord/import_scheduler.go +++ b/internal/datacoord/import_scheduler.go @@ -303,7 +303,7 @@ func (s *importScheduler) processInProgressImport(task ImportTask) { if resp.GetState() == datapb.ImportTaskStateV2_Completed { for _, info := range resp.GetImportSegmentsInfo() { // try to parse path and fill logID - err = binlog.CompressFieldBinlogs(info.GetBinlogs()) + err = binlog.CompressBinLogs(info.GetBinlogs(), info.GetStatslogs()) if err != nil { log.Warn("fail to CompressFieldBinlogs for import binlogs", WrapTaskLog(task, zap.Int64("segmentID", info.GetSegmentID()), zap.Error(err))...) diff --git a/internal/metastore/kv/binlog/binlog.go b/internal/metastore/kv/binlog/binlog.go index 94e0c09cc73e6..8a2f5ef9e0a01 100644 --- a/internal/metastore/kv/binlog/binlog.go +++ b/internal/metastore/kv/binlog/binlog.go @@ -63,18 +63,12 @@ func CompressCompactionBinlogs(binlogs []*datapb.CompactionSegment) error { return nil } -func CompressBinLogs(s *datapb.SegmentInfo) error { - err := CompressFieldBinlogs(s.GetBinlogs()) - if err != nil { - return err - } - err = CompressFieldBinlogs(s.GetDeltalogs()) - if err != nil { - return err - } - err = CompressFieldBinlogs(s.GetStatslogs()) - if err != nil { - return err +func CompressBinLogs(binlogs ...[]*datapb.FieldBinlog) error { + for _, l := range binlogs { + err := CompressFieldBinlogs(l) + if err != nil { + return err + } } return nil } diff --git a/internal/metastore/kv/binlog/binlog_test.go b/internal/metastore/kv/binlog/binlog_test.go index ed4f6d5f13237..3c2c5114b5f27 100644 --- a/internal/metastore/kv/binlog/binlog_test.go +++ b/internal/metastore/kv/binlog/binlog_test.go @@ -198,7 +198,7 @@ func TestBinlog_Compress(t *testing.T) { assert.NoError(t, err) compressedSegmentInfo := proto.Clone(segmentInfo).(*datapb.SegmentInfo) - err = CompressBinLogs(compressedSegmentInfo) + err = CompressBinLogs(compressedSegmentInfo.GetBinlogs(), compressedSegmentInfo.GetDeltalogs(), compressedSegmentInfo.GetStatslogs()) assert.NoError(t, err) valCompressed, err := proto.Marshal(compressedSegmentInfo) @@ -233,7 +233,7 @@ func TestBinlog_Compress(t *testing.T) { segmentInfo1 := &datapb.SegmentInfo{ Binlogs: fieldBinLogs, } - err = CompressBinLogs(segmentInfo1) + err = CompressBinLogs(segmentInfo1.GetBinlogs(), segmentInfo1.GetDeltalogs(), segmentInfo1.GetStatslogs()) assert.ErrorIs(t, err, merr.ErrParameterInvalid) fakeDeltalogs := make([]*datapb.Binlog, 1) @@ -249,7 +249,7 @@ func TestBinlog_Compress(t *testing.T) { segmentInfo2 := &datapb.SegmentInfo{ Deltalogs: fieldDeltaLogs, } - err = CompressBinLogs(segmentInfo2) + err = CompressBinLogs(segmentInfo2.GetBinlogs(), segmentInfo2.GetDeltalogs(), segmentInfo2.GetStatslogs()) assert.ErrorIs(t, err, merr.ErrParameterInvalid) fakeStatslogs := make([]*datapb.Binlog, 1) @@ -265,7 +265,7 @@ func TestBinlog_Compress(t *testing.T) { segmentInfo3 := &datapb.SegmentInfo{ Statslogs: fieldDeltaLogs, } - err = CompressBinLogs(segmentInfo3) + err = CompressBinLogs(segmentInfo3.GetBinlogs(), segmentInfo3.GetDeltalogs(), segmentInfo3.GetStatslogs()) assert.ErrorIs(t, err, merr.ErrParameterInvalid) // test decompress error invalid Type diff --git a/internal/metastore/kv/datacoord/kv_catalog_test.go b/internal/metastore/kv/datacoord/kv_catalog_test.go index 4f4244a21f04a..8ff58679be1e2 100644 --- a/internal/metastore/kv/datacoord/kv_catalog_test.go +++ b/internal/metastore/kv/datacoord/kv_catalog_test.go @@ -418,6 +418,44 @@ func Test_AlterSegments(t *testing.T) { assert.Equal(t, int64(100), segmentXL.GetNumOfRows()) assert.Equal(t, int64(5), adjustedSeg.GetNumOfRows()) }) + + t.Run("invalid log id", func(t *testing.T) { + metakv := mocks.NewMetaKv(t) + catalog := NewCatalog(metakv, rootPath, "") + + segment := &datapb.SegmentInfo{ + ID: segmentID, + CollectionID: collectionID, + PartitionID: partitionID, + NumOfRows: 100, + State: commonpb.SegmentState_Flushed, + } + + invalidLogWithZeroLogID := []*datapb.FieldBinlog{{ + FieldID: 1, + Binlogs: []*datapb.Binlog{ + { + LogID: 0, + LogPath: "mock_log_path", + }, + }, + }} + + segment.Statslogs = invalidLogWithZeroLogID + err := catalog.AlterSegments(context.TODO(), []*datapb.SegmentInfo{segment}, metastore.BinlogsIncrement{Segment: segment}) + assert.Error(t, err) + t.Logf("%v", err) + + segment.Deltalogs = invalidLogWithZeroLogID + err = catalog.AlterSegments(context.TODO(), []*datapb.SegmentInfo{segment}, metastore.BinlogsIncrement{Segment: segment}) + assert.Error(t, err) + t.Logf("%v", err) + + segment.Binlogs = invalidLogWithZeroLogID + err = catalog.AlterSegments(context.TODO(), []*datapb.SegmentInfo{segment}, metastore.BinlogsIncrement{Segment: segment}) + assert.Error(t, err) + t.Logf("%v", err) + }) } func Test_DropSegment(t *testing.T) { diff --git a/internal/metastore/kv/datacoord/util.go b/internal/metastore/kv/datacoord/util.go index 09a979bafeff2..c94459e3c649f 100644 --- a/internal/metastore/kv/datacoord/util.go +++ b/internal/metastore/kv/datacoord/util.go @@ -164,8 +164,20 @@ func cloneLogs(binlogs []*datapb.FieldBinlog) []*datapb.FieldBinlog { func buildBinlogKvs(collectionID, partitionID, segmentID typeutil.UniqueID, binlogs, deltalogs, statslogs []*datapb.FieldBinlog) (map[string]string, error) { kv := make(map[string]string) + checkLogID := func(fieldBinlog *datapb.FieldBinlog) error { + for _, binlog := range fieldBinlog.GetBinlogs() { + if binlog.GetLogID() == 0 { + return fmt.Errorf("invalid log id, binlog:%v", binlog) + } + } + return nil + } + // binlog kv for _, binlog := range binlogs { + if err := checkLogID(binlog); err != nil { + return nil, err + } binlogBytes, err := proto.Marshal(binlog) if err != nil { return nil, fmt.Errorf("marshal binlogs failed, collectionID:%d, segmentID:%d, fieldID:%d, error:%w", collectionID, segmentID, binlog.FieldID, err) @@ -176,6 +188,9 @@ func buildBinlogKvs(collectionID, partitionID, segmentID typeutil.UniqueID, binl // deltalog for _, deltalog := range deltalogs { + if err := checkLogID(deltalog); err != nil { + return nil, err + } binlogBytes, err := proto.Marshal(deltalog) if err != nil { return nil, fmt.Errorf("marshal deltalogs failed, collectionID:%d, segmentID:%d, fieldID:%d, error:%w", collectionID, segmentID, deltalog.FieldID, err) @@ -186,6 +201,9 @@ func buildBinlogKvs(collectionID, partitionID, segmentID typeutil.UniqueID, binl // statslog for _, statslog := range statslogs { + if err := checkLogID(statslog); err != nil { + return nil, err + } binlogBytes, err := proto.Marshal(statslog) if err != nil { return nil, fmt.Errorf("marshal statslogs failed, collectionID:%d, segmentID:%d, fieldID:%d, error:%w", collectionID, segmentID, statslog.FieldID, err) From 2407779a83d2bab90a1c1d33aa067598d4c05403 Mon Sep 17 00:00:00 2001 From: bigsheeper Date: Thu, 30 May 2024 17:20:27 +0800 Subject: [PATCH 2/3] fix ut Signed-off-by: bigsheeper --- internal/datacoord/compaction_test.go | 13 +++++++++++++ internal/datacoord/meta_test.go | 16 ++++++++-------- internal/datacoord/server_test.go | 13 +++++++++++++ internal/datacoord/services_test.go | 15 +++++++++++++++ 4 files changed, 49 insertions(+), 8 deletions(-) diff --git a/internal/datacoord/compaction_test.go b/internal/datacoord/compaction_test.go index e21f06f29f626..00e69e337651d 100644 --- a/internal/datacoord/compaction_test.go +++ b/internal/datacoord/compaction_test.go @@ -26,6 +26,7 @@ import ( "github.com/stretchr/testify/suite" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + "github.com/milvus-io/milvus/internal/metastore/kv/binlog" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/metautil" @@ -767,6 +768,10 @@ func getFieldBinlogIDs(fieldID int64, logIDs ...int64) *datapb.FieldBinlog { for _, id := range logIDs { l.Binlogs = append(l.Binlogs, &datapb.Binlog{LogID: id}) } + err := binlog.CompressFieldBinlogs([]*datapb.FieldBinlog{l}) + if err != nil { + panic(err) + } return l } @@ -778,6 +783,10 @@ func getFieldBinlogPaths(fieldID int64, paths ...string) *datapb.FieldBinlog { for _, path := range paths { l.Binlogs = append(l.Binlogs, &datapb.Binlog{LogPath: path}) } + err := binlog.CompressFieldBinlogs([]*datapb.FieldBinlog{l}) + if err != nil { + panic(err) + } return l } @@ -789,6 +798,10 @@ func getFieldBinlogIDsWithEntry(fieldID int64, entry int64, logIDs ...int64) *da for _, id := range logIDs { l.Binlogs = append(l.Binlogs, &datapb.Binlog{LogID: id, EntriesNum: entry}) } + err := binlog.CompressFieldBinlogs([]*datapb.FieldBinlog{l}) + if err != nil { + panic(err) + } return l } diff --git a/internal/datacoord/meta_test.go b/internal/datacoord/meta_test.go index dd0471eebcddf..eb26acfb7272c 100644 --- a/internal/datacoord/meta_test.go +++ b/internal/datacoord/meta_test.go @@ -678,8 +678,8 @@ func TestUpdateSegmentsInfo(t *testing.T) { segment1 := &SegmentInfo{SegmentInfo: &datapb.SegmentInfo{ ID: 1, State: commonpb.SegmentState_Growing, - Binlogs: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 0)}, - Statslogs: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 0)}, + Binlogs: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 2)}, + Statslogs: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 2)}, }} err = meta.AddSegment(context.TODO(), segment1) assert.NoError(t, err) @@ -689,7 +689,7 @@ func TestUpdateSegmentsInfo(t *testing.T) { AddBinlogsOperator(1, []*datapb.FieldBinlog{getFieldBinlogIDsWithEntry(1, 10, 1)}, []*datapb.FieldBinlog{getFieldBinlogIDs(1, 1)}, - []*datapb.FieldBinlog{{Binlogs: []*datapb.Binlog{{EntriesNum: 1, TimestampFrom: 100, TimestampTo: 200, LogSize: 1000, LogPath: getDeltaLogPath("deltalog1", 1)}}}}, + []*datapb.FieldBinlog{{Binlogs: []*datapb.Binlog{{EntriesNum: 1, TimestampFrom: 100, TimestampTo: 200, LogSize: 1000, LogPath: getDeltaLogPath("deltalog1", 1), LogID: 2}}}}, ), UpdateStartPosition([]*datapb.SegmentStartPosition{{SegmentID: 1, StartPosition: &msgpb.MsgPosition{MsgID: []byte{1, 2, 3}}}}), UpdateCheckPointOperator(1, []*datapb.CheckPoint{{SegmentID: 1, NumOfRows: 10}}), @@ -729,8 +729,8 @@ func TestUpdateSegmentsInfo(t *testing.T) { // normal segment1 := &SegmentInfo{SegmentInfo: &datapb.SegmentInfo{ ID: 1, State: commonpb.SegmentState_Flushed, - Binlogs: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 0)}, - Statslogs: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 0)}, + Binlogs: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 2)}, + Statslogs: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 2)}, }} err = meta.AddSegment(context.TODO(), segment1) assert.NoError(t, err) @@ -828,9 +828,9 @@ func TestUpdateSegmentsInfo(t *testing.T) { err = meta.UpdateSegmentsInfo( UpdateStatusOperator(1, commonpb.SegmentState_Flushing), AddBinlogsOperator(1, - []*datapb.FieldBinlog{getFieldBinlogIDs(1, 0)}, - []*datapb.FieldBinlog{getFieldBinlogIDs(1, 0)}, - []*datapb.FieldBinlog{{Binlogs: []*datapb.Binlog{{EntriesNum: 1, TimestampFrom: 100, TimestampTo: 200, LogSize: 1000, LogPath: getDeltaLogPath("deltalog", 1)}}}}, + []*datapb.FieldBinlog{getFieldBinlogIDs(1, 2)}, + []*datapb.FieldBinlog{getFieldBinlogIDs(1, 2)}, + []*datapb.FieldBinlog{{Binlogs: []*datapb.Binlog{{EntriesNum: 1, TimestampFrom: 100, TimestampTo: 200, LogSize: 1000, LogPath: getDeltaLogPath("deltalog", 1), LogID: 2}}}}, ), UpdateStartPosition([]*datapb.SegmentStartPosition{{SegmentID: 1, StartPosition: &msgpb.MsgPosition{MsgID: []byte{1, 2, 3}}}}), UpdateCheckPointOperator(1, []*datapb.CheckPoint{{SegmentID: 1, NumOfRows: 10}}), diff --git a/internal/datacoord/server_test.go b/internal/datacoord/server_test.go index 5ef708b072b9b..3fc1e5a038b6e 100644 --- a/internal/datacoord/server_test.go +++ b/internal/datacoord/server_test.go @@ -320,14 +320,17 @@ func TestGetSegmentInfo(t *testing.T) { { EntriesNum: 20, LogPath: metautil.BuildInsertLogPath("a", 0, 0, 0, 1, 801), + LogID: 801, }, { EntriesNum: 20, LogPath: metautil.BuildInsertLogPath("a", 0, 0, 0, 1, 802), + LogID: 802, }, { EntriesNum: 20, LogPath: metautil.BuildInsertLogPath("a", 0, 0, 0, 1, 803), + LogID: 803, }, }, }, @@ -1821,14 +1824,17 @@ func TestGetRecoveryInfo(t *testing.T) { { EntriesNum: 20, LogPath: metautil.BuildInsertLogPath("a", 0, 0, 0, 1, 901), + LogID: 901, }, { EntriesNum: 20, LogPath: metautil.BuildInsertLogPath("a", 0, 0, 0, 1, 902), + LogID: 902, }, { EntriesNum: 20, LogPath: metautil.BuildInsertLogPath("a", 0, 0, 0, 1, 903), + LogID: 903, }, }, }, @@ -1841,10 +1847,12 @@ func TestGetRecoveryInfo(t *testing.T) { { EntriesNum: 30, LogPath: metautil.BuildInsertLogPath("a", 0, 0, 1, 1, 801), + LogID: 801, }, { EntriesNum: 70, LogPath: metautil.BuildInsertLogPath("a", 0, 0, 1, 1, 802), + LogID: 802, }, }, }, @@ -1918,14 +1926,17 @@ func TestGetRecoveryInfo(t *testing.T) { { EntriesNum: 20, LogPath: metautil.BuildInsertLogPath("a", 0, 0, 3, 1, 901), + LogID: 901, }, { EntriesNum: 20, LogPath: metautil.BuildInsertLogPath("a", 0, 0, 3, 1, 902), + LogID: 902, }, { EntriesNum: 20, LogPath: metautil.BuildInsertLogPath("a", 0, 0, 3, 1, 903), + LogID: 903, }, }, }, @@ -1938,10 +1949,12 @@ func TestGetRecoveryInfo(t *testing.T) { { EntriesNum: 30, LogPath: metautil.BuildInsertLogPath("a", 0, 0, 4, 1, 801), + LogID: 801, }, { EntriesNum: 70, LogPath: metautil.BuildInsertLogPath("a", 0, 0, 4, 1, 802), + LogID: 802, }, }, }, diff --git a/internal/datacoord/services_test.go b/internal/datacoord/services_test.go index ddb813acfa922..5454c08e93814 100644 --- a/internal/datacoord/services_test.go +++ b/internal/datacoord/services_test.go @@ -1139,14 +1139,17 @@ func TestGetRecoveryInfoV2(t *testing.T) { { EntriesNum: 20, LogPath: metautil.BuildInsertLogPath("a", 0, 0, 0, 1, 901), + LogID: 901, }, { EntriesNum: 20, LogPath: metautil.BuildInsertLogPath("a", 0, 0, 0, 1, 902), + LogID: 902, }, { EntriesNum: 20, LogPath: metautil.BuildInsertLogPath("a", 0, 0, 0, 1, 903), + LogID: 903, }, }, }, @@ -1159,10 +1162,12 @@ func TestGetRecoveryInfoV2(t *testing.T) { { EntriesNum: 30, LogPath: metautil.BuildInsertLogPath("a", 0, 0, 1, 1, 801), + LogID: 801, }, { EntriesNum: 70, LogPath: metautil.BuildInsertLogPath("a", 0, 0, 1, 1, 802), + LogID: 802, }, }, }, @@ -1239,14 +1244,17 @@ func TestGetRecoveryInfoV2(t *testing.T) { { EntriesNum: 20, LogPath: metautil.BuildInsertLogPath("a", 0, 0, 3, 1, 901), + LogID: 901, }, { EntriesNum: 20, LogPath: metautil.BuildInsertLogPath("a", 0, 0, 3, 1, 902), + LogID: 902, }, { EntriesNum: 20, LogPath: metautil.BuildInsertLogPath("a", 0, 0, 3, 1, 903), + LogID: 903, }, }, }, @@ -1259,10 +1267,12 @@ func TestGetRecoveryInfoV2(t *testing.T) { { EntriesNum: 30, LogPath: metautil.BuildInsertLogPath("a", 0, 0, 4, 1, 801), + LogID: 801, }, { EntriesNum: 70, LogPath: metautil.BuildInsertLogPath("a", 0, 0, 4, 1, 802), + LogID: 802, }, }, }, @@ -1309,9 +1319,11 @@ func TestGetRecoveryInfoV2(t *testing.T) { Binlogs: []*datapb.Binlog{ { LogPath: metautil.BuildInsertLogPath("a", 0, 100, 0, 1, 801), + LogID: 801, }, { LogPath: metautil.BuildInsertLogPath("a", 0, 100, 0, 1, 801), + LogID: 801, }, }, }, @@ -1322,9 +1334,11 @@ func TestGetRecoveryInfoV2(t *testing.T) { Binlogs: []*datapb.Binlog{ { LogPath: metautil.BuildStatsLogPath("a", 0, 100, 0, 1000, 10000), + LogID: 10000, }, { LogPath: metautil.BuildStatsLogPath("a", 0, 100, 0, 1000, 10000), + LogID: 10000, }, }, }, @@ -1337,6 +1351,7 @@ func TestGetRecoveryInfoV2(t *testing.T) { TimestampTo: 1, LogPath: metautil.BuildDeltaLogPath("a", 0, 100, 0, 100000), LogSize: 1, + LogID: 100000, }, }, }, From 3e2288ae4c714f38c5ed4ad6ee2b9ec43cac9f42 Mon Sep 17 00:00:00 2001 From: bigsheeper Date: Fri, 31 May 2024 11:15:10 +0800 Subject: [PATCH 3/3] add more check Signed-off-by: bigsheeper --- tests/integration/import/import_test.go | 16 ++++++++ tests/integration/meta_watcher.go | 52 ++++++++++++++++++++++++- 2 files changed, 66 insertions(+), 2 deletions(-) diff --git a/tests/integration/import/import_test.go b/tests/integration/import/import_test.go index ae592ebc39379..bb60ddc4198cf 100644 --- a/tests/integration/import/import_test.go +++ b/tests/integration/import/import_test.go @@ -31,6 +31,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/util/importutilv2" "github.com/milvus-io/milvus/pkg/common" @@ -158,6 +159,21 @@ func (s *BulkInsertSuite) run() { segments, err := c.MetaWatcher.ShowSegments() s.NoError(err) s.NotEmpty(segments) + checkLogID := func(fieldBinlogs []*datapb.FieldBinlog) { + for _, fieldBinlog := range fieldBinlogs { + for _, l := range fieldBinlog.GetBinlogs() { + s.NotEqual(int64(0), l.GetLogID()) + } + } + } + for _, segment := range segments { + s.True(len(segment.GetBinlogs()) > 0) + checkLogID(segment.GetBinlogs()) + s.True(len(segment.GetDeltalogs()) == 0) + checkLogID(segment.GetDeltalogs()) + s.True(len(segment.GetStatslogs()) > 0) + checkLogID(segment.GetStatslogs()) + } // create index createIndexStatus, err := c.Proxy.CreateIndex(ctx, &milvuspb.CreateIndexRequest{ diff --git a/tests/integration/meta_watcher.go b/tests/integration/meta_watcher.go index 01a25035ef36e..8434f16e6333c 100644 --- a/tests/integration/meta_watcher.go +++ b/tests/integration/meta_watcher.go @@ -54,7 +54,7 @@ func (watcher *EtcdMetaWatcher) ShowSessions() ([]*sessionutil.SessionRaw, error func (watcher *EtcdMetaWatcher) ShowSegments() ([]*datapb.SegmentInfo, error) { metaBasePath := path.Join(watcher.rootPath, "/meta/datacoord-meta/s/") + "/" - return listSegments(watcher.etcdCli, metaBasePath, func(s *datapb.SegmentInfo) bool { + return listSegments(watcher.etcdCli, watcher.rootPath, metaBasePath, func(s *datapb.SegmentInfo) bool { return true }) } @@ -88,7 +88,7 @@ func listSessionsByPrefix(cli *clientv3.Client, prefix string) ([]*sessionutil.S return sessions, nil } -func listSegments(cli *clientv3.Client, prefix string, filter func(*datapb.SegmentInfo) bool) ([]*datapb.SegmentInfo, error) { +func listSegments(cli *clientv3.Client, rootPath string, prefix string, filter func(*datapb.SegmentInfo) bool) ([]*datapb.SegmentInfo, error) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*3) defer cancel() resp, err := cli.Get(ctx, prefix, clientv3.WithPrefix()) @@ -110,9 +110,57 @@ func listSegments(cli *clientv3.Client, prefix string, filter func(*datapb.Segme sort.Slice(segments, func(i, j int) bool { return segments[i].GetID() < segments[j].GetID() }) + + for _, segment := range segments { + segment.Binlogs, segment.Deltalogs, segment.Statslogs, err = getSegmentBinlogs(cli, rootPath, segment) + if err != nil { + return nil, err + } + } + return segments, nil } +func getSegmentBinlogs(cli *clientv3.Client, rootPath string, segment *datapb.SegmentInfo) ([]*datapb.FieldBinlog, []*datapb.FieldBinlog, []*datapb.FieldBinlog, error) { + fn := func(prefix string) ([]*datapb.FieldBinlog, error) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*3) + defer cancel() + resp, err := cli.Get(ctx, prefix, clientv3.WithPrefix()) + if err != nil { + return nil, err + } + fieldBinlogs := make([]*datapb.FieldBinlog, 0, len(resp.Kvs)) + for _, kv := range resp.Kvs { + info := &datapb.FieldBinlog{} + err = proto.Unmarshal(kv.Value, info) + if err != nil { + return nil, err + } + fieldBinlogs = append(fieldBinlogs, info) + } + return fieldBinlogs, nil + } + prefix := path.Join(rootPath, "/meta/datacoord-meta", fmt.Sprintf("binlog/%d/%d/%d", segment.CollectionID, segment.PartitionID, segment.ID)) + binlogs, err := fn(prefix) + if err != nil { + return nil, nil, nil, err + } + + prefix = path.Join(rootPath, "/meta/datacoord-meta", fmt.Sprintf("deltalog/%d/%d/%d", segment.CollectionID, segment.PartitionID, segment.ID)) + deltalogs, err := fn(prefix) + if err != nil { + return nil, nil, nil, err + } + + prefix = path.Join(rootPath, "/meta/datacoord-meta", fmt.Sprintf("statslog/%d/%d/%d", segment.CollectionID, segment.PartitionID, segment.ID)) + statslogs, err := fn(prefix) + if err != nil { + return nil, nil, nil, err + } + + return binlogs, deltalogs, statslogs, nil +} + func listReplicas(cli *clientv3.Client, prefix string) ([]*querypb.Replica, error) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*3) defer cancel()