Skip to content

Commit

Permalink
feat: publish to callback endpoint for tracking (#1753)
Browse files Browse the repository at this point in the history
Signed-off-by: Yashash H L <[email protected]>
Signed-off-by: Vigith Maurice <[email protected]>
Co-authored-by: Sreekanth <[email protected]>
Co-authored-by: Derek Wang <[email protected]>
Co-authored-by: Vigith Maurice <[email protected]>
  • Loading branch information
4 people authored Jun 19, 2024
1 parent f69d830 commit 8da7c22
Show file tree
Hide file tree
Showing 65 changed files with 1,451 additions and 314 deletions.
2 changes: 1 addition & 1 deletion examples/1-simple-pipeline-keys.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,4 @@ spec:
- from: in
to: cat
- from: cat
to: out
to: out
2 changes: 1 addition & 1 deletion examples/1-simple-pipeline.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,4 @@ spec:
- from: in
to: cat
- from: cat
to: out
to: out
12 changes: 10 additions & 2 deletions pkg/apis/numaflow/v1alpha1/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,9 @@ const (
RemovePauseTimestampPatch = `[{"op": "remove", "path": "/metadata/annotations/numaflow.numaproj.io~1pause-timestamp"}]`

// ID key in the header of sources like http
KeyMetaID = "x-numaflow-id"
KeyMetaEventTime = "x-numaflow-event-time"
KeyMetaID = "X-Numaflow-Id"
KeyMetaEventTime = "X-Numaflow-Event-Time"
KeyMetaCallbackURL = "X-Numaflow-Callback-Url"

DefaultISBSvcName = "default"

Expand Down Expand Up @@ -99,6 +100,9 @@ const (
EnvNamespace = "NUMAFLOW_NAMESPACE"
EnvPipelineName = "NUMAFLOW_PIPELINE_NAME"
EnvVertexName = "NUMAFLOW_VERTEX_NAME"
EnvMapStreaming = "NUMAFLOW_MAP_STREAMING"
EnvCallbackEnabled = "NUMAFLOW_CALLBACK_ENABLED"
EnvCallbackURL = "NUMAFLOW_CALLBACK_URL"
EnvPod = "NUMAFLOW_POD"
EnvReplica = "NUMAFLOW_REPLICA"
EnvVertexObject = "NUMAFLOW_VERTEX_OBJECT"
Expand Down Expand Up @@ -209,6 +213,10 @@ const (
PipelineStatusInactive = "inactive"
PipelineStatusDeleting = "deleting"
PipelineStatusUnhealthy = "unhealthy"

// Callback annotation keys
CallbackEnabledKey = "numaflow.numaproj.io/callback"
CallbackURLKey = "numaflow.numaproj.io/callback-url"
)

var (
Expand Down
13 changes: 3 additions & 10 deletions pkg/apis/numaflow/v1alpha1/vertex_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"errors"
"fmt"
"os"
"strconv"
"strings"

corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -176,6 +175,9 @@ func (v Vertex) commonEnvs() []corev1.EnvVar {
{Name: EnvReplica, ValueFrom: &corev1.EnvVarSource{FieldRef: &corev1.ObjectFieldSelector{FieldPath: "metadata.annotations['" + KeyReplica + "']"}}},
{Name: EnvPipelineName, Value: v.Spec.PipelineName},
{Name: EnvVertexName, Value: v.Spec.Name},
{Name: EnvMapStreaming, ValueFrom: &corev1.EnvVarSource{FieldRef: &corev1.ObjectFieldSelector{FieldPath: "metadata.annotations['" + MapUdfStreamKey + "']"}}},
{Name: EnvCallbackEnabled, ValueFrom: &corev1.EnvVarSource{FieldRef: &corev1.ObjectFieldSelector{FieldPath: "metadata.annotations['" + CallbackEnabledKey + "']"}}},
{Name: EnvCallbackURL, ValueFrom: &corev1.EnvVarSource{FieldRef: &corev1.ObjectFieldSelector{FieldPath: "metadata.annotations['" + CallbackURLKey + "']"}}},
}
}

Expand Down Expand Up @@ -412,15 +414,6 @@ func (v Vertex) GetReplicas() int {
return int(*v.Spec.Replicas)
}

func (v Vertex) MapUdfStreamEnabled() (bool, error) {
if v.Spec.Metadata != nil && v.Spec.Metadata.Annotations != nil {
if mapUdfStream, existing := v.Spec.Metadata.Annotations[MapUdfStreamKey]; existing {
return strconv.ParseBool(mapUdfStream)
}
}
return false, nil
}

type VertexSpec struct {
AbstractVertex `json:",inline" protobuf:"bytes,1,opt,name=abstractVertex"`
PipelineName string `json:"pipelineName" protobuf:"bytes,2,opt,name=pipelineName"`
Expand Down
31 changes: 29 additions & 2 deletions pkg/isb/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package isb

import (
"fmt"
"time"
)

Expand Down Expand Up @@ -63,8 +64,8 @@ type Header struct {
MessageInfo
// Kind indicates the kind of Message
Kind MessageKind
// ID is used for exactly-once-semantics. ID is usually populated from the offset, if offset is available.
ID string
// ID is used for exactly-once-semantics. ID is a combination of vertex name, offset and index of the message.
ID MessageID
// Keys is (key,value) in the map-reduce paradigm will be used for reduce operation, last key in the list
// will be used for conditional forwarding
Keys []string
Expand All @@ -73,6 +74,23 @@ type Header struct {
Headers map[string]string
}

// MessageID is the message ID of the message which is used for exactly-once-semantics.
type MessageID struct {
// VertexName is the name of the vertex
VertexName string
// Offset is the offset of the message
// NOTE: should be unique across the replicas of the vertex, that is the
// reason we don't have a separate replica field in the MessageID
Offset string
// Index is the index of a flatmap message, otherwise use 0
Index int32
}

// String returns the string representation of the MessageID
func (id MessageID) String() string {
return fmt.Sprintf("%s-%s-%d", id.VertexName, id.Offset, id.Index)
}

// Body is the body of the message
type Body struct {
Payload []byte
Expand Down Expand Up @@ -104,3 +122,12 @@ type WriteMessage struct {
Message
Tags []string
}

// ReadWriteMessagePair is a pair of ReadMessage and a list of WriteMessage which will be used
// to map the read message to a list of write messages that the udf returns.
// The error field is used to capture any error that occurs during the processing of the message.
type ReadWriteMessagePair struct {
ReadMessage *ReadMessage
WriteMessages []*WriteMessage
Err error
}
71 changes: 66 additions & 5 deletions pkg/isb/serde.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,53 @@ func (p *MessageInfo) UnmarshalBinary(data []byte) (err error) {
return nil
}

type messageIDPreamble struct {
VertexNameLen int16
OffsetLen int32
Index int32
}

// MarshalBinary encodes MessageID to the binary format
func (id MessageID) MarshalBinary() (data []byte, err error) {
var buf = new(bytes.Buffer)
var preamble = messageIDPreamble{
VertexNameLen: int16(len(id.VertexName)),
OffsetLen: int32(len(id.Offset)),
Index: id.Index,
}
if err = binary.Write(buf, binary.LittleEndian, preamble); err != nil {
return nil, err
}
if err = binary.Write(buf, binary.LittleEndian, []byte(id.VertexName)); err != nil {
return nil, err
}
if err = binary.Write(buf, binary.LittleEndian, []byte(id.Offset)); err != nil {
return nil, err
}
return buf.Bytes(), nil
}

// UnmarshalBinary decodes MessageID from the binary format
func (id *MessageID) UnmarshalBinary(data []byte) (err error) {
var r = bytes.NewReader(data)
var preamble = new(messageIDPreamble)
if err = binary.Read(r, binary.LittleEndian, preamble); err != nil {
return err
}
var vertexName = make([]byte, preamble.VertexNameLen)
if err = binary.Read(r, binary.LittleEndian, vertexName); err != nil {
return err
}
var offset = make([]byte, preamble.OffsetLen)
if err = binary.Read(r, binary.LittleEndian, offset); err != nil {
return err
}
id.VertexName = string(vertexName)
id.Offset = string(offset)
id.Index = preamble.Index
return nil
}

type headerPreamble struct {
// message length
MLen int32
Expand All @@ -73,10 +120,14 @@ func (h Header) MarshalBinary() (data []byte, err error) {
if err != nil {
return nil, err
}
id, err := h.ID.MarshalBinary()
if err != nil {
return nil, err
}
var preamble = headerPreamble{
MLen: int32(len(msgInfo)),
MsgKind: h.Kind,
IDLen: int16(len(h.ID)),
IDLen: int16(len(id)),
KeysLen: int16(len(h.Keys)),
HeadersLen: int16(len(h.Headers)),
}
Expand All @@ -89,8 +140,11 @@ func (h Header) MarshalBinary() (data []byte, err error) {
} else if n != int(preamble.MLen) {
return nil, fmt.Errorf("expected to write msgInfo size of %d but got %d", preamble.MLen, n)
}
if err = binary.Write(buf, binary.LittleEndian, []byte(h.ID)); err != nil {
n, err = buf.Write(id)
if err != nil {
return nil, err
} else if n != int(preamble.IDLen) {
return nil, fmt.Errorf("expected to write id size of %d but got %d", preamble.IDLen, n)
}
for i := 0; i < len(h.Keys); i++ {
if err = binary.Write(buf, binary.LittleEndian, int16(len(h.Keys[i]))); err != nil {
Expand Down Expand Up @@ -136,8 +190,15 @@ func (h *Header) UnmarshalBinary(data []byte) (err error) {
if err = msgInfo.UnmarshalBinary(msgInfoByte); err != nil {
return err
}
var id = make([]byte, preamble.IDLen)
if err = binary.Read(r, binary.LittleEndian, id); err != nil {
var idByte = make([]byte, preamble.IDLen)
n, err = r.Read(idByte)
if err != nil {
return err
} else if n != int(preamble.IDLen) {
return fmt.Errorf("expected to read id size of %d but got %d", preamble.IDLen, n)
}
var id = new(MessageID)
if err = id.UnmarshalBinary(idByte); err != nil {
return err
}
keys := make([]string, 0)
Expand Down Expand Up @@ -182,7 +243,7 @@ func (h *Header) UnmarshalBinary(data []byte) (err error) {
}
h.MessageInfo = *msgInfo
h.Kind = preamble.MsgKind
h.ID = string(id)
h.ID = *id

return err
}
Expand Down
66 changes: 53 additions & 13 deletions pkg/isb/serde_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func TestHeader(t *testing.T) {
type fields struct {
MessageInfo MessageInfo
Kind MessageKind
ID string
ID MessageID
Key []string
Headers map[string]string
}
Expand All @@ -110,16 +110,24 @@ func TestHeader(t *testing.T) {
IsLate: true,
},
Kind: Data,
ID: "TestID",
Key: []string{"TestKey", "TestKey2"},
ID: MessageID{
VertexName: "test-vertex",
Offset: "test-offset",
Index: 0,
},
Key: []string{"TestKey", "TestKey2"},
},
wantData: Header{
MessageInfo: MessageInfo{
EventTime: time.UnixMilli(1676617200000).UTC(),
IsLate: true,
},
Kind: Data,
ID: "TestID",
ID: MessageID{
VertexName: "test-vertex",
Offset: "test-offset",
Index: 0,
},
Keys: []string{"TestKey", "TestKey2"},
},
wantMarshalError: false,
Expand All @@ -133,8 +141,12 @@ func TestHeader(t *testing.T) {
IsLate: true,
},
Kind: Data,
ID: "TestID",
Key: []string{"TestKey", "TestKey2"},
ID: MessageID{
VertexName: "test-vertex",
Offset: "test-offset",
Index: 0,
},
Key: []string{"TestKey", "TestKey2"},
Headers: map[string]string{
"key1": "value1",
"key2": "value2",
Expand All @@ -146,7 +158,11 @@ func TestHeader(t *testing.T) {
IsLate: true,
},
Kind: Data,
ID: "TestID",
ID: MessageID{
VertexName: "test-vertex",
Offset: "test-offset",
Index: 0,
},
Keys: []string{"TestKey", "TestKey2"},
Headers: map[string]string{
"key1": "value1",
Expand Down Expand Up @@ -260,7 +276,11 @@ func TestMessage(t *testing.T) {
IsLate: true,
},
Kind: Data,
ID: "TestID",
ID: MessageID{
VertexName: "test-vertex",
Offset: "test-offset",
Index: 0,
},
Keys: []string{"TestKey"},
},
Body: Body{
Expand All @@ -274,7 +294,11 @@ func TestMessage(t *testing.T) {
IsLate: true,
},
Kind: Data,
ID: "TestID",
ID: MessageID{
VertexName: "test-vertex",
Offset: "test-offset",
Index: 0,
},
Keys: []string{"TestKey"},
},
Body: Body{
Expand All @@ -300,7 +324,11 @@ func TestMessage(t *testing.T) {
IsLate: true,
},
Kind: Data,
ID: "TestID",
ID: MessageID{
VertexName: "test-vertex",
Offset: "test-offset",
Index: 0,
},
Keys: []string{"TestKey"},
Headers: map[string]string{
"key1": "value1",
Expand All @@ -318,7 +346,11 @@ func TestMessage(t *testing.T) {
IsLate: true,
},
Kind: Data,
ID: "TestID",
ID: MessageID{
VertexName: "test-vertex",
Offset: "test-offset",
Index: 0,
},
Keys: []string{"TestKey"},
Headers: map[string]string{
"key1": "value1",
Expand Down Expand Up @@ -381,7 +413,11 @@ func TestReadMessage(t *testing.T) {
IsLate: true,
},
Kind: Data,
ID: "TestID",
ID: MessageID{
VertexName: "test-vertex",
Offset: "test-offset",
Index: 0,
},
Keys: []string{"TestKey"},
Headers: map[string]string{
"key1": "value1",
Expand All @@ -408,7 +444,11 @@ func TestReadMessage(t *testing.T) {
IsLate: true,
},
Kind: Data,
ID: "TestID",
ID: MessageID{
VertexName: "test-vertex",
Offset: "test-offset",
Index: 0,
},
Keys: []string{"TestKey"},
Headers: map[string]string{
"key1": "value1",
Expand Down
2 changes: 1 addition & 1 deletion pkg/isb/stores/jetstream/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func TestJetStreamBufferRead(t *testing.T) {
defer jw.Close()
// Add some data
startTime := time.Unix(1636470000, 0)
messages := testutils.BuildTestWriteMessages(int64(20), startTime, nil)
messages := testutils.BuildTestWriteMessages(int64(20), startTime, nil, "testVertex")
// Verify if buffer is full.
for jw.isFull.Load() {
select {
Expand Down
Loading

0 comments on commit 8da7c22

Please sign in to comment.