Skip to content

Commit

Permalink
fix: Fill stats log id and check validity (#33477) (#33479)
Browse files Browse the repository at this point in the history
1. Fill log ID of stats log from import
2. Add a check to validate the log ID before writing to meta

issue: #33476

pr: #33477

---------

Signed-off-by: bigsheeper <[email protected]>
  • Loading branch information
bigsheeper authored May 30, 2024
1 parent 7384bfe commit 43f58eb
Show file tree
Hide file tree
Showing 9 changed files with 116 additions and 25 deletions.
13 changes: 13 additions & 0 deletions internal/datacoord/compaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/stretchr/testify/suite"

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus/internal/metastore/kv/binlog"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/metautil"
Expand Down Expand Up @@ -767,6 +768,10 @@ func getFieldBinlogIDs(fieldID int64, logIDs ...int64) *datapb.FieldBinlog {
for _, id := range logIDs {
l.Binlogs = append(l.Binlogs, &datapb.Binlog{LogID: id})
}
err := binlog.CompressFieldBinlogs([]*datapb.FieldBinlog{l})
if err != nil {
panic(err)
}
return l
}

Expand All @@ -778,6 +783,10 @@ func getFieldBinlogPaths(fieldID int64, paths ...string) *datapb.FieldBinlog {
for _, path := range paths {
l.Binlogs = append(l.Binlogs, &datapb.Binlog{LogPath: path})
}
err := binlog.CompressFieldBinlogs([]*datapb.FieldBinlog{l})
if err != nil {
panic(err)
}
return l
}

Expand All @@ -789,6 +798,10 @@ func getFieldBinlogIDsWithEntry(fieldID int64, entry int64, logIDs ...int64) *da
for _, id := range logIDs {
l.Binlogs = append(l.Binlogs, &datapb.Binlog{LogID: id, EntriesNum: entry})
}
err := binlog.CompressFieldBinlogs([]*datapb.FieldBinlog{l})
if err != nil {
panic(err)
}
return l
}

