Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enhance: delete codc deserialize data by stream batch #30407

Merged
merged 2 commits into from
Feb 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 38 additions & 30 deletions internal/storage/data_codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -1015,43 +1015,51 @@
return InvalidUniqueID, InvalidUniqueID, nil, err
}

stringArray, err := eventReader.GetStringFromPayload()
dataset, err := eventReader.GetByteArrayDataSet()
if err != nil {
eventReader.Close()
binlogReader.Close()
return InvalidUniqueID, InvalidUniqueID, nil, err
}
for i := 0; i < len(stringArray); i++ {
deleteLog := &DeleteLog{}
if err = json.Unmarshal([]byte(stringArray[i]), deleteLog); err != nil {
// compatible with versions that only support int64 type primary keys
// compatible with fmt.Sprintf("%d,%d", pk, ts)
// compatible error info (unmarshal err invalid character ',' after top-level value)
splits := strings.Split(stringArray[i], ",")
if len(splits) != 2 {
eventReader.Close()
binlogReader.Close()
return InvalidUniqueID, InvalidUniqueID, nil, fmt.Errorf("the format of delta log is incorrect, %v can not be split", stringArray[i])
}
pk, err := strconv.ParseInt(splits[0], 10, 64)
if err != nil {
eventReader.Close()
binlogReader.Close()
return InvalidUniqueID, InvalidUniqueID, nil, err
}
deleteLog.Pk = &Int64PrimaryKey{
Value: pk,
}
deleteLog.PkType = int64(schemapb.DataType_Int64)
deleteLog.Ts, err = strconv.ParseUint(splits[1], 10, 64)
if err != nil {
eventReader.Close()
binlogReader.Close()
return InvalidUniqueID, InvalidUniqueID, nil, err
}

batchSize := int64(1024)
for dataset.HasNext() {
stringArray, err := dataset.NextBatch(batchSize)
if err != nil {
return InvalidUniqueID, InvalidUniqueID, nil, err

Check warning on line 1029 in internal/storage/data_codec.go

View check run for this annotation

Codecov / codecov/patch

internal/storage/data_codec.go#L1029

Added line #L1029 was not covered by tests
}
for i := 0; i < len(stringArray); i++ {
deleteLog := &DeleteLog{}
if err = json.Unmarshal(stringArray[i], deleteLog); err != nil {
// compatible with versions that only support int64 type primary keys
// compatible with fmt.Sprintf("%d,%d", pk, ts)
// compatible error info (unmarshal err invalid character ',' after top-level value)
splits := strings.Split(stringArray[i].String(), ",")
if len(splits) != 2 {
eventReader.Close()
binlogReader.Close()
return InvalidUniqueID, InvalidUniqueID, nil, fmt.Errorf("the format of delta log is incorrect, %v can not be split", stringArray[i])
}
pk, err := strconv.ParseInt(splits[0], 10, 64)
if err != nil {
eventReader.Close()
binlogReader.Close()
return InvalidUniqueID, InvalidUniqueID, nil, err
}
deleteLog.Pk = &Int64PrimaryKey{
Value: pk,
}
deleteLog.PkType = int64(schemapb.DataType_Int64)
deleteLog.Ts, err = strconv.ParseUint(splits[1], 10, 64)
if err != nil {
eventReader.Close()
binlogReader.Close()
return InvalidUniqueID, InvalidUniqueID, nil, err
}
}

result.Append(deleteLog.Pk, deleteLog.Ts)
result.Append(deleteLog.Pk, deleteLog.Ts)
}
}
eventReader.Close()
binlogReader.Close()
Expand Down
158 changes: 127 additions & 31 deletions internal/storage/data_codec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -494,42 +494,138 @@ func TestDeleteCodec(t *testing.T) {
}

func TestUpgradeDeleteLog(t *testing.T) {
binlogWriter := NewDeleteBinlogWriter(schemapb.DataType_String, CollectionID, 1, 1)
eventWriter, err := binlogWriter.NextDeleteEventWriter()
assert.NoError(t, err)
t.Run("normal", func(t *testing.T) {
binlogWriter := NewDeleteBinlogWriter(schemapb.DataType_String, CollectionID, 1, 1)
eventWriter, err := binlogWriter.NextDeleteEventWriter()
assert.NoError(t, err)

dData := &DeleteData{
Pks: []PrimaryKey{&Int64PrimaryKey{Value: 1}, &Int64PrimaryKey{Value: 2}},
Tss: []Timestamp{100, 200},
RowCount: 2,
}
dData := &DeleteData{
Pks: []PrimaryKey{&Int64PrimaryKey{Value: 1}, &Int64PrimaryKey{Value: 2}},
Tss: []Timestamp{100, 200},
RowCount: 2,
}

sizeTotal := 0
for i := int64(0); i < dData.RowCount; i++ {
int64PkValue := dData.Pks[i].(*Int64PrimaryKey).Value
ts := dData.Tss[i]
err = eventWriter.AddOneStringToPayload(fmt.Sprintf("%d,%d", int64PkValue, ts))
assert.NoError(t, err)
sizeTotal += binary.Size(int64PkValue)
sizeTotal += binary.Size(ts)
}
eventWriter.SetEventTimestamp(100, 200)
binlogWriter.SetEventTimeStamp(100, 200)
binlogWriter.AddExtra(originalSizeKey, fmt.Sprintf("%v", sizeTotal))

sizeTotal := 0
for i := int64(0); i < dData.RowCount; i++ {
int64PkValue := dData.Pks[i].(*Int64PrimaryKey).Value
ts := dData.Tss[i]
err = eventWriter.AddOneStringToPayload(fmt.Sprintf("%d,%d", int64PkValue, ts))
err = binlogWriter.Finish()
assert.NoError(t, err)
sizeTotal += binary.Size(int64PkValue)
sizeTotal += binary.Size(ts)
}
eventWriter.SetEventTimestamp(100, 200)
binlogWriter.SetEventTimeStamp(100, 200)
binlogWriter.AddExtra(originalSizeKey, fmt.Sprintf("%v", sizeTotal))
buffer, err := binlogWriter.GetBuffer()
assert.NoError(t, err)
blob := &Blob{Value: buffer}

err = binlogWriter.Finish()
assert.NoError(t, err)
buffer, err := binlogWriter.GetBuffer()
assert.NoError(t, err)
blob := &Blob{Value: buffer}
dCodec := NewDeleteCodec()
parID, segID, deleteData, err := dCodec.Deserialize([]*Blob{blob})
assert.NoError(t, err)
assert.Equal(t, int64(1), parID)
assert.Equal(t, int64(1), segID)
assert.ElementsMatch(t, dData.Pks, deleteData.Pks)
assert.ElementsMatch(t, dData.Tss, deleteData.Tss)
})

dCodec := NewDeleteCodec()
parID, segID, deleteData, err := dCodec.Deserialize([]*Blob{blob})
assert.NoError(t, err)
assert.Equal(t, int64(1), parID)
assert.Equal(t, int64(1), segID)
assert.ElementsMatch(t, dData.Pks, deleteData.Pks)
assert.ElementsMatch(t, dData.Tss, deleteData.Tss)
t.Run("with split lenth error", func(t *testing.T) {
binlogWriter := NewDeleteBinlogWriter(schemapb.DataType_String, CollectionID, 1, 1)
eventWriter, err := binlogWriter.NextDeleteEventWriter()
assert.NoError(t, err)

dData := &DeleteData{
Pks: []PrimaryKey{&Int64PrimaryKey{Value: 1}, &Int64PrimaryKey{Value: 2}},
Tss: []Timestamp{100, 200},
RowCount: 2,
}

for i := int64(0); i < dData.RowCount; i++ {
int64PkValue := dData.Pks[i].(*Int64PrimaryKey).Value
ts := dData.Tss[i]
err = eventWriter.AddOneStringToPayload(fmt.Sprintf("%d,%d,?", int64PkValue, ts))
assert.NoError(t, err)
}
eventWriter.SetEventTimestamp(100, 200)
binlogWriter.SetEventTimeStamp(100, 200)
binlogWriter.AddExtra(originalSizeKey, fmt.Sprintf("%v", 0))

err = binlogWriter.Finish()
assert.NoError(t, err)
buffer, err := binlogWriter.GetBuffer()
assert.NoError(t, err)
blob := &Blob{Value: buffer}

dCodec := NewDeleteCodec()
_, _, _, err = dCodec.Deserialize([]*Blob{blob})
assert.Error(t, err)
})

t.Run("with parse int error", func(t *testing.T) {
binlogWriter := NewDeleteBinlogWriter(schemapb.DataType_String, CollectionID, 1, 1)
eventWriter, err := binlogWriter.NextDeleteEventWriter()
assert.NoError(t, err)

dData := &DeleteData{
Pks: []PrimaryKey{&Int64PrimaryKey{Value: 1}, &Int64PrimaryKey{Value: 2}},
Tss: []Timestamp{100, 200},
RowCount: 2,
}

for i := int64(0); i < dData.RowCount; i++ {
ts := dData.Tss[i]
err = eventWriter.AddOneStringToPayload(fmt.Sprintf("abc,%d", ts))
assert.NoError(t, err)
}
eventWriter.SetEventTimestamp(100, 200)
binlogWriter.SetEventTimeStamp(100, 200)
binlogWriter.AddExtra(originalSizeKey, fmt.Sprintf("%v", 0))

err = binlogWriter.Finish()
assert.NoError(t, err)
buffer, err := binlogWriter.GetBuffer()
assert.NoError(t, err)
blob := &Blob{Value: buffer}

dCodec := NewDeleteCodec()
_, _, _, err = dCodec.Deserialize([]*Blob{blob})
assert.Error(t, err)
})

t.Run("with parse ts uint error", func(t *testing.T) {
binlogWriter := NewDeleteBinlogWriter(schemapb.DataType_String, CollectionID, 1, 1)
eventWriter, err := binlogWriter.NextDeleteEventWriter()
assert.NoError(t, err)

dData := &DeleteData{
Pks: []PrimaryKey{&Int64PrimaryKey{Value: 1}, &Int64PrimaryKey{Value: 2}},
Tss: []Timestamp{100, 200},
RowCount: 2,
}

for i := int64(0); i < dData.RowCount; i++ {
int64PkValue := dData.Pks[i].(*Int64PrimaryKey).Value
err = eventWriter.AddOneStringToPayload(fmt.Sprintf("%d,abc", int64PkValue))
assert.NoError(t, err)
}
eventWriter.SetEventTimestamp(100, 200)
binlogWriter.SetEventTimeStamp(100, 200)
binlogWriter.AddExtra(originalSizeKey, fmt.Sprintf("%v", 0))

err = binlogWriter.Finish()
assert.NoError(t, err)
buffer, err := binlogWriter.GetBuffer()
assert.NoError(t, err)
blob := &Blob{Value: buffer}

dCodec := NewDeleteCodec()
_, _, _, err = dCodec.Deserialize([]*Blob{blob})
assert.Error(t, err)
})
}

func TestDDCodec(t *testing.T) {
Expand Down
6 changes: 6 additions & 0 deletions internal/storage/payload.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
package storage

import (
"github.com/apache/arrow/go/v12/parquet"
"github.com/apache/arrow/go/v12/parquet/file"

"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
)

Expand Down Expand Up @@ -64,6 +67,9 @@ type PayloadReaderInterface interface {
GetBFloat16VectorFromPayload() ([]byte, int, error)
GetFloatVectorFromPayload() ([]float32, int, error)
GetPayloadLengthFromReader() (int, error)

GetByteArrayDataSet() (*DataSet[parquet.ByteArray, *file.ByteArrayColumnChunkReader], error)

ReleasePayloadReader() error
Close() error
}
74 changes: 74 additions & 0 deletions internal/storage/payload_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,14 @@
})
}

