Skip to content

Commit

Permalink
Initial commit
Browse files Browse the repository at this point in the history
Signed-off-by: Ted Xu <[email protected]>
  • Loading branch information
tedxu committed Nov 6, 2024
1 parent b3de4b0 commit d213331
Showing 1 changed file with 79 additions and 0 deletions.
79 changes: 79 additions & 0 deletions internal/storage/serde_events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,26 @@ package storage
import (
"bytes"
"context"
"fmt"
"io"
"math/rand"
"sort"
"strconv"
"testing"
"time"

"github.com/apache/arrow/go/v12/arrow"
"github.com/apache/arrow/go/v12/arrow/array"
"github.com/apache/arrow/go/v12/arrow/memory"
"github.com/apache/arrow/go/v12/parquet/file"
"github.com/apache/arrow/go/v12/parquet/pqarrow"
"github.com/stretchr/testify/assert"
"go.uber.org/zap"

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
)

func TestBinlogDeserializeReader(t *testing.T) {
Expand Down Expand Up @@ -182,6 +189,78 @@ func TestBinlogSerializeWriter(t *testing.T) {
})
}

func BenchmarkSerializeWriter(b *testing.B) {
const (
dim = 128
numRows = 200000
)

var (
rId = &schemapb.FieldSchema{FieldID: common.RowIDField, Name: common.RowIDFieldName, DataType: schemapb.DataType_Int64}
ts = &schemapb.FieldSchema{FieldID: common.TimeStampField, Name: common.TimeStampFieldName, DataType: schemapb.DataType_Int64}
pk = &schemapb.FieldSchema{FieldID: 100, Name: "pk", IsPrimaryKey: true, DataType: schemapb.DataType_VarChar, TypeParams: []*commonpb.KeyValuePair{{Key: common.MaxLengthKey, Value: "100"}}}
f = &schemapb.FieldSchema{FieldID: 101, Name: "random", DataType: schemapb.DataType_Double}
fVec = &schemapb.FieldSchema{FieldID: 102, Name: "vec", DataType: schemapb.DataType_FloatVector, TypeParams: []*commonpb.KeyValuePair{{Key: common.DimKey, Value: strconv.Itoa(dim)}}}
)
schema := &schemapb.CollectionSchema{Name: "test-aaa", Fields: []*schemapb.FieldSchema{rId, ts, pk, f, fVec}}

// prepare data values
start := time.Now()
vec := make([]float32, dim)
for j := 0; j < dim; j++ {
vec[j] = rand.Float32()
}
values := make([]*Value, numRows)
for i := 0; i < numRows; i++ {
value := &Value{}
value.Value = make(map[int64]interface{}, len(schema.GetFields()))
m := value.Value.(map[int64]interface{})
for _, field := range schema.GetFields() {
switch field.GetDataType() {
case schemapb.DataType_Int64:
m[field.GetFieldID()] = int64(i)
case schemapb.DataType_VarChar:
k := fmt.Sprintf("test_pk_%d", i)
m[field.GetFieldID()] = k
value.PK = &VarCharPrimaryKey{
Value: k,
}
case schemapb.DataType_Double:
m[field.GetFieldID()] = float64(i)
case schemapb.DataType_FloatVector:
m[field.GetFieldID()] = vec
}
}
value.ID = int64(i)
value.Timestamp = int64(0)
value.IsDeleted = false
value.Value = m
values[i] = value
}
sort.Slice(values, func(i, j int) bool {
return values[i].PK.LT(values[j].PK)
})
log.Info("prepare data done", zap.Int("len", len(values)), zap.Duration("dur", time.Since(start)))

b.ResetTimer()

sizes := []int{100, 1000, 10000, 100000}
for _, s := range sizes {
b.Run(fmt.Sprintf("batch size=%d", s), func(b *testing.B) {
for i := 0; i < b.N; i++ {
writers := NewBinlogStreamWriters(0, 0, 0, schema.Fields)
writer, err := NewBinlogSerializeWriter(schema, 0, 0, writers, s)
assert.NoError(b, err)
for _, v := range values {
_ = writer.Write(v)
assert.NoError(b, err)
}
writer.Close()
}
})
}
}

func TestNull(t *testing.T) {
t.Run("test null", func(t *testing.T) {
schema := generateTestSchema()
Expand Down

0 comments on commit d213331

Please sign in to comment.