Skip to content

Commit

Permalink
feat: improve reduce performance (#501)
Browse files Browse the repository at this point in the history
Signed-off-by: ashwinidulams <[email protected]>
Signed-off-by: Vigith Maurice <[email protected]>
Signed-off-by: Yashash H L <[email protected]>
Co-authored-by: Vigith Maurice <[email protected]>
Co-authored-by: Yashash H L <[email protected]>
  • Loading branch information
3 people authored Feb 8, 2023
1 parent b502fa9 commit 93753c1
Show file tree
Hide file tree
Showing 28 changed files with 278 additions and 273 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ require (
github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe
github.com/nats-io/nats-server/v2 v2.7.5-0.20220415000625-a6b62f61a703
github.com/nats-io/nats.go v1.21.0
github.com/numaproj/numaflow-go v0.3.1
github.com/numaproj/numaflow-go v0.3.3
github.com/prometheus/client_golang v1.12.1
github.com/prometheus/common v0.32.1
github.com/soheilhy/cmux v0.1.5
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -692,8 +692,8 @@ github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
github.com/numaproj/numaflow-go v0.3.1 h1:WRaTKNaaQE4oXvfD826BhUYQecy5j0nYc7Z5wdPxUko=
github.com/numaproj/numaflow-go v0.3.1/go.mod h1:TOawJdyf1C4V98zKnjjFhbHLBtg/TDyzZM+1MsfZuPo=
github.com/numaproj/numaflow-go v0.3.3 h1:450go64yEykiuUUoPrb1EuWhybEIEOlapeBf+8J6c7c=
github.com/numaproj/numaflow-go v0.3.3/go.mod h1:TOawJdyf1C4V98zKnjjFhbHLBtg/TDyzZM+1MsfZuPo=
github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU=
Expand Down
4 changes: 4 additions & 0 deletions pkg/apis/numaflow/v1alpha1/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ const (
EnvDebug = "NUMAFLOW_DEBUG"
EnvPPROF = "NUMAFLOW_PPROF"
EnvHealthCheckDisabled = "NUMAFLOW_HEALTH_CHECK_DISABLED"
EnvGRPCMaxMessageSize = "NUMAFLOW_GRPC_MAX_MESSAGE_SIZE"

PathVarRun = "/var/run/numaflow"
VertexMetricsPort = 2469
Expand Down Expand Up @@ -144,6 +145,9 @@ const (

// DefaultKeyForNonKeyedData Default key for non keyed stream
DefaultKeyForNonKeyedData = "NON_KEYED_STREAM"

// Default gRPC max message size
DefaultGRPCMaxMessageSize = 20 * 1024 * 1024
)

var (
Expand Down
105 changes: 60 additions & 45 deletions pkg/reduce/data_forward_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,62 +142,77 @@ type SumReduceTest struct {
}

func (s SumReduceTest) ApplyReduce(_ context.Context, partitionID *partition.ID, messageStream <-chan *isb.ReadMessage) ([]*isb.Message, error) {
sum := 0
sums := make(map[string]int)

for msg := range messageStream {
var payload PayloadForTest
_ = json.Unmarshal(msg.Payload, &payload)
sum += payload.Value
key := msg.Key
sums[key] += payload.Value
}

payload := PayloadForTest{Key: "sum", Value: sum}
b, _ := json.Marshal(payload)
ret := &isb.Message{
Header: isb.Header{
PaneInfo: isb.PaneInfo{
StartTime: partitionID.Start,
EndTime: partitionID.End,
EventTime: partitionID.End,
msgs := make([]*isb.Message, 0)

for k, s := range sums {
payload := PayloadForTest{Key: k, Value: s}
b, _ := json.Marshal(payload)
msg := &isb.Message{
Header: isb.Header{
PaneInfo: isb.PaneInfo{
StartTime: partitionID.Start,
EndTime: partitionID.End,
EventTime: partitionID.End,
},
ID: "msgID",
Key: k,
},
ID: "msgID",
Key: "result",
},
Body: isb.Body{Payload: b},
Body: isb.Body{Payload: b},
}
msgs = append(msgs, msg)
}
return []*isb.Message{
ret,
}, nil

return msgs, nil
}

type MaxReduceTest struct {
}

func (m MaxReduceTest) ApplyReduce(_ context.Context, partitionID *partition.ID, messageStream <-chan *isb.ReadMessage) ([]*isb.Message, error) {
mx := math.MinInt64
maxMap := make(map[string]int)
for msg := range messageStream {
var payload PayloadForTest
_ = json.Unmarshal(msg.Payload, &payload)
if max, ok := maxMap[msg.Key]; ok {
mx = max
}
if payload.Value > mx {
mx = payload.Value
maxMap[msg.Key] = mx
}
}

payload := PayloadForTest{Key: "max", Value: mx}
b, _ := json.Marshal(payload)
ret := &isb.Message{
Header: isb.Header{
PaneInfo: isb.PaneInfo{
StartTime: partitionID.Start,
EndTime: partitionID.End,
EventTime: partitionID.End,
result := make([]*isb.Message, 0)
for k, max := range maxMap {
payload := PayloadForTest{Key: k, Value: max}
b, _ := json.Marshal(payload)
ret := &isb.Message{
Header: isb.Header{
PaneInfo: isb.PaneInfo{
StartTime: partitionID.Start,
EndTime: partitionID.End,
EventTime: partitionID.End,
},
ID: "msgID",
Key: k,
},
ID: "msgID",
Key: "result",
},
Body: isb.Body{Payload: b},
Body: isb.Body{Payload: b},
}

result = append(result, ret)
}
return []*isb.Message{
ret,
}, nil

return result, nil
}

// read from simple buffer
Expand Down Expand Up @@ -418,7 +433,7 @@ func TestReduceDataForward_Sum(t *testing.T) {
_ = json.Unmarshal(msgs[0].Payload, &readMessagePayload)
// since the window duration is 2 minutes and tps is 1, the sum should be 120 * 10
assert.Equal(t, int64(1200), int64(readMessagePayload.Value))
assert.Equal(t, "sum", readMessagePayload.Key)
assert.Equal(t, "even", readMessagePayload.Key)

}

Expand Down Expand Up @@ -491,14 +506,14 @@ func TestReduceDataForward_Max(t *testing.T) {
_ = json.Unmarshal(msgs[0].Payload, &readMessagePayload)
// since all the messages have the same value the max should be 100
assert.Equal(t, int64(100), int64(readMessagePayload.Value))
assert.Equal(t, "max", readMessagePayload.Key)
assert.Equal(t, "even", readMessagePayload.Key)

}

// Max operation with 5 minutes window and two keys
func TestReduceDataForward_SumWithDifferentKeys(t *testing.T) {
var (
ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
ctx, cancel = context.WithTimeout(context.Background(), 10*time.Second)
fromBufferSize = int64(100000)
toBufferSize = int64(10)
messages = []int{100, 99}
Expand Down Expand Up @@ -538,12 +553,12 @@ func TestReduceDataForward_SumWithDifferentKeys(t *testing.T) {

assert.NoError(t, err)

// start the producer
go publishMessages(ctx, startTime, messages, 650, 10, p[fromBuffer.GetName()], fromBuffer)

// start the forwarder
go reduceDataForward.Start()

// start the producer
go publishMessages(ctx, startTime, messages, 600, 10, p[fromBuffer.GetName()], fromBuffer)

// wait until there is data in to buffer
for buffer.IsEmpty() {
select {
Expand Down Expand Up @@ -571,8 +586,8 @@ func TestReduceDataForward_SumWithDifferentKeys(t *testing.T) {
// we cant guarantee the order of the output
assert.Contains(t, []int{30000, 29700}, readMessagePayload1.Value)
assert.Contains(t, []int{30000, 29700}, readMessagePayload2.Value)
assert.Equal(t, "sum", readMessagePayload1.Key)
assert.Equal(t, "sum", readMessagePayload2.Key)
assert.Contains(t, []string{"even", "odd"}, readMessagePayload1.Key)
assert.Contains(t, []string{"even", "odd"}, readMessagePayload2.Key)

}

Expand Down Expand Up @@ -647,7 +662,7 @@ func TestReduceDataForward_NonKeyed(t *testing.T) {
// 100 * 300 + 99 * 300
// we cant guarantee the order of the output
assert.Equal(t, 59700, readMessagePayload.Value)
assert.Equal(t, "sum", readMessagePayload.Key)
assert.Equal(t, dfv1.DefaultKeyForNonKeyedData, readMessagePayload.Key)

}

Expand Down Expand Up @@ -704,7 +719,7 @@ func TestDataForward_WithContextClose(t *testing.T) {
// wait for the partitions to be created
for {
partitionsList := pbqManager.ListPartitions()
if len(partitionsList) > 1 {
if len(partitionsList) == 1 {
childCancel()
break
}
Expand All @@ -721,7 +736,7 @@ func TestDataForward_WithContextClose(t *testing.T) {
for {
discoveredPartitions, _ = storeProvider.DiscoverPartitions(ctx)

if len(discoveredPartitions) > 1 {
if len(discoveredPartitions) == 1 {
break
}
select {
Expand All @@ -733,8 +748,8 @@ func TestDataForward_WithContextClose(t *testing.T) {
}
}

// since we have 2 different keys
assert.Len(t, discoveredPartitions, 2)
// even though we have 2 different keys
assert.Len(t, discoveredPartitions, 1)

}

Expand Down
5 changes: 3 additions & 2 deletions pkg/reduce/pbq/partition/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,10 @@ import (
type ID struct {
Start time.Time
End time.Time
Key string
// Slot is a hash-range for keys (multiple keys can go to the same slot)
Slot string
}

func (p ID) String() string {
return fmt.Sprintf("%v-%v-%s", p.Start.UnixMilli(), p.End.UnixMilli(), p.Key)
return fmt.Sprintf("%v-%v", p.Start.UnixMilli(), p.End.UnixMilli())
}
9 changes: 5 additions & 4 deletions pkg/reduce/pbq/pbq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,13 @@ import (
"testing"
"time"

"github.com/stretchr/testify/assert"

"github.com/numaproj/numaflow/pkg/isb"
"github.com/numaproj/numaflow/pkg/isb/testutils"
"github.com/numaproj/numaflow/pkg/reduce/pbq/partition"
"github.com/numaproj/numaflow/pkg/reduce/pbq/store"
"github.com/numaproj/numaflow/pkg/reduce/pbq/store/memory"
"github.com/stretchr/testify/assert"
)

// test cases for PBQ (store type in-memory)
Expand All @@ -50,7 +51,7 @@ func TestPBQ_ReadWrite(t *testing.T) {
partitionID := partition.ID{
Start: time.Unix(60, 0),
End: time.Unix(120, 0),
Key: "new-partition",
Slot: "new-partition",
}

pq, err := qManager.CreateNewPBQ(ctx, partitionID)
Expand Down Expand Up @@ -113,7 +114,7 @@ func Test_PBQReadWithCanceledContext(t *testing.T) {
partitionID := partition.ID{
Start: time.Unix(60, 0),
End: time.Unix(120, 0),
Key: "new-partition",
Slot: "new-partition",
}
var pq ReadWriteCloser
pq, err = qManager.CreateNewPBQ(ctx, partitionID)
Expand Down Expand Up @@ -177,7 +178,7 @@ func TestPBQ_WriteWithStoreFull(t *testing.T) {
partitionID := partition.ID{
Start: time.Unix(60, 0),
End: time.Unix(120, 0),
Key: "new-partition",
Slot: "new-partition",
}

var pq ReadWriteCloser
Expand Down
18 changes: 9 additions & 9 deletions pkg/reduce/pbq/pbqmanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,12 @@ func TestManager_ListPartitions(t *testing.T) {
testPartition := partition.ID{
Start: time.Unix(60, 0),
End: time.Unix(120, 0),
Key: "partition-1",
Slot: "partition-1",
}
partitionTwo := partition.ID{
Start: time.Unix(60, 0),
End: time.Unix(120, 0),
Key: "partition-2",
Slot: "partition-2",
}
var pq1, pq2 ReadWriteCloser
pq1, err = pbqManager.CreateNewPBQ(ctx, testPartition)
Expand All @@ -59,7 +59,7 @@ func TestManager_ListPartitions(t *testing.T) {
pq2, err = pbqManager.CreateNewPBQ(ctx, partitionTwo)
assert.NoError(t, err)

assert.Len(t, pbqManager.ListPartitions(), 2)
assert.Len(t, pbqManager.ListPartitions(), 1)

err = pq1.GC()
assert.NoError(t, err)
Expand All @@ -83,7 +83,7 @@ func TestManager_GetPBQ(t *testing.T) {
testPartition := partition.ID{
Start: time.Unix(60, 0),
End: time.Unix(120, 0),
Key: "partition-1",
Slot: "partition-1",
}
pb1, err = pbqManager.CreateNewPBQ(ctx, testPartition)
assert.NoError(t, err)
Expand All @@ -105,7 +105,7 @@ func TestPBQFlow(t *testing.T) {
testPartition := partition.ID{
Start: time.Unix(60, 0),
End: time.Unix(120, 0),
Key: "partition-1",
Slot: "partition-1",
}

var pq ReadWriteCloser
Expand Down Expand Up @@ -165,7 +165,7 @@ func TestPBQFlowWithNoOpStore(t *testing.T) {
testPartition := partition.ID{
Start: time.Unix(60, 0),
End: time.Unix(120, 0),
Key: "partition-1",
Slot: "partition-1",
}

// create a pbq backed with no op store
Expand Down Expand Up @@ -223,7 +223,7 @@ func TestManager_Replay(t *testing.T) {
testPartition := partition.ID{
Start: time.Unix(60, 0),
End: time.Unix(120, 0),
Key: "partition-1",
Slot: "partition-1",
}

var pq ReadWriteCloser
Expand Down Expand Up @@ -277,13 +277,13 @@ func TestManager_StartUp(t *testing.T) {
pID1 := partition.ID{
Start: time.Now(),
End: time.Now(),
Key: "test-partition-1",
Slot: "test-partition-1",
}

pID2 := partition.ID{
Start: time.Now(),
End: time.Now(),
Key: "test-partition-2",
Slot: "test-partition-2",
}
dp := func(context.Context) ([]partition.ID, error) {
return []partition.ID{pID1,
Expand Down
Loading

0 comments on commit 93753c1

Please sign in to comment.