Skip to content

Commit

Permalink
fix: update udf fetchWatermark and publishWatermark initial values (#193
Browse files Browse the repository at this point in the history
)

Signed-off-by: jyu6 <[email protected]>
Co-authored-by: jyu6 <[email protected]>
  • Loading branch information
jy4096 and jyu6 authored Sep 29, 2022
1 parent 04da619 commit b1b78fa
Showing 1 changed file with 2 additions and 4 deletions.
6 changes: 2 additions & 4 deletions pkg/udf/udf.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"sync"
"time"

"github.com/numaproj/numaflow/pkg/watermark/generic"
"go.uber.org/zap"

dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1"
Expand All @@ -19,9 +20,7 @@ import (
"github.com/numaproj/numaflow/pkg/shared/logging"
sharedutil "github.com/numaproj/numaflow/pkg/shared/util"
"github.com/numaproj/numaflow/pkg/udf/applier"
"github.com/numaproj/numaflow/pkg/watermark/fetch"
"github.com/numaproj/numaflow/pkg/watermark/generic/jetstream"
"github.com/numaproj/numaflow/pkg/watermark/publish"
)

type UDFProcessor struct {
Expand All @@ -40,8 +39,7 @@ func (u *UDFProcessor) Start(ctx context.Context) error {
writers := make(map[string]isb.BufferWriter)

// watermark variables
var fetchWatermark fetch.Fetcher = nil
var publishWatermark map[string]publish.Publisher = nil
fetchWatermark, publishWatermark := generic.BuildNoOpWatermarkProgressorsFromEdgeList(generic.GetBufferNameList(u.VertexInstance.Vertex.GetToBuffers()))

switch u.ISBSvcType {
case dfv1.ISBSvcTypeRedis:
Expand Down

0 comments on commit b1b78fa

Please sign in to comment.