Expand Down
2 changes: 1 addition & 1 deletion internal/datacoord/import_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ func (s *importScheduler) processInProgressImport(task ImportTask) {
if resp.GetState() == datapb.ImportTaskStateV2_Completed {
for _, info := range resp.GetImportSegmentsInfo() {
// try to parse path and fill logID
err = binlog.CompressFieldBinlogs(info.GetBinlogs())
err = binlog.CompressBinLogs(info.GetBinlogs(), info.GetStatslogs())
if err != nil {
log.Warn("fail to CompressFieldBinlogs for import binlogs",
WrapTaskLog(task, zap.Int64("segmentID", info.GetSegmentID()), zap.Error(err))...)
Expand Down
16 changes: 8 additions & 8 deletions internal/datacoord/meta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -678,8 +678,8 @@ func TestUpdateSegmentsInfo(t *testing.T) {

segment1 := &SegmentInfo{SegmentInfo: &datapb.SegmentInfo{
ID: 1, State: commonpb.SegmentState_Growing,
Binlogs: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 0)},
Statslogs: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 0)},
Binlogs: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 2)},
Statslogs: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 2)},
}}
err = meta.AddSegment(context.TODO(), segment1)
assert.NoError(t, err)
Expand All @@ -689,7 +689,7 @@ func TestUpdateSegmentsInfo(t *testing.T) {
AddBinlogsOperator(1,
[]*datapb.FieldBinlog{getFieldBinlogIDsWithEntry(1, 10, 1)},
[]*datapb.FieldBinlog{getFieldBinlogIDs(1, 1)},
[]*datapb.FieldBinlog{{Binlogs: []*datapb.Binlog{{EntriesNum: 1, TimestampFrom: 100, TimestampTo: 200, LogSize: 1000, LogPath: getDeltaLogPath("deltalog1", 1)}}}},
[]*datapb.FieldBinlog{{Binlogs: []*datapb.Binlog{{EntriesNum: 1, TimestampFrom: 100, TimestampTo: 200, LogSize: 1000, LogPath: getDeltaLogPath("deltalog1", 1), LogID: 2}}}},
),
UpdateStartPosition([]*datapb.SegmentStartPosition{{SegmentID: 1, StartPosition: &msgpb.MsgPosition{MsgID: []byte{1, 2, 3}}}}),
UpdateCheckPointOperator(1, []*datapb.CheckPoint{{SegmentID: 1, NumOfRows: 10}}),
Expand Down Expand Up @@ -729,8 +729,8 @@ func TestUpdateSegmentsInfo(t *testing.T) {
// normal
segment1 := &SegmentInfo{SegmentInfo: &datapb.SegmentInfo{
ID: 1, State: commonpb.SegmentState_Flushed,
Binlogs: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 0)},
Statslogs: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 0)},
Binlogs: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 2)},
Statslogs: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 2)},
}}
err = meta.AddSegment(context.TODO(), segment1)
assert.NoError(t, err)
Expand Down Expand Up @@ -828,9 +828,9 @@ func TestUpdateSegmentsInfo(t *testing.T) {
err = meta.UpdateSegmentsInfo(
UpdateStatusOperator(1, commonpb.SegmentState_Flushing),
AddBinlogsOperator(1,
[]*datapb.FieldBinlog{getFieldBinlogIDs(1, 0)},
[]*datapb.FieldBinlog{getFieldBinlogIDs(1, 0)},
[]*datapb.FieldBinlog{{Binlogs: []*datapb.Binlog{{EntriesNum: 1, TimestampFrom: 100, TimestampTo: 200, LogSize: 1000, LogPath: getDeltaLogPath("deltalog", 1)}}}},
[]*datapb.FieldBinlog{getFieldBinlogIDs(1, 2)},
[]*datapb.FieldBinlog{getFieldBinlogIDs(1, 2)},
[]*datapb.FieldBinlog{{Binlogs: []*datapb.Binlog{{EntriesNum: 1, TimestampFrom: 100, TimestampTo: 200, LogSize: 1000, LogPath: getDeltaLogPath("deltalog", 1), LogID: 2}}}},
),
UpdateStartPosition([]*datapb.SegmentStartPosition{{SegmentID: 1, StartPosition: &msgpb.MsgPosition{MsgID: []byte{1, 2, 3}}}}),
UpdateCheckPointOperator(1, []*datapb.CheckPoint{{SegmentID: 1, NumOfRows: 10}}),
Expand Down
13 changes: 13 additions & 0 deletions internal/datacoord/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,14 +320,17 @@ func TestGetSegmentInfo(t *testing.T) {
{
EntriesNum: 20,
LogPath: metautil.BuildInsertLogPath("a", 0, 0, 0, 1, 801),
LogID: 801,
},
{
EntriesNum: 20,
LogPath: metautil.BuildInsertLogPath("a", 0, 0, 0, 1, 802),
LogID: 802,
},
{
EntriesNum: 20,
LogPath: metautil.BuildInsertLogPath("a", 0, 0, 0, 1, 803),
LogID: 803,
},
},
},
Expand Down Expand Up @@ -1821,14 +1824,17 @@ func TestGetRecoveryInfo(t *testing.T) {
{
EntriesNum: 20,
LogPath: metautil.BuildInsertLogPath("a", 0, 0, 0, 1, 901),
LogID: 901,
},
{
EntriesNum: 20,
LogPath: metautil.BuildInsertLogPath("a", 0, 0, 0, 1, 902),
LogID: 902,
},
{
EntriesNum: 20,
LogPath: metautil.BuildInsertLogPath("a", 0, 0, 0, 1, 903),
LogID: 903,
},
},
},
Expand All @@ -1841,10 +1847,12 @@ func TestGetRecoveryInfo(t *testing.T) {
{
EntriesNum: 30,
LogPath: metautil.BuildInsertLogPath("a", 0, 0, 1, 1, 801),
LogID: 801,
},
{
EntriesNum: 70,
LogPath: metautil.BuildInsertLogPath("a", 0, 0, 1, 1, 802),
LogID: 802,
},
},
},
Expand Down Expand Up @@ -1918,14 +1926,17 @@ func TestGetRecoveryInfo(t *testing.T) {
{
EntriesNum: 20,
LogPath: metautil.BuildInsertLogPath("a", 0, 0, 3, 1, 901),
LogID: 901,
},
{
EntriesNum: 20,
LogPath: metautil.BuildInsertLogPath("a", 0, 0, 3, 1, 902),
LogID: 902,
},
{
EntriesNum: 20,
LogPath: metautil.BuildInsertLogPath("a", 0, 0, 3, 1, 903),
LogID: 903,
},
},
},
Expand All @@ -1938,10 +1949,12 @@ func TestGetRecoveryInfo(t *testing.T) {
{
EntriesNum: 30,
LogPath: metautil.BuildInsertLogPath("a", 0, 0, 4, 1, 801),
LogID: 801,
},
{
EntriesNum: 70,
LogPath: metautil.BuildInsertLogPath("a", 0, 0, 4, 1, 802),
LogID: 802,
},
},
},
Expand Down
15 changes: 15 additions & 0 deletions internal/datacoord/services_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1139,14 +1139,17 @@ func TestGetRecoveryInfoV2(t *testing.T) {
{
EntriesNum: 20,
LogPath: metautil.BuildInsertLogPath("a", 0, 0, 0, 1, 901),
LogID: 901,
},
{
EntriesNum: 20,
LogPath: metautil.BuildInsertLogPath("a", 0, 0, 0, 1, 902),
LogID: 902,
},
{
EntriesNum: 20,
LogPath: metautil.BuildInsertLogPath("a", 0, 0, 0, 1, 903),
LogID: 903,
},
},
},
Expand All @@ -1159,10 +1162,12 @@ func TestGetRecoveryInfoV2(t *testing.T) {
{
EntriesNum: 30,
LogPath: metautil.BuildInsertLogPath("a", 0, 0, 1, 1, 801),
LogID: 801,
},
{
EntriesNum: 70,
LogPath: metautil.BuildInsertLogPath("a", 0, 0, 1, 1, 802),
LogID: 802,
},
},
},
Expand Down Expand Up @@ -1239,14 +1244,17 @@ func TestGetRecoveryInfoV2(t *testing.T) {
{
EntriesNum: 20,
LogPath: metautil.BuildInsertLogPath("a", 0, 0, 3, 1, 901),
LogID: 901,
},
{
EntriesNum: 20,
LogPath: metautil.BuildInsertLogPath("a", 0, 0, 3, 1, 902),
LogID: 902,
},
{
EntriesNum: 20,
LogPath: metautil.BuildInsertLogPath("a", 0, 0, 3, 1, 903),
LogID: 903,
},
},
},
Expand All @@ -1259,10 +1267,12 @@ func TestGetRecoveryInfoV2(t *testing.T) {
{
EntriesNum: 30,
LogPath: metautil.BuildInsertLogPath("a", 0, 0, 4, 1, 801),
LogID: 801,
},
{
EntriesNum: 70,
LogPath: metautil.BuildInsertLogPath("a", 0, 0, 4, 1, 802),
LogID: 802,
},
},
},
Expand Down Expand Up @@ -1309,9 +1319,11 @@ func TestGetRecoveryInfoV2(t *testing.T) {
Binlogs: []*datapb.Binlog{
{
LogPath: metautil.BuildInsertLogPath("a", 0, 100, 0, 1, 801),
LogID: 801,
},
{
LogPath: metautil.BuildInsertLogPath("a", 0, 100, 0, 1, 801),
LogID: 801,
},
},
},
Expand All @@ -1322,9 +1334,11 @@ func TestGetRecoveryInfoV2(t *testing.T) {
Binlogs: []*datapb.Binlog{
{
LogPath: metautil.BuildStatsLogPath("a", 0, 100, 0, 1000, 10000),
LogID: 10000,
},
{
LogPath: metautil.BuildStatsLogPath("a", 0, 100, 0, 1000, 10000),
LogID: 10000,
},
},
},
Expand All @@ -1337,6 +1351,7 @@ func TestGetRecoveryInfoV2(t *testing.T) {
TimestampTo: 1,
LogPath: metautil.BuildDeltaLogPath("a", 0, 100, 0, 100000),
LogSize: 1,
LogID: 100000,
},
},
},
Expand Down
18 changes: 6 additions & 12 deletions internal/metastore/kv/binlog/binlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,18 +63,12 @@ func CompressCompactionBinlogs(binlogs []*datapb.CompactionSegment) error {
return nil
}

