diff --git a/pkg/udf/udf.go b/pkg/udf/udf.go index 7dca6ab822..47d38d5025 100644 --- a/pkg/udf/udf.go +++ b/pkg/udf/udf.go @@ -6,6 +6,7 @@ import ( "sync" "time" + "github.com/numaproj/numaflow/pkg/watermark/generic" "go.uber.org/zap" dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1" @@ -19,9 +20,7 @@ import ( "github.com/numaproj/numaflow/pkg/shared/logging" sharedutil "github.com/numaproj/numaflow/pkg/shared/util" "github.com/numaproj/numaflow/pkg/udf/applier" - "github.com/numaproj/numaflow/pkg/watermark/fetch" "github.com/numaproj/numaflow/pkg/watermark/generic/jetstream" - "github.com/numaproj/numaflow/pkg/watermark/publish" ) type UDFProcessor struct { @@ -40,8 +39,7 @@ func (u *UDFProcessor) Start(ctx context.Context) error { writers := make(map[string]isb.BufferWriter) // watermark variables - var fetchWatermark fetch.Fetcher = nil - var publishWatermark map[string]publish.Publisher = nil + fetchWatermark, publishWatermark := generic.BuildNoOpWatermarkProgressorsFromEdgeList(generic.GetBufferNameList(u.VertexInstance.Vertex.GetToBuffers())) switch u.ISBSvcType { case dfv1.ISBSvcTypeRedis: