Skip to content

Commit

Permalink
enhance: generally improve the performance of mix compactions #37163 (#…
Browse files Browse the repository at this point in the history
…37268)

See #37234
pr: #37163

Signed-off-by: Ted Xu <[email protected]>
  • Loading branch information
tedxu authored Oct 31, 2024
1 parent 7f32dc2 commit e585f6d
Show file tree
Hide file tree
Showing 7 changed files with 136 additions and 93 deletions.
3 changes: 2 additions & 1 deletion internal/datanode/compaction/clustering_compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,8 +277,9 @@ func (s *ClusteringCompactionTaskSuite) TestCompactionWithBM25Function() {

// 8 + 8 + 8 + 7 + 8 = 39
// 39*1024 = 39936
// plus buffer on null bitsets etc., let's make it 45000
// writer will automatically flush after 1024 rows.
paramtable.Get().Save(paramtable.Get().DataNodeCfg.BinLogMaxSize.Key, "39935")
paramtable.Get().Save(paramtable.Get().DataNodeCfg.BinLogMaxSize.Key, "45000")
defer paramtable.Get().Reset(paramtable.Get().DataNodeCfg.BinLogMaxSize.Key)

compactionResult, err := s.task.Compact()
Expand Down
28 changes: 15 additions & 13 deletions internal/datanode/compaction/mix_compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,10 +154,12 @@ func (t *mixCompactionTask) mergeSplit(
return nil, err
}
for _, paths := range binlogPaths {
err := t.dealBinlogPaths(ctx, delta, mWriter, pkField, paths, &deletedRowCount, &expiredRowCount)
del, exp, err := t.writePaths(ctx, delta, mWriter, pkField, paths)
if err != nil {
return nil, err
}
deletedRowCount += del
expiredRowCount += exp
}
res, err := mWriter.Finish()
if err != nil {
Expand Down Expand Up @@ -186,12 +188,14 @@ func isValueDeleted(v *storage.Value, delta map[interface{}]typeutil.Timestamp)
return false
}

func (t *mixCompactionTask) dealBinlogPaths(ctx context.Context, delta map[interface{}]typeutil.Timestamp, mWriter *MultiSegmentWriter, pkField *schemapb.FieldSchema, paths []string, deletedRowCount, expiredRowCount *int64) error {
func (t *mixCompactionTask) writePaths(ctx context.Context, delta map[interface{}]typeutil.Timestamp,
mWriter *MultiSegmentWriter, pkField *schemapb.FieldSchema, paths []string,
) (deletedRowCount, expiredRowCount int64, err error) {
log := log.With(zap.Strings("paths", paths))
allValues, err := t.binlogIO.Download(ctx, paths)
if err != nil {
log.Warn("compact wrong, fail to download insertLogs", zap.Error(err))
return err
return
}

blobs := lo.Map(allValues, func(v []byte, i int) *storage.Blob {
Expand All @@ -201,42 +205,40 @@ func (t *mixCompactionTask) dealBinlogPaths(ctx context.Context, delta map[inter
iter, err := storage.NewBinlogDeserializeReader(blobs, pkField.GetFieldID())
if err != nil {
log.Warn("compact wrong, failed to new insert binlogs reader", zap.Error(err))
return err
return
}
defer iter.Close()

for {
err := iter.Next()
err = iter.Next()
if err != nil {
if err == sio.EOF {
err = nil
break
} else {
log.Warn("compact wrong, failed to iter through data", zap.Error(err))
return err
return
}
}
v := iter.Value()
if isValueDeleted(v, delta) {
oldDeletedRowCount := *deletedRowCount
*deletedRowCount = oldDeletedRowCount + 1
deletedRowCount++
continue
}

// Filtering expired entity
if isExpiredEntity(t.plan.GetCollectionTtl(), t.currentTs, typeutil.Timestamp(v.Timestamp)) {
oldExpiredRowCount := *expiredRowCount
*expiredRowCount = oldExpiredRowCount + 1
expiredRowCount++
continue
}

err = mWriter.Write(v)
if err != nil {
log.Warn("compact wrong, failed to writer row", zap.Error(err))
return err
return
}
}

return nil
return
}

func (t *mixCompactionTask) Compact() (*datapb.CompactionPlanResult, error) {
Expand Down
81 changes: 66 additions & 15 deletions internal/datanode/compaction/mix_compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,6 @@ import (
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

var compactTestDir = "/tmp/milvus_test/compact"

func TestMixCompactionTaskSuite(t *testing.T) {
suite.Run(t, new(MixCompactionTaskSuite))
}
Expand Down Expand Up @@ -146,7 +144,7 @@ func (s *MixCompactionTaskSuite) TestCompactDupPK() {
// clear origial segments
s.task.plan.SegmentBinlogs = make([]*datapb.CompactionSegmentBinlogs, 0)
for _, segID := range segments {
s.initSegBuffer(segID)
s.initSegBuffer(1, segID)
row := getRow(100)
v := &storage.Value{
PK: storage.NewInt64PrimaryKey(100),
Expand Down Expand Up @@ -193,7 +191,7 @@ func (s *MixCompactionTaskSuite) TestCompactTwoToOne() {
s.mockBinlogIO.EXPECT().Upload(mock.Anything, mock.Anything).Return(nil)
s.task.plan.SegmentBinlogs = make([]*datapb.CompactionSegmentBinlogs, 0)
for _, segID := range segments {
s.initSegBuffer(segID)
s.initSegBuffer(1, segID)
kvs, fBinlogs, err := serializeWrite(context.TODO(), alloc, s.segWriter)
s.Require().NoError(err)
s.mockBinlogIO.EXPECT().Download(mock.Anything, mock.MatchedBy(func(keys []string) bool {
Expand All @@ -220,7 +218,7 @@ func (s *MixCompactionTaskSuite) TestCompactTwoToOne() {
})

result, err := s.task.Compact()
s.NoError(err)
s.Require().NoError(err)
s.NotNil(result)

s.Equal(s.task.plan.GetPlanID(), result.GetPlanID())
Expand Down Expand Up @@ -322,7 +320,7 @@ func (s *MixCompactionTaskSuite) TestCompactSortedSegment() {
}

func (s *MixCompactionTaskSuite) TestSplitMergeEntityExpired() {
s.initSegBuffer(3)
s.initSegBuffer(1, 3)
collTTL := 864000 // 10 days
currTs := tsoutil.ComposeTSByTime(getMilvusBirthday().Add(time.Second*(time.Duration(collTTL)+1)), 0)
s.task.currentTs = currTs
Expand Down Expand Up @@ -353,7 +351,7 @@ func (s *MixCompactionTaskSuite) TestSplitMergeEntityExpired() {
}

func (s *MixCompactionTaskSuite) TestMergeNoExpiration() {
s.initSegBuffer(4)
s.initSegBuffer(1, 4)
deleteTs := tsoutil.ComposeTSByTime(getMilvusBirthday().Add(10*time.Second), 0)
tests := []struct {
description string
Expand Down Expand Up @@ -657,17 +655,19 @@ func (s *MixCompactionTaskSuite) initSegBufferWithBM25(magic int64) {
s.segWriter = segWriter
}

func (s *MixCompactionTaskSuite) initSegBuffer(magic int64) {
segWriter, err := NewSegmentWriter(s.meta.GetSchema(), 100, magic, PartitionID, CollectionID, []int64{})
func (s *MixCompactionTaskSuite) initSegBuffer(size int, seed int64) {
segWriter, err := NewSegmentWriter(s.meta.GetSchema(), 100, seed, PartitionID, CollectionID, []int64{})
s.Require().NoError(err)

v := storage.Value{
PK: storage.NewInt64PrimaryKey(magic),
Timestamp: int64(tsoutil.ComposeTSByTime(getMilvusBirthday(), 0)),
Value: getRow(magic),
for i := 0; i < size; i++ {
v := storage.Value{
PK: storage.NewInt64PrimaryKey(seed),
Timestamp: int64(tsoutil.ComposeTSByTime(getMilvusBirthday(), 0)),
Value: getRow(seed),
}
err = segWriter.Write(&v)
s.Require().NoError(err)
}
err = segWriter.Write(&v)
s.Require().NoError(err)
segWriter.FlushAndIsFull()

s.segWriter = segWriter
Expand Down Expand Up @@ -858,3 +858,54 @@ func genTestCollectionMeta() *etcdpb.CollectionMeta {
},
}
}

func BenchmarkMixCompactor(b *testing.B) {
// Setup
s := new(MixCompactionTaskSuite)

s.SetT(&testing.T{})
s.SetupSuite()
s.SetupTest()

b.ResetTimer()

for i := 0; i < b.N; i++ {
b.StopTimer()
seq := int64(i * 100000)
segments := []int64{seq, seq + 1, seq + 2}
alloc := allocator.NewLocalAllocator(seq+3, math.MaxInt64)
s.mockBinlogIO.EXPECT().Upload(mock.Anything, mock.Anything).Return(nil)
s.task.plan.SegmentBinlogs = make([]*datapb.CompactionSegmentBinlogs, 0)
for _, segID := range segments {
s.initSegBuffer(100000, segID)
kvs, fBinlogs, err := serializeWrite(context.TODO(), alloc, s.segWriter)
s.Require().NoError(err)
s.mockBinlogIO.EXPECT().Download(mock.Anything, mock.MatchedBy(func(keys []string) bool {
left, right := lo.Difference(keys, lo.Keys(kvs))
return len(left) == 0 && len(right) == 0
})).Return(lo.Values(kvs), nil).Once()

s.plan.SegmentBinlogs = append(s.plan.SegmentBinlogs, &datapb.CompactionSegmentBinlogs{
SegmentID: segID,
FieldBinlogs: lo.Values(fBinlogs),
})
}

b.StartTimer()

result, err := s.task.Compact()
s.NoError(err)
s.NotNil(result)
s.Equal(s.task.plan.GetPlanID(), result.GetPlanID())
s.Equal(1, len(result.GetSegments()))
segment := result.GetSegments()[0]
s.EqualValues(19531, segment.GetSegmentID())
s.EqualValues(3, segment.GetNumOfRows())
s.NotEmpty(segment.InsertLogs)
s.NotEmpty(segment.Field2StatslogPaths)
s.Empty(segment.Deltalogs)

}

s.TearDownTest()
}
8 changes: 6 additions & 2 deletions internal/datanode/compaction/segment_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,8 @@ type SegmentWriter struct {
sch *schemapb.CollectionSchema
rowCount *atomic.Int64
syncedSize *atomic.Int64

maxBinlogSize uint64
}

func (w *SegmentWriter) GetRowNum() int64 {
Expand Down Expand Up @@ -412,12 +414,12 @@ func (w *SegmentWriter) GetBm25StatsBlob() (map[int64]*storage.Blob, error) {
}

func (w *SegmentWriter) IsFull() bool {
return w.writer.WrittenMemorySize() > paramtable.Get().DataNodeCfg.BinLogMaxSize.GetAsUint64()
return w.writer.WrittenMemorySize() > w.maxBinlogSize
}

func (w *SegmentWriter) FlushAndIsFull() bool {
w.writer.Flush()
return w.writer.WrittenMemorySize() > paramtable.Get().DataNodeCfg.BinLogMaxSize.GetAsUint64()
return w.writer.WrittenMemorySize() > w.maxBinlogSize
}

func (w *SegmentWriter) FlushAndIsFullWithBinlogMaxSize(binLogMaxSize uint64) bool {
Expand Down Expand Up @@ -502,6 +504,8 @@ func NewSegmentWriter(sch *schemapb.CollectionSchema, maxCount int64, segID, par
collectionID: collID,
rowCount: atomic.NewInt64(0),
syncedSize: atomic.NewInt64(0),

maxBinlogSize: paramtable.Get().DataNodeCfg.BinLogMaxSize.GetAsUint64(),
}

for _, fieldID := range Bm25Fields {
Expand Down
31 changes: 26 additions & 5 deletions internal/storage/binlog_iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"testing"

"github.com/stretchr/testify/assert"
"google.golang.org/protobuf/proto"

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
Expand Down Expand Up @@ -174,8 +175,28 @@ func generateTestData(num int) ([]*Blob, error) {
return blobs, err
}

// Verify value of index i (1-based numbering) in data generated by generateTestData
func assertTestData(t *testing.T, i int, value *Value) {
assertTestDataInternal(t, i, value, true)
}

// Verify value of index i (1-based numbering) in data generated by generateTestData
func assertTestDataInternal(t *testing.T, i int, value *Value, lazy bool) {
getf18 := func() any {
f18 := &schemapb.ScalarField{
Data: &schemapb.ScalarField_IntData{
IntData: &schemapb.IntArray{
Data: []int32{int32(i), int32(i), int32(i)},
},
},
}
if lazy {
f18b, err := proto.Marshal(f18)
assert.Nil(t, err)
return f18b
}
return f18
}

f102 := make([]float32, 8)
for j := range f102 {
f102[j] = float32(i)
Expand Down Expand Up @@ -205,7 +226,7 @@ func assertTestData(t *testing.T, i int, value *Value) {
15: float64(i),
16: fmt.Sprint(i),
17: fmt.Sprint(i),
18: &schemapb.ScalarField{Data: &schemapb.ScalarField_IntData{IntData: &schemapb.IntArray{Data: []int32{int32(i), int32(i), int32(i)}}}},
18: getf18(),
19: []byte{byte(i)},
101: int32(i),
102: f102,
Expand Down Expand Up @@ -250,7 +271,7 @@ func TestInsertlogIterator(t *testing.T) {
v, err := itr.Next()
assert.NoError(t, err)
value := v.(*Value)
assertTestData(t, i, value)
assertTestDataInternal(t, i, value, false)
}

assert.False(t, itr.HasNext())
Expand Down Expand Up @@ -290,7 +311,7 @@ func TestMergeIterator(t *testing.T) {
v, err := itr.Next()
assert.NoError(t, err)
value := v.(*Value)
assertTestData(t, i, value)
assertTestDataInternal(t, i, value, false)
}
assert.False(t, itr.HasNext())
_, err = itr.Next()
Expand All @@ -313,7 +334,7 @@ func TestMergeIterator(t *testing.T) {
v, err := itr.Next()
assert.NoError(t, err)
value := v.(*Value)
assertTestData(t, i, value)
assertTestDataInternal(t, i, value, false)
}
}

Expand Down
Loading

0 comments on commit e585f6d

Please sign in to comment.