Skip to content

Commit

Permalink
fix: skip publishing watermarks to unexpected vertices. Fixes #235 (#236
Browse files Browse the repository at this point in the history
)

Signed-off-by: Derek Wang <[email protected]>
  • Loading branch information
whynowy authored Oct 18, 2022
1 parent fff05f3 commit 998e398
Show file tree
Hide file tree
Showing 13 changed files with 56 additions and 35 deletions.
6 changes: 6 additions & 0 deletions docs/APIs.md
Original file line number Diff line number Diff line change
Expand Up @@ -3723,6 +3723,12 @@ Kubernetes meta/v1.Time </a> </em>
</tr>
</tbody>
</table>
<h3 id="numaflow.numaproj.io/v1alpha1.VertexType">
VertexType (<code>string</code> alias)
</p>
</h3>
<p>
</p>
<h3 id="numaflow.numaproj.io/v1alpha1.Watermark">
Watermark
</h3>
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ require (
github.com/grpc-ecosystem/grpc-gateway v1.16.0
github.com/hashicorp/golang-lru v0.5.4
github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe
github.com/nats-io/nats.go v1.16.0
github.com/nats-io/nats.go v1.18.0
github.com/numaproj/numaflow-go v0.2.3
github.com/prometheus/client_golang v1.12.1
github.com/prometheus/common v0.32.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -683,8 +683,8 @@ github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f/go.mod h1:ZdcZmHo+
github.com/nats-io/jwt/v2 v2.2.1-0.20220330180145-442af02fd36a h1:lem6QCvxR0Y28gth9P+wV2K/zYUUAkJ+55U8cpS0p5I=
github.com/nats-io/nats-server/v2 v2.7.5-0.20220415000625-a6b62f61a703 h1:d8siT+8VQ68hfqPqYZvpMrHIihlMVW3gGy+o2hRDCyg=
github.com/nats-io/nats-server/v2 v2.7.5-0.20220415000625-a6b62f61a703/go.mod h1:5vic7C58BFEVltiZhs7Kq81q2WcEPhJPsmNv1FOrdv0=
github.com/nats-io/nats.go v1.16.0 h1:zvLE7fGBQYW6MWaFaRdsgm9qT39PJDQoju+DS8KsO1g=
github.com/nats-io/nats.go v1.16.0/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
github.com/nats-io/nats.go v1.18.0 h1:o480Ao6kuSSFyJO75rGTXCEPj7LGkY84C1Ye+Uhm4c0=
github.com/nats-io/nats.go v1.18.0/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8=
github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
Expand Down
8 changes: 8 additions & 0 deletions pkg/apis/numaflow/v1alpha1/vertex_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,14 @@ const (
VertexPhaseFailed VertexPhase = "Failed"
)

type VertexType string

const (
VertexTypeSource VertexType = "Source"
VertexTypeSink VertexType = "Sink"
VertexTypeUDF VertexType = "UDF"
)

// +genclient
// +kubebuilder:object:root=true
// +kubebuilder:resource:shortName=vtx
Expand Down
31 changes: 17 additions & 14 deletions pkg/isb/forward/forward.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,18 +181,18 @@ func (isdf *InterStepDataForward) forwardAChunk(ctx context.Context) {
for _, m := range readMessages {
readBytesCount.With(map[string]string{metricspkg.LabelVertex: isdf.vertexName, metricspkg.LabelPipeline: isdf.pipelineName, "buffer": isdf.fromBuffer.GetName()}).Add(float64(len(m.Payload)))
m.Watermark = time.Time(processorWM)
if isdf.opts.isFromSourceVertex && processorWM.After(m.EventTime) { // Set late data at source level
if isdf.opts.vertexType == dfv1.VertexTypeSource && processorWM.After(m.EventTime) { // Set late data at source level
m.IsLate = true
}
}

// create space for writeMessages specific to each step as we could forward to all the steps too.
var messageToStep = make(map[string][]isb.Message)
var toBuffers string
for step := range isdf.toBuffers {
var toBuffers string // logging purpose
for buffer := range isdf.toBuffers {
// over allocating to have a predictable pattern
messageToStep[step] = make([]isb.Message, 0, len(readMessages))
toBuffers += step
messageToStep[buffer] = make([]isb.Message, 0, len(readMessages))
toBuffers += buffer + ","
}

// udf concurrent processing request channel
Expand Down Expand Up @@ -251,12 +251,15 @@ func (isdf *InterStepDataForward) forwardAChunk(ctx context.Context) {

// forward the highest watermark to all the edges to avoid idle edge problem
// TODO: sort and get the highest value
// TODO: Should also publish to those edges without writing (fall out of conditional forwarding)?
for bufferName, offsets := range writeOffsets {
if publisher, ok := isdf.publishWatermark[bufferName]; ok {
if len(offsets) > 0 {
publisher.PublishWatermark(processorWM, offsets[len(offsets)-1])
} else { // This only happens on sink vertex, and it does not care about the offset during watermark publishing
if isdf.opts.vertexType == dfv1.VertexTypeSource || isdf.opts.vertexType == dfv1.VertexTypeUDF {
if len(offsets) > 0 {
publisher.PublishWatermark(processorWM, offsets[len(offsets)-1])
}
// This (len(offsets) == 0) happens at conditional forwarding, there's no data written to the buffer
// TODO: Should also publish to those edges without writing (fall out of conditional forwarding)
} else { // For Sink vertex, and it does not care about the offset during watermark publishing
publisher.PublishWatermark(processorWM, nil)
}
}
Expand Down Expand Up @@ -305,15 +308,15 @@ func (isdf *InterStepDataForward) ackFromBuffer(ctx context.Context, offsets []i
// writeToBuffers is a blocking call until all the messages have be forwarded to all the toBuffers, or a shutdown
// has been initiated while we are stuck looping on an InternalError.
func (isdf *InterStepDataForward) writeToBuffers(ctx context.Context, messageToStep map[string][]isb.Message) (writeOffsetsEdge map[string][]isb.Offset, err error) {
// messageToStep contains all the to buffers, so the messages could be empty (conditional forwarding).
// So writeOffsetsEdge also contains all the to buffers, but the returned offsets might be empty.
writeOffsetsEdge = make(map[string][]isb.Offset, len(messageToStep))
// TODO: rename key to edgeName
for key, toBuffer := range isdf.toBuffers {
writeOffsetsEdge[key], err = isdf.writeToBuffer(ctx, toBuffer, messageToStep[key])
for bufferName, toBuffer := range isdf.toBuffers {
writeOffsetsEdge[bufferName], err = isdf.writeToBuffer(ctx, toBuffer, messageToStep[bufferName])
if err != nil {
return writeOffsetsEdge, err
return nil, err
}
}

return writeOffsetsEdge, nil
}

Expand Down
12 changes: 7 additions & 5 deletions pkg/isb/forward/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"time"

"go.uber.org/zap"

dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1"
)

// options for forwarding the message
Expand All @@ -14,8 +16,8 @@ type options struct {
udfConcurrency int
// retryInterval is the time.Duration to sleep before retrying
retryInterval time.Duration
// isFromSourceVertex indicates if the fromStep is a source
isFromSourceVertex bool
// vertexType indicates the type of the vertex
vertexType dfv1.VertexType
// logger is used to pass the logger variable
logger *zap.SugaredLogger
}
Expand Down Expand Up @@ -54,10 +56,10 @@ func WithLogger(l *zap.SugaredLogger) Option {
}
}

// FromSourceVertex indicates it reads from a buffer written by a source vertex
func FromSourceVertex() Option {
// WithVertexType sets the type of the vertex
func WithVertexType(t dfv1.VertexType) Option {
return func(o *options) error {
o.isFromSourceVertex = true
o.vertexType = t
return nil
}
}
5 changes: 3 additions & 2 deletions pkg/sinks/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@ package kafka
import (
"context"
"fmt"
"time"

"github.com/Shopify/sarama"
"github.com/numaproj/numaflow/pkg/udf/applier"
"github.com/numaproj/numaflow/pkg/watermark/fetch"
"github.com/numaproj/numaflow/pkg/watermark/publish"
"go.uber.org/zap"
"time"

dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1"
"github.com/numaproj/numaflow/pkg/isb"
Expand Down Expand Up @@ -60,7 +61,7 @@ func NewToKafka(vertex *dfv1.Vertex, fromBuffer isb.BufferReader, fetchWatermark
toKafka.topic = kafkaSink.Topic
toKafka.kafkaSink = kafkaSink

forwardOpts := []forward.Option{forward.WithLogger(toKafka.log)}
forwardOpts := []forward.Option{forward.WithVertexType(dfv1.VertexTypeSink), forward.WithLogger(toKafka.log)}
if x := vertex.Spec.Limits; x != nil {
if x.ReadBatchSize != nil {
forwardOpts = append(forwardOpts, forward.WithReadBatchSize(int64(*x.ReadBatchSize)))
Expand Down
5 changes: 3 additions & 2 deletions pkg/sinks/logger/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@ package logger

import (
"context"
"log"

"github.com/numaproj/numaflow/pkg/udf/applier"
"github.com/numaproj/numaflow/pkg/watermark/fetch"
"github.com/numaproj/numaflow/pkg/watermark/publish"
"log"

dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1"

Expand Down Expand Up @@ -50,7 +51,7 @@ func NewToLog(vertex *dfv1.Vertex, fromBuffer isb.BufferReader, fetchWatermark f
toLog.logger = logging.NewLogger()
}

forwardOpts := []forward.Option{forward.WithLogger(toLog.logger)}
forwardOpts := []forward.Option{forward.WithVertexType(dfv1.VertexTypeSink), forward.WithLogger(toLog.logger)}
if x := vertex.Spec.Limits; x != nil {
if x.ReadBatchSize != nil {
forwardOpts = append(forwardOpts, forward.WithReadBatchSize(int64(*x.ReadBatchSize)))
Expand Down
4 changes: 2 additions & 2 deletions pkg/sinks/udsink/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@ package udsink
import (
"context"
"fmt"
"github.com/numaproj/numaflow/pkg/udf/applier"
"time"

sinkpb "github.com/numaproj/numaflow-go/pkg/apis/proto/sink/v1"
dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1"
"github.com/numaproj/numaflow/pkg/isb"
"github.com/numaproj/numaflow/pkg/isb/forward"
"github.com/numaproj/numaflow/pkg/shared/logging"
"github.com/numaproj/numaflow/pkg/udf/applier"
"github.com/numaproj/numaflow/pkg/watermark/fetch"
"github.com/numaproj/numaflow/pkg/watermark/publish"
"go.uber.org/zap"
Expand Down Expand Up @@ -49,7 +49,7 @@ func NewUserDefinedSink(vertex *dfv1.Vertex, fromBuffer isb.BufferReader, fetchW
s.logger = logging.NewLogger()
}

forwardOpts := []forward.Option{forward.WithLogger(s.logger)}
forwardOpts := []forward.Option{forward.WithVertexType(dfv1.VertexTypeSink), forward.WithLogger(s.logger)}
if x := vertex.Spec.Limits; x != nil {
if x.ReadBatchSize != nil {
forwardOpts = append(forwardOpts, forward.WithReadBatchSize(int64(*x.ReadBatchSize)))
Expand Down
4 changes: 2 additions & 2 deletions pkg/sources/generator/tickgen.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"context"
"encoding/binary"
"encoding/json"
"github.com/numaproj/numaflow/pkg/udf/applier"
"strconv"
"sync/atomic"
"time"
Expand All @@ -18,6 +17,7 @@ import (
"github.com/numaproj/numaflow/pkg/isb/forward"
metricspkg "github.com/numaproj/numaflow/pkg/metrics"
"github.com/numaproj/numaflow/pkg/shared/logging"
"github.com/numaproj/numaflow/pkg/udf/applier"
"github.com/numaproj/numaflow/pkg/watermark/fetch"
"github.com/numaproj/numaflow/pkg/watermark/processor"
"github.com/numaproj/numaflow/pkg/watermark/publish"
Expand Down Expand Up @@ -154,7 +154,7 @@ func NewMemGen(vertexInstance *dfv1.VertexInstance,
destinations[w.GetName()] = w
}

forwardOpts := []forward.Option{forward.FromSourceVertex(), forward.WithLogger(gensrc.logger)}
forwardOpts := []forward.Option{forward.WithVertexType(dfv1.VertexTypeSource), forward.WithLogger(gensrc.logger)}
if x := vertexInstance.Vertex.Spec.Limits; x != nil {
if x.ReadBatchSize != nil {
forwardOpts = append(forwardOpts, forward.WithReadBatchSize(int64(*x.ReadBatchSize)))
Expand Down
4 changes: 2 additions & 2 deletions pkg/sources/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"crypto/tls"
"fmt"
"github.com/numaproj/numaflow/pkg/udf/applier"
"io"
"net/http"
"strconv"
Expand All @@ -20,6 +19,7 @@ import (
"github.com/numaproj/numaflow/pkg/shared/logging"
sharedtls "github.com/numaproj/numaflow/pkg/shared/tls"
sharedutil "github.com/numaproj/numaflow/pkg/shared/util"
"github.com/numaproj/numaflow/pkg/udf/applier"
"github.com/numaproj/numaflow/pkg/watermark/fetch"
"github.com/numaproj/numaflow/pkg/watermark/processor"
"github.com/numaproj/numaflow/pkg/watermark/publish"
Expand Down Expand Up @@ -175,7 +175,7 @@ func New(vertexInstance *dfv1.VertexInstance, writers []isb.BufferWriter, fetchW
destinations[w.GetName()] = w
}

forwardOpts := []forward.Option{forward.FromSourceVertex(), forward.WithLogger(h.logger)}
forwardOpts := []forward.Option{forward.WithVertexType(dfv1.VertexTypeSource), forward.WithLogger(h.logger)}
if x := vertexInstance.Vertex.Spec.Limits; x != nil {
if x.ReadBatchSize != nil {
forwardOpts = append(forwardOpts, forward.WithReadBatchSize(int64(*x.ReadBatchSize)))
Expand Down
4 changes: 2 additions & 2 deletions pkg/sources/kafka/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"errors"
"fmt"
"github.com/numaproj/numaflow/pkg/udf/applier"
"strconv"
"strings"
"sync"
Expand All @@ -19,6 +18,7 @@ import (
metricspkg "github.com/numaproj/numaflow/pkg/metrics"
"github.com/numaproj/numaflow/pkg/shared/logging"
sharedutil "github.com/numaproj/numaflow/pkg/shared/util"
"github.com/numaproj/numaflow/pkg/udf/applier"
"github.com/numaproj/numaflow/pkg/watermark/fetch"
"github.com/numaproj/numaflow/pkg/watermark/processor"
"github.com/numaproj/numaflow/pkg/watermark/publish"
Expand Down Expand Up @@ -303,7 +303,7 @@ func NewKafkaSource(vertexInstance *dfv1.VertexInstance, writers []isb.BufferWri
destinations[w.GetName()] = w
}

forwardOpts := []forward.Option{forward.FromSourceVertex(), forward.WithLogger(kafkasource.logger)}
forwardOpts := []forward.Option{forward.WithVertexType(dfv1.VertexTypeSource), forward.WithLogger(kafkasource.logger)}
if x := vertexInstance.Vertex.Spec.Limits; x != nil {
if x.ReadBatchSize != nil {
forwardOpts = append(forwardOpts, forward.WithReadBatchSize(int64(*x.ReadBatchSize)))
Expand Down
2 changes: 1 addition & 1 deletion pkg/udf/udf.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func (u *UDFProcessor) Start(ctx context.Context) error {
}()
log.Infow("Start processing udf messages", zap.String("isbsvc", string(u.ISBSvcType)), zap.String("from", fromBufferName), zap.Any("to", toBuffers))

opts := []forward.Option{forward.WithLogger(log)}
opts := []forward.Option{forward.WithVertexType(dfv1.VertexTypeUDF), forward.WithLogger(log)}
if x := u.VertexInstance.Vertex.Spec.Limits; x != nil {
if x.ReadBatchSize != nil {
opts = append(opts, forward.WithReadBatchSize(int64(*x.ReadBatchSize)))
Expand Down

0 comments on commit 998e398

Please sign in to comment.