Skip to content

Commit

Permalink
feat: Add side input sdkclient and grpc (#953)
Browse files Browse the repository at this point in the history
Signed-off-by: Sidhant Kohli <[email protected]>
  • Loading branch information
kohlisid authored Aug 25, 2023
1 parent f3deb16 commit f90d4fe
Show file tree
Hide file tree
Showing 13 changed files with 381 additions and 48 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ require (
github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe
github.com/nats-io/nats-server/v2 v2.9.19
github.com/nats-io/nats.go v1.27.1
github.com/numaproj/numaflow-go v0.4.6-0.20230822054239-88190e94e727
github.com/numaproj/numaflow-go v0.4.6-0.20230824220200-630a5eba1f54
github.com/prometheus/client_golang v1.14.0
github.com/prometheus/common v0.37.0
github.com/redis/go-redis/v9 v9.0.3
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -670,8 +670,8 @@ github.com/nats-io/nkeys v0.4.4/go.mod h1:XUkxdLPTufzlihbamfzQ7mw/VGx6ObUs+0bN5s
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
github.com/numaproj/numaflow-go v0.4.6-0.20230822054239-88190e94e727 h1:m+2sl0pbBvhiiLEXyyslBv0GeWXm/1wpR4PUg0C2xY8=
github.com/numaproj/numaflow-go v0.4.6-0.20230822054239-88190e94e727/go.mod h1:5zwvvREIbqaCPCKsNE1MVjVToD0kvkCh2Z90Izlhw5U=
github.com/numaproj/numaflow-go v0.4.6-0.20230824220200-630a5eba1f54 h1:nx77VKeseDKPFHhY4AMecvzhJw8oSEVeisAROufT5dU=
github.com/numaproj/numaflow-go v0.4.6-0.20230824220200-630a5eba1f54/go.mod h1:5zwvvREIbqaCPCKsNE1MVjVToD0kvkCh2Z90Izlhw5U=
github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
github.com/oklog/ulid v1.3.1 h1:EGfNDEx6MqHz8B3uNV6QAib1UR2Lm97sHi3ocA6ESJ4=
Expand Down
95 changes: 95 additions & 0 deletions pkg/sdkclient/sideinput/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package sideinput

import (
"context"
"fmt"
"time"

sideinputpb "github.com/numaproj/numaflow-go/pkg/apis/proto/sideinput/v1"
"github.com/numaproj/numaflow-go/pkg/shared"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/protobuf/types/known/emptypb"
)

// client contains the grpc connection and the grpc client.
type client struct {
conn *grpc.ClientConn
grpcClt sideinputpb.SideInputClient
}

var _ Client = (*client)(nil)

// New creates a new client object.
func New(inputOptions ...Option) (*client, error) {
var opts = &options{
sockAddr: shared.SideInputAddr,
maxMessageSize: 1024 * 1024 * 64, // 64 MB
}
for _, inputOption := range inputOptions {
inputOption(opts)
}
_, cancel := context.WithTimeout(context.Background(), 120*time.Second)
defer cancel()
c := new(client)
sockAddr := fmt.Sprintf("%s:%s", shared.UDS, opts.sockAddr)
conn, err := grpc.Dial(sockAddr, grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(opts.maxMessageSize), grpc.MaxCallSendMsgSize(opts.maxMessageSize)))
if err != nil {
return nil, fmt.Errorf("failed to execute grpc.Dial(%q): %w", sockAddr, err)
}
c.conn = conn
c.grpcClt = sideinputpb.NewSideInputClient(conn)
return c, nil
}

// NewFromClient creates a new client object from a grpc client. This is used for testing.
func NewFromClient(c sideinputpb.SideInputClient) (Client, error) {
return &client{
grpcClt: c,
}, nil
}

// CloseConn closes the grpc connection.
func (c client) CloseConn(ctx context.Context) error {
return c.conn.Close()
}

// IsReady checks if the grpc connection is ready to use.
func (c client) IsReady(ctx context.Context, in *emptypb.Empty) (bool, error) {
resp, err := c.grpcClt.IsReady(ctx, in)
if err != nil {
return false, err
}
return resp.GetReady(), nil
}

// RetrieveSideInput retrieves the side input value and returns the updated payload.
func (c client) RetrieveSideInput(ctx context.Context, in *emptypb.Empty) (*sideinputpb.SideInputResponse, error) {
retrieveResponse, err := c.grpcClt.RetrieveSideInput(ctx, in)
// TODO check which error to use
if err != nil {
return nil, fmt.Errorf("failed to execute c.grpcClt.RetrieveSideInput(): %w", err)
}
return retrieveResponse, nil
}

// IsHealthy checks if the client is healthy.
func (c client) IsHealthy(ctx context.Context) error {
return c.WaitUntilReady(ctx)
}

// WaitUntilReady waits until the client is connected.
func (c client) WaitUntilReady(ctx context.Context) error {
for {
select {
case <-ctx.Done():
return fmt.Errorf("failed on readiness check: %w", ctx.Err())
default:
if _, err := c.IsReady(ctx, &emptypb.Empty{}); err == nil {
return nil
}
time.Sleep(1 * time.Second)
}
}
}
86 changes: 86 additions & 0 deletions pkg/sdkclient/sideinput/client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package sideinput

import (
"bytes"
"context"
"fmt"
"reflect"
"testing"

"github.com/golang/mock/gomock"
sideinputpb "github.com/numaproj/numaflow-go/pkg/apis/proto/sideinput/v1"
"github.com/numaproj/numaflow-go/pkg/apis/proto/sideinput/v1/sideinputmock"
"github.com/stretchr/testify/assert"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/emptypb"
)

type rpcMsg struct {
msg proto.Message
}

func (r *rpcMsg) Matches(msg interface{}) bool {
m, ok := msg.(proto.Message)
if !ok {
return false
}
return proto.Equal(m, r.msg)
}

func (r *rpcMsg) String() string {
return fmt.Sprintf("is %s", r.msg)
}

func TestIsReady(t *testing.T) {
var ctx = context.Background()
LintCleanCall()

ctrl := gomock.NewController(t)
defer ctrl.Finish()

mockClient := sideinputmock.NewMockSideInputClient(ctrl)
mockClient.EXPECT().IsReady(gomock.Any(), gomock.Any()).Return(&sideinputpb.ReadyResponse{Ready: true}, nil)
mockClient.EXPECT().IsReady(gomock.Any(), gomock.Any()).Return(&sideinputpb.ReadyResponse{Ready: false}, fmt.Errorf("mock connection refused"))

testClient, err := NewFromClient(mockClient)
assert.NoError(t, err)
reflect.DeepEqual(testClient, &client{
grpcClt: mockClient,
})

ready, err := testClient.IsReady(ctx, &emptypb.Empty{})
assert.True(t, ready)
assert.NoError(t, err)

ready, err = testClient.IsReady(ctx, &emptypb.Empty{})
assert.False(t, ready)
assert.EqualError(t, err, "mock connection refused")
}

func TestRetrieveFn(t *testing.T) {
var ctx = context.Background()

ctrl := gomock.NewController(t)
defer ctrl.Finish()

mockSideInputClient := sideinputmock.NewMockSideInputClient(ctrl)
response := sideinputpb.SideInputResponse{Value: []byte("mock side input message")}
mockSideInputClient.EXPECT().RetrieveSideInput(gomock.Any(), gomock.Any()).Return(&sideinputpb.SideInputResponse{Value: []byte("mock side input message")}, nil)

testClient, err := NewFromClient(mockSideInputClient)
assert.NoError(t, err)
reflect.DeepEqual(testClient, &client{
grpcClt: mockSideInputClient,
})

got, err := testClient.RetrieveSideInput(ctx, &emptypb.Empty{})
assert.True(t, bytes.Equal(got.Value, response.Value))
assert.NoError(t, err)
}

// Check if there is a better way to resolve
func LintCleanCall() {
var m = rpcMsg{}
fmt.Println(m.Matches(m))
fmt.Println(m)
}
15 changes: 15 additions & 0 deletions pkg/sdkclient/sideinput/interface.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package sideinput

import (
"context"

sideinputpb "github.com/numaproj/numaflow-go/pkg/apis/proto/sideinput/v1"
"google.golang.org/protobuf/types/known/emptypb"
)

// Client contains methods to call a gRPC client for side input.
type Client interface {
CloseConn(ctx context.Context) error
IsReady(ctx context.Context, in *emptypb.Empty) (bool, error)
RetrieveSideInput(ctx context.Context, in *emptypb.Empty) (*sideinputpb.SideInputResponse, error)
}
49 changes: 49 additions & 0 deletions pkg/sdkclient/sideinput/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
Copyright 2022 The Numaproj Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package sideinput

import "time"

type options struct {
sockAddr string
maxMessageSize int
sideInputTimeout time.Duration
}

// Option is the interface to apply options.
type Option func(*options)

// WithSockAddr start the client with the given sock addr. This is mainly used for testing purpose.
func WithSockAddr(addr string) Option {
return func(opts *options) {
opts.sockAddr = addr
}
}

// WithMaxMessageSize sets the max message size to the given size.
func WithMaxMessageSize(size int) Option {
return func(o *options) {
o.maxMessageSize = size
}
}

// WithSideInputTimeout sets the side input timeout to the given timeout.
func WithSideInputTimeout(t time.Duration) Option {
return func(o *options) {
o.sideInputTimeout = t
}
}
17 changes: 9 additions & 8 deletions pkg/sideinputs/initializer/initializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,9 @@ func NewSideInputsInitializer(isbSvcType dfv1.ISBSvcType, pipelineName, sideInpu
// and update the values on the disk. This would exit once all the side inputs are initialized.
func (sii *sideInputsInitializer) Run(ctx context.Context) error {
var (
natsClient *jsclient.NATSClient
err error
natsClient *jsclient.NATSClient
err error
sideInputWatcher kvs.KVWatcher
)

log := logging.FromContext(ctx)
Expand All @@ -75,15 +76,15 @@ func (sii *sideInputsInitializer) Run(ctx context.Context) error {
return err
}
defer natsClient.Close()
// Load the required KV bucket and create a sideInputWatcher for it
kvName := isbsvc.JetStreamSideInputsStoreKVName(sii.sideInputsStore)
sideInputWatcher, err = jetstream.NewKVJetStreamKVWatch(ctx, kvName, natsClient)
if err != nil {
return fmt.Errorf("failed to create a sideInputWatcher, %w", err)
}
default:
return fmt.Errorf("unrecognized isbsvc type %q", sii.isbSvcType)
}
// Load the required KV bucket and create a sideInputWatcher for it
kvName := isbsvc.JetStreamSideInputsStoreKVName(sii.sideInputsStore)
sideInputWatcher, err := jetstream.NewKVJetStreamKVWatch(ctx, kvName, natsClient)
if err != nil {
return fmt.Errorf("failed to create a sideInputWatcher, %w", err)
}
return startSideInputInitializer(ctx, sideInputWatcher, dfv1.PathSideInputsMount, sii.sideInputs)
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/sideinputs/initializer/initializer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,14 +93,14 @@ func TestSideInputsInitializer_Success(t *testing.T) {

for x, sideInput := range sideInputs {
p := path.Join(mountPath, sideInput)
fileData, err := utils.FetchSideInputFile(p)
fileData, err := utils.FetchSideInputFileValue(p)
for err != nil {
select {
case <-ctx.Done():
t.Fatalf("Context timeout")
default:
time.Sleep(10 * time.Millisecond)
fileData, err = utils.FetchSideInputFile(p)
fileData, err = utils.FetchSideInputFileValue(p)
}
}
assert.Equal(t, dataTest[x], string(fileData))
Expand Down
Loading

0 comments on commit f90d4fe

Please sign in to comment.