Skip to content

Commit

Permalink
feat: Generate Idle Watermark if the source is idling (#1385)
Browse files Browse the repository at this point in the history
Signed-off-by: Yashash H L <[email protected]>
Signed-off-by: chandankumar4 <[email protected]>
Signed-off-by: Vigith Maurice <[email protected]>
Co-authored-by: Yashash H L <[email protected]>
Co-authored-by: Vigith Maurice <[email protected]>
  • Loading branch information
3 people authored Dec 7, 2023
1 parent 6eb25c2 commit 0db9cd1
Show file tree
Hide file tree
Showing 58 changed files with 2,392 additions and 1,127 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -137,10 +137,10 @@ jobs:
timeout-minutes: 20
strategy:
fail-fast: false
max-parallel: 11
max-parallel: 12
matrix:
driver: [jetstream]
case: [e2e, diamond-e2e, transformer-e2e, kafka-e2e, http-e2e, nats-e2e, sdks-e2e, reduce-e2e, udsource-e2e, api-e2e, sideinputs-e2e]
case: [e2e, diamond-e2e, transformer-e2e, kafka-e2e, http-e2e, nats-e2e, sdks-e2e, reduce-e2e, udsource-e2e, api-e2e, sideinputs-e2e, idle-source-e2e]
steps:
- name: Checkout code
uses: actions/checkout@v3
Expand Down
23 changes: 22 additions & 1 deletion api/json-schema/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -18155,6 +18155,23 @@
},
"type": "object"
},
"io.numaproj.numaflow.v1alpha1.IdleSource": {
"properties": {
"incrementBy": {
"$ref": "#/definitions/io.k8s.apimachinery.pkg.apis.meta.v1.Duration",
"description": "IncrementBy is the duration to be added to the current watermark to progress the watermark when source is idling."
},
"stepInterval": {
"$ref": "#/definitions/io.k8s.apimachinery.pkg.apis.meta.v1.Duration",
"description": "StepInterval is the duration between the subsequent increment of the watermark as long the source remains Idle. The default value is 0s which means that once we detect idle source, we will be incrementing the watermark by `IncrementBy` for time we detect that we source is empty (in other words, this will be a very frequent update)."
},
"threshold": {
"$ref": "#/definitions/io.k8s.apimachinery.pkg.apis.meta.v1.Duration",
"description": "Threshold is the duration after which a source is marked as Idle due to lack of data. Ex: If watermark found to be idle after the Threshold duration then the watermark is progressed by `IncrementBy`."
}
},
"type": "object"
},
"io.numaproj.numaflow.v1alpha1.InterStepBufferService": {
"properties": {
"apiVersion": {
Expand Down Expand Up @@ -18858,7 +18875,7 @@
},
"templates": {
"$ref": "#/definitions/io.numaproj.numaflow.v1alpha1.Templates",
"description": "Templates is used to customize additional kubernetes resources required for the Pipeline"
"description": "Templates are used to customize additional kubernetes resources required for the Pipeline"
},
"vertices": {
"items": {
Expand Down Expand Up @@ -19769,6 +19786,10 @@
"description": "Disabled toggles the watermark propagation, defaults to false.",
"type": "boolean"
},
"idleSource": {
"$ref": "#/definitions/io.numaproj.numaflow.v1alpha1.IdleSource",
"description": "IdleSource defines the idle watermark properties, it could be configured in case source is idling."
},
"maxDelay": {
"$ref": "#/definitions/io.k8s.apimachinery.pkg.apis.meta.v1.Duration",
"description": "Maximum delay allowed for watermark calculation, defaults to \"0s\", which means no delay."
Expand Down
23 changes: 22 additions & 1 deletion api/openapi-spec/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -18159,6 +18159,23 @@
}
}
},
"io.numaproj.numaflow.v1alpha1.IdleSource": {
"type": "object",
"properties": {
"incrementBy": {
"description": "IncrementBy is the duration to be added to the current watermark to progress the watermark when source is idling.",
"$ref": "#/definitions/io.k8s.apimachinery.pkg.apis.meta.v1.Duration"
},
"stepInterval": {
"description": "StepInterval is the duration between the subsequent increment of the watermark as long the source remains Idle. The default value is 0s which means that once we detect idle source, we will be incrementing the watermark by `IncrementBy` for time we detect that we source is empty (in other words, this will be a very frequent update).",
"$ref": "#/definitions/io.k8s.apimachinery.pkg.apis.meta.v1.Duration"
},
"threshold": {
"description": "Threshold is the duration after which a source is marked as Idle due to lack of data. Ex: If watermark found to be idle after the Threshold duration then the watermark is progressed by `IncrementBy`.",
"$ref": "#/definitions/io.k8s.apimachinery.pkg.apis.meta.v1.Duration"
}
}
},
"io.numaproj.numaflow.v1alpha1.InterStepBufferService": {
"type": "object",
"required": [
Expand Down Expand Up @@ -18844,7 +18861,7 @@
}
},
"templates": {
"description": "Templates is used to customize additional kubernetes resources required for the Pipeline",
"description": "Templates are used to customize additional kubernetes resources required for the Pipeline",
"$ref": "#/definitions/io.numaproj.numaflow.v1alpha1.Templates"
},
"vertices": {
Expand Down Expand Up @@ -19747,6 +19764,10 @@
"description": "Disabled toggles the watermark propagation, defaults to false.",
"type": "boolean"
},
"idleSource": {
"description": "IdleSource defines the idle watermark properties, it could be configured in case source is idling.",
"$ref": "#/definitions/io.numaproj.numaflow.v1alpha1.IdleSource"
},
"maxDelay": {
"description": "Maximum delay allowed for watermark calculation, defaults to \"0s\", which means no delay.",
"$ref": "#/definitions/io.k8s.apimachinery.pkg.apis.meta.v1.Duration"
Expand Down
10 changes: 10 additions & 0 deletions config/base/crds/full/numaflow.numaproj.io_pipelines.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8656,6 +8656,16 @@ spec:
disabled:
default: false
type: boolean
idleSource:
properties:
incrementBy:
type: string
stepInterval:
default: 0s
type: string
threshold:
type: string
type: object
maxDelay:
default: 0s
type: string
Expand Down
10 changes: 10 additions & 0 deletions config/base/crds/full/numaflow.numaproj.io_vertices.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4567,6 +4567,16 @@ spec:
disabled:
default: false
type: boolean
idleSource:
properties:
incrementBy:
type: string
stepInterval:
default: 0s
type: string
threshold:
type: string
type: object
maxDelay:
default: 0s
type: string
Expand Down
20 changes: 20 additions & 0 deletions config/install.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11185,6 +11185,16 @@ spec:
disabled:
default: false
type: boolean
idleSource:
properties:
incrementBy:
type: string
stepInterval:
default: 0s
type: string
threshold:
type: string
type: object
maxDelay:
default: 0s
type: string
Expand Down Expand Up @@ -15838,6 +15848,16 @@ spec:
disabled:
default: false
type: boolean
idleSource:
properties:
incrementBy:
type: string
stepInterval:
default: 0s
type: string
threshold:
type: string
type: object
maxDelay:
default: 0s
type: string
Expand Down
20 changes: 20 additions & 0 deletions config/namespace-install.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11185,6 +11185,16 @@ spec:
disabled:
default: false
type: boolean
idleSource:
properties:
incrementBy:
type: string
stepInterval:
default: 0s
type: string
threshold:
type: string
type: object
maxDelay:
default: 0s
type: string
Expand Down Expand Up @@ -15838,6 +15848,16 @@ spec:
disabled:
default: false
type: boolean
idleSource:
properties:
incrementBy:
type: string
stepInterval:
default: 0s
type: string
threshold:
type: string
type: object
maxDelay:
default: 0s
type: string
Expand Down
85 changes: 83 additions & 2 deletions docs/APIs.md
Original file line number Diff line number Diff line change
Expand Up @@ -1973,6 +1973,73 @@ ISBSvcType (<code>string</code> alias)
</p>
<p>
</p>
<h3 id="numaflow.numaproj.io/v1alpha1.IdleSource">
IdleSource
</h3>
<p>
(<em>Appears on:</em>
<a href="#numaflow.numaproj.io/v1alpha1.Watermark">Watermark</a>)
</p>
<p>
</p>
<table>
<thead>
<tr>
<th>
Field
</th>
<th>
Description
</th>
</tr>
</thead>
<tbody>
<tr>
<td>
<code>threshold</code></br> <em>
<a href="https://pkg.go.dev/k8s.io/apimachinery/pkg/apis/meta/v1#Duration">
Kubernetes meta/v1.Duration </a> </em>
</td>
<td>
<p>
Threshold is the duration after which a source is marked as Idle due to
lack of data. Ex: If watermark found to be idle after the Threshold
duration then the watermark is progressed by <code>IncrementBy</code>.
</p>
</td>
</tr>
<tr>
<td>
<code>stepInterval</code></br> <em>
<a href="https://pkg.go.dev/k8s.io/apimachinery/pkg/apis/meta/v1#Duration">
Kubernetes meta/v1.Duration </a> </em>
</td>
<td>
<em>(Optional)</em>
<p>
StepInterval is the duration between the subsequent increment of the
watermark as long the source remains Idle. The default value is 0s which
means that once we detect idle source, we will be incrementing the
watermark by <code>IncrementBy</code> for time we detect that we source
is empty (in other words, this will be a very frequent update).
</p>
</td>
</tr>
<tr>
<td>
<code>incrementBy</code></br> <em>
<a href="https://pkg.go.dev/k8s.io/apimachinery/pkg/apis/meta/v1#Duration">
Kubernetes meta/v1.Duration </a> </em>
</td>
<td>
<p>
IncrementBy is the duration to be added to the current watermark to
progress the watermark when source is idling.
</p>
</td>
</tr>
</tbody>
</table>
<h3 id="numaflow.numaproj.io/v1alpha1.InterStepBufferService">
InterStepBufferService
</h3>
Expand Down Expand Up @@ -3253,7 +3320,7 @@ Watermark enables watermark progression across the entire pipeline.
<td>
<em>(Optional)</em>
<p>
Templates is used to customize additional kubernetes resources required
Templates are used to customize additional kubernetes resources required
for the Pipeline
</p>
</td>
Expand Down Expand Up @@ -3469,7 +3536,7 @@ Watermark enables watermark progression across the entire pipeline.
<td>
<em>(Optional)</em>
<p>
Templates is used to customize additional kubernetes resources required
Templates are used to customize additional kubernetes resources required
for the Pipeline
</p>
</td>
Expand Down Expand Up @@ -5396,6 +5463,20 @@ means no delay.
</p>
</td>
</tr>
<tr>
<td>
<code>idleSource</code></br> <em>
<a href="#numaflow.numaproj.io/v1alpha1.IdleSource"> IdleSource </a>
</em>
</td>
<td>
<em>(Optional)</em>
<p>
IdleSource defines the idle watermark properties, it could be configured
in case source is idling.
</p>
</td>
</tr>
</tbody>
</table>
<h3 id="numaflow.numaproj.io/v1alpha1.Window">
Expand Down
Loading

0 comments on commit 0db9cd1

Please sign in to comment.