Skip to content

Commit

Permalink
fix: avoid unwanted watcher creation and reduce being stuck with udf …
Browse files Browse the repository at this point in the history
…is restarted (#999)

Signed-off-by: Yashash H L <[email protected]>
Signed-off-by: Vigith Maurice <[email protected]>
Co-authored-by: Vigith Maurice <[email protected]>
  • Loading branch information
yhl25 and vigith authored Aug 30, 2023
1 parent 54c8ccf commit 92fbf7f
Show file tree
Hide file tree
Showing 4 changed files with 128 additions and 91 deletions.
4 changes: 2 additions & 2 deletions pkg/shared/kvs/jetstream/kv_watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,9 +167,9 @@ func (jsw *jetStreamWatch) Watch(ctx context.Context) (<-chan kvs.KVEntry, <-cha
kvLastUpdatedTime := jsw.lastUpdateKVTime()

// if the last update time is zero, it means that there are no key-value pairs in the store yet or ctx was canceled both the cases we should not recreate the watcher
// if the last update time is before the previous fetch time, it means that the store is not getting any updates
// if the last update time is not after the previous fetch time, it means that the store is not getting any updates (watermark is not getting updated)
// therefore, we don't have to recreate the watcher
if kvLastUpdatedTime.IsZero() || kvLastUpdatedTime.Before(jsw.previousFetchTime) {
if kvLastUpdatedTime.IsZero() || !kvLastUpdatedTime.After(jsw.previousFetchTime) {
jsw.log.Debug("The watcher is not receiving any updates, but the store is not getting any updates either", zap.String("watcher", jsw.GetKVName()), zap.Time("lastUpdateKVTime", kvLastUpdatedTime), zap.Time("previousFetchTime", jsw.previousFetchTime))
} else {
// if the last update time is after the previous fetch time, it means that the store is getting updates but the watcher is not receiving any
Expand Down
18 changes: 12 additions & 6 deletions pkg/sources/generator/tickgen.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,15 +314,21 @@ func (mg *memgen) Start() <-chan struct{} {
func (mg *memgen) NewWorker(ctx context.Context, rate int) func(chan time.Time, chan struct{}) {

return func(tickChan chan time.Time, done chan struct{}) {
for {
select {
case <-ctx.Done():
log.Info("Context.Done is called. emptying any pending ticks")
defer func() {
// empty any pending ticks
if len(tickChan) > 0 {
log.Info("emptying any pending ticks")
for len(tickChan) > 0 {
<-tickChan
}
close(done)
close(mg.srcChan)
}
}()
defer close(done)
defer close(mg.srcChan)

for {
select {
case <-ctx.Done():
return
case ts := <-tickChan:
tickgenSourceCount.With(map[string]string{metrics.LabelVertex: mg.vertexName, metrics.LabelPipeline: mg.pipelineName})
Expand Down
190 changes: 108 additions & 82 deletions pkg/udf/rpc/grpc_reduce.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"context"
"fmt"
"strconv"
"sync"
"time"

reducepb "github.com/numaproj/numaflow-go/pkg/apis/proto/reduce/v1"
Expand Down Expand Up @@ -75,111 +74,101 @@ func (u *GRPCBasedReduce) WaitUntilReady(ctx context.Context) error {

// ApplyReduce accepts a channel of isbMessages and returns the aggregated result
func (u *GRPCBasedReduce) ApplyReduce(ctx context.Context, partitionID *partition.ID, messageStream <-chan *isb.ReadMessage) ([]*isb.WriteMessage, error) {
datumCh := make(chan *reducepb.ReduceRequest)
var wg sync.WaitGroup
var result *reducepb.ReduceResponse
var err error
var (
result *reducepb.ReduceResponse
err error
errCh = make(chan error, 1)
responseCh = make(chan *reducepb.ReduceResponse, 1)
datumCh = make(chan *reducepb.ReduceRequest)
)

// pass key and window information inside the context
mdMap := map[string]string{
shared.WinStartTime: strconv.FormatInt(partitionID.Start.UnixMilli(), 10),
shared.WinEndTime: strconv.FormatInt(partitionID.End.UnixMilli(), 10),
}

ctx = metadata.NewOutgoingContext(ctx, metadata.New(mdMap))
ctx, cancel := context.WithCancel(ctx)
defer cancel()

grpcCtx := metadata.NewOutgoingContext(ctx, metadata.New(mdMap))

// There can be two error scenarios:
// 1. The u.client.ReduceFn method returns an error before reading all the messages from the messageStream
// 2. The u.client.ReduceFn method returns an error after reading all the messages from the messageStream

// invoke the reduceFn method with datumCh channel
wg.Add(1)
go func() {
defer wg.Done()
// TODO handle this error here itself
result, err = u.client.ReduceFn(ctx, datumCh)
result, err = u.client.ReduceFn(grpcCtx, datumCh)
if err != nil {
errCh <- err
} else {
responseCh <- result
}
close(errCh)
close(responseCh)
}()

readLoop:
for {
select {
case msg, ok := <-messageStream:
if msg != nil {
// create datum from isbMessage and send it to datumCh channel for reduceFn
go func() {
// after reading all the messages from the messageStream or if ctx was canceled close the datumCh channel
defer close(datumCh)
for {
select {
case msg, ok := <-messageStream:
// if the messageStream is closed or if the message is nil, return
if !ok || msg == nil {
return
}

d := createDatum(msg)

// send the datum to datumCh channel, handle the case when the context is canceled
select {
case datumCh <- d:
case <-ctx.Done():
close(datumCh)
return nil, ctx.Err()
return
}
}
if !ok {
break readLoop
}
case <-ctx.Done():
close(datumCh)
return nil, ctx.Err()
}
}

// close the datumCh, let the reduceFn know that there are no more messages
close(datumCh)

wg.Wait()

if err != nil {
// if any error happens in reduce
// will exit and restart the numa container
udfErr, _ := sdkerr.FromError(err)
switch udfErr.ErrorKind() {
case sdkerr.Retryable:
// TODO: currently we don't handle retryable errors for reduce
return nil, ApplyUDFErr{
UserUDFErr: false,
Message: fmt.Sprintf("gRPC client.ReduceFn failed, %s", err),
InternalErr: InternalErr{
Flag: true,
MainCarDown: false,
},
}
case sdkerr.NonRetryable:
return nil, ApplyUDFErr{
UserUDFErr: false,
Message: fmt.Sprintf("gRPC client.ReduceFn failed, %s", err),
InternalErr: InternalErr{
Flag: true,
MainCarDown: false,
},
}
default:
return nil, ApplyUDFErr{
UserUDFErr: false,
Message: fmt.Sprintf("gRPC client.ReduceFn failed, %s", err),
InternalErr: InternalErr{
Flag: true,
MainCarDown: false,
},
case <-ctx.Done(): // if the context is done, return
return
}
}
}
}()

taggedMessages := make([]*isb.WriteMessage, 0)
for _, response := range result.GetResults() {
keys := response.Keys
taggedMessage := &isb.WriteMessage{
Message: isb.Message{
Header: isb.Header{
MessageInfo: isb.MessageInfo{
EventTime: partitionID.End.Add(-1 * time.Millisecond),
IsLate: false,
// wait for the reduceFn to finish
for {
select {
case err = <-errCh:
if err != nil {
return nil, convertToUdfError(err)
}
case result = <-responseCh:
taggedMessages := make([]*isb.WriteMessage, 0)
for _, response := range result.GetResults() {
keys := response.Keys
taggedMessage := &isb.WriteMessage{
Message: isb.Message{
Header: isb.Header{
MessageInfo: isb.MessageInfo{
EventTime: partitionID.End.Add(-1 * time.Millisecond),
IsLate: false,
},
Keys: keys,
},
Body: isb.Body{
Payload: response.Value,
},
},
Keys: keys,
},
Body: isb.Body{
Payload: response.Value,
},
},
Tags: response.Tags,
Tags: response.Tags,
}
taggedMessages = append(taggedMessages, taggedMessage)
}
return taggedMessages, nil
case <-ctx.Done():
return nil, convertToUdfError(ctx.Err())
}
taggedMessages = append(taggedMessages, taggedMessage)
}
return taggedMessages, nil
}

func createDatum(readMessage *isb.ReadMessage) *reducepb.ReduceRequest {
Expand All @@ -194,3 +183,40 @@ func createDatum(readMessage *isb.ReadMessage) *reducepb.ReduceRequest {
}
return d
}

// convertToUdfError converts the error returned by the reduceFn to ApplyUDFErr
func convertToUdfError(err error) ApplyUDFErr {
// if any error happens in reduce
// will exit and restart the numa container
udfErr, _ := sdkerr.FromError(err)
switch udfErr.ErrorKind() {
case sdkerr.Retryable:
// TODO: currently we don't handle retryable errors for reduce
return ApplyUDFErr{
UserUDFErr: false,
Message: fmt.Sprintf("gRPC client.ReduceFn failed, %s", err),
InternalErr: InternalErr{
Flag: true,
MainCarDown: false,
},
}
case sdkerr.NonRetryable:
return ApplyUDFErr{
UserUDFErr: false,
Message: fmt.Sprintf("gRPC client.ReduceFn failed, %s", err),
InternalErr: InternalErr{
Flag: true,
MainCarDown: false,
},
}
default:
return ApplyUDFErr{
UserUDFErr: false,
Message: fmt.Sprintf("gRPC client.ReduceFn failed, %s", err),
InternalErr: InternalErr{
Flag: true,
MainCarDown: false,
},
}
}
}
7 changes: 6 additions & 1 deletion pkg/udf/rpc/grpc_reduce_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ func TestGRPCBasedUDF_BasicReduceWithMockClient(t *testing.T) {

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

go func() {
<-ctx.Done()
if errors.Is(ctx.Err(), context.DeadlineExceeded) {
Expand All @@ -159,7 +160,11 @@ func TestGRPCBasedUDF_BasicReduceWithMockClient(t *testing.T) {

go func() {
for index := range messages {
messageCh <- &messages[index]
select {
case <-ctx.Done():
return
case messageCh <- &messages[index]:
}
}
close(messageCh)
}()
Expand Down

0 comments on commit 92fbf7f

Please sign in to comment.