diff --git a/internal/datacoord/compaction_test.go b/internal/datacoord/compaction_test.go index e21f06f29f626..00e69e337651d 100644 --- a/internal/datacoord/compaction_test.go +++ b/internal/datacoord/compaction_test.go @@ -26,6 +26,7 @@ import ( "github.com/stretchr/testify/suite" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + "github.com/milvus-io/milvus/internal/metastore/kv/binlog" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/metautil" @@ -767,6 +768,10 @@ func getFieldBinlogIDs(fieldID int64, logIDs ...int64) *datapb.FieldBinlog { for _, id := range logIDs { l.Binlogs = append(l.Binlogs, &datapb.Binlog{LogID: id}) } + err := binlog.CompressFieldBinlogs([]*datapb.FieldBinlog{l}) + if err != nil { + panic(err) + } return l } @@ -778,6 +783,10 @@ func getFieldBinlogPaths(fieldID int64, paths ...string) *datapb.FieldBinlog { for _, path := range paths { l.Binlogs = append(l.Binlogs, &datapb.Binlog{LogPath: path}) } + err := binlog.CompressFieldBinlogs([]*datapb.FieldBinlog{l}) + if err != nil { + panic(err) + } return l } @@ -789,6 +798,10 @@ func getFieldBinlogIDsWithEntry(fieldID int64, entry int64, logIDs ...int64) *da for _, id := range logIDs { l.Binlogs = append(l.Binlogs, &datapb.Binlog{LogID: id, EntriesNum: entry}) } + err := binlog.CompressFieldBinlogs([]*datapb.FieldBinlog{l}) + if err != nil { + panic(err) + } return l } diff --git a/internal/datacoord/import_scheduler.go b/internal/datacoord/import_scheduler.go index bfe48e041d6de..7833ca49e8df2 100644 --- a/internal/datacoord/import_scheduler.go +++ b/internal/datacoord/import_scheduler.go @@ -303,7 +303,7 @@ func (s *importScheduler) processInProgressImport(task ImportTask) { if resp.GetState() == datapb.ImportTaskStateV2_Completed { for _, info := range resp.GetImportSegmentsInfo() { // try to parse path and fill logID - err = binlog.CompressFieldBinlogs(info.GetBinlogs()) + err = binlog.CompressBinLogs(info.GetBinlogs(), info.GetStatslogs()) if err != nil { log.Warn("fail to CompressFieldBinlogs for import binlogs", WrapTaskLog(task, zap.Int64("segmentID", info.GetSegmentID()), zap.Error(err))...) diff --git a/internal/datacoord/meta_test.go b/internal/datacoord/meta_test.go index dd0471eebcddf..eb26acfb7272c 100644 --- a/internal/datacoord/meta_test.go +++ b/internal/datacoord/meta_test.go @@ -678,8 +678,8 @@ func TestUpdateSegmentsInfo(t *testing.T) { segment1 := &SegmentInfo{SegmentInfo: &datapb.SegmentInfo{ ID: 1, State: commonpb.SegmentState_Growing, - Binlogs: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 0)}, - Statslogs: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 0)}, + Binlogs: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 2)}, + Statslogs: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 2)}, }} err = meta.AddSegment(context.TODO(), segment1) assert.NoError(t, err) @@ -689,7 +689,7 @@ func TestUpdateSegmentsInfo(t *testing.T) { AddBinlogsOperator(1, []*datapb.FieldBinlog{getFieldBinlogIDsWithEntry(1, 10, 1)}, []*datapb.FieldBinlog{getFieldBinlogIDs(1, 1)}, - []*datapb.FieldBinlog{{Binlogs: []*datapb.Binlog{{EntriesNum: 1, TimestampFrom: 100, TimestampTo: 200, LogSize: 1000, LogPath: getDeltaLogPath("deltalog1", 1)}}}}, + []*datapb.FieldBinlog{{Binlogs: []*datapb.Binlog{{EntriesNum: 1, TimestampFrom: 100, TimestampTo: 200, LogSize: 1000, LogPath: getDeltaLogPath("deltalog1", 1), LogID: 2}}}}, ), UpdateStartPosition([]*datapb.SegmentStartPosition{{SegmentID: 1, StartPosition: &msgpb.MsgPosition{MsgID: []byte{1, 2, 3}}}}), UpdateCheckPointOperator(1, []*datapb.CheckPoint{{SegmentID: 1, NumOfRows: 10}}), @@ -729,8 +729,8 @@ func TestUpdateSegmentsInfo(t *testing.T) { // normal segment1 := &SegmentInfo{SegmentInfo: &datapb.SegmentInfo{ ID: 1, State: commonpb.SegmentState_Flushed, - Binlogs: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 0)}, - Statslogs: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 0)}, + Binlogs: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 2)}, + Statslogs: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 2)}, }} err = meta.AddSegment(context.TODO(), segment1) assert.NoError(t, err) @@ -828,9 +828,9 @@ func TestUpdateSegmentsInfo(t *testing.T) { err = meta.UpdateSegmentsInfo( UpdateStatusOperator(1, commonpb.SegmentState_Flushing), AddBinlogsOperator(1, - []*datapb.FieldBinlog{getFieldBinlogIDs(1, 0)}, - []*datapb.FieldBinlog{getFieldBinlogIDs(1, 0)}, - []*datapb.FieldBinlog{{Binlogs: []*datapb.Binlog{{EntriesNum: 1, TimestampFrom: 100, TimestampTo: 200, LogSize: 1000, LogPath: getDeltaLogPath("deltalog", 1)}}}}, + []*datapb.FieldBinlog{getFieldBinlogIDs(1, 2)}, + []*datapb.FieldBinlog{getFieldBinlogIDs(1, 2)}, + []*datapb.FieldBinlog{{Binlogs: []*datapb.Binlog{{EntriesNum: 1, TimestampFrom: 100, TimestampTo: 200, LogSize: 1000, LogPath: getDeltaLogPath("deltalog", 1), LogID: 2}}}}, ), UpdateStartPosition([]*datapb.SegmentStartPosition{{SegmentID: 1, StartPosition: &msgpb.MsgPosition{MsgID: []byte{1, 2, 3}}}}), UpdateCheckPointOperator(1, []*datapb.CheckPoint{{SegmentID: 1, NumOfRows: 10}}), diff --git a/internal/datacoord/server_test.go b/internal/datacoord/server_test.go index 5ef708b072b9b..3fc1e5a038b6e 100644 --- a/internal/datacoord/server_test.go +++ b/internal/datacoord/server_test.go @@ -320,14 +320,17 @@ func TestGetSegmentInfo(t *testing.T) { { EntriesNum: 20, LogPath: metautil.BuildInsertLogPath("a", 0, 0, 0, 1, 801), + LogID: 801, }, { EntriesNum: 20, LogPath: metautil.BuildInsertLogPath("a", 0, 0, 0, 1, 802), + LogID: 802, }, { EntriesNum: 20, LogPath: metautil.BuildInsertLogPath("a", 0, 0, 0, 1, 803), + LogID: 803, }, }, }, @@ -1821,14 +1824,17 @@ func TestGetRecoveryInfo(t *testing.T) { { EntriesNum: 20, LogPath: metautil.BuildInsertLogPath("a", 0, 0, 0, 1, 901), + LogID: 901, }, { EntriesNum: 20, LogPath: metautil.BuildInsertLogPath("a", 0, 0, 0, 1, 902), + LogID: 902, }, { EntriesNum: 20, LogPath: metautil.BuildInsertLogPath("a", 0, 0, 0, 1, 903), + LogID: 903, }, }, }, @@ -1841,10 +1847,12 @@ func TestGetRecoveryInfo(t *testing.T) { { EntriesNum: 30, LogPath: metautil.BuildInsertLogPath("a", 0, 0, 1, 1, 801), + LogID: 801, }, { EntriesNum: 70, LogPath: metautil.BuildInsertLogPath("a", 0, 0, 1, 1, 802), + LogID: 802, }, }, }, @@ -1918,14 +1926,17 @@ func TestGetRecoveryInfo(t *testing.T) { { EntriesNum: 20, LogPath: metautil.BuildInsertLogPath("a", 0, 0, 3, 1, 901), + LogID: 901, }, { EntriesNum: 20, LogPath: metautil.BuildInsertLogPath("a", 0, 0, 3, 1, 902), + LogID: 902, }, { EntriesNum: 20, LogPath: metautil.BuildInsertLogPath("a", 0, 0, 3, 1, 903), + LogID: 903, }, }, }, @@ -1938,10 +1949,12 @@ func TestGetRecoveryInfo(t *testing.T) { { EntriesNum: 30, LogPath: metautil.BuildInsertLogPath("a", 0, 0, 4, 1, 801), + LogID: 801, }, { EntriesNum: 70, LogPath: metautil.BuildInsertLogPath("a", 0, 0, 4, 1, 802), + LogID: 802, }, }, }, diff --git a/internal/datacoord/services_test.go b/internal/datacoord/services_test.go index ddb813acfa922..5454c08e93814 100644 --- a/internal/datacoord/services_test.go +++ b/internal/datacoord/services_test.go @@ -1139,14 +1139,17 @@ func TestGetRecoveryInfoV2(t *testing.T) { { EntriesNum: 20, LogPath: metautil.BuildInsertLogPath("a", 0, 0, 0, 1, 901), + LogID: 901, }, { EntriesNum: 20, LogPath: metautil.BuildInsertLogPath("a", 0, 0, 0, 1, 902), + LogID: 902, }, { EntriesNum: 20, LogPath: metautil.BuildInsertLogPath("a", 0, 0, 0, 1, 903), + LogID: 903, }, }, }, @@ -1159,10 +1162,12 @@ func TestGetRecoveryInfoV2(t *testing.T) { { EntriesNum: 30, LogPath: metautil.BuildInsertLogPath("a", 0, 0, 1, 1, 801), + LogID: 801, }, { EntriesNum: 70, LogPath: metautil.BuildInsertLogPath("a", 0, 0, 1, 1, 802), + LogID: 802, }, }, }, @@ -1239,14 +1244,17 @@ func TestGetRecoveryInfoV2(t *testing.T) { { EntriesNum: 20, LogPath: metautil.BuildInsertLogPath("a", 0, 0, 3, 1, 901), + LogID: 901, }, { EntriesNum: 20, LogPath: metautil.BuildInsertLogPath("a", 0, 0, 3, 1, 902), + LogID: 902, }, { EntriesNum: 20, LogPath: metautil.BuildInsertLogPath("a", 0, 0, 3, 1, 903), + LogID: 903, }, }, }, @@ -1259,10 +1267,12 @@ func TestGetRecoveryInfoV2(t *testing.T) { { EntriesNum: 30, LogPath: metautil.BuildInsertLogPath("a", 0, 0, 4, 1, 801), + LogID: 801, }, { EntriesNum: 70, LogPath: metautil.BuildInsertLogPath("a", 0, 0, 4, 1, 802), + LogID: 802, }, }, }, @@ -1309,9 +1319,11 @@ func TestGetRecoveryInfoV2(t *testing.T) { Binlogs: []*datapb.Binlog{ { LogPath: metautil.BuildInsertLogPath("a", 0, 100, 0, 1, 801), + LogID: 801, }, { LogPath: metautil.BuildInsertLogPath("a", 0, 100, 0, 1, 801), + LogID: 801, }, }, }, @@ -1322,9 +1334,11 @@ func TestGetRecoveryInfoV2(t *testing.T) { Binlogs: []*datapb.Binlog{ { LogPath: metautil.BuildStatsLogPath("a", 0, 100, 0, 1000, 10000), + LogID: 10000, }, { LogPath: metautil.BuildStatsLogPath("a", 0, 100, 0, 1000, 10000), + LogID: 10000, }, }, }, @@ -1337,6 +1351,7 @@ func TestGetRecoveryInfoV2(t *testing.T) { TimestampTo: 1, LogPath: metautil.BuildDeltaLogPath("a", 0, 100, 0, 100000), LogSize: 1, + LogID: 100000, }, }, }, diff --git a/internal/metastore/kv/binlog/binlog.go b/internal/metastore/kv/binlog/binlog.go index 94e0c09cc73e6..8a2f5ef9e0a01 100644 --- a/internal/metastore/kv/binlog/binlog.go +++ b/internal/metastore/kv/binlog/binlog.go @@ -63,18 +63,12 @@ func CompressCompactionBinlogs(binlogs []*datapb.CompactionSegment) error { return nil } -func CompressBinLogs(s *datapb.SegmentInfo) error { - err := CompressFieldBinlogs(s.GetBinlogs()) - if err != nil { - return err - } - err = CompressFieldBinlogs(s.GetDeltalogs()) - if err != nil { - return err - } - err = CompressFieldBinlogs(s.GetStatslogs()) - if err != nil { - return err +func CompressBinLogs(binlogs ...[]*datapb.FieldBinlog) error { + for _, l := range binlogs { + err := CompressFieldBinlogs(l) + if err != nil { + return err + } } return nil } diff --git a/internal/metastore/kv/binlog/binlog_test.go b/internal/metastore/kv/binlog/binlog_test.go index ed4f6d5f13237..3c2c5114b5f27 100644 --- a/internal/metastore/kv/binlog/binlog_test.go +++ b/internal/metastore/kv/binlog/binlog_test.go @@ -198,7 +198,7 @@ func TestBinlog_Compress(t *testing.T) { assert.NoError(t, err) compressedSegmentInfo := proto.Clone(segmentInfo).(*datapb.SegmentInfo) - err = CompressBinLogs(compressedSegmentInfo) + err = CompressBinLogs(compressedSegmentInfo.GetBinlogs(), compressedSegmentInfo.GetDeltalogs(), compressedSegmentInfo.GetStatslogs()) assert.NoError(t, err) valCompressed, err := proto.Marshal(compressedSegmentInfo) @@ -233,7 +233,7 @@ func TestBinlog_Compress(t *testing.T) { segmentInfo1 := &datapb.SegmentInfo{ Binlogs: fieldBinLogs, } - err = CompressBinLogs(segmentInfo1) + err = CompressBinLogs(segmentInfo1.GetBinlogs(), segmentInfo1.GetDeltalogs(), segmentInfo1.GetStatslogs()) assert.ErrorIs(t, err, merr.ErrParameterInvalid) fakeDeltalogs := make([]*datapb.Binlog, 1) @@ -249,7 +249,7 @@ func TestBinlog_Compress(t *testing.T) { segmentInfo2 := &datapb.SegmentInfo{ Deltalogs: fieldDeltaLogs, } - err = CompressBinLogs(segmentInfo2) + err = CompressBinLogs(segmentInfo2.GetBinlogs(), segmentInfo2.GetDeltalogs(), segmentInfo2.GetStatslogs()) assert.ErrorIs(t, err, merr.ErrParameterInvalid) fakeStatslogs := make([]*datapb.Binlog, 1) @@ -265,7 +265,7 @@ func TestBinlog_Compress(t *testing.T) { segmentInfo3 := &datapb.SegmentInfo{ Statslogs: fieldDeltaLogs, } - err = CompressBinLogs(segmentInfo3) + err = CompressBinLogs(segmentInfo3.GetBinlogs(), segmentInfo3.GetDeltalogs(), segmentInfo3.GetStatslogs()) assert.ErrorIs(t, err, merr.ErrParameterInvalid) // test decompress error invalid Type diff --git a/internal/metastore/kv/datacoord/kv_catalog_test.go b/internal/metastore/kv/datacoord/kv_catalog_test.go index 4f4244a21f04a..8ff58679be1e2 100644 --- a/internal/metastore/kv/datacoord/kv_catalog_test.go +++ b/internal/metastore/kv/datacoord/kv_catalog_test.go @@ -418,6 +418,44 @@ func Test_AlterSegments(t *testing.T) { assert.Equal(t, int64(100), segmentXL.GetNumOfRows()) assert.Equal(t, int64(5), adjustedSeg.GetNumOfRows()) }) + + t.Run("invalid log id", func(t *testing.T) { + metakv := mocks.NewMetaKv(t) + catalog := NewCatalog(metakv, rootPath, "") + + segment := &datapb.SegmentInfo{ + ID: segmentID, + CollectionID: collectionID, + PartitionID: partitionID, + NumOfRows: 100, + State: commonpb.SegmentState_Flushed, + } + + invalidLogWithZeroLogID := []*datapb.FieldBinlog{{ + FieldID: 1, + Binlogs: []*datapb.Binlog{ + { + LogID: 0, + LogPath: "mock_log_path", + }, + }, + }} + + segment.Statslogs = invalidLogWithZeroLogID + err := catalog.AlterSegments(context.TODO(), []*datapb.SegmentInfo{segment}, metastore.BinlogsIncrement{Segment: segment}) + assert.Error(t, err) + t.Logf("%v", err) + + segment.Deltalogs = invalidLogWithZeroLogID + err = catalog.AlterSegments(context.TODO(), []*datapb.SegmentInfo{segment}, metastore.BinlogsIncrement{Segment: segment}) + assert.Error(t, err) + t.Logf("%v", err) + + segment.Binlogs = invalidLogWithZeroLogID + err = catalog.AlterSegments(context.TODO(), []*datapb.SegmentInfo{segment}, metastore.BinlogsIncrement{Segment: segment}) + assert.Error(t, err) + t.Logf("%v", err) + }) } func Test_DropSegment(t *testing.T) { diff --git a/internal/metastore/kv/datacoord/util.go b/internal/metastore/kv/datacoord/util.go index 09a979bafeff2..c94459e3c649f 100644 --- a/internal/metastore/kv/datacoord/util.go +++ b/internal/metastore/kv/datacoord/util.go @@ -164,8 +164,20 @@ func cloneLogs(binlogs []*datapb.FieldBinlog) []*datapb.FieldBinlog { func buildBinlogKvs(collectionID, partitionID, segmentID typeutil.UniqueID, binlogs, deltalogs, statslogs []*datapb.FieldBinlog) (map[string]string, error) { kv := make(map[string]string) + checkLogID := func(fieldBinlog *datapb.FieldBinlog) error { + for _, binlog := range fieldBinlog.GetBinlogs() { + if binlog.GetLogID() == 0 { + return fmt.Errorf("invalid log id, binlog:%v", binlog) + } + } + return nil + } + // binlog kv for _, binlog := range binlogs { + if err := checkLogID(binlog); err != nil { + return nil, err + } binlogBytes, err := proto.Marshal(binlog) if err != nil { return nil, fmt.Errorf("marshal binlogs failed, collectionID:%d, segmentID:%d, fieldID:%d, error:%w", collectionID, segmentID, binlog.FieldID, err) @@ -176,6 +188,9 @@ func buildBinlogKvs(collectionID, partitionID, segmentID typeutil.UniqueID, binl // deltalog for _, deltalog := range deltalogs { + if err := checkLogID(deltalog); err != nil { + return nil, err + } binlogBytes, err := proto.Marshal(deltalog) if err != nil { return nil, fmt.Errorf("marshal deltalogs failed, collectionID:%d, segmentID:%d, fieldID:%d, error:%w", collectionID, segmentID, deltalog.FieldID, err) @@ -186,6 +201,9 @@ func buildBinlogKvs(collectionID, partitionID, segmentID typeutil.UniqueID, binl // statslog for _, statslog := range statslogs { + if err := checkLogID(statslog); err != nil { + return nil, err + } binlogBytes, err := proto.Marshal(statslog) if err != nil { return nil, fmt.Errorf("marshal statslogs failed, collectionID:%d, segmentID:%d, fieldID:%d, error:%w", collectionID, segmentID, statslog.FieldID, err)