diff --git a/pkg/shared/idlehandler/idlehandler.go b/pkg/shared/idlehandler/idlehandler.go index 29a0f9247e..90381d97bc 100644 --- a/pkg/shared/idlehandler/idlehandler.go +++ b/pkg/shared/idlehandler/idlehandler.go @@ -59,7 +59,14 @@ func PublishIdleWatermark(ctx context.Context, toBufferPartition isb.BufferWrite // publish WMB (this will naturally incr or set the timestamp of rl.wmbOffset) if vertexType == dfv1.VertexTypeSource || vertexType == dfv1.VertexTypeMapUDF || vertexType == dfv1.VertexTypeReduceUDF { - wmPublisher.PublishIdleWatermark(wm, idleManager.Get(toPartitionName), toVertexPartition) + // We create one forwarder for each fromPartitions, and all the forwarders share one idleManager. + // Therefore, it's possible that one forwarder marks the toPartition to be "idling" and tries to + // publish a valid idle watermark while another forwarder just marks the toPartition to be "active" + // right after. In that case, the offset we get here will be nil, and we ignore the "idling" + // and consider the toPartition to be "active" + if offset := idleManager.Get(toPartitionName); offset != nil { + wmPublisher.PublishIdleWatermark(wm, offset, toVertexPartition) + } } else { // for Sink vertex, and it does not care about the offset during watermark publishing wmPublisher.PublishIdleWatermark(wm, nil, toVertexPartition) diff --git a/pkg/udf/map_udf.go b/pkg/udf/map_udf.go index cff33005d3..0e7d4809b1 100644 --- a/pkg/udf/map_udf.go +++ b/pkg/udf/map_udf.go @@ -240,7 +240,7 @@ func (u *MapUDFProcessor) Start(ctx context.Context) error { defer finalWg.Done() log.Infow("Start processing udf messages", zap.String("isbsvc", string(u.ISBSvcType)), zap.String("from", fromBufferPartitionName), zap.Any("to", u.VertexInstance.Vertex.GetToBuffers())) - stopped := forwarder.Start() + stopped := isdf.Start() wg := &sync.WaitGroup{} wg.Add(1) go func() { @@ -254,7 +254,7 @@ func (u *MapUDFProcessor) Start(ctx context.Context) error { <-ctx.Done() log.Info("SIGTERM, exiting inside partition...", zap.String("partition", fromBufferPartitionName)) - forwarder.Stop() + isdf.Stop() wg.Wait() log.Info("Exited for partition...", zap.String("partition", fromBufferPartitionName)) }(bufferPartition, forwarder)