Skip to content

Commit

Permalink
feat: implement user-defined source (#980)
Browse files Browse the repository at this point in the history
* Implement gRPC on the platform side to enable calling user-defined source APIs.
* Enable the simple E2E test which verifies that the simple source image can generate data and the data reaches sink.
This PR completes the functionality of the user-defined source.

Signed-off-by: Keran Yang <[email protected]>
  • Loading branch information
KeranYang authored Aug 24, 2023
1 parent b4b997d commit d99480a
Show file tree
Hide file tree
Showing 28 changed files with 772 additions and 267 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ jobs:
max-parallel: 10
matrix:
driver: [jetstream]
case: [e2e, diamond-e2e, transformer-e2e, kafka-e2e, http-e2e, nats-e2e, redis-streams-e2e, sdks-e2e, reduce-e2e]
case: [e2e, diamond-e2e, transformer-e2e, kafka-e2e, http-e2e, nats-e2e, redis-streams-e2e, sdks-e2e, reduce-e2e, udsource-e2e]
steps:
- name: Checkout code
uses: actions/checkout@v3
Expand Down
2 changes: 1 addition & 1 deletion api/json-schema/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -19278,7 +19278,7 @@
"transformer": {
"$ref": "#/definitions/io.numaproj.numaflow.v1alpha1.UDTransformer"
},
"udSource": {
"udsource": {
"$ref": "#/definitions/io.numaproj.numaflow.v1alpha1.UDSource"
}
},
Expand Down
2 changes: 1 addition & 1 deletion api/openapi-spec/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -19265,7 +19265,7 @@
"transformer": {
"$ref": "#/definitions/io.numaproj.numaflow.v1alpha1.UDTransformer"
},
"udSource": {
"udsource": {
"$ref": "#/definitions/io.numaproj.numaflow.v1alpha1.UDSource"
}
}
Expand Down
2 changes: 1 addition & 1 deletion config/base/crds/full/numaflow.numaproj.io_pipelines.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7511,7 +7511,7 @@ spec:
type: array
type: object
type: object
udSource:
udsource:
properties:
container:
properties:
Expand Down
2 changes: 1 addition & 1 deletion config/base/crds/full/numaflow.numaproj.io_vertices.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3354,7 +3354,7 @@ spec:
type: array
type: object
type: object
udSource:
udsource:
properties:
container:
properties:
Expand Down
4 changes: 2 additions & 2 deletions config/install.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10040,7 +10040,7 @@ spec:
type: array
type: object
type: object
udSource:
udsource:
properties:
container:
properties:
Expand Down Expand Up @@ -14695,7 +14695,7 @@ spec:
type: array
type: object
type: object
udSource:
udsource:
properties:
container:
properties:
Expand Down
4 changes: 2 additions & 2 deletions config/namespace-install.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10040,7 +10040,7 @@ spec:
type: array
type: object
type: object
udSource:
udsource:
properties:
container:
properties:
Expand Down Expand Up @@ -14695,7 +14695,7 @@ spec:
type: array
type: object
type: object
udSource:
udsource:
properties:
container:
properties:
Expand Down
2 changes: 1 addition & 1 deletion docs/APIs.md
Original file line number Diff line number Diff line change
Expand Up @@ -4462,7 +4462,7 @@ RedisStreamsSource </a> </em>
</tr>
<tr>
<td>
<code>udSource</code></br> <em>
<code>udsource</code></br> <em>
<a href="#numaflow.numaproj.io/v1alpha1.UDSource"> UDSource </a> </em>
</td>
<td>
Expand Down
2 changes: 1 addition & 1 deletion pkg/apis/numaflow/v1alpha1/openapi_generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/apis/numaflow/v1alpha1/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type Source struct {
// +optional
UDTransformer *UDTransformer `json:"transformer,omitempty" protobuf:"bytes,6,opt,name=transformer"`
// +optional
UDSource *UDSource `json:"udSource,omitempty" protobuf:"bytes,7,opt,name=udSource"`
UDSource *UDSource `json:"udsource,omitempty" protobuf:"bytes,7,opt,name=udSource"`
}

func (s Source) getContainers(req getContainerReq) ([]corev1.Container, error) {
Expand Down
1 change: 0 additions & 1 deletion pkg/sdkclient/source/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@ func (c *client) IsReady(ctx context.Context, in *emptypb.Empty) (bool, error) {

// ReadFn reads data from the source.
func (c *client) ReadFn(ctx context.Context, req *sourcepb.ReadRequest, datumCh chan<- *sourcepb.ReadResponse) error {
defer close(datumCh)
stream, err := c.grpcClt.ReadFn(ctx, req)
if err != nil {
return fmt.Errorf("failed to execute c.grpcClt.ReadFn(): %w", err)
Expand Down
8 changes: 6 additions & 2 deletions pkg/sinks/udsink/udsink_grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,13 @@ import (
"time"

sinkclient "github.com/numaproj/numaflow/pkg/sdkclient/sinker"
"github.com/numaproj/numaflow/pkg/shared/logging"

sinkpb "github.com/numaproj/numaflow-go/pkg/apis/proto/sink/v1"
"google.golang.org/protobuf/types/known/emptypb"
)

// SinkApplier applies the ssink on the read message and gives back a response. Any UserError will be retried here, while
// SinkApplier applies the sink on the read message and gives back a response. Any UserError will be retried here, while
// InternalErr can be returned and could be retried by the callee.
type SinkApplier interface {
ApplySink(ctx context.Context, requests []*sinkpb.SinkRequest) []error
Expand Down Expand Up @@ -55,15 +56,18 @@ func (u *UDSgRPCBasedUDSink) IsHealthy(ctx context.Context) error {

// WaitUntilReady waits until the udsink is connected.
func (u *UDSgRPCBasedUDSink) WaitUntilReady(ctx context.Context) error {
log := logging.FromContext(ctx)
for {
select {
case <-ctx.Done():
return fmt.Errorf("failed on readiness check: %w", ctx.Err())
default:
if _, err := u.client.IsReady(ctx, &emptypb.Empty{}); err == nil {
return nil
} else {
log.Infof("waiting for udsink to be ready: %v", err)
time.Sleep(1 * time.Second)
}
time.Sleep(1 * time.Second)
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/sources/forward/shutdown.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (isdf *DataForward) Stop() {
// ForceStop sets up the force shutdown flag.
func (isdf *DataForward) ForceStop() {
// call stop (what if we have an enthusiastic shutdown that forces first)
// e.g. I know I have written a wrong source transformer, so shutdown ASAP
// e.g., I know I have written a wrong source transformer, so shutdown ASAP
isdf.Stop()
isdf.Shutdown.rwlock.Lock()
defer isdf.Shutdown.rwlock.Unlock()
Expand Down
8 changes: 7 additions & 1 deletion pkg/sources/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,13 @@ func (sp *SourceProcessor) getSourcer(

src := sp.VertexInstance.Vertex.Spec.Source
if x := src.UDSource; x != nil && udsGRPCClient != nil {
udsource, err := udsource.New(udsGRPCClient)
readOptions := []udsource.Option{
udsource.WithLogger(logger),
}
if l := sp.VertexInstance.Vertex.Spec.Limits; l != nil && l.ReadTimeout != nil {
readOptions = append(readOptions, udsource.WithReadTimeout(l.ReadTimeout.Duration))
}
udsource, err := udsource.New(sp.VertexInstance, writers, fsd, transformerApplier, udsGRPCClient, fetchWM, toVertexPublisherStores, publishWMStores, readOptions...)
return udsource, err
} else if x := src.Generator; x != nil {
readOptions := []generator.Option{
Expand Down
6 changes: 5 additions & 1 deletion pkg/sources/transformer/grpc_transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/numaproj/numaflow/pkg/isb"
sdkerr "github.com/numaproj/numaflow/pkg/sdkclient/error"
"github.com/numaproj/numaflow/pkg/sdkclient/sourcetransformer"
"github.com/numaproj/numaflow/pkg/shared/logging"
"github.com/numaproj/numaflow/pkg/udf/rpc"
)

Expand All @@ -49,15 +50,18 @@ func (u *GRPCBasedTransformer) IsHealthy(ctx context.Context) error {

// WaitUntilReady waits until the client is connected.
func (u *GRPCBasedTransformer) WaitUntilReady(ctx context.Context) error {
log := logging.FromContext(ctx)
for {
select {
case <-ctx.Done():
return fmt.Errorf("failed on readiness check: %w", ctx.Err())
default:
if _, err := u.client.IsReady(ctx, &emptypb.Empty{}); err == nil {
return nil
} else {
log.Infof("waiting for transformer to be ready: %v", err)
time.Sleep(1 * time.Second)
}
time.Sleep(1 * time.Second)
}
}
}
Expand Down
16 changes: 16 additions & 0 deletions pkg/sources/udsource/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
/*
Copyright 2022 The Numaproj Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

// Package udsource implements user-defined source,
// enabling the platform to communicate with user-defined data sources through gRPC.
package udsource
165 changes: 165 additions & 0 deletions pkg/sources/udsource/grpc_udsource.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
/*
Copyright 2022 The Numaproj Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package udsource

import (
"context"
"fmt"
"sync"
"time"

sourcepb "github.com/numaproj/numaflow-go/pkg/apis/proto/source/v1"

"google.golang.org/protobuf/types/known/emptypb"

"github.com/numaproj/numaflow/pkg/isb"
sourceclient "github.com/numaproj/numaflow/pkg/sdkclient/source/client"
"github.com/numaproj/numaflow/pkg/shared/logging"
"github.com/numaproj/numaflow/pkg/sources/udsource/utils"
)

// GRPCBasedUDSource applies a user-defined source over gRPC
// connection where server is the UDSource.
type GRPCBasedUDSource struct {
client sourceclient.Client
}

// NewUDSgRPCBasedUDSource accepts a gRPC client and returns a new GRPCBasedUDSource.
func NewUDSgRPCBasedUDSource(c sourceclient.Client) (*GRPCBasedUDSource, error) {
return &GRPCBasedUDSource{c}, nil
}

// CloseConn closes the gRPC client connection.
func (u *GRPCBasedUDSource) CloseConn(ctx context.Context) error {
return u.client.CloseConn(ctx)
}

// IsHealthy checks if the udsource is healthy.
func (u *GRPCBasedUDSource) IsHealthy(ctx context.Context) error {
return u.WaitUntilReady(ctx)
}

// WaitUntilReady waits until the udsource is connected.
func (u *GRPCBasedUDSource) WaitUntilReady(ctx context.Context) error {
log := logging.FromContext(ctx)
for {
select {
case <-ctx.Done():
return fmt.Errorf("failed on readiness check: %w", ctx.Err())
default:
if _, err := u.client.IsReady(ctx, &emptypb.Empty{}); err == nil {
return nil
} else {
log.Infof("waiting for udsource to be ready: %v", err)
time.Sleep(1 * time.Second)
}
}
}
}

// ApplyPendingFn returns the number of pending messages in the source.
func (u *GRPCBasedUDSource) ApplyPendingFn(ctx context.Context) (int64, error) {
if resp, err := u.client.PendingFn(ctx, &emptypb.Empty{}); err == nil {
return int64(resp.Result.Count), nil
} else {
return isb.PendingNotAvailable, err
}
}

// ApplyReadFn reads messages from the source.
func (u *GRPCBasedUDSource) ApplyReadFn(ctx context.Context, count int64, timeout time.Duration) ([]*isb.ReadMessage, error) {
var readMessages []*isb.ReadMessage

// Construct the gRPC request
var r = &sourcepb.ReadRequest{
Request: &sourcepb.ReadRequest_Request{
NumRecords: uint64(count),
TimeoutInMs: uint32(timeout.Milliseconds()),
},
}

// Prepare the ReadResponse channel
var datumCh = make(chan *sourcepb.ReadResponse)
// Prepare the error channel to receive errors from the ReadFn goroutine
errCh := make(chan error, 1)
defer close(errCh)
var wg sync.WaitGroup
wg.Add(1)

// Start the goroutine to read messages and send to the channel
go func() {
defer wg.Done()
defer close(datumCh)
if err := u.client.ReadFn(ctx, r, datumCh); err != nil {
errCh <- fmt.Errorf("failed to read messages from udsource: %w", err)
}
}()

// Collect the messages from the channel and return
for {
select {
case <-ctx.Done():
// If the context is done, return the messages collected so far
return readMessages, fmt.Errorf("context is done, %w", ctx.Err())
case err := <-errCh:
// If the ReadFn goroutine returns an error, return the messages collected so far
return readMessages, err
case datum, ok := <-datumCh:
if !ok {
// If the channel is closed, wait for the ReadFn goroutine to finish
wg.Wait()
return readMessages, nil
}
// Convert the datum to ReadMessage and append to the list
r := datum.GetResult()
readMessage := &isb.ReadMessage{
Message: isb.Message{
Header: isb.Header{
MessageInfo: isb.MessageInfo{EventTime: r.GetEventTime().AsTime()},
ID: constructMessageID(r),
Keys: r.GetKeys(),
},
Body: isb.Body{
Payload: r.GetPayload(),
},
},
ReadOffset: utils.ConvertToIsbOffset(r.GetOffset()),
}
readMessages = append(readMessages, readMessage)
}
}
}

// ApplyAckFn acknowledges messages in the source.
func (u *GRPCBasedUDSource) ApplyAckFn(ctx context.Context, offsets []isb.Offset) error {
rOffsets := make([]*sourcepb.Offset, len(offsets))
for i, offset := range offsets {
rOffsets[i] = utils.ConvertToSourceOffset(offset)
}
var r = &sourcepb.AckRequest{
Request: &sourcepb.AckRequest_Request{
Offsets: rOffsets,
},
}
_, err := u.client.AckFn(ctx, r)
return err
}

func constructMessageID(r *sourcepb.ReadResponse_Result) string {
// For a user-defined source, the partition ID plus the offset should be able to uniquely identify a message
return r.Offset.GetPartitionId() + "-" + string(r.Offset.GetOffset())
}
Loading

0 comments on commit d99480a

Please sign in to comment.