-
Notifications
You must be signed in to change notification settings - Fork 113
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: publish to callback endpoint for tracking #1753
Conversation
Signed-off-by: Yashash H L <[email protected]>
Signed-off-by: Yashash H L <[email protected]>
Signed-off-by: Yashash H L <[email protected]>
Signed-off-by: Yashash H L <[email protected]>
Signed-off-by: Yashash H L <[email protected]>
// Callback defines the callback configuration for the messages processed by the pipeline. | ||
// can be used for tracking the message processing status. | ||
// +optional | ||
Callback Callback `json:"callback,omitempty" protobuf:"bytes,9,opt,name=callback"` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As we discussed, can we use annotation at this moment?
type Callback struct { | ||
// Enabled indicates whether callback is enabled for the pipeline. | ||
// +optional | ||
Enabled bool `json:"enabled,omitempty" protobuf:"varint,1,opt,name=enabled"` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not needed.
Signed-off-by: Yashash H L <[email protected]>
Signed-off-by: Yashash H L <[email protected]>
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #1753 +/- ##
==========================================
+ Coverage 56.63% 56.82% +0.19%
==========================================
Files 216 218 +2
Lines 17350 17581 +231
==========================================
+ Hits 9826 9991 +165
- Misses 6692 6735 +43
- Partials 832 855 +23 ☔ View full report in Codecov by Sentry. |
Signed-off-by: Yashash H L <[email protected]>
Signed-off-by: Yashash H L <[email protected]>
pkg/shared/callback/callback.go
Outdated
return fmt.Errorf("received non-OK response status: %s", resp.Status) | ||
} | ||
|
||
_ = resp.Body.Close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use defer
before line 239.
pkg/shared/callback/callback.go
Outdated
}) | ||
|
||
if dOpts.callbackURL != "" { | ||
client := &http.Client{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This has been existing in GetClient()
, do not need it here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we don't need it
@@ -74,7 +75,7 @@ func (sp *SourceProcessor) Start(ctx context.Context) error { | |||
healthCheckers []metrics.HealthChecker | |||
idleManager wmb.IdleManager | |||
pipelineName = sp.VertexInstance.Vertex.Spec.PipelineName | |||
vertexName = sp.VertexInstance.Vertex.Name | |||
vertexName = sp.VertexInstance.Vertex.Spec.Name |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will existing pipelines get impacted with this fix?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No
@@ -73,6 +74,23 @@ type Header struct { | |||
Headers map[string]string | |||
} | |||
|
|||
// MessageID is the message ID of the message which is used for exactly-once-semantics. | |||
type MessageID struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will this change impact existing pipelines?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, it should not impact.
Signed-off-by: Yashash H L <[email protected]>
Signed-off-by: Yashash H L <[email protected]>
Signed-off-by: Yashash H L <[email protected]> Co-authored-by: Derek Wang <[email protected]>
pkg/sinks/sink.go
Outdated
@@ -87,7 +92,7 @@ func (u *SinkProcessor) Start(ctx context.Context) error { | |||
// create reader for each partition. Each partition is a group in redis | |||
for index, bufferPartition := range u.VertexInstance.Vertex.OwnedBuffers() { | |||
fromGroup := bufferPartition + "-group" | |||
consumer := fmt.Sprintf("%s-%v", u.VertexInstance.Vertex.Name, u.VertexInstance.Replica) | |||
consumer := fmt.Sprintf("%s-%v", vertexName, u.VertexInstance.Replica) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is risky. vertex.spec.Name
is not unique in one namespace, while vertex.Name
is.
pkg/udf/map_udf.go
Outdated
enableMapUdfStream, err := u.VertexInstance.Vertex.MapUdfStreamEnabled() | ||
if err != nil { | ||
return fmt.Errorf("failed to parse UDF map streaming metadata, %w", err) | ||
enableMapStreamStr := os.Getenv(dfv1.EnvMapStreaming) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use pkg/shared/util.LookupEnvBoolOr()
Signed-off-by: Vigith Maurice <[email protected]>
Signed-off-by: Vigith Maurice <[email protected]>
Signed-off-by: Yashash H L <[email protected]>
Signed-off-by: Vigith Maurice <[email protected]>
Signed-off-by: Vigith Maurice <[email protected]>
examples/1-simple-pipeline.yaml
Outdated
rpu: 5 | ||
duration: 1s | ||
http: {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
don't we have to update the quickstart if we change the source?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was added by mistake.
} | ||
|
||
// If the client is not in the cache, create a new one | ||
client := &http.Client{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if the callback server (for this url) is restarted, will the http.Client auto-reconnect internally or will this client become invalid?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the server restart case, If I am not wrong the client doesn't maintain a persistent connection to the server, so it simply creates a new connection when there is a post request, so ideally it should be fine. But still, the client caches the TCP connections. Let me run some tests, If I find any issues I will create a follow-up PR to fix it.
Signed-off-by: Vigith Maurice <[email protected]>
No description provided.