diff --git a/go.mod b/go.mod index 537aa749c9..4363d2ea9b 100644 --- a/go.mod +++ b/go.mod @@ -26,7 +26,7 @@ require ( github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe github.com/nats-io/nats-server/v2 v2.7.5-0.20220415000625-a6b62f61a703 github.com/nats-io/nats.go v1.21.0 - github.com/numaproj/numaflow-go v0.3.1 + github.com/numaproj/numaflow-go v0.3.3 github.com/prometheus/client_golang v1.12.1 github.com/prometheus/common v0.32.1 github.com/soheilhy/cmux v0.1.5 diff --git a/go.sum b/go.sum index 7b751e0b3d..b7b2bbb011 100644 --- a/go.sum +++ b/go.sum @@ -692,8 +692,8 @@ github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= -github.com/numaproj/numaflow-go v0.3.1 h1:WRaTKNaaQE4oXvfD826BhUYQecy5j0nYc7Z5wdPxUko= -github.com/numaproj/numaflow-go v0.3.1/go.mod h1:TOawJdyf1C4V98zKnjjFhbHLBtg/TDyzZM+1MsfZuPo= +github.com/numaproj/numaflow-go v0.3.3 h1:450go64yEykiuUUoPrb1EuWhybEIEOlapeBf+8J6c7c= +github.com/numaproj/numaflow-go v0.3.3/go.mod h1:TOawJdyf1C4V98zKnjjFhbHLBtg/TDyzZM+1MsfZuPo= github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU= diff --git a/pkg/apis/numaflow/v1alpha1/const.go b/pkg/apis/numaflow/v1alpha1/const.go index 88c6a8a9d2..9b11b5c3eb 100644 --- a/pkg/apis/numaflow/v1alpha1/const.go +++ b/pkg/apis/numaflow/v1alpha1/const.go @@ -105,6 +105,7 @@ const ( EnvDebug = "NUMAFLOW_DEBUG" EnvPPROF = "NUMAFLOW_PPROF" EnvHealthCheckDisabled = "NUMAFLOW_HEALTH_CHECK_DISABLED" + EnvGRPCMaxMessageSize = "NUMAFLOW_GRPC_MAX_MESSAGE_SIZE" PathVarRun = "/var/run/numaflow" VertexMetricsPort = 2469 @@ -144,6 +145,9 @@ const ( // DefaultKeyForNonKeyedData Default key for non keyed stream DefaultKeyForNonKeyedData = "NON_KEYED_STREAM" + + // Default gRPC max message size + DefaultGRPCMaxMessageSize = 20 * 1024 * 1024 ) var ( diff --git a/pkg/reduce/data_forward_test.go b/pkg/reduce/data_forward_test.go index 60908cea54..10b4bb6d03 100644 --- a/pkg/reduce/data_forward_test.go +++ b/pkg/reduce/data_forward_test.go @@ -142,30 +142,36 @@ type SumReduceTest struct { } func (s SumReduceTest) ApplyReduce(_ context.Context, partitionID *partition.ID, messageStream <-chan *isb.ReadMessage) ([]*isb.Message, error) { - sum := 0 + sums := make(map[string]int) + for msg := range messageStream { var payload PayloadForTest _ = json.Unmarshal(msg.Payload, &payload) - sum += payload.Value + key := msg.Key + sums[key] += payload.Value } - payload := PayloadForTest{Key: "sum", Value: sum} - b, _ := json.Marshal(payload) - ret := &isb.Message{ - Header: isb.Header{ - PaneInfo: isb.PaneInfo{ - StartTime: partitionID.Start, - EndTime: partitionID.End, - EventTime: partitionID.End, + msgs := make([]*isb.Message, 0) + + for k, s := range sums { + payload := PayloadForTest{Key: k, Value: s} + b, _ := json.Marshal(payload) + msg := &isb.Message{ + Header: isb.Header{ + PaneInfo: isb.PaneInfo{ + StartTime: partitionID.Start, + EndTime: partitionID.End, + EventTime: partitionID.End, + }, + ID: "msgID", + Key: k, }, - ID: "msgID", - Key: "result", - }, - Body: isb.Body{Payload: b}, + Body: isb.Body{Payload: b}, + } + msgs = append(msgs, msg) } - return []*isb.Message{ - ret, - }, nil + + return msgs, nil } type MaxReduceTest struct { @@ -173,31 +179,40 @@ type MaxReduceTest struct { func (m MaxReduceTest) ApplyReduce(_ context.Context, partitionID *partition.ID, messageStream <-chan *isb.ReadMessage) ([]*isb.Message, error) { mx := math.MinInt64 + maxMap := make(map[string]int) for msg := range messageStream { var payload PayloadForTest _ = json.Unmarshal(msg.Payload, &payload) + if max, ok := maxMap[msg.Key]; ok { + mx = max + } if payload.Value > mx { mx = payload.Value + maxMap[msg.Key] = mx } } - payload := PayloadForTest{Key: "max", Value: mx} - b, _ := json.Marshal(payload) - ret := &isb.Message{ - Header: isb.Header{ - PaneInfo: isb.PaneInfo{ - StartTime: partitionID.Start, - EndTime: partitionID.End, - EventTime: partitionID.End, + result := make([]*isb.Message, 0) + for k, max := range maxMap { + payload := PayloadForTest{Key: k, Value: max} + b, _ := json.Marshal(payload) + ret := &isb.Message{ + Header: isb.Header{ + PaneInfo: isb.PaneInfo{ + StartTime: partitionID.Start, + EndTime: partitionID.End, + EventTime: partitionID.End, + }, + ID: "msgID", + Key: k, }, - ID: "msgID", - Key: "result", - }, - Body: isb.Body{Payload: b}, + Body: isb.Body{Payload: b}, + } + + result = append(result, ret) } - return []*isb.Message{ - ret, - }, nil + + return result, nil } // read from simple buffer @@ -418,7 +433,7 @@ func TestReduceDataForward_Sum(t *testing.T) { _ = json.Unmarshal(msgs[0].Payload, &readMessagePayload) // since the window duration is 2 minutes and tps is 1, the sum should be 120 * 10 assert.Equal(t, int64(1200), int64(readMessagePayload.Value)) - assert.Equal(t, "sum", readMessagePayload.Key) + assert.Equal(t, "even", readMessagePayload.Key) } @@ -491,14 +506,14 @@ func TestReduceDataForward_Max(t *testing.T) { _ = json.Unmarshal(msgs[0].Payload, &readMessagePayload) // since all the messages have the same value the max should be 100 assert.Equal(t, int64(100), int64(readMessagePayload.Value)) - assert.Equal(t, "max", readMessagePayload.Key) + assert.Equal(t, "even", readMessagePayload.Key) } // Max operation with 5 minutes window and two keys func TestReduceDataForward_SumWithDifferentKeys(t *testing.T) { var ( - ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second) + ctx, cancel = context.WithTimeout(context.Background(), 10*time.Second) fromBufferSize = int64(100000) toBufferSize = int64(10) messages = []int{100, 99} @@ -538,12 +553,12 @@ func TestReduceDataForward_SumWithDifferentKeys(t *testing.T) { assert.NoError(t, err) + // start the producer + go publishMessages(ctx, startTime, messages, 650, 10, p[fromBuffer.GetName()], fromBuffer) + // start the forwarder go reduceDataForward.Start() - // start the producer - go publishMessages(ctx, startTime, messages, 600, 10, p[fromBuffer.GetName()], fromBuffer) - // wait until there is data in to buffer for buffer.IsEmpty() { select { @@ -571,8 +586,8 @@ func TestReduceDataForward_SumWithDifferentKeys(t *testing.T) { // we cant guarantee the order of the output assert.Contains(t, []int{30000, 29700}, readMessagePayload1.Value) assert.Contains(t, []int{30000, 29700}, readMessagePayload2.Value) - assert.Equal(t, "sum", readMessagePayload1.Key) - assert.Equal(t, "sum", readMessagePayload2.Key) + assert.Contains(t, []string{"even", "odd"}, readMessagePayload1.Key) + assert.Contains(t, []string{"even", "odd"}, readMessagePayload2.Key) } @@ -647,7 +662,7 @@ func TestReduceDataForward_NonKeyed(t *testing.T) { // 100 * 300 + 99 * 300 // we cant guarantee the order of the output assert.Equal(t, 59700, readMessagePayload.Value) - assert.Equal(t, "sum", readMessagePayload.Key) + assert.Equal(t, dfv1.DefaultKeyForNonKeyedData, readMessagePayload.Key) } @@ -704,7 +719,7 @@ func TestDataForward_WithContextClose(t *testing.T) { // wait for the partitions to be created for { partitionsList := pbqManager.ListPartitions() - if len(partitionsList) > 1 { + if len(partitionsList) == 1 { childCancel() break } @@ -721,7 +736,7 @@ func TestDataForward_WithContextClose(t *testing.T) { for { discoveredPartitions, _ = storeProvider.DiscoverPartitions(ctx) - if len(discoveredPartitions) > 1 { + if len(discoveredPartitions) == 1 { break } select { @@ -733,8 +748,8 @@ func TestDataForward_WithContextClose(t *testing.T) { } } - // since we have 2 different keys - assert.Len(t, discoveredPartitions, 2) + // even though we have 2 different keys + assert.Len(t, discoveredPartitions, 1) } diff --git a/pkg/reduce/pbq/partition/partition.go b/pkg/reduce/pbq/partition/partition.go index 108e5fbb08..d6ac343ecd 100644 --- a/pkg/reduce/pbq/partition/partition.go +++ b/pkg/reduce/pbq/partition/partition.go @@ -25,9 +25,10 @@ import ( type ID struct { Start time.Time End time.Time - Key string + // Slot is a hash-range for keys (multiple keys can go to the same slot) + Slot string } func (p ID) String() string { - return fmt.Sprintf("%v-%v-%s", p.Start.UnixMilli(), p.End.UnixMilli(), p.Key) + return fmt.Sprintf("%v-%v", p.Start.UnixMilli(), p.End.UnixMilli()) } diff --git a/pkg/reduce/pbq/pbq_test.go b/pkg/reduce/pbq/pbq_test.go index 7aa2f276c5..254899c255 100644 --- a/pkg/reduce/pbq/pbq_test.go +++ b/pkg/reduce/pbq/pbq_test.go @@ -22,12 +22,13 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" + "github.com/numaproj/numaflow/pkg/isb" "github.com/numaproj/numaflow/pkg/isb/testutils" "github.com/numaproj/numaflow/pkg/reduce/pbq/partition" "github.com/numaproj/numaflow/pkg/reduce/pbq/store" "github.com/numaproj/numaflow/pkg/reduce/pbq/store/memory" - "github.com/stretchr/testify/assert" ) // test cases for PBQ (store type in-memory) @@ -50,7 +51,7 @@ func TestPBQ_ReadWrite(t *testing.T) { partitionID := partition.ID{ Start: time.Unix(60, 0), End: time.Unix(120, 0), - Key: "new-partition", + Slot: "new-partition", } pq, err := qManager.CreateNewPBQ(ctx, partitionID) @@ -113,7 +114,7 @@ func Test_PBQReadWithCanceledContext(t *testing.T) { partitionID := partition.ID{ Start: time.Unix(60, 0), End: time.Unix(120, 0), - Key: "new-partition", + Slot: "new-partition", } var pq ReadWriteCloser pq, err = qManager.CreateNewPBQ(ctx, partitionID) @@ -177,7 +178,7 @@ func TestPBQ_WriteWithStoreFull(t *testing.T) { partitionID := partition.ID{ Start: time.Unix(60, 0), End: time.Unix(120, 0), - Key: "new-partition", + Slot: "new-partition", } var pq ReadWriteCloser diff --git a/pkg/reduce/pbq/pbqmanager_test.go b/pkg/reduce/pbq/pbqmanager_test.go index 3c5acf5b92..cc8c6888cd 100644 --- a/pkg/reduce/pbq/pbqmanager_test.go +++ b/pkg/reduce/pbq/pbqmanager_test.go @@ -45,12 +45,12 @@ func TestManager_ListPartitions(t *testing.T) { testPartition := partition.ID{ Start: time.Unix(60, 0), End: time.Unix(120, 0), - Key: "partition-1", + Slot: "partition-1", } partitionTwo := partition.ID{ Start: time.Unix(60, 0), End: time.Unix(120, 0), - Key: "partition-2", + Slot: "partition-2", } var pq1, pq2 ReadWriteCloser pq1, err = pbqManager.CreateNewPBQ(ctx, testPartition) @@ -59,7 +59,7 @@ func TestManager_ListPartitions(t *testing.T) { pq2, err = pbqManager.CreateNewPBQ(ctx, partitionTwo) assert.NoError(t, err) - assert.Len(t, pbqManager.ListPartitions(), 2) + assert.Len(t, pbqManager.ListPartitions(), 1) err = pq1.GC() assert.NoError(t, err) @@ -83,7 +83,7 @@ func TestManager_GetPBQ(t *testing.T) { testPartition := partition.ID{ Start: time.Unix(60, 0), End: time.Unix(120, 0), - Key: "partition-1", + Slot: "partition-1", } pb1, err = pbqManager.CreateNewPBQ(ctx, testPartition) assert.NoError(t, err) @@ -105,7 +105,7 @@ func TestPBQFlow(t *testing.T) { testPartition := partition.ID{ Start: time.Unix(60, 0), End: time.Unix(120, 0), - Key: "partition-1", + Slot: "partition-1", } var pq ReadWriteCloser @@ -165,7 +165,7 @@ func TestPBQFlowWithNoOpStore(t *testing.T) { testPartition := partition.ID{ Start: time.Unix(60, 0), End: time.Unix(120, 0), - Key: "partition-1", + Slot: "partition-1", } // create a pbq backed with no op store @@ -223,7 +223,7 @@ func TestManager_Replay(t *testing.T) { testPartition := partition.ID{ Start: time.Unix(60, 0), End: time.Unix(120, 0), - Key: "partition-1", + Slot: "partition-1", } var pq ReadWriteCloser @@ -277,13 +277,13 @@ func TestManager_StartUp(t *testing.T) { pID1 := partition.ID{ Start: time.Now(), End: time.Now(), - Key: "test-partition-1", + Slot: "test-partition-1", } pID2 := partition.ID{ Start: time.Now(), End: time.Now(), - Key: "test-partition-2", + Slot: "test-partition-2", } dp := func(context.Context) ([]partition.ID, error) { return []partition.ID{pID1, diff --git a/pkg/reduce/pbq/store/memory/store_test.go b/pkg/reduce/pbq/store/memory/store_test.go index 6be711d297..ba55a11c3c 100644 --- a/pkg/reduce/pbq/store/memory/store_test.go +++ b/pkg/reduce/pbq/store/memory/store_test.go @@ -34,7 +34,7 @@ func TestMemoryStore_WriteToStore(t *testing.T) { partitionID := partition.ID{ Start: time.Unix(60, 0), End: time.Unix(120, 0), - Key: "new-partition", + Slot: "new-partition", } // create a store of size 100 (it can store max 100 messages) memStore, err := NewMemoryStores(WithStoreSize(100)).CreateStore(ctx, partitionID) @@ -57,7 +57,7 @@ func TestMemoryStore_ReadFromStore(t *testing.T) { partitionID := partition.ID{ Start: time.Unix(60, 0), End: time.Unix(120, 0), - Key: "new-partition", + Slot: "new-partition", } // create a store of size 100 (it can store max 100 messages) @@ -88,7 +88,7 @@ func TestEmptyStore_Read(t *testing.T) { partitionID := partition.ID{ Start: time.Unix(60, 0), End: time.Unix(120, 0), - Key: "new-partition", + Slot: "new-partition", } memStore, err := NewMemoryStores(WithStoreSize(storeSize)).CreateStore(ctx, partitionID) @@ -109,7 +109,7 @@ func TestFullStore_Write(t *testing.T) { partitionID := partition.ID{ Start: time.Unix(60, 0), End: time.Unix(120, 0), - Key: "new-partition", + Slot: "new-partition", } memStore, err := NewMemoryStores(WithStoreSize(storeSize)).CreateStore(ctx, partitionID) diff --git a/pkg/reduce/pbq/store/memory/stores_test.go b/pkg/reduce/pbq/store/memory/stores_test.go index 528d8741e0..bf617cb973 100644 --- a/pkg/reduce/pbq/store/memory/stores_test.go +++ b/pkg/reduce/pbq/store/memory/stores_test.go @@ -19,17 +19,17 @@ func TestMemoryStores(t *testing.T) { { Start: time.Unix(60, 0), End: time.Unix(120, 0), - Key: "test-1", + Slot: "test-1", }, { Start: time.Unix(120, 0), End: time.Unix(180, 0), - Key: "test-2", + Slot: "test-2", }, { Start: time.Unix(180, 0), End: time.Unix(240, 0), - Key: "test-3", + Slot: "test-3", }, } storeProvider := NewMemoryStores(WithStoreSize(100)) diff --git a/pkg/reduce/pbq/store/wal/bootstrap.go b/pkg/reduce/pbq/store/wal/bootstrap.go index 29bb3afb92..d4c2cea87d 100644 --- a/pkg/reduce/pbq/store/wal/bootstrap.go +++ b/pkg/reduce/pbq/store/wal/bootstrap.go @@ -93,7 +93,7 @@ func decodeHeader(buf io.Reader) (*partition.ID, error) { return &partition.ID{ Start: time.UnixMilli(hp.S).In(location), End: time.UnixMilli(hp.E).In(location), - Key: string(key), + Slot: string(key), }, nil } diff --git a/pkg/reduce/pbq/store/wal/segment.go b/pkg/reduce/pbq/store/wal/segment.go index ea15136c7a..44cb577d83 100644 --- a/pkg/reduce/pbq/store/wal/segment.go +++ b/pkg/reduce/pbq/store/wal/segment.go @@ -137,7 +137,7 @@ func (w *WAL) encodeHeader(id *partition.ID) (buf *bytes.Buffer, err error) { hp := headerPreamble{ S: id.Start.UnixMilli(), E: id.End.UnixMilli(), - KLen: int16(len(id.Key)), + KLen: int16(len(id.Slot)), } // write the fixed values @@ -147,7 +147,7 @@ func (w *WAL) encodeHeader(id *partition.ID) (buf *bytes.Buffer, err error) { } // write the variadic values - err = binary.Write(buf, binary.LittleEndian, []rune(id.Key)) + err = binary.Write(buf, binary.LittleEndian, []rune(id.Slot)) return buf, err } @@ -354,6 +354,6 @@ func (w *WAL) Close() (err error) { } func getSegmentFilePath(id *partition.ID, dir string) string { - filename := fmt.Sprintf("%s_%d.%d.%s", SegmentPrefix, id.Start.Unix(), id.End.Unix(), id.Key) + filename := fmt.Sprintf("%s_%d.%d.%s", SegmentPrefix, id.Start.Unix(), id.End.Unix(), id.Slot) return filepath.Join(dir, filename) } diff --git a/pkg/reduce/pbq/store/wal/segment_test.go b/pkg/reduce/pbq/store/wal/segment_test.go index 9e6419f3bb..9bc7c69470 100644 --- a/pkg/reduce/pbq/store/wal/segment_test.go +++ b/pkg/reduce/pbq/store/wal/segment_test.go @@ -46,7 +46,7 @@ func Test_writeReadHeader(t *testing.T) { id := partition.ID{ Start: time.Unix(1665109020, 0).In(location), End: time.Unix(1665109020, 0).Add(time.Minute).In(location), - Key: "test1", + Slot: "test1", } tmp := t.TempDir() @@ -89,7 +89,7 @@ func Test_encodeDecodeHeader(t *testing.T) { id: &partition.ID{ Start: time.Unix(1665109020, 0).In(location), End: time.Unix(1665109020, 0).Add(time.Minute).In(location), - Key: "test1,test2", + Slot: "test1,test2", }, }, { @@ -98,7 +98,7 @@ func Test_encodeDecodeHeader(t *testing.T) { id: &partition.ID{ Start: time.Time{}, End: time.Time{}, - Key: "", + Slot: "", }, }, } @@ -126,7 +126,7 @@ func Test_writeReadEntry(t *testing.T) { id := partition.ID{ Start: time.Unix(1665109020, 0).In(location), End: time.Unix(1665109020, 0).Add(time.Minute).In(location), - Key: "test1", + Slot: "test1", } tmp := t.TempDir() @@ -234,7 +234,7 @@ func Test_batchSyncWithMaxBatchSize(t *testing.T) { id := partition.ID{ Start: time.Unix(1665109020, 0).In(location), End: time.Unix(1665109020, 0).Add(time.Minute).In(location), - Key: "test1", + Slot: "test1", } tmp := t.TempDir() @@ -299,7 +299,7 @@ func Test_batchSyncWithSyncDuration(t *testing.T) { id := partition.ID{ Start: time.Unix(1665109020, 0).In(location), End: time.Unix(1665109020, 0).Add(time.Minute).In(location), - Key: "test1", + Slot: "test1", } tmp := t.TempDir() diff --git a/pkg/reduce/pbq/store/wal/stores.go b/pkg/reduce/pbq/store/wal/stores.go index e90af94346..8e3aab79da 100644 --- a/pkg/reduce/pbq/store/wal/stores.go +++ b/pkg/reduce/pbq/store/wal/stores.go @@ -150,8 +150,8 @@ func (ws *walStores) openOrCreateWAL(id *partition.ID) (*WAL, error) { if err != nil { return nil, err } - if id.Key != readPartition.Key { - return nil, fmt.Errorf("expected partition key %s, but got %s", id.Key, readPartition.Key) + if id.Slot != readPartition.Slot { + return nil, fmt.Errorf("expected partition key %s, but got %s", id.Slot, readPartition.Slot) } } diff --git a/pkg/reduce/pbq/store/wal/stores_test.go b/pkg/reduce/pbq/store/wal/stores_test.go index e6afe1625f..69ed5892e6 100644 --- a/pkg/reduce/pbq/store/wal/stores_test.go +++ b/pkg/reduce/pbq/store/wal/stores_test.go @@ -29,17 +29,17 @@ func TestWalStores(t *testing.T) { { Start: time.Unix(60, 0), End: time.Unix(120, 0), - Key: "test-1", + Slot: "test-1", }, { Start: time.Unix(120, 0), End: time.Unix(180, 0), - Key: "test-2", + Slot: "test-2", }, { Start: time.Unix(180, 0), End: time.Unix(240, 0), - Key: "test-3", + Slot: "test-3", }, } diff --git a/pkg/reduce/pnf/processandforward.go b/pkg/reduce/pnf/processandforward.go index 2a8079989b..6115bbbdca 100644 --- a/pkg/reduce/pnf/processandforward.go +++ b/pkg/reduce/pnf/processandforward.go @@ -31,7 +31,6 @@ import ( "go.uber.org/zap" "k8s.io/apimachinery/pkg/util/wait" - dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1" "github.com/numaproj/numaflow/pkg/forward" "github.com/numaproj/numaflow/pkg/isb" "github.com/numaproj/numaflow/pkg/metrics" @@ -39,7 +38,6 @@ import ( "github.com/numaproj/numaflow/pkg/reduce/pbq" "github.com/numaproj/numaflow/pkg/reduce/pbq/partition" "github.com/numaproj/numaflow/pkg/shared/logging" - sharedutil "github.com/numaproj/numaflow/pkg/shared/util" "github.com/numaproj/numaflow/pkg/watermark/processor" "github.com/numaproj/numaflow/pkg/watermark/publish" ) @@ -111,17 +109,7 @@ func (p *ProcessAndForward) Forward(ctx context.Context) error { processorWM := processor.Watermark(p.PartitionID.End) - // decide which ISB to write to - to, err := p.whereToDecider.WhereTo(p.PartitionID.Key) - if err != nil { - platformError.With(map[string]string{ - metrics.LabelVertex: p.vertexName, - metrics.LabelPipeline: p.pipelineName, - metrics.LabelVertexReplicaIndex: strconv.Itoa(int(p.vertexReplica)), - }).Inc() - return err - } - messagesToStep := p.whereToStep(to) + messagesToStep := p.whereToStep() // store write offsets to publish watermark writeOffsets := make(map[string][]isb.Offset) @@ -161,7 +149,7 @@ func (p *ProcessAndForward) Forward(ctx context.Context) error { p.publishWM(processorWM, writeOffsets) // delete the persisted messages - err = p.pbqReader.GC() + err := p.pbqReader.GC() if err != nil { return err } @@ -170,24 +158,33 @@ func (p *ProcessAndForward) Forward(ctx context.Context) error { // whereToStep assigns a message to the ISBs based on the Message.Key. // TODO: we have to introduce support for shuffle, output of a reducer can be input to the next reducer. -func (p *ProcessAndForward) whereToStep(to []string) map[string][]isb.Message { +func (p *ProcessAndForward) whereToStep() map[string][]isb.Message { // writer doesn't accept array of pointers messagesToStep := make(map[string][]isb.Message) - writeMessages := make([]isb.Message, len(p.result)) - for idx, msg := range p.result { - writeMessages[idx] = *msg - } - // if a message is mapped to an isb, all the messages will be mapped to same isb (key is same) - switch { - case sharedutil.StringSliceContains(to, dfv1.MessageKeyAll): - for bufferID := range p.toBuffers { - messagesToStep[bufferID] = writeMessages + var to []string + var err error + for _, msg := range p.result { + to, err = p.whereToDecider.WhereTo(msg.Key) + if err != nil { + platformError.With(map[string]string{ + metrics.LabelVertex: p.vertexName, + metrics.LabelPipeline: p.pipelineName, + metrics.LabelVertexReplicaIndex: strconv.Itoa(int(p.vertexReplica)), + }).Inc() + p.log.Errorw("Got an error while invoking WhereTo, dropping the message", zap.String("key", msg.Key), zap.Error(err), zap.Any("partitionID", p.PartitionID)) + continue + } + + if len(to) == 0 { + continue } - case sharedutil.StringSliceContains(to, dfv1.MessageKeyDrop): - default: + for _, bufferID := range to { - messagesToStep[bufferID] = writeMessages + if _, ok := messagesToStep[bufferID]; !ok { + messagesToStep[bufferID] = make([]isb.Message, 0) + } + messagesToStep[bufferID] = append(messagesToStep[bufferID], *msg) } } diff --git a/pkg/reduce/pnf/processandforward_test.go b/pkg/reduce/pnf/processandforward_test.go index 8e82956884..94eeacfc68 100644 --- a/pkg/reduce/pnf/processandforward_test.go +++ b/pkg/reduce/pnf/processandforward_test.go @@ -42,7 +42,6 @@ import ( "github.com/numaproj/numaflow-go/pkg/function/clienttest" "github.com/stretchr/testify/assert" - dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1" "github.com/numaproj/numaflow/pkg/isb/testutils" udfcall "github.com/numaproj/numaflow/pkg/udf/function" wmstore "github.com/numaproj/numaflow/pkg/watermark/store" @@ -56,15 +55,16 @@ const ( ) type myForwardTest struct { + buffers []string } func (f myForwardTest) WhereTo(key string) ([]string, error) { if strings.Compare(key, "test-forward-one") == 0 { return []string{"buffer1"}, nil } else if strings.Compare(key, "test-forward-all") == 0 { - return []string{dfv1.MessageKeyAll}, nil + return f.buffers, nil } - return []string{dfv1.MessageKeyDrop}, nil + return []string{}, nil } func (f myForwardTest) Apply(ctx context.Context, message *isb.ReadMessage) ([]*isb.Message, error) { @@ -94,7 +94,7 @@ func TestProcessAndForward_Process(t *testing.T) { testPartition := partition.ID{ Start: time.UnixMilli(60000), End: time.UnixMilli(120000), - Key: "partition-1", + Slot: "partition-1", } var err error var pbqManager *pbq.Manager @@ -143,6 +143,7 @@ func TestProcessAndForward_Process(t *testing.T) { assert.NoError(t, err) _, publishWatermark := generic.BuildNoOpWatermarkProgressorsFromBufferMap(make(map[string]isb.BufferWriter)) + // create pf using key and reducer pf := NewProcessAndForward(ctx, "reduce", "test-pipeline", 0, testPartition, client, simplePbq, make(map[string]isb.BufferWriter), myForwardTest{}, publishWatermark) @@ -202,7 +203,7 @@ func TestProcessAndForward_Forward(t *testing.T) { id: partition.ID{ Start: time.UnixMilli(60000), End: time.UnixMilli(120000), - Key: "test-forward-one", + Slot: "test-forward-one", }, buffers: []*simplebuffer.InMemoryBuffer{test1Buffer1, test1Buffer2}, pf: pf1, @@ -226,7 +227,7 @@ func TestProcessAndForward_Forward(t *testing.T) { id: partition.ID{ Start: time.UnixMilli(60000), End: time.UnixMilli(120000), - Key: "test-forward-all", + Slot: "test-forward-all", }, buffers: []*simplebuffer.InMemoryBuffer{test2Buffer1, test2Buffer2}, pf: pf2, @@ -250,7 +251,7 @@ func TestProcessAndForward_Forward(t *testing.T) { id: partition.ID{ Start: time.UnixMilli(60000), End: time.UnixMilli(120000), - Key: "test-drop-all", + Slot: "test-drop-all", }, buffers: []*simplebuffer.InMemoryBuffer{test3Buffer1, test3Buffer2}, pf: pf3, @@ -297,7 +298,7 @@ func createProcessAndForwardAndOTStore(ctx context.Context, key string, pbqManag testPartition := partition.ID{ Start: time.UnixMilli(60000), End: time.UnixMilli(120000), - Key: key, + Slot: key, } // create a pbq for a partition @@ -316,12 +317,21 @@ func createProcessAndForwardAndOTStore(ctx context.Context, key string, pbqManag PaneInfo: isb.PaneInfo{ EventTime: time.UnixMilli(60000), }, - ID: "1", + ID: "1", + Key: key, }, Body: isb.Body{Payload: resultPayload}, }, } + buffers := make([]string, 0) + for k := range toBuffers { + buffers = append(buffers, k) + } + whereto := &myForwardTest{ + buffers: buffers, + } + pf := ProcessAndForward{ PartitionID: testPartition, UDF: nil, @@ -329,7 +339,7 @@ func createProcessAndForwardAndOTStore(ctx context.Context, key string, pbqManag pbqReader: simplePbq, log: logging.FromContext(ctx), toBuffers: toBuffers, - whereToDecider: myForwardTest{}, + whereToDecider: whereto, publishWatermark: pw, } diff --git a/pkg/reduce/readloop/ordered_test.go b/pkg/reduce/readloop/ordered_test.go index 4050405ccc..5b32288bc0 100644 --- a/pkg/reduce/readloop/ordered_test.go +++ b/pkg/reduce/readloop/ordered_test.go @@ -18,7 +18,6 @@ package readloop import ( "context" - "fmt" "testing" "time" @@ -47,7 +46,6 @@ func (f myForwardTest) Apply(ctx context.Context, message *isb.ReadMessage) ([]* } func TestOrderedProcessing(t *testing.T) { - // Test Reducer returns the messages as is identityReducer := applier.ApplyReduceFunc(func(ctx context.Context, partitionID *partition.ID, input <-chan *isb.ReadMessage) ([]*isb.Message, error) { messages := make([]*isb.Message, 0) @@ -119,10 +117,9 @@ func TestOrderedProcessing(t *testing.T) { count := 0 for e := op.taskQueue.Front(); e != nil; e = e.Next() { pfTask := e.Value.(*task) - assert.Equal(t, partition.ID{Key: fmt.Sprintf("partition-%d", count)}, pfTask.pf.PartitionID) + assert.Equal(t, partitionFor(count), pfTask.pf.PartitionID) count = count + 1 } - for _, id := range tt.reduceOrder { p := pbqManager.GetPBQ(id) p.CloseOfBook() @@ -153,7 +150,7 @@ func TestOrderedProcessing(t *testing.T) { func partitions(count int) []partition.ID { partitions := make([]partition.ID, count) for i := 0; i < count; i++ { - partitions[i] = partition.ID{Key: fmt.Sprintf("partition-%d", i)} + partitions[i] = partitionFor(i) } return partitions } @@ -161,11 +158,16 @@ func partitions(count int) []partition.ID { func partitionsFor(partitionIdx []int) []partition.ID { partitions := make([]partition.ID, len(partitionIdx)) for i, idx := range partitionIdx { - partitions[i] = partition.ID{Key: fmt.Sprintf("partition-%d", idx)} + partitions[i] = partitionFor(idx) } return partitions } +func partitionFor(i int) partition.ID { + base := 10000 + return partition.ID{Start: time.UnixMilli(int64(base * i)), End: time.UnixMilli(int64(base * (i + 1)))} +} + func taskForPartition(op *orderedForwarder, partitionId partition.ID) *task { op.RLock() defer op.RUnlock() diff --git a/pkg/reduce/readloop/readloop.go b/pkg/reduce/readloop/readloop.go index c0aa99ceae..3b9a1ed2d1 100644 --- a/pkg/reduce/readloop/readloop.go +++ b/pkg/reduce/readloop/readloop.go @@ -123,7 +123,7 @@ func (rl *ReadLoop) Startup(ctx context.Context) error { // add key to the window, so that when a new message with the watermark greater than // the window end time comes, key will not be lost and the windows will be closed as expected - keyedWindow.AddKey(p.Key) + keyedWindow.AddSlot(p.Slot) // create and invoke process and forward for the partition rl.associatePBQAndPnF(ctx, p) @@ -212,7 +212,7 @@ messagesLoop: partitionID := partition.ID{ Start: kw.StartTime(), End: kw.EndTime(), - Key: message.Key, + Slot: "slot-0", // FIXME: revisit this later. for now hard coded to slot-0 } err := rl.writeToPBQ(ctx, partitionID, message) @@ -394,7 +394,8 @@ func (rl *ReadLoop) upsertWindowsAndKeys(m *isb.ReadMessage) []window.AlignedKey rl.log.Debugw("Found an existing window", zap.String("msg.offset", m.ID), zap.Int64("startTime", w.StartTime().UnixMilli()), zap.Int64("endTime", w.EndTime().UnixMilli())) } // track the key to window relationship - w.AddKey(m.Key) + // FIXME: create a slot from m.key + w.AddSlot("slot-0") kWindows = append(kWindows, w) } return kWindows diff --git a/pkg/reduce/readloop/readloop_test.go b/pkg/reduce/readloop/readloop_test.go index 08e7d29974..a338820456 100644 --- a/pkg/reduce/readloop/readloop_test.go +++ b/pkg/reduce/readloop/readloop_test.go @@ -31,31 +31,37 @@ func (s *SumReduceTest) WhereTo(_ string) ([]string, error) { return []string{"reduce-buffer"}, nil } -func (s *SumReduceTest) ApplyReduce(_ context.Context, partitionID *partition.ID, messageStream <-chan *isb.ReadMessage) ([]*isb.Message, error) { - sum := 0 +func (s SumReduceTest) ApplyReduce(_ context.Context, partitionID *partition.ID, messageStream <-chan *isb.ReadMessage) ([]*isb.Message, error) { + sums := make(map[string]int) + for msg := range messageStream { var payload PayloadForTest _ = json.Unmarshal(msg.Payload, &payload) - sum += payload.Value + key := msg.Key + sums[key] += payload.Value } - payload := PayloadForTest{Key: "sum", Value: sum} - b, _ := json.Marshal(payload) - ret := &isb.Message{ - Header: isb.Header{ - PaneInfo: isb.PaneInfo{ - StartTime: partitionID.Start, - EndTime: partitionID.End, - EventTime: partitionID.End, + msgs := make([]*isb.Message, 0) + + for k, s := range sums { + payload := PayloadForTest{Key: k, Value: s} + b, _ := json.Marshal(payload) + msg := &isb.Message{ + Header: isb.Header{ + PaneInfo: isb.PaneInfo{ + StartTime: partitionID.Start, + EndTime: partitionID.End, + EventTime: partitionID.End, + }, + ID: "msgID", + Key: k, }, - ID: "msgID", - Key: "result", - }, - Body: isb.Body{Payload: b}, + Body: isb.Body{Payload: b}, + } + msgs = append(msgs, msg) } - return []*isb.Message{ - ret, - }, nil + + return msgs, nil } // testing startup code with replay included using in-memory pbq @@ -68,22 +74,22 @@ func TestReadLoop_Startup(t *testing.T) { { Start: time.Unix(60, 0), End: time.Unix(120, 0), - Key: "even", + Slot: "window", }, { Start: time.Unix(60, 0), End: time.Unix(120, 0), - Key: "odd", + Slot: "window", }, { Start: time.Unix(120, 0), End: time.Unix(180, 0), - Key: "odd", + Slot: "window", }, { Start: time.Unix(180, 0), End: time.Unix(240, 0), - Key: "even", + Slot: "window", }, } @@ -94,14 +100,14 @@ func TestReadLoop_Startup(t *testing.T) { assert.NoError(t, err) var msgVal int - if id.Key == "even" { + if id.Slot == "even" { msgVal = 2 } else { msgVal = 3 } // write messages to the store, which will be replayed - storeMessages := createStoreMessages(ctx, id.Key, msgVal, id.Start, 10) + storeMessages := createStoreMessages(ctx, id.Slot, msgVal, id.Start, 10) for _, msg := range storeMessages { err = memStore.Write(msg) assert.NoError(t, err) @@ -110,7 +116,7 @@ func TestReadLoop_Startup(t *testing.T) { pManager, _ := pbq.NewManager(ctx, "reduce", "test-pipeline", 0, memStoreProvider, pbq.WithChannelBufferSize(10)) - to1 := simplebuffer.NewInMemoryBuffer("reduce-buffer", 4) + to1 := simplebuffer.NewInMemoryBuffer("reduce-buffer", 3) toSteps := map[string]isb.BufferWriter{ "reduce-buffer": to1, } @@ -144,6 +150,7 @@ func TestReadLoop_Startup(t *testing.T) { } rl.Process(ctx, []*isb.ReadMessage{latestMessage}) + for !to1.IsFull() { select { case <-ctx.Done(): @@ -154,29 +161,24 @@ func TestReadLoop_Startup(t *testing.T) { } } - msgs, readErr := to1.Read(ctx, 4) + msgs, readErr := to1.Read(ctx, 3) assert.Nil(t, readErr) - assert.Len(t, msgs, 4) - + assert.Len(t, msgs, 3) // since we have 3 partitions we should have 3 different outputs var readMessagePayload1 PayloadForTest var readMessagePayload2 PayloadForTest var readMessagePayload3 PayloadForTest - var readMessagePayload4 PayloadForTest _ = json.Unmarshal(msgs[0].Payload, &readMessagePayload1) _ = json.Unmarshal(msgs[1].Payload, &readMessagePayload2) _ = json.Unmarshal(msgs[2].Payload, &readMessagePayload3) - _ = json.Unmarshal(msgs[3].Payload, &readMessagePayload4) // since we had 10 messages in the store with value 2 and 3 // the expected value is 20 and 30, since the reduce operation is sum - assert.Contains(t, []int{20, 30, 20, 30}, readMessagePayload1.Value) + assert.Contains(t, []int{20, 30, 20, 30, 60}, readMessagePayload1.Value) assert.Contains(t, []int{20, 30, 20, 30}, readMessagePayload2.Value) assert.Contains(t, []int{20, 30, 20, 30}, readMessagePayload3.Value) - assert.Contains(t, []int{20, 30, 20, 30}, readMessagePayload4.Value) - assert.Equal(t, "sum", readMessagePayload1.Key) - assert.Equal(t, "sum", readMessagePayload2.Key) - assert.Equal(t, "sum", readMessagePayload3.Key) - assert.Equal(t, "sum", readMessagePayload4.Key) + assert.Equal(t, "window", readMessagePayload1.Key) + assert.Equal(t, "window", readMessagePayload2.Key) + assert.Equal(t, "window", readMessagePayload3.Key) } diff --git a/pkg/shared/util/env.go b/pkg/shared/util/env.go index de4969e83a..6c903685fd 100644 --- a/pkg/shared/util/env.go +++ b/pkg/shared/util/env.go @@ -17,7 +17,9 @@ limitations under the License. package util import ( + "fmt" "os" + "strconv" ) func LookupEnvStringOr(key, defaultValue string) string { @@ -27,3 +29,15 @@ func LookupEnvStringOr(key, defaultValue string) string { return defaultValue } } + +func LookupEnvIntOr(key string, defaultValue int) int { + if valStr, existing := os.LookupEnv(key); existing && valStr != "" { + val, err := strconv.Atoi(valStr) + if err != nil { + panic(fmt.Errorf("invalid value for env variable %q, value %q", key, valStr)) + } + return val + } else { + return defaultValue + } +} diff --git a/pkg/udf/function/uds_grpc.go b/pkg/udf/function/uds_grpc.go index 2b204daf77..76436ddf6a 100644 --- a/pkg/udf/function/uds_grpc.go +++ b/pkg/udf/function/uds_grpc.go @@ -25,14 +25,14 @@ import ( functionpb "github.com/numaproj/numaflow-go/pkg/apis/proto/function/v1" functionsdk "github.com/numaproj/numaflow-go/pkg/function" - "github.com/numaproj/numaflow-go/pkg/function/client" + "google.golang.org/grpc/metadata" + "google.golang.org/protobuf/types/known/emptypb" + "google.golang.org/protobuf/types/known/timestamppb" + map_applier "github.com/numaproj/numaflow/pkg/forward/applier" "github.com/numaproj/numaflow/pkg/isb" reduce_applier "github.com/numaproj/numaflow/pkg/reduce/applier" "github.com/numaproj/numaflow/pkg/reduce/pbq/partition" - "google.golang.org/grpc/metadata" - "google.golang.org/protobuf/types/known/emptypb" - "google.golang.org/protobuf/types/known/timestamppb" ) // udsGRPCBasedUDF applies user defined function over gRPC (over Unix Domain Socket) client/server where server is the UDF. @@ -44,11 +44,7 @@ var _ map_applier.MapApplier = (*udsGRPCBasedUDF)(nil) var _ reduce_applier.ReduceApplier = (*udsGRPCBasedUDF)(nil) // NewUDSGRPCBasedUDF returns a new udsGRPCBasedUDF object. -func NewUDSGRPCBasedUDF() (*udsGRPCBasedUDF, error) { - c, err := client.New() // Can we pass this as a parameter to the function? - if err != nil { - return nil, fmt.Errorf("failed to create a new gRPC client: %w", err) - } +func NewUDSGRPCBasedUDF(c functionsdk.Client) (*udsGRPCBasedUDF, error) { return &udsGRPCBasedUDF{c}, nil } @@ -131,7 +127,7 @@ func (u *udsGRPCBasedUDF) ApplyReduce(ctx context.Context, partitionID *partitio // pass key and window information inside the context mdMap := map[string]string{ - functionsdk.DatumKey: partitionID.Key, + functionsdk.DatumKey: partitionID.Slot, functionsdk.WinStartTime: strconv.FormatInt(partitionID.Start.UnixMilli(), 10), functionsdk.WinEndTime: strconv.FormatInt(partitionID.End.UnixMilli(), 10), } diff --git a/pkg/udf/function/uds_grpc_test.go b/pkg/udf/function/uds_grpc_test.go index 921e6bc4bf..15d4f370c4 100644 --- a/pkg/udf/function/uds_grpc_test.go +++ b/pkg/udf/function/uds_grpc_test.go @@ -27,6 +27,7 @@ import ( functionpb "github.com/numaproj/numaflow-go/pkg/apis/proto/function/v1" "github.com/numaproj/numaflow-go/pkg/apis/proto/function/v1/funcmock" "github.com/numaproj/numaflow-go/pkg/function/clienttest" + "github.com/numaproj/numaflow/pkg/isb" "github.com/numaproj/numaflow/pkg/isb/testutils" "github.com/numaproj/numaflow/pkg/reduce/pbq/partition" @@ -305,7 +306,7 @@ func TestGRPCBasedUDF_BasicReduceWithMockClient(t *testing.T) { partitionID := &partition.ID{ Start: time.Unix(60, 0), End: time.Unix(120, 0), - Key: "test", + Slot: "test", } got, err := u.ApplyReduce(ctx, partitionID, messageCh) @@ -356,7 +357,7 @@ func TestGRPCBasedUDF_BasicReduceWithMockClient(t *testing.T) { partitionID := &partition.ID{ Start: time.Unix(60, 0), End: time.Unix(120, 0), - Key: "test", + Slot: "test", } _, err := u.ApplyReduce(ctx, partitionID, messageCh) @@ -399,7 +400,7 @@ func TestGRPCBasedUDF_BasicReduceWithMockClient(t *testing.T) { partitionID := &partition.ID{ Start: time.Unix(60, 0), End: time.Unix(120, 0), - Key: "test", + Slot: "test", } _, err := u.ApplyReduce(ctx, partitionID, messageCh) @@ -468,7 +469,7 @@ func TestHGRPCBasedUDF_Reduce(t *testing.T) { partitionID := &partition.ID{ Start: time.Unix(60, 0), End: time.Unix(120, 0), - Key: "test", + Slot: "test", } result, err := u.ApplyReduce(ctx, partitionID, messageCh) diff --git a/pkg/udf/map_udf.go b/pkg/udf/map_udf.go index ef6e835f1d..a7b7972d3a 100644 --- a/pkg/udf/map_udf.go +++ b/pkg/udf/map_udf.go @@ -22,6 +22,7 @@ import ( "sync" "time" + "github.com/numaproj/numaflow-go/pkg/function/client" "go.uber.org/zap" dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1" @@ -101,7 +102,13 @@ func (u *MapUDFProcessor) Start(ctx context.Context) error { }) log = log.With("protocol", "uds-grpc-map-udf") - udfHandler, err := function.NewUDSGRPCBasedUDF() + maxMessageSize := sharedutil.LookupEnvIntOr(dfv1.EnvGRPCMaxMessageSize, dfv1.DefaultGRPCMaxMessageSize) + c, err := client.New(client.WithMaxMessageSize(maxMessageSize)) + if err != nil { + return fmt.Errorf("failed to create a new gRPC client: %w", err) + } + + udfHandler, err := function.NewUDSGRPCBasedUDF(c) if err != nil { return fmt.Errorf("failed to create gRPC client, %w", err) } diff --git a/pkg/udf/reduce_udf.go b/pkg/udf/reduce_udf.go index 3644a65f50..d0a225f329 100644 --- a/pkg/udf/reduce_udf.go +++ b/pkg/udf/reduce_udf.go @@ -23,9 +23,11 @@ import ( "sync" "time" + "github.com/numaproj/numaflow-go/pkg/function/client" "go.uber.org/zap" "github.com/numaproj/numaflow/pkg/forward" + "github.com/numaproj/numaflow/pkg/metrics" "github.com/numaproj/numaflow/pkg/reduce/pbq/store/wal" "github.com/numaproj/numaflow/pkg/reduce/pbq" @@ -34,7 +36,6 @@ import ( dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1" "github.com/numaproj/numaflow/pkg/isb" - "github.com/numaproj/numaflow/pkg/metrics" "github.com/numaproj/numaflow/pkg/reduce" "github.com/numaproj/numaflow/pkg/shared/logging" sharedutil "github.com/numaproj/numaflow/pkg/shared/util" @@ -130,7 +131,14 @@ func (u *ReduceUDFProcessor) Start(ctx context.Context) error { }) log = log.With("protocol", "uds-grpc-reduce-udf") - udfHandler, err := function.NewUDSGRPCBasedUDF() + + maxMessageSize := sharedutil.LookupEnvIntOr(dfv1.EnvGRPCMaxMessageSize, dfv1.DefaultGRPCMaxMessageSize) + c, err := client.New(client.WithMaxMessageSize(maxMessageSize)) + if err != nil { + return fmt.Errorf("failed to create a new gRPC client: %w", err) + } + + udfHandler, err := function.NewUDSGRPCBasedUDF(c) if err != nil { return fmt.Errorf("failed to create gRPC client, %w", err) } @@ -146,6 +154,31 @@ func (u *ReduceUDFProcessor) Start(ctx context.Context) error { }() log.Infow("Start processing reduce udf messages", zap.String("isbsvc", string(u.ISBSvcType)), zap.String("from", fromBuffer.Name)) + // start metrics server + // TODO: make into a function + metricsOpts := []metrics.Option{ + metrics.WithLookbackSeconds(int64(u.VertexInstance.Vertex.Spec.Scale.GetLookbackSeconds())), + } + if sharedutil.LookupEnvStringOr(dfv1.EnvHealthCheckDisabled, "false") != "true" { + metricsOpts = append(metricsOpts, metrics.WithHealthCheckExecutor(func() error { + cctx, cancel := context.WithTimeout(ctx, 20*time.Second) + defer cancel() + return udfHandler.WaitUntilReady(cctx) + })) + } + if x, ok := reader.(isb.LagReader); ok { + metricsOpts = append(metricsOpts, metrics.WithLagReader(x)) + } + if x, ok := reader.(isb.Ratable); ok { + metricsOpts = append(metricsOpts, metrics.WithRater(x)) + } + ms := metrics.NewMetricsServer(u.VertexInstance.Vertex, metricsOpts...) + if shutdown, err := ms.Start(ctx); err != nil { + return fmt.Errorf("failed to start metrics server, error: %w", err) + } else { + defer func() { _ = shutdown(context.Background()) }() + } + storeProvider := wal.NewWALStores(u.VertexInstance, wal.WithStorePath(dfv1.DefaultStorePath), wal.WithMaxBufferSize(dfv1.DefaultStoreMaxBufferSize), wal.WithSyncDuration(dfv1.DefaultStoreSyncDuration)) pbqManager, err := pbq.NewManager(ctx, u.VertexInstance.Vertex.Spec.Name, u.VertexInstance.Vertex.Spec.PipelineName, u.VertexInstance.Replica, storeProvider) @@ -172,29 +205,6 @@ func (u *ReduceUDFProcessor) Start(ctx context.Context) error { log.Info("Forwarder stopped, exiting reduce udf data processor...") }() - metricsOpts := []metrics.Option{ - metrics.WithLookbackSeconds(int64(u.VertexInstance.Vertex.Spec.Scale.GetLookbackSeconds())), - } - if sharedutil.LookupEnvStringOr(dfv1.EnvHealthCheckDisabled, "false") != "true" { - metricsOpts = append(metricsOpts, metrics.WithHealthCheckExecutor(func() error { - cctx, cancel := context.WithTimeout(ctx, 20*time.Second) - defer cancel() - return udfHandler.WaitUntilReady(cctx) - })) - } - if x, ok := reader.(isb.LagReader); ok { - metricsOpts = append(metricsOpts, metrics.WithLagReader(x)) - } - if x, ok := reader.(isb.Ratable); ok { - metricsOpts = append(metricsOpts, metrics.WithRater(x)) - } - ms := metrics.NewMetricsServer(u.VertexInstance.Vertex, metricsOpts...) - if shutdown, err := ms.Start(ctx); err != nil { - return fmt.Errorf("failed to start metrics server, error: %w", err) - } else { - defer func() { _ = shutdown(context.Background()) }() - } - <-ctx.Done() log.Info("SIGTERM, exiting...") wg.Wait() diff --git a/pkg/window/keyed/keyed.go b/pkg/window/keyed/keyed.go index 734286cdef..154cafbc81 100644 --- a/pkg/window/keyed/keyed.go +++ b/pkg/window/keyed/keyed.go @@ -61,8 +61,8 @@ func (kw *AlignedKeyedWindow) EndTime() time.Time { return kw.End } -// AddKey adds a key to an existing window -func (kw *AlignedKeyedWindow) AddKey(key string) { +// AddSlot adds a slot to an existing window +func (kw *AlignedKeyedWindow) AddSlot(key string) { kw.lock.Lock() defer kw.lock.Unlock() if _, ok := kw.keys[key]; !ok { @@ -78,7 +78,7 @@ func (kw *AlignedKeyedWindow) Partitions() []partition.ID { partitions := make([]partition.ID, len(kw.keys)) idx := 0 for k := range kw.keys { - partitions[idx] = partition.ID{Start: kw.StartTime(), End: kw.EndTime(), Key: k} + partitions[idx] = partition.ID{Start: kw.StartTime(), End: kw.EndTime(), Slot: k} idx++ } diff --git a/pkg/window/keyed/keyed_test.go b/pkg/window/keyed/keyed_test.go index ba71fef487..54bfd096d1 100644 --- a/pkg/window/keyed/keyed_test.go +++ b/pkg/window/keyed/keyed_test.go @@ -17,13 +17,10 @@ limitations under the License. package keyed import ( - "sort" "testing" "time" "github.com/stretchr/testify/assert" - - "github.com/numaproj/numaflow/pkg/reduce/pbq/partition" ) func TestKeyedWindow_AddKey(t *testing.T) { @@ -54,9 +51,9 @@ func TestKeyedWindow_AddKey(t *testing.T) { t.Run(tt.name, func(t *testing.T) { kw = NewKeyedWindow(time.Unix(60, 0), time.Unix(120, 0)) for k := range tt.given.keys { - kw.AddKey(k) + kw.AddSlot(k) } - kw.AddKey(tt.input) + kw.AddSlot(tt.input) assert.Equal(t, len(tt.expectedKeys), len(kw.keys)) for k := range tt.expectedKeys { _, ok := kw.keys[k] @@ -65,57 +62,3 @@ func TestKeyedWindow_AddKey(t *testing.T) { }) } } - -func TestKeyedWindow_Partitions(t *testing.T) { - kw := NewKeyedWindow(time.Unix(60, 0), time.Unix(120, 0)) - tests := []struct { - name string - given *AlignedKeyedWindow - input string - expected []partition.ID - }{ - { - name: "no_keys", - given: &AlignedKeyedWindow{}, - expected: []partition.ID{}, - }, - { - name: "with_some_existing_keys", - given: &AlignedKeyedWindow{ - keys: map[string]struct{}{"key2": {}, "key3": {}, "key4": {}}, - }, - expected: []partition.ID{ - { - Key: "key2", - Start: time.Unix(60, 0), - End: time.Unix(120, 0), - }, - { - Key: "key3", - Start: time.Unix(60, 0), - End: time.Unix(120, 0), - }, - { - Key: "key4", - Start: time.Unix(60, 0), - End: time.Unix(120, 0), - }, - }, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - kw.keys = tt.given.keys - ret := kw.Partitions() - // the kw.keys is a map so the order of the output is random - // use sort to sort the ret array by key - sort.Slice(ret, func(i int, j int) bool { - return ret[i].Key < ret[j].Key - }) - for idx, s := range tt.expected { - assert.EqualValues(t, ret[idx], s) - } - }) - } -} diff --git a/pkg/window/windower.go b/pkg/window/windower.go index 58275ed9b8..3093e1c37d 100644 --- a/pkg/window/windower.go +++ b/pkg/window/windower.go @@ -29,8 +29,8 @@ type AlignedKeyedWindower interface { StartTime() time.Time // EndTime returns the end time of the window EndTime() time.Time - // AddKey adds a key to the window - AddKey(string) + // AddSlot adds a slot to the window. Slots are hash-ranges for keys. + AddSlot(string) // Partitions returns an array of partition ids Partitions() []partition.ID // Keys returns an array of keys diff --git a/test/fixtures/when.go b/test/fixtures/when.go index 97d12d0a6d..b383c91f4a 100644 --- a/test/fixtures/when.go +++ b/test/fixtures/when.go @@ -22,11 +22,12 @@ import ( "testing" "time" - dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1" - flowpkg "github.com/numaproj/numaflow/pkg/client/clientset/versioned/typed/numaflow/v1alpha1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" + + dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1" + flowpkg "github.com/numaproj/numaflow/pkg/client/clientset/versioned/typed/numaflow/v1alpha1" ) type When struct {