Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enhance: Return deltadata for DeleteCodec.Deserialize #37214

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion internal/datanode/importv2/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@ import (
"fmt"
"testing"

"github.com/stretchr/testify/assert"

"github.com/milvus-io/milvus/pkg/config"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/stretchr/testify/assert"
)

func TestResizePools(t *testing.T) {
Expand Down
8 changes: 4 additions & 4 deletions internal/datanode/iterators/deltalog_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ type DeltalogIterator struct {
disposedOnce sync.Once
disposed atomic.Bool

data *storage.DeleteData
data *storage.DeltaData
blobs []*storage.Blob
label *Label
pos int
Expand Down Expand Up @@ -50,8 +50,8 @@ func (d *DeltalogIterator) Next() (*LabeledRowData, error) {
}

row := &DeltalogRow{
Pk: d.data.Pks[d.pos],
Timestamp: d.data.Tss[d.pos],
Pk: d.data.DeletePks().Get(d.pos),
Timestamp: d.data.DeleteTimestamps()[d.pos],
}
d.pos++

Expand All @@ -76,7 +76,7 @@ func (d *DeltalogIterator) hasNext() bool {
d.data = dData
d.blobs = nil
}
return int64(d.pos) < d.data.RowCount
return int64(d.pos) < d.data.DeleteRowCount()
}

func (d *DeltalogIterator) isDisposed() bool {
Expand Down
33 changes: 15 additions & 18 deletions internal/querynodev2/delegator/delegator_data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1509,18 +1509,15 @@ func (s *DelegatorDataSuite) TestSyncTargetVersion() {
func (s *DelegatorDataSuite) TestLevel0Deletions() {
delegator := s.delegator
partitionID := int64(10)
partitionDelPks := storage.NewInt64PrimaryKeys(1)
partitionDelPks.AppendRaw(1)
allPartitionDelPks := storage.NewInt64PrimaryKeys(1)
allPartitionDelPks.AppendRaw(2)
partitionDeleteData := &storage.DeltaData{
DeletePks: partitionDelPks,
DeleteTimestamps: []storage.Timestamp{100},
}
allPartitionDeleteData := &storage.DeltaData{
DeletePks: allPartitionDelPks,
DeleteTimestamps: []storage.Timestamp{101},
}
partitionDeleteData, err := storage.NewDeltaDataWithPkType(1, schemapb.DataType_Int64)
s.Require().NoError(err)
err = partitionDeleteData.Append(storage.NewInt64PrimaryKey(1), 100)
s.Require().NoError(err)

allPartitionDeleteData, err := storage.NewDeltaDataWithPkType(1, schemapb.DataType_Int64)
s.Require().NoError(err)
err = allPartitionDeleteData.Append(storage.NewInt64PrimaryKey(2), 101)
s.Require().NoError(err)

schema := segments.GenTestCollectionSchema("test_stop", schemapb.DataType_Int64, true)
collection := segments.NewCollection(1, schema, nil, &querypb.LoadMetaInfo{
Expand Down Expand Up @@ -1549,29 +1546,29 @@ func (s *DelegatorDataSuite) TestLevel0Deletions() {
l0Global.LoadDeltaData(context.TODO(), allPartitionDeleteData)

pks, _ := delegator.GetLevel0Deletions(partitionID, pkoracle.NewCandidateKey(l0.ID(), l0.Partition(), segments.SegmentTypeGrowing))
s.True(pks[0].EQ(partitionDeleteData.DeletePks.Get(0)))
s.True(pks[0].EQ(partitionDeleteData.DeletePks().Get(0)))

pks, _ = delegator.GetLevel0Deletions(partitionID+1, pkoracle.NewCandidateKey(l0.ID(), l0.Partition(), segments.SegmentTypeGrowing))
s.Empty(pks)

delegator.segmentManager.Put(context.TODO(), segments.SegmentTypeSealed, l0Global)
pks, _ = delegator.GetLevel0Deletions(partitionID, pkoracle.NewCandidateKey(l0.ID(), l0.Partition(), segments.SegmentTypeGrowing))
s.ElementsMatch(pks, []storage.PrimaryKey{partitionDeleteData.DeletePks.Get(0), allPartitionDeleteData.DeletePks.Get(0)})
s.ElementsMatch(pks, []storage.PrimaryKey{partitionDeleteData.DeletePks().Get(0), allPartitionDeleteData.DeletePks().Get(0)})

bfs := pkoracle.NewBloomFilterSet(3, l0.Partition(), commonpb.SegmentState_Sealed)
bfs.UpdateBloomFilter([]storage.PrimaryKey{allPartitionDeleteData.DeletePks.Get(0)})
bfs.UpdateBloomFilter([]storage.PrimaryKey{allPartitionDeleteData.DeletePks().Get(0)})

pks, _ = delegator.GetLevel0Deletions(partitionID, bfs)
// bf filtered segment
s.Equal(len(pks), 1)
s.True(pks[0].EQ(allPartitionDeleteData.DeletePks.Get(0)))
s.True(pks[0].EQ(allPartitionDeleteData.DeletePks().Get(0)))

delegator.segmentManager.Remove(context.TODO(), l0.ID(), querypb.DataScope_All)
pks, _ = delegator.GetLevel0Deletions(partitionID, pkoracle.NewCandidateKey(l0.ID(), l0.Partition(), segments.SegmentTypeGrowing))
s.True(pks[0].EQ(allPartitionDeleteData.DeletePks.Get(0)))
s.True(pks[0].EQ(allPartitionDeleteData.DeletePks().Get(0)))

pks, _ = delegator.GetLevel0Deletions(partitionID+1, pkoracle.NewCandidateKey(l0.ID(), l0.Partition(), segments.SegmentTypeGrowing))
s.True(pks[0].EQ(allPartitionDeleteData.DeletePks.Get(0)))
s.True(pks[0].EQ(allPartitionDeleteData.DeletePks().Get(0)))

delegator.segmentManager.Remove(context.TODO(), l0Global.ID(), querypb.DataScope_All)
pks, _ = delegator.GetLevel0Deletions(partitionID+1, pkoracle.NewCandidateKey(l0.ID(), l0.Partition(), segments.SegmentTypeGrowing))
Expand Down
4 changes: 2 additions & 2 deletions internal/querynodev2/segments/segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -1019,8 +1019,8 @@ func (s *LocalSegment) AddFieldDataInfo(ctx context.Context, rowCount int64, fie
}

func (s *LocalSegment) LoadDeltaData(ctx context.Context, deltaData *storage.DeltaData) error {
pks, tss := deltaData.DeletePks, deltaData.DeleteTimestamps
rowNum := deltaData.DelRowCount
pks, tss := deltaData.DeletePks(), deltaData.DeleteTimestamps()
rowNum := deltaData.DeleteRowCount()

if !s.ptrLock.RLockIf(state.IsNotReleased) {
return merr.WrapErrSegmentNotLoaded(s.ID(), "segment released")
Expand Down
6 changes: 3 additions & 3 deletions internal/querynodev2/segments/segment_l0.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,10 +155,10 @@ func (s *L0Segment) LoadDeltaData(ctx context.Context, deltaData *storage.DeltaD
s.dataGuard.Lock()
defer s.dataGuard.Unlock()

for i := 0; i < deltaData.DeletePks.Len(); i++ {
s.pks = append(s.pks, deltaData.DeletePks.Get(i))
for i := 0; i < int(deltaData.DeleteRowCount()); i++ {
s.pks = append(s.pks, deltaData.DeletePks().Get(i))
}
s.tss = append(s.tss, deltaData.DeleteTimestamps...)
s.tss = append(s.tss, deltaData.DeleteTimestamps()...)
return nil
}

Expand Down
24 changes: 8 additions & 16 deletions internal/querynodev2/segments/segment_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -1208,22 +1208,13 @@
return blob.RowNum
})

var deltaData *storage.DeltaData
collection := loader.manager.Collection.Get(segment.Collection())

helper, _ := typeutil.CreateSchemaHelper(collection.Schema())
pkField, _ := helper.GetPrimaryKeyField()
switch pkField.DataType {
case schemapb.DataType_Int64:
deltaData = &storage.DeltaData{
DeletePks: storage.NewInt64PrimaryKeys(int(rowNums)),
DeleteTimestamps: make([]uint64, 0, rowNums),
}
case schemapb.DataType_VarChar:
deltaData = &storage.DeltaData{
DeletePks: storage.NewVarcharPrimaryKeys(int(rowNums)),
DeleteTimestamps: make([]uint64, 0, rowNums),
}
deltaData, err := storage.NewDeltaDataWithPkType(rowNums, pkField.DataType)
if err != nil {
return err

Check warning on line 1217 in internal/querynodev2/segments/segment_loader.go

View check run for this annotation

Codecov / codecov/patch

internal/querynodev2/segments/segment_loader.go#L1217

Added line #L1217 was not covered by tests
}

reader, err := storage.CreateDeltalogReader(blobs)
Expand All @@ -1240,17 +1231,18 @@
return err
}
dl := reader.Value()
deltaData.DeletePks.MustAppend(dl.Pk)
deltaData.DeleteTimestamps = append(deltaData.DeleteTimestamps, dl.Ts)
deltaData.DelRowCount++
err = deltaData.Append(dl.Pk, dl.Ts)
if err != nil {
return err
}

Check warning on line 1237 in internal/querynodev2/segments/segment_loader.go

View check run for this annotation

Codecov / codecov/patch

internal/querynodev2/segments/segment_loader.go#L1236-L1237

Added lines #L1236 - L1237 were not covered by tests
}

err = segment.LoadDeltaData(ctx, deltaData)
if err != nil {
return err
}

log.Info("load delta logs done", zap.Int64("deleteCount", deltaData.DelRowCount))
log.Info("load delta logs done", zap.Int64("deleteCount", deltaData.DeleteRowCount()))
return nil
}

Expand Down
15 changes: 12 additions & 3 deletions internal/storage/data_codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
"math"
"sort"

"github.com/samber/lo"

"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/proto/etcdpb"
"github.com/milvus-io/milvus/pkg/common"
Expand Down Expand Up @@ -761,13 +763,17 @@
}

// Deserialize deserializes the deltalog blobs into DeleteData
func (deleteCodec *DeleteCodec) Deserialize(blobs []*Blob) (partitionID UniqueID, segmentID UniqueID, data *DeleteData, err error) {
func (deleteCodec *DeleteCodec) Deserialize(blobs []*Blob) (partitionID UniqueID, segmentID UniqueID, data *DeltaData, err error) {
if len(blobs) == 0 {
return InvalidUniqueID, InvalidUniqueID, nil, fmt.Errorf("blobs is empty")
}

rowNum := lo.SumBy(blobs, func(blob *Blob) int64 {
return blob.RowNum
})

var pid, sid UniqueID
result := &DeleteData{}
result := NewDeltaData(rowNum)

deserializeBlob := func(blob *Blob) error {
binlogReader, err := NewBinlogReader(blob.Value)
Expand Down Expand Up @@ -801,7 +807,10 @@
if err != nil {
return err
}
result.Append(deleteLog.Pk, deleteLog.Ts)
err = result.Append(deleteLog.Pk, deleteLog.Ts)
if err != nil {
return err
}

Check warning on line 813 in internal/storage/data_codec.go

View check run for this annotation

Codecov / codecov/patch

internal/storage/data_codec.go#L812-L813

Added lines #L812 - L813 were not covered by tests
}
return nil
}
Expand Down
14 changes: 10 additions & 4 deletions internal/storage/data_codec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -574,9 +574,11 @@ func TestDeleteCodec(t *testing.T) {

pid, sid, data, err := deleteCodec.Deserialize([]*Blob{blob})
assert.NoError(t, err)
intPks, ok := data.DeletePks().(*Int64PrimaryKeys)
require.True(t, ok)
assert.Equal(t, pid, int64(1))
assert.Equal(t, sid, int64(1))
assert.Equal(t, data, deleteData)
assert.Equal(t, []int64{1, 2}, intPks.values)
})

t.Run("string pk", func(t *testing.T) {
Expand All @@ -591,9 +593,11 @@ func TestDeleteCodec(t *testing.T) {

pid, sid, data, err := deleteCodec.Deserialize([]*Blob{blob})
assert.NoError(t, err)
strPks, ok := data.DeletePks().(*VarcharPrimaryKeys)
require.True(t, ok)
assert.Equal(t, pid, int64(1))
assert.Equal(t, sid, int64(1))
assert.Equal(t, data, deleteData)
assert.Equal(t, []string{"test1", "test2"}, strPks.values)
})
}

Expand Down Expand Up @@ -633,8 +637,10 @@ func TestUpgradeDeleteLog(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, int64(1), parID)
assert.Equal(t, int64(1), segID)
assert.ElementsMatch(t, dData.Pks, deleteData.Pks)
assert.ElementsMatch(t, dData.Tss, deleteData.Tss)
intPks, ok := deleteData.DeletePks().(*Int64PrimaryKeys)
require.True(t, ok)
assert.ElementsMatch(t, []int64{1, 2}, intPks.values)
assert.ElementsMatch(t, dData.Tss, deleteData.DeleteTimestamps())
})

t.Run("with split lenth error", func(t *testing.T) {
Expand Down
82 changes: 77 additions & 5 deletions internal/storage/delta_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@ import (
"fmt"
"strconv"
"strings"
"sync"

"github.com/samber/lo"
"github.com/valyala/fastjson"

"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/pkg/util/merr"
)

// parserPool use object pooling to reduce fastjson.Parser allocation.
Expand All @@ -34,14 +36,84 @@ var parserPool = &fastjson.ParserPool{}
// DeltaData stores delta data
// currently only delete tuples are stored
type DeltaData struct {
PkType schemapb.DataType
pkType schemapb.DataType
// delete tuples
DeletePks PrimaryKeys
DeleteTimestamps []Timestamp
deletePks PrimaryKeys
deleteTimestamps []Timestamp

// stats
DelRowCount int64
MemSize int64
delRowCount int64

initCap int64
typeInitOnce sync.Once
}

func (dd *DeltaData) initPkType(pkType schemapb.DataType) error {
var err error
dd.typeInitOnce.Do(func() {
switch pkType {
case schemapb.DataType_Int64:
dd.deletePks = NewInt64PrimaryKeys(dd.initCap)
case schemapb.DataType_VarChar:
dd.deletePks = NewVarcharPrimaryKeys(dd.initCap)
default:
err = merr.WrapErrServiceInternal("unsupported pk type", pkType.String())
}
dd.pkType = pkType
})
return err
}

func (dd *DeltaData) PkType() schemapb.DataType {
return dd.pkType
}

func (dd *DeltaData) DeletePks() PrimaryKeys {
return dd.deletePks
}

func (dd *DeltaData) DeleteTimestamps() []Timestamp {
return dd.deleteTimestamps
}

func (dd *DeltaData) Append(pk PrimaryKey, ts Timestamp) error {
dd.initPkType(pk.Type())
err := dd.deletePks.Append(pk)
if err != nil {
return err
}
dd.deleteTimestamps = append(dd.deleteTimestamps, ts)
dd.delRowCount++
return nil
}

func (dd *DeltaData) DeleteRowCount() int64 {
return dd.delRowCount
}

func (dd *DeltaData) MemSize() int64 {
var result int64
if dd.deletePks != nil {
result += dd.deletePks.Size()
}
result += int64(len(dd.deleteTimestamps) * 8)
return result
}

func NewDeltaData(cap int64) *DeltaData {
return &DeltaData{
deleteTimestamps: make([]Timestamp, 0, cap),
initCap: cap,
}
}

func NewDeltaDataWithPkType(cap int64, pkType schemapb.DataType) (*DeltaData, error) {
result := NewDeltaData(cap)
err := result.initPkType(pkType)
if err != nil {
return nil, err
}
return result, nil
}

type DeleteLog struct {
Expand Down
Loading
Loading