Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: idle watermark v0 #520

Merged
merged 24 commits into from
Feb 4, 2023
Merged

feat: idle watermark v0 #520

merged 24 commits into from
Feb 4, 2023

Conversation

jy4096
Copy link
Contributor

@jy4096 jy4096 commented Feb 2, 2023

Fixes #480

Local Test

example pipeline

apiVersion: numaflow.numaproj.io/v1alpha1
kind: Pipeline
metadata:
  name: reduce-sliding-pipeline
spec:
  vertices:
    - name: in
      source:
        # A self data generating source
        generator:
          rpu: 100
          duration: 1s
    - name: map
      containerTemplate:
        env:
          - name: NUMAFLOW_DEBUG
            value: "true" # DO NOT forget the double quotes!!!
      scale:
        min: 1
        max: 1
      udf:
        container:
          image: quay.io/chromevoid/map-generate-ten
    - name: fixed
      udf:
        container:
          image: quay.io/numaio/numaflow-go/reduce-counter
        groupBy:
          window:
            fixed:
              length: 10s
          keyed: true
          storage:
            persistentVolumeClaim:
              volumeSize: 5Gi
              accessMode: ReadWriteOnce
      containerTemplate:
        env:
          - name: NUMAFLOW_DEBUG
            value: "true" # DO NOT forget the double quotes!!!
    - name: sliding
      udf:
        container:
          image: quay.io/numaio/numaflow-go/reduce-counter
        groupBy:
          window:
            sliding:
              length: 10s
              slide: 5s
          keyed: true
          storage:
            persistentVolumeClaim:
              volumeSize: 5Gi
              accessMode: ReadWriteOnce
      containerTemplate:
        env:
          - name: NUMAFLOW_DEBUG
            value: "true" # DO NOT forget the double quotes!!!
    - name: sink
      scale:
        min: 1
        max: 5
      sink:
        log: {}
  edges:
    - from: in
      to: map
    - from: map
      to: fixed
      parallelism: 5
    - from: fixed
      to: sliding
      parallelism: 10
    - from: sliding
      to: sink

Current main branch result

image

sliding 0 log

2023-02-02T03:33:42.800Z	DEBUG	numaflow.ReduceUDF-processor	fetch/edge_fetcher.go:116	[Processor: reduce-sliding-pipeline-fixed-1 status:active, timeline: [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1]] 
[Processor: reduce-sliding-pipeline-fixed-3 status:active, timeline: [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1]] 
[Processor: reduce-sliding-pipeline-fixed-4 status:active, timeline: [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1]] 
[Processor: reduce-sliding-pipeline-fixed-2 status:active, timeline: [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1]] 
[Processor: reduce-sliding-pipeline-fixed-0 status:active, timeline: [1675308810000:14] -> [1675308800000:13] -> [1675308790000:12] -> [1675308780000:11] -> [1675308770000:10] -> [1675308760000:9] -> [1675308750000:8] -> [1675308740000:7] -> [1675308730000:6] -> [1675308720000:5]] 
[default-reduce-sliding-pipeline-fixed-sliding-0] get watermark for offset 15: -1	{"vertex": "reduce-sliding-pipeline-sliding", "bufferName": "default-reduce-sliding-pipeline-fixed-sliding-0"}

sliding 1 log

2023-02-02T03:34:13.095Z	DEBUG	numaflow.ReduceUDF-processor	fetch/edge_fetcher.go:116	[Processor: reduce-sliding-pipeline-fixed-0 status:active, timeline: [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1]] 
[Processor: reduce-sliding-pipeline-fixed-1 status:active, timeline: [1675308840000:17] -> [1675308830000:16] -> [1675308820000:15] -> [1675308810000:14] -> [1675308800000:13] -> [1675308790000:12] -> [1675308780000:11] -> [1675308770000:10] -> [1675308760000:9] -> [1675308750000:8]] 
[Processor: reduce-sliding-pipeline-fixed-3 status:active, timeline: [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1]] 
[Processor: reduce-sliding-pipeline-fixed-4 status:active, timeline: [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1]] 
[Processor: reduce-sliding-pipeline-fixed-2 status:active, timeline: [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1]] 
[default-reduce-sliding-pipeline-fixed-sliding-1] get watermark for offset 18: -1	{"vertex": "reduce-sliding-pipeline-sliding", "bufferName": "default-reduce-sliding-pipeline-fixed-sliding-1"}

etc.

PR result

image

sliding 0 log

