Skip to content

Commit

Permalink
refactor: create a new data forwarder dedicated for source (#874)
Browse files Browse the repository at this point in the history
This is the first step of separating forward from existing one to source.

In this PR, I copied the existing data forwarder from package forward to package sources and then did the following cleanups:
1. removed streaming since it's not supported for source.
2. removed all the `isSource` check and if branches that were for non-source vertices.
3. removed vertexType that was a required parameter of forwarder.
4. made source watermark publisher a required parameter instead of optional.
5. updated unit tests accordingly.

Signed-off-by: Keran Yang <[email protected]>
  • Loading branch information
KeranYang authored Jul 19, 2023
1 parent 6d14998 commit cfdeaa8
Show file tree
Hide file tree
Showing 19 changed files with 2,637 additions and 103 deletions.
4 changes: 2 additions & 2 deletions pkg/apis/numaflow/v1alpha1/vertex_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -527,7 +527,7 @@ func (av AbstractVertex) IsReduceUDF() bool {
}

func (av AbstractVertex) OwnedBufferNames(namespace, pipeline string) []string {
r := []string{}
var r []string
if av.IsASource() {
return r
}
Expand Down Expand Up @@ -699,7 +699,7 @@ func GenerateBufferName(namespace, pipelineName, vertex string, index int) strin
}

func GenerateBufferNames(namespace, pipelineName, vertex string, numOfPartitions int) []string {
result := []string{}
var result []string
for i := 0; i < numOfPartitions; i++ {
result = append(result, GenerateBufferName(namespace, pipelineName, vertex, i))
}
Expand Down
50 changes: 25 additions & 25 deletions pkg/forward/forward.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,9 +211,9 @@ func (isdf *InterStepDataForward) forwardAChunk(ctx context.Context) {
readMessages, err := isdf.fromBufferPartition.Read(ctx, isdf.opts.readBatchSize)
if err != nil {
isdf.opts.logger.Warnw("failed to read fromBufferPartition", zap.Error(err))
readMessagesError.With(map[string]string{metrics.LabelVertex: isdf.vertexName, metrics.LabelPipeline: isdf.pipelineName, metrics.LabelPartitionName: isdf.fromBufferPartition.GetName()}).Inc()
ReadMessagesError.With(map[string]string{metrics.LabelVertex: isdf.vertexName, metrics.LabelPipeline: isdf.pipelineName, metrics.LabelPartitionName: isdf.fromBufferPartition.GetName()}).Inc()
}
readMessagesCount.With(map[string]string{metrics.LabelVertex: isdf.vertexName, metrics.LabelPipeline: isdf.pipelineName, metrics.LabelPartitionName: isdf.fromBufferPartition.GetName()}).Add(float64(len(readMessages)))
ReadMessagesCount.With(map[string]string{metrics.LabelVertex: isdf.vertexName, metrics.LabelPipeline: isdf.pipelineName, metrics.LabelPartitionName: isdf.fromBufferPartition.GetName()}).Add(float64(len(readMessages)))

// process only if we have any read messages. There is a natural looping here if there is an internal error while
// reading, and we are not able to proceed.
Expand Down Expand Up @@ -303,7 +303,7 @@ func (isdf *InterStepDataForward) forwardAChunk(ctx context.Context) {
// send to UDF only the data messages
for idx, m := range dataMessages {
// emit message size metric
readBytesCount.With(map[string]string{metrics.LabelVertex: isdf.vertexName, metrics.LabelPipeline: isdf.pipelineName, metrics.LabelPartitionName: isdf.fromBufferPartition.GetName()}).Add(float64(len(m.Payload)))
ReadBytesCount.With(map[string]string{metrics.LabelVertex: isdf.vertexName, metrics.LabelPipeline: isdf.pipelineName, metrics.LabelPartitionName: isdf.fromBufferPartition.GetName()}).Add(float64(len(m.Payload)))
// assign watermark to the message. assign time.UnixMilli(-1) as watermark when we are at source vertex.
m.Watermark = time.Time(processorWM)
// send UDF processing work to the channel
Expand All @@ -316,7 +316,7 @@ func (isdf *InterStepDataForward) forwardAChunk(ctx context.Context) {
// context.Done() is closed.
wg.Wait()
isdf.opts.logger.Debugw("concurrent applyUDF completed", zap.Int("concurrency", isdf.opts.udfConcurrency), zap.Duration("took", time.Since(concurrentUDFProcessingStart)))
concurrentUDFProcessingTime.With(map[string]string{metrics.LabelVertex: isdf.vertexName, metrics.LabelPipeline: isdf.pipelineName, metrics.LabelPartitionName: isdf.fromBufferPartition.GetName()}).Observe(float64(time.Since(concurrentUDFProcessingStart).Microseconds()))
ConcurrentUDFProcessingTime.With(map[string]string{metrics.LabelVertex: isdf.vertexName, metrics.LabelPipeline: isdf.pipelineName, metrics.LabelPartitionName: isdf.fromBufferPartition.GetName()}).Observe(float64(time.Since(concurrentUDFProcessingStart).Microseconds()))
// UDF processing is done.

// if vertex type is source, it means we have finished the source data transformation.
Expand Down Expand Up @@ -353,7 +353,7 @@ func (isdf *InterStepDataForward) forwardAChunk(ctx context.Context) {
// look for errors in udf processing, if we see even 1 error NoAck all messages
// then return. Handling partial retrying is not worth ATM.
if m.udfError != nil {
udfError.With(map[string]string{metrics.LabelVertex: isdf.vertexName, metrics.LabelPipeline: isdf.pipelineName, metrics.LabelPartitionName: isdf.fromBufferPartition.GetName()}).Inc()
UdfError.With(map[string]string{metrics.LabelVertex: isdf.vertexName, metrics.LabelPipeline: isdf.pipelineName, metrics.LabelPartitionName: isdf.fromBufferPartition.GetName()}).Inc()
isdf.opts.logger.Errorw("failed to applyUDF", zap.Error(m.udfError))
// As there's no partial failure, non-ack all the readOffsets
isdf.fromBufferPartition.NoAck(ctx, readOffsets)
Expand Down Expand Up @@ -444,13 +444,13 @@ func (isdf *InterStepDataForward) forwardAChunk(ctx context.Context) {
// implicit return for posterity :-)
if err != nil {
isdf.opts.logger.Errorw("failed to ack from buffer", zap.Error(err))
ackMessageError.With(map[string]string{metrics.LabelVertex: isdf.vertexName, metrics.LabelPipeline: isdf.pipelineName, metrics.LabelPartitionName: isdf.fromBufferPartition.GetName()}).Add(float64(len(readOffsets)))
AckMessageError.With(map[string]string{metrics.LabelVertex: isdf.vertexName, metrics.LabelPipeline: isdf.pipelineName, metrics.LabelPartitionName: isdf.fromBufferPartition.GetName()}).Add(float64(len(readOffsets)))
return
}
ackMessagesCount.With(map[string]string{metrics.LabelVertex: isdf.vertexName, metrics.LabelPipeline: isdf.pipelineName, metrics.LabelPartitionName: isdf.fromBufferPartition.GetName()}).Add(float64(len(readOffsets)))
AckMessagesCount.With(map[string]string{metrics.LabelVertex: isdf.vertexName, metrics.LabelPipeline: isdf.pipelineName, metrics.LabelPartitionName: isdf.fromBufferPartition.GetName()}).Add(float64(len(readOffsets)))

// ProcessingTimes of the entire forwardAChunk
forwardAChunkProcessingTime.With(map[string]string{metrics.LabelVertex: isdf.vertexName, metrics.LabelPipeline: isdf.pipelineName, metrics.LabelPartitionName: isdf.fromBufferPartition.GetName()}).Observe(float64(time.Since(start).Microseconds()))
ForwardAChunkProcessingTime.With(map[string]string{metrics.LabelVertex: isdf.vertexName, metrics.LabelPipeline: isdf.pipelineName, metrics.LabelPartitionName: isdf.fromBufferPartition.GetName()}).Observe(float64(time.Since(start).Microseconds()))
}

// streamMessage streams the data messages to the next step.
Expand Down Expand Up @@ -478,14 +478,14 @@ func (isdf *InterStepDataForward) streamMessage(
// send to UDF only the data messages

// emit message size metric
readBytesCount.With(map[string]string{metrics.LabelVertex: isdf.vertexName, metrics.LabelPipeline: isdf.pipelineName, metrics.LabelPartitionName: isdf.fromBufferPartition.GetName()}).
ReadBytesCount.With(map[string]string{metrics.LabelVertex: isdf.vertexName, metrics.LabelPipeline: isdf.pipelineName, metrics.LabelPartitionName: isdf.fromBufferPartition.GetName()}).
Add(float64(len(dataMessages[0].Payload)))
// assign watermark to the message. assign time.UnixMilli(-1) as watermark when we are at source vertex.
dataMessages[0].Watermark = time.Time(processorWM)

// process the UDF and get the result
start := time.Now()
udfReadMessagesCount.With(map[string]string{metrics.LabelVertex: isdf.vertexName, metrics.LabelPipeline: isdf.pipelineName, metrics.LabelPartitionName: isdf.fromBufferPartition.GetName()}).Inc()
UdfReadMessagesCount.With(map[string]string{metrics.LabelVertex: isdf.vertexName, metrics.LabelPipeline: isdf.pipelineName, metrics.LabelPartitionName: isdf.fromBufferPartition.GetName()}).Inc()

writeMessageCh := make(chan isb.WriteMessage)
errs, ctx := errgroup.WithContext(ctx)
Expand All @@ -500,7 +500,7 @@ func (isdf *InterStepDataForward) streamMessage(
// add partition to the ID, this is to make sure that the ID is unique across partitions
writeMessage.ID = fmt.Sprintf("%s-%d-%d", dataMessages[0].ReadOffset.String(), isdf.fromBufferPartition.GetPartitionIdx(), msgIndex)
msgIndex += 1
udfWriteMessagesCount.With(map[string]string{metrics.LabelVertex: isdf.vertexName, metrics.LabelPipeline: isdf.pipelineName, metrics.LabelPartitionName: isdf.fromBufferPartition.GetName()}).Add(float64(1))
UdfWriteMessagesCount.With(map[string]string{metrics.LabelVertex: isdf.vertexName, metrics.LabelPipeline: isdf.pipelineName, metrics.LabelPartitionName: isdf.fromBufferPartition.GetName()}).Add(float64(1))

// update toBuffers
if err := isdf.whereToStep(&writeMessage, messageToStep, dataMessages[0]); err != nil {
Expand All @@ -523,17 +523,17 @@ func (isdf *InterStepDataForward) streamMessage(
// look for errors in udf processing, if we see even 1 error NoAck all messages
// then return. Handling partial retrying is not worth ATM.
if err := errs.Wait(); err != nil {
udfError.With(map[string]string{metrics.LabelVertex: isdf.vertexName, metrics.LabelPipeline: isdf.pipelineName,
UdfError.With(map[string]string{metrics.LabelVertex: isdf.vertexName, metrics.LabelPipeline: isdf.pipelineName,
metrics.LabelPartitionName: isdf.fromBufferPartition.GetName()}).Inc()
// We do not retry as we are streaming
if ok, _ := isdf.IsShuttingDown(); ok {
isdf.opts.logger.Errorw("UDF.Apply, Stop called while stuck on an internal error", zap.Error(err))
platformError.With(map[string]string{metrics.LabelVertex: isdf.vertexName, metrics.LabelPipeline: isdf.pipelineName}).Inc()
PlatformError.With(map[string]string{metrics.LabelVertex: isdf.vertexName, metrics.LabelPipeline: isdf.pipelineName}).Inc()
}
return nil, fmt.Errorf("failed to applyUDF, error: %w", err)
}

udfProcessingTime.With(map[string]string{metrics.LabelVertex: isdf.vertexName, metrics.LabelPipeline: isdf.pipelineName,
UdfProcessingTime.With(map[string]string{metrics.LabelVertex: isdf.vertexName, metrics.LabelPipeline: isdf.pipelineName,
metrics.LabelPartitionName: isdf.fromBufferPartition.GetName()}).Observe(float64(time.Since(start).Microseconds()))
} else {
// Even not data messages, forward the message to the edge buffer (could be multiple edges)
Expand Down Expand Up @@ -644,10 +644,10 @@ func (isdf *InterStepDataForward) writeToBuffer(ctx context.Context, toBufferPar
needRetry = true
// we retry only failed messages
failedMessages = append(failedMessages, msg)
writeMessagesError.With(map[string]string{metrics.LabelVertex: isdf.vertexName, metrics.LabelPipeline: isdf.pipelineName, metrics.LabelPartitionName: toBufferPartition.GetName()}).Inc()
WriteMessagesError.With(map[string]string{metrics.LabelVertex: isdf.vertexName, metrics.LabelPipeline: isdf.pipelineName, metrics.LabelPartitionName: toBufferPartition.GetName()}).Inc()
// a shutdown can break the blocking loop caused due to InternalErr
if ok, _ := isdf.IsShuttingDown(); ok {
platformError.With(map[string]string{metrics.LabelVertex: isdf.vertexName, metrics.LabelPipeline: isdf.pipelineName}).Inc()
PlatformError.With(map[string]string{metrics.LabelVertex: isdf.vertexName, metrics.LabelPipeline: isdf.pipelineName}).Inc()
return writeOffsets, fmt.Errorf("writeToBuffer failed, Stop called while stuck on an internal error with failed messages:%d, %v", len(failedMessages), errs)
}
}
Expand Down Expand Up @@ -677,23 +677,23 @@ func (isdf *InterStepDataForward) writeToBuffer(ctx context.Context, toBufferPar
}
}

dropMessagesCount.With(map[string]string{metrics.LabelVertex: isdf.vertexName, metrics.LabelPipeline: isdf.pipelineName, metrics.LabelPartitionName: toBufferPartition.GetName()}).Add(float64(totalCount - writeCount))
dropBytesCount.With(map[string]string{metrics.LabelVertex: isdf.vertexName, metrics.LabelPipeline: isdf.pipelineName, metrics.LabelPartitionName: toBufferPartition.GetName()}).Add(dropBytes)
writeMessagesCount.With(map[string]string{metrics.LabelVertex: isdf.vertexName, metrics.LabelPipeline: isdf.pipelineName, metrics.LabelPartitionName: toBufferPartition.GetName()}).Add(float64(writeCount))
writeBytesCount.With(map[string]string{metrics.LabelVertex: isdf.vertexName, metrics.LabelPipeline: isdf.pipelineName, metrics.LabelPartitionName: toBufferPartition.GetName()}).Add(writeBytes)
DropMessagesCount.With(map[string]string{metrics.LabelVertex: isdf.vertexName, metrics.LabelPipeline: isdf.pipelineName, metrics.LabelPartitionName: toBufferPartition.GetName()}).Add(float64(totalCount - writeCount))
DropBytesCount.With(map[string]string{metrics.LabelVertex: isdf.vertexName, metrics.LabelPipeline: isdf.pipelineName, metrics.LabelPartitionName: toBufferPartition.GetName()}).Add(dropBytes)
WriteMessagesCount.With(map[string]string{metrics.LabelVertex: isdf.vertexName, metrics.LabelPipeline: isdf.pipelineName, metrics.LabelPartitionName: toBufferPartition.GetName()}).Add(float64(writeCount))
WriteBytesCount.With(map[string]string{metrics.LabelVertex: isdf.vertexName, metrics.LabelPipeline: isdf.pipelineName, metrics.LabelPartitionName: toBufferPartition.GetName()}).Add(writeBytes)
return writeOffsets, nil
}

// concurrentApplyUDF applies the UDF based on the request from the channel
func (isdf *InterStepDataForward) concurrentApplyUDF(ctx context.Context, readMessagePair <-chan *readWriteMessagePair) {
for message := range readMessagePair {
start := time.Now()
udfReadMessagesCount.With(map[string]string{metrics.LabelVertex: isdf.vertexName, metrics.LabelPipeline: isdf.pipelineName, metrics.LabelPartitionName: isdf.fromBufferPartition.GetName()}).Inc()
UdfReadMessagesCount.With(map[string]string{metrics.LabelVertex: isdf.vertexName, metrics.LabelPipeline: isdf.pipelineName, metrics.LabelPartitionName: isdf.fromBufferPartition.GetName()}).Inc()
writeMessages, err := isdf.applyUDF(ctx, message.readMessage)
udfWriteMessagesCount.With(map[string]string{metrics.LabelVertex: isdf.vertexName, metrics.LabelPipeline: isdf.pipelineName, metrics.LabelPartitionName: isdf.fromBufferPartition.GetName()}).Add(float64(len(writeMessages)))
UdfWriteMessagesCount.With(map[string]string{metrics.LabelVertex: isdf.vertexName, metrics.LabelPipeline: isdf.pipelineName, metrics.LabelPartitionName: isdf.fromBufferPartition.GetName()}).Add(float64(len(writeMessages)))
message.writeMessages = append(message.writeMessages, writeMessages...)
message.udfError = err
udfProcessingTime.With(map[string]string{metrics.LabelVertex: isdf.vertexName, metrics.LabelPipeline: isdf.pipelineName, metrics.LabelPartitionName: isdf.fromBufferPartition.GetName()}).Observe(float64(time.Since(start).Microseconds()))
UdfProcessingTime.With(map[string]string{metrics.LabelVertex: isdf.vertexName, metrics.LabelPipeline: isdf.pipelineName, metrics.LabelPartitionName: isdf.fromBufferPartition.GetName()}).Observe(float64(time.Since(start).Microseconds()))
}
}

Expand All @@ -712,7 +712,7 @@ func (isdf *InterStepDataForward) applyUDF(ctx context.Context, readMessage *isb
// this does not mean we should prohibit this from a shutdown.
if ok, _ := isdf.IsShuttingDown(); ok {
isdf.opts.logger.Errorw("UDF.Apply, Stop called while stuck on an internal error", zap.Error(err))
platformError.With(map[string]string{metrics.LabelVertex: isdf.vertexName, metrics.LabelPipeline: isdf.pipelineName}).Inc()
PlatformError.With(map[string]string{metrics.LabelVertex: isdf.vertexName, metrics.LabelPipeline: isdf.pipelineName}).Inc()
return nil, err
}
continue
Expand All @@ -739,7 +739,7 @@ func (isdf *InterStepDataForward) whereToStep(writeMessage *isb.WriteMessage, me
// a shutdown can break the blocking loop caused due to InternalErr
if ok, _ := isdf.IsShuttingDown(); ok {
err := fmt.Errorf("whereToStep, Stop called while stuck on an internal error, %v", err)
platformError.With(map[string]string{metrics.LabelVertex: isdf.vertexName, metrics.LabelPipeline: isdf.pipelineName}).Inc()
PlatformError.With(map[string]string{metrics.LabelVertex: isdf.vertexName, metrics.LabelPipeline: isdf.pipelineName}).Inc()
return err
}
return err
Expand Down
12 changes: 6 additions & 6 deletions pkg/forward/forward_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1559,7 +1559,7 @@ func validateMetrics(t *testing.T, batchSize int64) {
forwarder_read_total{partition_name="from",pipeline="testPipeline",vertex="testVertex"} ` + fmt.Sprintf("%f", float64(batchSize)) + `
`

err := testutil.CollectAndCompare(readMessagesCount, strings.NewReader(metadata+expected), "forwarder_read_total")
err := testutil.CollectAndCompare(ReadMessagesCount, strings.NewReader(metadata+expected), "forwarder_read_total")
if err != nil {
t.Errorf("unexpected collecting result:\n%s", err)
}
Expand All @@ -1581,7 +1581,7 @@ func validateMetrics(t *testing.T, batchSize int64) {
`
}

err = testutil.CollectAndCompare(writeMessagesCount, strings.NewReader(writeMetadata+writeExpected), "forwarder_write_total")
err = testutil.CollectAndCompare(WriteMessagesCount, strings.NewReader(writeMetadata+writeExpected), "forwarder_write_total")
if err != nil {
t.Errorf("unexpected collecting result:\n%s", err)
}
Expand All @@ -1594,16 +1594,16 @@ func validateMetrics(t *testing.T, batchSize int64) {
forwarder_ack_total{partition_name="from",pipeline="testPipeline",vertex="testVertex"} ` + fmt.Sprintf("%d", batchSize) + `
`

err = testutil.CollectAndCompare(ackMessagesCount, strings.NewReader(ackMetadata+ackExpected), "forwarder_ack_total")
err = testutil.CollectAndCompare(AckMessagesCount, strings.NewReader(ackMetadata+ackExpected), "forwarder_ack_total")
if err != nil {
t.Errorf("unexpected collecting result:\n%s", err)
}
}

func metricsReset() {
readMessagesCount.Reset()
writeMessagesCount.Reset()
ackMessagesCount.Reset()
ReadMessagesCount.Reset()
WriteMessagesCount.Reset()
AckMessagesCount.Reset()
}

// buildPublisherMap builds OTStore and publisher for each toBuffer
Expand Down
Loading

0 comments on commit cfdeaa8

Please sign in to comment.