Skip to content

Commit

Permalink
fix: Watermark close fix and removed the nil check (#238)
Browse files Browse the repository at this point in the history
Signed-off-by: jyu6 <[email protected]>
Co-authored-by: jyu6 <[email protected]>
  • Loading branch information
jy4096 and jyu6 authored Oct 19, 2022
1 parent 998e398 commit 6870d2a
Show file tree
Hide file tree
Showing 12 changed files with 28 additions and 26 deletions.
3 changes: 1 addition & 2 deletions api/json-schema/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -1440,8 +1440,7 @@
}
},
"required": [
"replicas",
"availableReplicas"
"replicas"
],
"type": "object"
},
Expand Down
3 changes: 1 addition & 2 deletions api/openapi-spec/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -1391,8 +1391,7 @@
"description": "StatefulSetStatus represents the current state of a StatefulSet.",
"type": "object",
"required": [
"replicas",
"availableReplicas"
"replicas"
],
"properties": {
"availableReplicas": {
Expand Down
4 changes: 3 additions & 1 deletion pkg/apis/numaflow/v1alpha1/vertex_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
"encoding/base64"
"encoding/json"
"errors"
fmt "fmt"
"fmt"
"os"

corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -283,6 +283,8 @@ func (v Vertex) GetFromBuffers() []Buffer {
if v.IsASource() {
r = append(r, Buffer{GenerateSourceBufferName(v.Namespace, v.Spec.PipelineName, v.Spec.Name), SourceBuffer})
} else {
// TODO: current design always has len(v.Spec.FromEdges) equals 1
// need to update once we support multiple fromEdges
for _, vt := range v.Spec.FromEdges {
r = append(r, Buffer{GenerateEdgeBufferName(v.Namespace, v.Spec.PipelineName, vt.From, v.Spec.Name), EdgeBuffer})
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/sources/generator/tickgen.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func WithReadTimeOut(timeout time.Duration) Option {
}
}

// NewMemGen fuction creates an instance of generator.
// NewMemGen function creates an instance of generator.
// ctx - context passed by the cmd/start.go a new context with cancel
//
// is created for use by this vertex.
Expand Down
6 changes: 2 additions & 4 deletions pkg/watermark/fetch/processor_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,7 @@ func NewProcessorManager(ctx context.Context, watermarkStoreWatcher store.Waterm
opts: opts,
}
go v.startRefreshingProcessors()
// we do not care about heartbeat watcher if this is a source vertex
if v.hbWatcher != nil {
go v.startHeatBeatWatcher()
}
go v.startHeatBeatWatcher()
return v
}

Expand Down Expand Up @@ -140,6 +137,7 @@ func (v *ProcessorManager) startHeatBeatWatcher() {
for {
select {
case <-v.ctx.Done():
v.hbWatcher.Close()
return
case value := <-watchCh:
if value == nil {
Expand Down
6 changes: 2 additions & 4 deletions pkg/watermark/fetch/processor_to_fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,7 @@ func NewProcessorToFetch(ctx context.Context, processor processor.ProcessorEntit
otWatcher: watcher,
log: logging.FromContext(ctx),
}

if watcher != nil {
go p.startTimeLineWatcher()
}
go p.startTimeLineWatcher()
return p
}

Expand Down Expand Up @@ -103,6 +100,7 @@ func (p *ProcessorToFetch) startTimeLineWatcher() {
for {
select {
case <-p.ctx.Done():
p.otWatcher.Close()
return
case value := <-watchCh:
// TODO: why will value will be nil?
Expand Down
3 changes: 2 additions & 1 deletion pkg/watermark/fetch/processor_to_fetch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@ import (
"context"
"testing"

"github.com/numaproj/numaflow/pkg/watermark/store/noop"
"github.com/stretchr/testify/assert"

"github.com/numaproj/numaflow/pkg/watermark/processor"
)

func TestFromProcessor_setStatus(t *testing.T) {
var ctx = context.Background()
p := NewProcessorToFetch(ctx, processor.NewProcessorEntity("testPod1"), 5, nil)
p := NewProcessorToFetch(ctx, processor.NewProcessorEntity("testPod1"), 5, noop.NewKVOpWatch())
p.setStatus(_inactive)
assert.Equal(t, _inactive, p.status)
}
3 changes: 1 addition & 2 deletions pkg/watermark/generic/generic.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@ import (
)

// NewGenericEdgeFetch returns a Fetcher, where bufferName is the from buffer of the vertex that is currently processing.
// fetchWM is a struct for retrieving both the heartbeat
// and the offset watermark timeline (Vn-1 vertex).
// fetcher is an interface for retrieving both the heartbeat and the offset watermark timeline (Vn-1 vertex).
func NewGenericEdgeFetch(ctx context.Context, bufferName string, storeWatcher store.WatermarkStoreWatcher) fetch.Fetcher {
processorManager := fetch.NewProcessorManager(ctx, storeWatcher)
return fetch.NewEdgeFetcher(ctx, bufferName, processorManager)
Expand Down
9 changes: 5 additions & 4 deletions pkg/watermark/generic/jetstream/generic.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,10 @@ func BuildWatermarkProgressors(ctx context.Context, vertexInstance *v1alpha1.Ver
return fetchWatermark, publishWatermark, nil
}

publishWatermark := make(map[string]publish.Publisher)
// Fetcher creation
pipelineName := vertexInstance.Vertex.Spec.PipelineName

// Fetcher creation, we have only 1 in buffer ATM
var fetchWatermark fetch.Fetcher
fromBuffer := vertexInstance.Vertex.GetFromBuffers()[0]
hbBucketName := isbsvc.JetStreamProcessorBucket(pipelineName, fromBuffer.Name)
hbWatch, err := jetstream.NewKVJetStreamKVWatch(ctx, pipelineName, hbBucketName, jsclient.NewInClusterJetStreamClient())
Expand All @@ -48,14 +49,14 @@ func BuildWatermarkProgressors(ctx context.Context, vertexInstance *v1alpha1.Ver
return nil, nil, fmt.Errorf("failed at new OT KVJetStreamKVWatch, OTBucket: %s, %w", otBucketName, err)
}

var fetchWatermark fetch.Fetcher
if fromBuffer.Type == v1alpha1.SourceBuffer {
fetchWatermark = generic.NewGenericSourceFetch(ctx, fromBuffer.Name, store.BuildWatermarkStoreWatcher(hbWatch, otWatch))
} else {
fetchWatermark = generic.NewGenericEdgeFetch(ctx, fromBuffer.Name, store.BuildWatermarkStoreWatcher(hbWatch, otWatch))
}

// Publisher map creation, we need a publisher per edge.
// Publisher map creation, we need a publisher per out edge.
var publishWatermark = make(map[string]publish.Publisher)
for _, buffer := range vertexInstance.Vertex.GetToBuffers() {
hbPublisherBucketName := isbsvc.JetStreamProcessorBucket(pipelineName, buffer.Name)
// We create a separate Heartbeat bucket for each edge though it can be reused. We can reuse because heartbeat is at
Expand Down
5 changes: 3 additions & 2 deletions pkg/watermark/generic/noop.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,15 @@ func BuildNoOpWatermarkProgressorsFromEdgeList(bufferList []string) (fetch.Fetch
return fetchWatermark, publishWatermark
}

func BuildNoOpWatermarkProgressorsFromBufferMap(bufferList map[string]isb.BufferWriter) (fetch.Fetcher, map[string]publish.Publisher) {
func BuildNoOpWatermarkProgressorsFromBufferMap(bufferMap map[string]isb.BufferWriter) (fetch.Fetcher, map[string]publish.Publisher) {
fetchWatermark := NewNoOpWMProgressor()
publishWatermark := make(map[string]publish.Publisher)
for buffName := range bufferList {
for buffName := range bufferMap {
publishWatermark[buffName] = NewNoOpWMProgressor()
}
return fetchWatermark, publishWatermark
}

func GetBufferNameList(bufferList []v1alpha1.Buffer) []string {
bufferName := make([]string, len(bufferList))
for idx, buffer := range bufferList {
Expand Down
6 changes: 5 additions & 1 deletion pkg/watermark/publish/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,8 @@ func (p *publish) PublishWatermark(wm processor.Watermark, offset isb.Offset) {
// build value (offset)
value := make([]byte, 8)
var seq int64
if p.opts.isSource || p.opts.isSink { // For source and sink publisher, we don't care about the offset, also the sequence of the offset might not be integer.
if p.opts.isSource || p.opts.isSink {
// For source and sink publisher, we don't care about the offset, also the sequence of the offset might not be integer.
seq = time.Now().UnixNano()
} else {
seq, _ = offset.Sequence()
Expand Down Expand Up @@ -178,6 +179,9 @@ func (p *publish) StopPublisher() {
if err != nil {
p.log.Errorw("Failed to delete the key in the heartbeat bucket", zap.String("bucket", p.heartbeatStore.GetStoreName()), zap.String("key", p.entity.GetID()), zap.Error(err))
}

p.otStore.Close()
p.heartbeatStore.Close()
}

func (p *publish) getAllOTKeysFromBucket() []string {
Expand Down
4 changes: 2 additions & 2 deletions pkg/watermark/publish/publisher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func TestPublisherWithSeparateOTBuckets(t *testing.T) {
p.StopPublisher()

_, err = p.heartbeatStore.GetValue(ctx, publishEntity.GetID())
assert.Equal(t, nats.ErrKeyNotFound, err)
assert.Equal(t, nats.ErrConnectionClosed, err)

}

Expand Down Expand Up @@ -133,5 +133,5 @@ func TestPublisherWithSharedOTBucket(t *testing.T) {
p.StopPublisher()

_, err = p.heartbeatStore.GetValue(ctx, publishEntity.GetID())
assert.Equal(t, nats.ErrKeyNotFound, err)
assert.Equal(t, nats.ErrConnectionClosed, err)
}

0 comments on commit 6870d2a

Please sign in to comment.