Skip to content

Commit

Permalink
feat: expose pending messages and processing rate (#79)
Browse files Browse the repository at this point in the history
* feat: expose pending messages and processing rate

Signed-off-by: Derek Wang <[email protected]>
  • Loading branch information
whynowy committed Jul 8, 2022
1 parent ac1814d commit 5592bb1
Show file tree
Hide file tree
Showing 28 changed files with 986 additions and 413 deletions.
23 changes: 11 additions & 12 deletions cmd/commands/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,30 +52,29 @@ func NewProcessorCommand() *cobra.Command {
return fmt.Errorf("invalid replica %q", replicaStr)
}
log = log.With("vertex", vertex.Name)
vertexInstance := &dfv1.VertexInstance{
Vertex: vertex,
Hostname: hostname,
Replica: int32(replica),
}
ctx := logging.WithLogger(signals.SetupSignalHandler(), log)
switch processorType {
case "source":
p := &sources.SourceProcessor{
ISBSvcType: dfv1.ISBSvcType(isbSvcType),
Vertex: vertex,
Hostname: hostname,
Replica: replica,
ISBSvcType: dfv1.ISBSvcType(isbSvcType),
VertexInstance: vertexInstance,
}
return p.Start(ctx)
case "sink":
p := &sinks.SinkProcessor{
ISBSvcType: dfv1.ISBSvcType(isbSvcType),
Vertex: vertex,
Hostname: hostname,
Replica: replica,
ISBSvcType: dfv1.ISBSvcType(isbSvcType),
VertexInstance: vertexInstance,
}
return p.Start(ctx)
case "udf":
p := &udf.UDFProcessor{
ISBSvcType: dfv1.ISBSvcType(isbSvcType),
Vertex: vertex,
Hostname: hostname,
Replica: replica,
ISBSvcType: dfv1.ISBSvcType(isbSvcType),
VertexInstance: vertexInstance,
}
return p.Start(ctx)
default:
Expand Down
8 changes: 7 additions & 1 deletion config/base/crds/numaflow.numaproj.io_pipelines.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1444,8 +1444,14 @@ spec:
type: string
scale:
properties:
lookbackSeconds:
default: 180
description: Lookback seconds to calculate the average pending
messages and processing rate
format: int32
type: integer
max:
default: 1
default: 100
description: Maximum replicas
format: int32
type: integer
Expand Down
8 changes: 7 additions & 1 deletion config/base/crds/numaflow.numaproj.io_vertices.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1316,8 +1316,14 @@ spec:
type: integer
scale:
properties:
lookbackSeconds:
default: 180
description: Lookback seconds to calculate the average pending
messages and processing rate
format: int32
type: integer
max:
default: 1
default: 100
description: Maximum replicas
format: int32
type: integer
Expand Down
16 changes: 14 additions & 2 deletions config/install.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6170,8 +6170,14 @@ spec:
type: string
scale:
properties:
lookbackSeconds:
default: 180
description: Lookback seconds to calculate the average pending
messages and processing rate
format: int32
type: integer
max:
default: 1
default: 100
description: Maximum replicas
format: int32
type: integer
Expand Down Expand Up @@ -10057,8 +10063,14 @@ spec:
type: integer
scale:
properties:
lookbackSeconds:
default: 180
description: Lookback seconds to calculate the average pending
messages and processing rate
format: int32
type: integer
max:
default: 1
default: 100
description: Maximum replicas
format: int32
type: integer
Expand Down
62 changes: 62 additions & 0 deletions docs/APIs.md
Original file line number Diff line number Diff line change
Expand Up @@ -2983,6 +2983,18 @@ Maximum replicas
</p>
</td>
</tr>
<tr>
<td>
<code>lookbackSeconds</code></br> <em> int32 </em>
</td>
<td>
<em>(Optional)</em>
<p>
Lookback seconds to calculate the average pending messages and
processing rate
</p>
</td>
</tr>
</tbody>
</table>
<h3 id="numaflow.numaproj.io/v1alpha1.Sink">
Expand Down Expand Up @@ -3273,6 +3285,10 @@ Description
Vertex
</h3>
<p>
(<em>Appears on:</em>
<a href="#numaflow.numaproj.io/v1alpha1.VertexInstance">VertexInstance</a>)
</p>
<p>
</p>
<table>
<thead>
Expand Down Expand Up @@ -3374,6 +3390,52 @@ Refer to the Kubernetes API documentation for the fields of the
</tr>
</tbody>
</table>
<h3 id="numaflow.numaproj.io/v1alpha1.VertexInstance">
VertexInstance
</h3>
<p>
<p>
VertexInstance is a wrapper of a vertex instance, which contains the
vertex spec and the instance information such as hostname and replica
index.
</p>
</p>
<table>
<thead>
<tr>
<th>
Field
</th>
<th>
Description
</th>
</tr>
</thead>
<tbody>
<tr>
<td>
<code>vertex</code></br> <em>
<a href="#numaflow.numaproj.io/v1alpha1.Vertex"> Vertex </a> </em>
</td>
<td>
</td>
</tr>
<tr>
<td>
<code>hostname</code></br> <em> string </em>
</td>
<td>
</td>
</tr>
<tr>
<td>
<code>replica</code></br> <em> int32 </em>
</td>
<td>
</td>
</tr>
</tbody>
</table>
<h3 id="numaflow.numaproj.io/v1alpha1.VertexLimits">
VertexLimits
</h3>
Expand Down
Loading

0 comments on commit 5592bb1

Please sign in to comment.