Skip to content

Commit

Permalink
enhance: avoid memory copy and serde in mix compaction
Browse files Browse the repository at this point in the history
Signed-off-by: Ted Xu <[email protected]>
  • Loading branch information
tedxu committed Nov 6, 2024
1 parent d213331 commit d2c18ad
Show file tree
Hide file tree
Showing 5 changed files with 291 additions and 110 deletions.
81 changes: 61 additions & 20 deletions internal/datanode/compaction/mix_compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"math"
"time"

"github.com/apache/arrow/go/v12/arrow/array"
"github.com/cockroachdb/errors"
"github.com/samber/lo"
"go.opentelemetry.io/otel"
Expand All @@ -33,6 +34,7 @@ import (
"github.com/milvus-io/milvus/internal/flushcommon/io"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/funcutil"
Expand Down Expand Up @@ -199,26 +201,43 @@ func (t *mixCompactionTask) writeSegment(ctx context.Context,
log.Warn("compact wrong, fail to merge deltalogs", zap.Error(err))
return
}
isValueDeleted := func(v *storage.Value) bool {
ts, ok := delta[v.PK.GetValue()]

isValueDeleted := func(pk any, ts typeutil.Timestamp) bool {
oldts, ok := delta[pk]
// insert task and delete task has the same ts when upsert
// here should be < instead of <=
// to avoid the upsert data to be deleted after compact
if ok && uint64(v.Timestamp) < ts {
if ok && ts < oldts {
deletedRowCount++
return true
}
// Filtering expired entity
if isExpiredEntity(t.plan.GetCollectionTtl(), t.currentTs, typeutil.Timestamp(ts)) {
expiredRowCount++
return true
}
return false
}

iter, err := storage.NewBinlogDeserializeReader(blobs, pkField.GetFieldID())
reader, err := storage.NewCompositeBinlogRecordReader(blobs)
if err != nil {
log.Warn("compact wrong, failed to new insert binlogs reader", zap.Error(err))
return
}
defer iter.Close()
defer reader.Close()

writeSlice := func(r storage.Record, start, end int) error {
sliced := r.Slice(start, end)
defer sliced.Release()
err = mWriter.WriteRecord(sliced)
if err != nil {
log.Warn("compact wrong, failed to writer row", zap.Error(err))
return err
}
return nil
}
for {
err = iter.Next()
err = reader.Next()
if err != nil {
if err == sio.EOF {
err = nil
Expand All @@ -228,23 +247,45 @@ func (t *mixCompactionTask) writeSegment(ctx context.Context,
return
}
}
v := iter.Value()

if isValueDeleted(v) {
deletedRowCount++
continue
}
r := reader.Record()
pkArray := r.Column(pkField.FieldID)
tsArray := r.Column(common.TimeStampField).(*array.Int64)

sliceStart := -1
rows := r.Len()
for i := 0; i < rows; i++ {
// Filtering deleted entities
var pk any
switch pkField.DataType {
case schemapb.DataType_Int64:
pk = pkArray.(*array.Int64).Value(i)
case schemapb.DataType_VarChar:
pk = pkArray.(*array.String).Value(i)
default:
panic("invalid data type")
}
ts := typeutil.Timestamp(tsArray.Value(i))
if isValueDeleted(pk, ts) {
if sliceStart != -1 {
err = writeSlice(r, sliceStart, i)
if err != nil {
return
}
sliceStart = -1
}
continue
}

// Filtering expired entity
if isExpiredEntity(t.plan.GetCollectionTtl(), t.currentTs, typeutil.Timestamp(v.Timestamp)) {
expiredRowCount++
continue
if sliceStart == -1 {
sliceStart = i
}
}

err = mWriter.Write(v)
if err != nil {
log.Warn("compact wrong, failed to writer row", zap.Error(err))
return
if sliceStart != -1 {
err = writeSlice(r, sliceStart, r.Len())
if err != nil {
return
}
}
}
return
Expand Down
81 changes: 71 additions & 10 deletions internal/datanode/compaction/segment_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"fmt"
"math"

"github.com/apache/arrow/go/v12/arrow/array"
"github.com/samber/lo"
"go.uber.org/atomic"
"go.uber.org/zap"
Expand All @@ -20,6 +21,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/etcdpb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
Expand Down Expand Up @@ -101,7 +103,7 @@ func (w *MultiSegmentWriter) finishCurrent() error {
allBinlogs = make(map[typeutil.UniqueID]*datapb.FieldBinlog)
}

if !writer.FlushAndIsEmpty() {
if !writer.IsEmpty() {
kvs, partialBinlogs, err := serializeWrite(context.TODO(), w.allocator.getLogIDAllocator(), writer)
if err != nil {
return err
Expand Down Expand Up @@ -183,12 +185,7 @@ func (w *MultiSegmentWriter) getWriter() (*SegmentWriter, error) {
return w.writers[w.current], nil
}

func (w *MultiSegmentWriter) Write(v *storage.Value) error {
writer, err := w.getWriter()
if err != nil {
return err
}

func (w *MultiSegmentWriter) writeInternal(writer *SegmentWriter) error {
if writer.IsFull() {
// init segment fieldBinlogs if it is not exist
if _, ok := w.cachedMeta[writer.segmentID]; !ok {
Expand All @@ -206,6 +203,29 @@ func (w *MultiSegmentWriter) Write(v *storage.Value) error {

mergeFieldBinlogs(w.cachedMeta[writer.segmentID], partialBinlogs)
}
return nil
}

func (w *MultiSegmentWriter) WriteRecord(r storage.Record) error {
writer, err := w.getWriter()
if err != nil {
return err
}
if err := w.writeInternal(writer); err != nil {
return err
}

return writer.WriteRecord(r)
}

func (w *MultiSegmentWriter) Write(v *storage.Value) error {
writer, err := w.getWriter()
if err != nil {
return err
}
if err := w.writeInternal(writer); err != nil {
return err
}

return writer.Write(v)
}
Expand Down Expand Up @@ -234,7 +254,7 @@ func (w *MultiSegmentWriter) Finish() ([]*datapb.CompactionSegment, error) {
return w.res, nil
}

if !w.writers[w.current].FlushAndIsEmpty() {
if !w.writers[w.current].IsEmpty() {
if err := w.finishCurrent(); err != nil {
return nil, err
}
Expand Down Expand Up @@ -358,6 +378,48 @@ func (w *SegmentWriter) WrittenMemorySize() uint64 {
return w.writer.WrittenMemorySize()
}

func (w *SegmentWriter) WriteRecord(r storage.Record) error {
tsArray := r.Column(common.TimeStampField).(*array.Int64)
rows := r.Len()
for i := 0; i < rows; i++ {
ts := typeutil.Timestamp(tsArray.Value(i))
if ts < w.tsFrom {
w.tsFrom = ts
}
if ts > w.tsTo {
w.tsTo = ts
}

switch schemapb.DataType(w.pkstats.PkType) {
case schemapb.DataType_Int64:
pkArray := r.Column(w.GetPkID()).(*array.Int64)
pk := &storage.Int64PrimaryKey{
Value: pkArray.Value(i),
}
w.pkstats.Update(pk)
case schemapb.DataType_VarChar:
pkArray := r.Column(w.GetPkID()).(*array.String)
pk := &storage.VarCharPrimaryKey{
Value: pkArray.Value(i),
}
w.pkstats.Update(pk)
default:
panic("invalid data type")
}

for fieldID, stats := range w.bm25Stats {
field, ok := r.Column(fieldID).(*array.Binary)
if !ok {
return fmt.Errorf("bm25 field value not found")
}
stats.AppendBytes(field.Value(i))
}

w.rowCount.Inc()
}
return w.writer.WriteRecord(r)
}

func (w *SegmentWriter) Write(v *storage.Value) error {
ts := typeutil.Timestamp(v.Timestamp)
if ts < w.tsFrom {
Expand Down Expand Up @@ -386,7 +448,7 @@ func (w *SegmentWriter) Write(v *storage.Value) error {
}

func (w *SegmentWriter) Finish() (*storage.Blob, error) {
w.writer.Flush()
w.writer.Close()
codec := storage.NewInsertCodecWithSchema(&etcdpb.CollectionMeta{ID: w.collectionID, Schema: w.sch})
return codec.SerializePkStats(w.pkstats, w.GetRowNum())
}
Expand Down Expand Up @@ -441,7 +503,6 @@ func (w *SegmentWriter) GetTimeRange() *writebuffer.TimeRange {
}

func (w *SegmentWriter) SerializeYield() ([]*storage.Blob, *writebuffer.TimeRange, error) {
w.writer.Flush()
w.writer.Close()

fieldData := make([]*storage.Blob, len(w.closers))
Expand Down
Loading

0 comments on commit d2c18ad

Please sign in to comment.