func CompressBinLogs(s *datapb.SegmentInfo) error {
err := CompressFieldBinlogs(s.GetBinlogs())
if err != nil {
return err
}
err = CompressFieldBinlogs(s.GetDeltalogs())
if err != nil {
return err
}
err = CompressFieldBinlogs(s.GetStatslogs())
if err != nil {
return err
func CompressBinLogs(binlogs ...[]*datapb.FieldBinlog) error {
for _, l := range binlogs {
err := CompressFieldBinlogs(l)
if err != nil {
return err
}
}
return nil
}
Expand Down
8 changes: 4 additions & 4 deletions internal/metastore/kv/binlog/binlog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ func TestBinlog_Compress(t *testing.T) {
assert.NoError(t, err)

compressedSegmentInfo := proto.Clone(segmentInfo).(*datapb.SegmentInfo)
err = CompressBinLogs(compressedSegmentInfo)
err = CompressBinLogs(compressedSegmentInfo.GetBinlogs(), compressedSegmentInfo.GetDeltalogs(), compressedSegmentInfo.GetStatslogs())
assert.NoError(t, err)

valCompressed, err := proto.Marshal(compressedSegmentInfo)
Expand Down Expand Up @@ -233,7 +233,7 @@ func TestBinlog_Compress(t *testing.T) {
segmentInfo1 := &datapb.SegmentInfo{
Binlogs: fieldBinLogs,
}
err = CompressBinLogs(segmentInfo1)
err = CompressBinLogs(segmentInfo1.GetBinlogs(), segmentInfo1.GetDeltalogs(), segmentInfo1.GetStatslogs())
assert.ErrorIs(t, err, merr.ErrParameterInvalid)

fakeDeltalogs := make([]*datapb.Binlog, 1)
Expand All @@ -249,7 +249,7 @@ func TestBinlog_Compress(t *testing.T) {
segmentInfo2 := &datapb.SegmentInfo{
Deltalogs: fieldDeltaLogs,
}
err = CompressBinLogs(segmentInfo2)
err = CompressBinLogs(segmentInfo2.GetBinlogs(), segmentInfo2.GetDeltalogs(), segmentInfo2.GetStatslogs())
assert.ErrorIs(t, err, merr.ErrParameterInvalid)

fakeStatslogs := make([]*datapb.Binlog, 1)
Expand All @@ -265,7 +265,7 @@ func TestBinlog_Compress(t *testing.T) {
segmentInfo3 := &datapb.SegmentInfo{
Statslogs: fieldDeltaLogs,
}
err = CompressBinLogs(segmentInfo3)
err = CompressBinLogs(segmentInfo3.GetBinlogs(), segmentInfo3.GetDeltalogs(), segmentInfo3.GetStatslogs())
assert.ErrorIs(t, err, merr.ErrParameterInvalid)

// test decompress error invalid Type
Expand Down
38 changes: 38 additions & 0 deletions internal/metastore/kv/datacoord/kv_catalog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,44 @@ func Test_AlterSegments(t *testing.T) {
assert.Equal(t, int64(100), segmentXL.GetNumOfRows())
assert.Equal(t, int64(5), adjustedSeg.GetNumOfRows())
})

t.Run("invalid log id", func(t *testing.T) {
metakv := mocks.NewMetaKv(t)
catalog := NewCatalog(metakv, rootPath, "")

segment := &datapb.SegmentInfo{
ID: segmentID,
CollectionID: collectionID,
PartitionID: partitionID,
NumOfRows: 100,
State: commonpb.SegmentState_Flushed,
}

invalidLogWithZeroLogID := []*datapb.FieldBinlog{{
FieldID: 1,
Binlogs: []*datapb.Binlog{
{
LogID: 0,
LogPath: "mock_log_path",
},
},
}}

segment.Statslogs = invalidLogWithZeroLogID
err := catalog.AlterSegments(context.TODO(), []*datapb.SegmentInfo{segment}, metastore.BinlogsIncrement{Segment: segment})
assert.Error(t, err)
t.Logf("%v", err)

segment.Deltalogs = invalidLogWithZeroLogID
err = catalog.AlterSegments(context.TODO(), []*datapb.SegmentInfo{segment}, metastore.BinlogsIncrement{Segment: segment})
assert.Error(t, err)
t.Logf("%v", err)

segment.Binlogs = invalidLogWithZeroLogID
err = catalog.AlterSegments(context.TODO(), []*datapb.SegmentInfo{segment}, metastore.BinlogsIncrement{Segment: segment})
assert.Error(t, err)
t.Logf("%v", err)
})
}

func Test_DropSegment(t *testing.T) {
Expand Down
Loading

0 comments on commit 43f58eb

Please sign in to comment.