2023-02-02T03:53:32.544Z	DEBUG	numaflow.ReduceUDF-processor	fetch/edge_fetcher.go:116	[Processor: reduce-sliding-pipeline-fixed-4 status:active, timeline: [1675309900000:12] -> [1675309890000:11] -> [1675309880000:10] -> [1675309870000:9] -> [1675309860000:8] -> [1675309850000:7] -> [1675309840000:6] -> [1675309830000:5] -> [1675309820000:4] -> [1675309810000:3]] 
[Processor: reduce-sliding-pipeline-fixed-1 status:active, timeline: [1675309910000:13] -> [1675309900000:12] -> [1675309890000:11] -> [1675309870000:9] -> [1675309860000:8] -> [1675309850000:7] -> [1675309840000:6] -> [1675309830000:5] -> [1675309820000:4] -> [1675309810000:3]] 
[Processor: reduce-sliding-pipeline-fixed-2 status:active, timeline: [1675309900000:12] -> [1675309880000:10] -> [1675309870000:9] -> [1675309860000:8] -> [1675309850000:7] -> [1675309840000:6] -> [1675309830000:5] -> [1675309820000:4] -> [1675309810000:3] -> [1675309800000:2]] 
[Processor: reduce-sliding-pipeline-fixed-0 status:active, timeline: [1675310000000:22] -> [1675309990000:21] -> [1675309980000:20] -> [1675309970000:19] -> [1675309960000:18] -> [1675309950000:17] -> [1675309940000:16] -> [1675309930000:15] -> [1675309920000:14] -> [1675309910000:13]] 
[Processor: reduce-sliding-pipeline-fixed-3 status:active, timeline: [1675309910000:13] -> [1675309900000:12] -> [1675309890000:11] -> [1675309880000:10] -> [1675309860000:8] -> [1675309850000:7] -> [1675309840000:6] -> [1675309830000:5] -> [1675309820000:4] -> [1675309810000:3]] 
[default-reduce-sliding-pipeline-fixed-sliding-0] get watermark for offset 23: 1675309900000	{"vertex": "reduce-sliding-pipeline-sliding", "bufferName": "default-reduce-sliding-pipeline-fixed-sliding-0"}

[SOLVED] Need discussion @vigith @whynowy @ashwinidulams

Currently we use the tail offsetWatermark to replace idle watermark, so we can see a delay in data processing.
For example, in the above test case, get watermark for offset 23 should have been 1675310000000(because the other processors always have idle watermark); however, because of the idle watermarks, the actual value is 1675309900000.

@vigith
Copy link
Member

vigith commented Feb 2, 2023

Currently we use the tail offsetWatermark to replace idle watermark, so we can see a delay in data processing.
For example, in the above test case, get watermark for offset 23 can actually be 1675310000000; however, because of the idle watermarks, the actual value is 1675309900000.

Why not use the value at the head?

pkg/forward/forward.go Outdated Show resolved Hide resolved
pkg/watermark/fetch/offset_timeline.go Show resolved Hide resolved
pkg/watermark/fetch/processor_manager.go Show resolved Hide resolved
pkg/watermark/ot/ot.go Outdated Show resolved Hide resolved
jyu6 added 16 commits February 1, 2023 20:54
Signed-off-by: jyu6 <[email protected]>
Signed-off-by: jyu6 <[email protected]>
Signed-off-by: jyu6 <[email protected]>
Signed-off-by: jyu6 <[email protected]>
This reverts commit 1cfb7ad.

Signed-off-by: jyu6 <[email protected]>
Signed-off-by: jyu6 <[email protected]>
Signed-off-by: jyu6 <[email protected]>
Signed-off-by: jyu6 <[email protected]>
Signed-off-by: jyu6 <[email protected]>
Signed-off-by: jyu6 <[email protected]>
Signed-off-by: jyu6 <[email protected]>
@jy4096 jy4096 marked this pull request as ready for review February 3, 2023 18:33
@jy4096 jy4096 requested a review from whynowy as a code owner February 3, 2023 18:33
@jy4096 jy4096 requested a review from vigith February 3, 2023 18:34
Signed-off-by: jyu6 <[email protected]>
@vigith vigith merged commit 2727e62 into numaproj:main Feb 4, 2023
@jy4096 jy4096 deleted the v0-idle-watermark branch February 7, 2023 04:33
ashwinidulams pushed a commit that referenced this pull request Feb 14, 2023
Signed-off-by: jyu6 <[email protected]>
Co-authored-by: jyu6 <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

offset timeline store stuck when we have parallelism > 1 in sliding window
3 participants