func (r *PayloadReader) GetByteArrayDataSet() (*DataSet[parquet.ByteArray, *file.ByteArrayColumnChunkReader], error) {
if r.colType != schemapb.DataType_String && r.colType != schemapb.DataType_VarChar {
return nil, fmt.Errorf("failed to get string from datatype %v", r.colType.String())
}

return NewDataSet[parquet.ByteArray, *file.ByteArrayColumnChunkReader](r.reader, 0, r.numRows), nil
}

func (r *PayloadReader) GetArrayFromPayload() ([]*schemapb.ScalarField, error) {
if r.colType != schemapb.DataType_Array {
return nil, fmt.Errorf("failed to get string from datatype %v", r.colType.String())
Expand Down Expand Up @@ -445,3 +453,69 @@

return offset, nil
}

type DataSet[T any, E interface {
ReadBatch(int64, []T, []int16, []int16) (int64, int, error)
}] struct {
reader *file.Reader
cReader E

cnt, numRows int64
groupID, columnIdx int
}

func NewDataSet[T any, E interface {
ReadBatch(int64, []T, []int16, []int16) (int64, int, error)
}](reader *file.Reader, columnIdx int, numRows int64) *DataSet[T, E] {
return &DataSet[T, E]{
reader: reader,
columnIdx: columnIdx,
numRows: numRows,
}
}

func (s *DataSet[T, E]) nextGroup() error {
s.cnt = 0
column, err := s.reader.RowGroup(s.groupID).Column(s.columnIdx)
if err != nil {
return err
}

cReader, ok := column.(E)
if !ok {
return fmt.Errorf("expect type %T, but got %T", *new(E), column)
}

Check warning on line 487 in internal/storage/payload_reader.go

View check run for this annotation

Codecov / codecov/patch

internal/storage/payload_reader.go#L486-L487

Added lines #L486 - L487 were not covered by tests
s.groupID++
s.cReader = cReader
return nil
}

func (s *DataSet[T, E]) HasNext() bool {
if s.groupID > s.reader.NumRowGroups() || (s.groupID == s.reader.NumRowGroups() && s.cnt >= s.numRows) || s.numRows == 0 {
return false
}
return true
}

func (s *DataSet[T, E]) NextBatch(batch int64) ([]T, error) {
if s.groupID > s.reader.NumRowGroups() || (s.groupID == s.reader.NumRowGroups() && s.cnt >= s.numRows) || s.numRows == 0 {
return nil, fmt.Errorf("has no more data")
}

Check warning on line 503 in internal/storage/payload_reader.go

View check run for this annotation

Codecov / codecov/patch

internal/storage/payload_reader.go#L502-L503

Added lines #L502 - L503 were not covered by tests

if s.groupID == 0 || s.cnt >= s.numRows {
err := s.nextGroup()
if err != nil {
return nil, err
}
}

batch = Min(batch, s.numRows-s.cnt)
result := make([]T, batch)
_, _, err := s.cReader.ReadBatch(batch, result, nil, nil)
if err != nil {
return nil, err
}

Check warning on line 517 in internal/storage/payload_reader.go

View check run for this annotation

Codecov / codecov/patch

internal/storage/payload_reader.go#L516-L517

Added lines #L516 - L517 were not covered by tests

s.cnt += batch
return result, nil
}
Loading
Loading