From 51903be6f72fe036c811c39cae40a8dc35218edd Mon Sep 17 00:00:00 2001 From: ashwinidulams <61725106+ashwinidulams@users.noreply.github.com> Date: Wed, 15 Mar 2023 20:53:42 +0530 Subject: [PATCH] chore: watermark and eventtime in windowing operations (#594) Signed-off-by: ashwinidulams Signed-off-by: Vigith Maurice Co-authored-by: Vigith Maurice --- .../user-defined-functions/reduce/reduce.md | 16 ++++++++++++++++ pkg/reduce/pnf/processandforward.go | 3 ++- pkg/reduce/pnf/processandforward_test.go | 12 ++++++------ pkg/udf/function/uds_grpc.go | 2 +- pkg/udf/function/uds_grpc_test.go | 6 ++++-- 5 files changed, 29 insertions(+), 10 deletions(-) diff --git a/docs/user-guide/user-defined-functions/reduce/reduce.md b/docs/user-guide/user-defined-functions/reduce/reduce.md index a8e6c6a2f9..a4922b7ec4 100644 --- a/docs/user-guide/user-defined-functions/reduce/reduce.md +++ b/docs/user-guide/user-defined-functions/reduce/reduce.md @@ -68,6 +68,22 @@ It is wrong to give a `parallelism` > `1` if it is a _non-keyed_ vertex (`keyed: There are a couple of [examples](examples.md) that demonstrates Fixed windows, Sliding windows, chaining of windows, keyed streams, etc. +## Time Characteristics + +All windowing operations generate new records as an output of reduce operations. Event-time and Watermark +are two main primitives that determine how the time propagates in a streaming application. so for all new +records generated in a reduce operation, event time is set to the end time of the window. + +For example, for a reduce operation over a keyed/non-keyed window with a start and end defined by +`[2031-09-29T18:47:00Z, 2031-09-29T18:48:00Z)`, event time for all the records generated will be set to +`2031-09-29T18:47:59.999Z` since millisecond is the smallest granularity (as of now) event time is set to +the last timestamp that belongs to a window. + +Watermark is treated similarly, the watermark is set to the last timestamp for a given window. +So for the example above, the value of the watermark will be set to the last timestamp, i.e., `2031-09-29T18:47:59.999Z`. + +This applies to all the window types regardless of whether they are keyed or non-keyed windows. + ## Storage Reduce unlike map requires persistence. To support persistence user has to define the diff --git a/pkg/reduce/pnf/processandforward.go b/pkg/reduce/pnf/processandforward.go index 6874fd42ee..1ae4296f61 100644 --- a/pkg/reduce/pnf/processandforward.go +++ b/pkg/reduce/pnf/processandforward.go @@ -107,7 +107,8 @@ func (p *ProcessAndForward) Forward(ctx context.Context) error { metrics.LabelVertexReplicaIndex: strconv.Itoa(int(p.vertexReplica)), }).Observe(float64(time.Since(startTime).Microseconds())) - processorWM := wmb.Watermark(p.PartitionID.Start) + // millisecond is the lowest granularity currently supported. + processorWM := wmb.Watermark(p.PartitionID.End.Add(-1 * time.Millisecond)) messagesToStep := p.whereToStep() diff --git a/pkg/reduce/pnf/processandforward_test.go b/pkg/reduce/pnf/processandforward_test.go index 7600cfbac3..69465abfa8 100644 --- a/pkg/reduce/pnf/processandforward_test.go +++ b/pkg/reduce/pnf/processandforward_test.go @@ -222,12 +222,12 @@ func TestProcessAndForward_Forward(t *testing.T) { wmExpected: map[string]wmb.WMB{ "buffer1": { Offset: 0, - Watermark: int64(60000), + Watermark: int64(119999), Idle: false, }, "buffer2": { Offset: 0, - Watermark: int64(60000), + Watermark: int64(119999), Idle: true, }, }, @@ -246,12 +246,12 @@ func TestProcessAndForward_Forward(t *testing.T) { wmExpected: map[string]wmb.WMB{ "buffer1": { Offset: 0, - Watermark: int64(60000), + Watermark: int64(119999), Idle: false, }, "buffer2": { Offset: 0, - Watermark: int64(60000), + Watermark: int64(119999), Idle: false, }, }, @@ -270,12 +270,12 @@ func TestProcessAndForward_Forward(t *testing.T) { wmExpected: map[string]wmb.WMB{ "buffer1": { Offset: 0, - Watermark: int64(60000), + Watermark: int64(119999), Idle: true, }, "buffer2": { Offset: 0, - Watermark: int64(60000), + Watermark: int64(119999), Idle: true, }, }, diff --git a/pkg/udf/function/uds_grpc.go b/pkg/udf/function/uds_grpc.go index 4b2020a89e..38c5aa3f69 100644 --- a/pkg/udf/function/uds_grpc.go +++ b/pkg/udf/function/uds_grpc.go @@ -189,7 +189,7 @@ readLoop: writeMessage := &isb.Message{ Header: isb.Header{ MessageInfo: isb.MessageInfo{ - EventTime: partitionID.Start, + EventTime: partitionID.End.Add(-1 * time.Millisecond), IsLate: false, }, Key: key, diff --git a/pkg/udf/function/uds_grpc_test.go b/pkg/udf/function/uds_grpc_test.go index b879bc7dee..8b9e22815a 100644 --- a/pkg/udf/function/uds_grpc_test.go +++ b/pkg/udf/function/uds_grpc_test.go @@ -321,6 +321,7 @@ func TestGRPCBasedUDF_BasicReduceWithMockClient(t *testing.T) { got, err := u.ApplyReduce(ctx, partitionID, messageCh) assert.Len(t, got, 1) + assert.Equal(t, time.Unix(120, 0).Add(-1*time.Millisecond), got[0].EventTime) assert.NoError(t, err) }) @@ -507,6 +508,7 @@ func TestHGRPCBasedUDF_Reduce(t *testing.T) { _ = json.Unmarshal(result[0].Payload, &resultPayload) assert.NoError(t, err) - assert.Equal(t, result[0].Key, "sum") - assert.Equal(t, resultPayload.Value, int64(45)) + assert.Equal(t, "sum", result[0].Key) + assert.Equal(t, int64(45), resultPayload.Value) + assert.Equal(t, time.Unix(120, 0).Add(-1*time.Millisecond), result[0].EventTime) }