Skip to content

Commit

Permalink
chore: watermark and eventtime in windowing operations (#594)
Browse files Browse the repository at this point in the history
Signed-off-by: ashwinidulams <[email protected]>
Signed-off-by: Vigith Maurice <[email protected]>
Co-authored-by: Vigith Maurice <[email protected]>
  • Loading branch information
ashwinidulams and vigith authored Mar 15, 2023
1 parent 5282766 commit 51903be
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 10 deletions.
16 changes: 16 additions & 0 deletions docs/user-guide/user-defined-functions/reduce/reduce.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,22 @@ It is wrong to give a `parallelism` > `1` if it is a _non-keyed_ vertex (`keyed:
There are a couple of [examples](examples.md) that demonstrates Fixed windows, Sliding windows,
chaining of windows, keyed streams, etc.

## Time Characteristics

All windowing operations generate new records as an output of reduce operations. Event-time and Watermark
are two main primitives that determine how the time propagates in a streaming application. so for all new
records generated in a reduce operation, event time is set to the end time of the window.

For example, for a reduce operation over a keyed/non-keyed window with a start and end defined by
`[2031-09-29T18:47:00Z, 2031-09-29T18:48:00Z)`, event time for all the records generated will be set to
`2031-09-29T18:47:59.999Z` since millisecond is the smallest granularity (as of now) event time is set to
the last timestamp that belongs to a window.

Watermark is treated similarly, the watermark is set to the last timestamp for a given window.
So for the example above, the value of the watermark will be set to the last timestamp, i.e., `2031-09-29T18:47:59.999Z`.

This applies to all the window types regardless of whether they are keyed or non-keyed windows.

## Storage

Reduce unlike map requires persistence. To support persistence user has to define the
Expand Down
3 changes: 2 additions & 1 deletion pkg/reduce/pnf/processandforward.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,8 @@ func (p *ProcessAndForward) Forward(ctx context.Context) error {
metrics.LabelVertexReplicaIndex: strconv.Itoa(int(p.vertexReplica)),
}).Observe(float64(time.Since(startTime).Microseconds()))

processorWM := wmb.Watermark(p.PartitionID.Start)
// millisecond is the lowest granularity currently supported.
processorWM := wmb.Watermark(p.PartitionID.End.Add(-1 * time.Millisecond))

messagesToStep := p.whereToStep()

Expand Down
12 changes: 6 additions & 6 deletions pkg/reduce/pnf/processandforward_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,12 +222,12 @@ func TestProcessAndForward_Forward(t *testing.T) {
wmExpected: map[string]wmb.WMB{
"buffer1": {
Offset: 0,
Watermark: int64(60000),
Watermark: int64(119999),
Idle: false,
},
"buffer2": {
Offset: 0,
Watermark: int64(60000),
Watermark: int64(119999),
Idle: true,
},
},
Expand All @@ -246,12 +246,12 @@ func TestProcessAndForward_Forward(t *testing.T) {
wmExpected: map[string]wmb.WMB{
"buffer1": {
Offset: 0,
Watermark: int64(60000),
Watermark: int64(119999),
Idle: false,
},
"buffer2": {
Offset: 0,
Watermark: int64(60000),
Watermark: int64(119999),
Idle: false,
},
},
Expand All @@ -270,12 +270,12 @@ func TestProcessAndForward_Forward(t *testing.T) {
wmExpected: map[string]wmb.WMB{
"buffer1": {
Offset: 0,
Watermark: int64(60000),
Watermark: int64(119999),
Idle: true,
},
"buffer2": {
Offset: 0,
Watermark: int64(60000),
Watermark: int64(119999),
Idle: true,
},
},
Expand Down
2 changes: 1 addition & 1 deletion pkg/udf/function/uds_grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ readLoop:
writeMessage := &isb.Message{
Header: isb.Header{
MessageInfo: isb.MessageInfo{
EventTime: partitionID.Start,
EventTime: partitionID.End.Add(-1 * time.Millisecond),
IsLate: false,
},
Key: key,
Expand Down
6 changes: 4 additions & 2 deletions pkg/udf/function/uds_grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,7 @@ func TestGRPCBasedUDF_BasicReduceWithMockClient(t *testing.T) {
got, err := u.ApplyReduce(ctx, partitionID, messageCh)

assert.Len(t, got, 1)
assert.Equal(t, time.Unix(120, 0).Add(-1*time.Millisecond), got[0].EventTime)
assert.NoError(t, err)
})

Expand Down Expand Up @@ -507,6 +508,7 @@ func TestHGRPCBasedUDF_Reduce(t *testing.T) {
_ = json.Unmarshal(result[0].Payload, &resultPayload)

assert.NoError(t, err)
assert.Equal(t, result[0].Key, "sum")
assert.Equal(t, resultPayload.Value, int64(45))
assert.Equal(t, "sum", result[0].Key)
assert.Equal(t, int64(45), resultPayload.Value)
assert.Equal(t, time.Unix(120, 0).Add(-1*time.Millisecond), result[0].EventTime)
}

0 comments on commit 51903be

Please sign in to comment.