From f6dbe86138aadc82586c4b6b1c35c7217e91bd98 Mon Sep 17 00:00:00 2001 From: Yashash H L Date: Thu, 31 Oct 2024 20:58:48 +0530 Subject: [PATCH] chore: Batch ack requests and sink responses for better performance #163 (#2194) Signed-off-by: Yashash H L --- Dockerfile | 2 +- examples/21-simple-mono-vertex.yaml | 6 +- go.mod | 2 +- go.sum | 4 +- pkg/apis/proto/sink/v1/sink.proto | 2 +- pkg/apis/proto/source/v1/source.proto | 2 +- pkg/sdkclient/sinker/client.go | 10 +- pkg/sdkclient/sinker/client_test.go | 16 ++- pkg/sdkclient/source/client.go | 26 ++--- pkg/sdkclient/source/client_test.go | 4 +- pkg/sdkclient/source/interface.go | 2 +- pkg/sinks/udsink/udsink_grpc.go | 14 ++- pkg/sinks/udsink/udsink_grpc_test.go | 5 +- pkg/sources/udsource/grpc_udsource.go | 14 +-- pkg/sources/udsource/grpc_udsource_test.go | 17 +-- rust/Cargo.lock | 4 +- rust/numaflow-core/Cargo.toml | 2 +- rust/numaflow-core/src/message.rs | 105 ++++++------------ rust/numaflow-core/src/metrics.rs | 2 +- rust/numaflow-core/src/monovertex.rs | 2 +- .../numaflow-core/src/monovertex/forwarder.rs | 12 +- rust/numaflow-core/src/pipeline.rs | 2 +- rust/numaflow-core/src/shared/utils.rs | 2 +- rust/numaflow-core/src/sink/user_defined.rs | 28 +++-- rust/numaflow-core/src/source/user_defined.rs | 47 ++++---- rust/numaflow-pb/src/clients/sink.v1.rs | 4 +- rust/numaflow-pb/src/clients/source.v1.rs | 4 +- 27 files changed, 152 insertions(+), 188 deletions(-) diff --git a/Dockerfile b/Dockerfile index c234eb30ed..57feea2da7 100644 --- a/Dockerfile +++ b/Dockerfile @@ -89,4 +89,4 @@ RUN chmod +x /bin/e2eapi #################################################################################################### FROM scratch AS e2eapi COPY --from=testbase /bin/e2eapi . -ENTRYPOINT ["/e2eapi"] +ENTRYPOINT ["/e2eapi"] \ No newline at end of file diff --git a/examples/21-simple-mono-vertex.yaml b/examples/21-simple-mono-vertex.yaml index 98192aa8fd..a47dbe3123 100644 --- a/examples/21-simple-mono-vertex.yaml +++ b/examples/21-simple-mono-vertex.yaml @@ -1,12 +1,10 @@ -apiVersion: numaflow.numaproj.io/v1alpha1 -kind: MonoVertex metadata: name: simple-mono-vertex spec: source: udsource: container: - image: quay.io/numaio/numaflow-java/source-simple-source:stable + image: quay.io/numaio/numaflow-rs/simple-source:stable # transformer is an optional container to do any transformation to the incoming data before passing to the sink transformer: container: @@ -14,4 +12,4 @@ spec: sink: udsink: container: - image: quay.io/numaio/numaflow-java/simple-sink:stable \ No newline at end of file + image: quay.io/numaio/numaflow-rs/sink-log:stable \ No newline at end of file diff --git a/go.mod b/go.mod index a40001aeb8..c9048c770d 100644 --- a/go.mod +++ b/go.mod @@ -32,7 +32,7 @@ require ( github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe github.com/nats-io/nats-server/v2 v2.10.20 github.com/nats-io/nats.go v1.37.0 - github.com/numaproj/numaflow-go v0.8.2-0.20241014112709-e12c1b5176bd + github.com/numaproj/numaflow-go v0.8.2-0.20241030023053-f6819383aa7b github.com/prometheus/client_golang v1.19.1 github.com/prometheus/client_model v0.6.1 github.com/prometheus/common v0.55.0 diff --git a/go.sum b/go.sum index c5f73afc67..b60aad18ee 100644 --- a/go.sum +++ b/go.sum @@ -483,8 +483,8 @@ github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDm github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= -github.com/numaproj/numaflow-go v0.8.2-0.20241014112709-e12c1b5176bd h1:yL7sbAaeCw2rWar1CF19N69KEHmcJpL1YjtqOWEG41c= -github.com/numaproj/numaflow-go v0.8.2-0.20241014112709-e12c1b5176bd/go.mod h1:FaCMeV0V9SiLcVf2fwT+GeTJHNaK2gdQsTAIqQ4x7oc= +github.com/numaproj/numaflow-go v0.8.2-0.20241030023053-f6819383aa7b h1:UEhFHfBwe2DwtnYzdFteTZ2tKwMX739llzfebfEMGg4= +github.com/numaproj/numaflow-go v0.8.2-0.20241030023053-f6819383aa7b/go.mod h1:FaCMeV0V9SiLcVf2fwT+GeTJHNaK2gdQsTAIqQ4x7oc= github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU= diff --git a/pkg/apis/proto/sink/v1/sink.proto b/pkg/apis/proto/sink/v1/sink.proto index 8f42720b5d..e7a85523d7 100644 --- a/pkg/apis/proto/sink/v1/sink.proto +++ b/pkg/apis/proto/sink/v1/sink.proto @@ -97,7 +97,7 @@ message SinkResponse { // err_msg is the error message, set it if success is set to false. string err_msg = 3; } - Result result = 1; + repeated Result results = 1; optional Handshake handshake = 2; optional TransmissionStatus status = 3; } \ No newline at end of file diff --git a/pkg/apis/proto/source/v1/source.proto b/pkg/apis/proto/source/v1/source.proto index 7dc1a67412..f1d2a6eb55 100644 --- a/pkg/apis/proto/source/v1/source.proto +++ b/pkg/apis/proto/source/v1/source.proto @@ -130,7 +130,7 @@ message ReadResponse { message AckRequest { message Request { // Required field holding the offset to be acked - Offset offset = 1; + repeated Offset offsets = 1; } // Required field holding the request. The list will be ordered and will have the same order as the original Read response. Request request = 1; diff --git a/pkg/sdkclient/sinker/client.go b/pkg/sdkclient/sinker/client.go index 3c7bc4b23d..9ab3fc2858 100644 --- a/pkg/sdkclient/sinker/client.go +++ b/pkg/sdkclient/sinker/client.go @@ -167,17 +167,21 @@ func (c *client) SinkFn(ctx context.Context, requests []*sinkpb.SinkRequest) ([] // Wait for the corresponding responses var responses []*sinkpb.SinkResponse - for i := 0; i < len(requests)+1; i++ { + responsesCount := 0 + for { resp, err := c.sinkStream.Recv() if err != nil { return nil, fmt.Errorf("failed to receive sink response: %v", err) } if resp.GetStatus() != nil && resp.GetStatus().GetEot() { - if i != len(requests) { - c.log.Errorw("Received EOT message before all responses are received, we will wait indefinitely for the remaining responses", zap.Int("received", i), zap.Int("expected", len(requests))) + if responsesCount != len(requests) { + c.log.Errorw("Received EOT message before all responses are received, we will wait indefinitely for the remaining responses", zap.Int("received", responsesCount), zap.Int("expected", len(requests))) + } else { + break } continue } + responsesCount += len(resp.GetResults()) responses = append(responses, resp) } diff --git a/pkg/sdkclient/sinker/client_test.go b/pkg/sdkclient/sinker/client_test.go index e95b39dba4..01cebcc590 100644 --- a/pkg/sdkclient/sinker/client_test.go +++ b/pkg/sdkclient/sinker/client_test.go @@ -65,9 +65,11 @@ func TestClient_SinkFn(t *testing.T) { mockSinkClient := sinkmock.NewMockSink_SinkFnClient(ctrl) mockSinkClient.EXPECT().Send(gomock.Any()).Return(nil).AnyTimes() mockSinkClient.EXPECT().Recv().Return(&sinkpb.SinkResponse{ - Result: &sinkpb.SinkResponse_Result{ - Id: "temp-id", - Status: sinkpb.Status_SUCCESS, + Results: []*sinkpb.SinkResponse_Result{ + { + Id: "temp-id", + Status: sinkpb.Status_SUCCESS, + }, }, }, nil) mockSinkClient.EXPECT().Recv().Return(&sinkpb.SinkResponse{ @@ -94,9 +96,11 @@ func TestClient_SinkFn(t *testing.T) { }) assert.Equal(t, []*sinkpb.SinkResponse{ { - Result: &sinkpb.SinkResponse_Result{ - Id: "temp-id", - Status: sinkpb.Status_SUCCESS, + Results: []*sinkpb.SinkResponse_Result{ + { + Id: "temp-id", + Status: sinkpb.Status_SUCCESS, + }, }, }, }, response) diff --git a/pkg/sdkclient/source/client.go b/pkg/sdkclient/source/client.go index 550c888d66..feff9ddc59 100644 --- a/pkg/sdkclient/source/client.go +++ b/pkg/sdkclient/source/client.go @@ -190,27 +190,19 @@ func (c *client) ReadFn(_ context.Context, req *sourcepb.ReadRequest, datumCh ch } // AckFn acknowledges the data from the source. -func (c *client) AckFn(_ context.Context, reqs []*sourcepb.AckRequest) ([]*sourcepb.AckResponse, error) { +func (c *client) AckFn(_ context.Context, req *sourcepb.AckRequest) (*sourcepb.AckResponse, error) { // Send the ack request - for _, req := range reqs { - err := c.ackStream.Send(req) - if err != nil { - return nil, fmt.Errorf("failed to send ack request: %v", err) - } + err := c.ackStream.Send(req) + if err != nil { + return nil, fmt.Errorf("failed to send ack request: %v", err) } - responses := make([]*sourcepb.AckResponse, len(reqs)) - for i := 0; i < len(reqs); i++ { - // Wait for the ack response - resp, err := c.ackStream.Recv() - // we don't need an EOF check because we only close the stream during shutdown. - if err != nil { - return nil, fmt.Errorf("failed to receive ack response: %v", err) - } - responses[i] = resp + // Wait for the ack response + resp, err := c.ackStream.Recv() + if err != nil { + return nil, fmt.Errorf("failed to receive ack response: %v", err) } - - return responses, nil + return resp, nil } // PendingFn returns the number of pending data from the source. diff --git a/pkg/sdkclient/source/client_test.go b/pkg/sdkclient/source/client_test.go index d19e3e8737..818c3c3430 100644 --- a/pkg/sdkclient/source/client_test.go +++ b/pkg/sdkclient/source/client_test.go @@ -188,9 +188,9 @@ func TestAckFn(t *testing.T) { assert.True(t, ackHandshakeResponse.GetHandshake().GetSot()) // Test AckFn - ack, err := testClient.AckFn(ctx, []*sourcepb.AckRequest{{}}) + ack, err := testClient.AckFn(ctx, &sourcepb.AckRequest{}) assert.NoError(t, err) - assert.Equal(t, []*sourcepb.AckResponse{{}}, ack) + assert.Equal(t, &sourcepb.AckResponse{}, ack) } func TestPendingFn(t *testing.T) { diff --git a/pkg/sdkclient/source/interface.go b/pkg/sdkclient/source/interface.go index cc26f2cd95..ea897b8207 100644 --- a/pkg/sdkclient/source/interface.go +++ b/pkg/sdkclient/source/interface.go @@ -32,7 +32,7 @@ type Client interface { // ReadFn reads messages from the udsource. ReadFn(ctx context.Context, req *sourcepb.ReadRequest, datumCh chan<- *sourcepb.ReadResponse) error // AckFn acknowledges messages from the udsource. - AckFn(ctx context.Context, req []*sourcepb.AckRequest) ([]*sourcepb.AckResponse, error) + AckFn(ctx context.Context, req *sourcepb.AckRequest) (*sourcepb.AckResponse, error) // PendingFn returns the number of pending messages from the udsource. PendingFn(ctx context.Context, req *emptypb.Empty) (*sourcepb.PendingResponse, error) // PartitionsFn returns the list of partitions from the udsource. diff --git a/pkg/sinks/udsink/udsink_grpc.go b/pkg/sinks/udsink/udsink_grpc.go index 80f47d2675..6b9a1a77eb 100644 --- a/pkg/sinks/udsink/udsink_grpc.go +++ b/pkg/sinks/udsink/udsink_grpc.go @@ -106,24 +106,26 @@ func (u *UDSgRPCBasedUDSink) ApplySink(ctx context.Context, requests []*sinkpb.S return errs } // Use ID to map the response messages, so that there's no strict requirement for the user-defined sink to return the response in order. - resMap := make(map[string]*sinkpb.SinkResponse) + resMap := make(map[string]*sinkpb.SinkResponse_Result) for _, res := range responses { - resMap[res.Result.GetId()] = res + for _, result := range res.Results { + resMap[result.GetId()] = result + } } for i, m := range requests { if r, existing := resMap[m.Request.GetId()]; !existing { errs[i] = &NotFoundErr } else { - if r.Result.GetStatus() == sinkpb.Status_FAILURE { - if r.Result.GetErrMsg() != "" { + if r.GetStatus() == sinkpb.Status_FAILURE { + if r.GetErrMsg() != "" { errs[i] = &ApplyUDSinkErr{ UserUDSinkErr: true, - Message: r.Result.GetErrMsg(), + Message: r.GetErrMsg(), } } else { errs[i] = &UnknownUDSinkErr } - } else if r.Result.GetStatus() == sinkpb.Status_FALLBACK { + } else if r.GetStatus() == sinkpb.Status_FALLBACK { errs[i] = &WriteToFallbackErr } else { errs[i] = nil diff --git a/pkg/sinks/udsink/udsink_grpc_test.go b/pkg/sinks/udsink/udsink_grpc_test.go index 2af72530fc..a3733b6c6f 100644 --- a/pkg/sinks/udsink/udsink_grpc_test.go +++ b/pkg/sinks/udsink/udsink_grpc_test.go @@ -99,10 +99,7 @@ func Test_gRPCBasedUDSink_ApplyWithMockClient(t *testing.T) { mockSinkClient := sinkmock.NewMockSink_SinkFnClient(ctrl) mockSinkClient.EXPECT().Send(gomock.Any()).Return(nil).AnyTimes() mockSinkClient.EXPECT().Recv().Return(&sinkpb.SinkResponse{ - Result: testResponseList[0], - }, nil) - mockSinkClient.EXPECT().Recv().Return(&sinkpb.SinkResponse{ - Result: testResponseList[1], + Results: testResponseList, }, nil) mockSinkClient.EXPECT().Recv().Return(&sinkpb.SinkResponse{Status: &sinkpb.TransmissionStatus{ Eot: true, diff --git a/pkg/sources/udsource/grpc_udsource.go b/pkg/sources/udsource/grpc_udsource.go index 8d0389a2ee..32efdbc51d 100644 --- a/pkg/sources/udsource/grpc_udsource.go +++ b/pkg/sources/udsource/grpc_udsource.go @@ -175,16 +175,12 @@ func (u *GRPCBasedUDSource) ApplyAckFn(ctx context.Context, offsets []isb.Offset for i, offset := range offsets { rOffsets[i] = ConvertToUserDefinedSourceOffset(offset) } - ackRequests := make([]*sourcepb.AckRequest, len(rOffsets)) - for i, offset := range rOffsets { - var r = &sourcepb.AckRequest{ - Request: &sourcepb.AckRequest_Request{ - Offset: offset, - }, - } - ackRequests[i] = r + var ackRequest = &sourcepb.AckRequest{ + Request: &sourcepb.AckRequest_Request{ + Offsets: rOffsets, + }, } - _, err := u.client.AckFn(ctx, ackRequests) + _, err := u.client.AckFn(ctx, ackRequest) return err } diff --git a/pkg/sources/udsource/grpc_udsource_test.go b/pkg/sources/udsource/grpc_udsource_test.go index e0a0ab4ca5..4d3d78f919 100644 --- a/pkg/sources/udsource/grpc_udsource_test.go +++ b/pkg/sources/udsource/grpc_udsource_test.go @@ -274,21 +274,14 @@ func Test_gRPCBasedUDSource_ApplyAckWithMockClient(t *testing.T) { mockClient.EXPECT().ReadFn(gomock.Any(), gomock.Any()).Return(nil, nil) mockClient.EXPECT().AckFn(gomock.Any(), gomock.Any()).Return(mockAckClient, nil) - req1 := &sourcepb.AckRequest{ - Request: &sourcepb.AckRequest_Request{ - Offset: offset1, - }, - } - - req2 := &sourcepb.AckRequest{ + req := &sourcepb.AckRequest{ Request: &sourcepb.AckRequest_Request{ - Offset: offset2, + Offsets: []*sourcepb.Offset{offset1, offset2}, }, } - mockAckClient.EXPECT().Send(req1).Return(nil).Times(1) - mockAckClient.EXPECT().Send(req2).Return(nil).Times(1) - mockAckClient.EXPECT().Recv().Return(&sourcepb.AckResponse{}, nil).Times(2) + mockAckClient.EXPECT().Send(req).Return(nil).Times(1) + mockAckClient.EXPECT().Recv().Return(&sourcepb.AckResponse{}, nil).Times(1) ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() @@ -321,7 +314,7 @@ func Test_gRPCBasedUDSource_ApplyAckWithMockClient(t *testing.T) { req1 := &sourcepb.AckRequest{ Request: &sourcepb.AckRequest_Request{ - Offset: offset1, + Offsets: []*sourcepb.Offset{offset1, offset2}, }, } diff --git a/rust/Cargo.lock b/rust/Cargo.lock index c069cd81f0..ee664937a7 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -1614,7 +1614,7 @@ dependencies = [ [[package]] name = "numaflow" version = "0.1.1" -source = "git+https://github.com/numaproj/numaflow-rs.git?rev=9fb3c0ad0f5f43cc42b4919f849b7dcce9a91387#9fb3c0ad0f5f43cc42b4919f849b7dcce9a91387" +source = "git+https://github.com/numaproj/numaflow-rs.git?rev=ddd879588e11455921f1ca958ea2b3c076689293#ddd879588e11455921f1ca958ea2b3c076689293" dependencies = [ "chrono", "futures-util", @@ -1648,7 +1648,7 @@ dependencies = [ "hyper-util", "kube", "log", - "numaflow 0.1.1 (git+https://github.com/numaproj/numaflow-rs.git?rev=9fb3c0ad0f5f43cc42b4919f849b7dcce9a91387)", + "numaflow 0.1.1 (git+https://github.com/numaproj/numaflow-rs.git?rev=ddd879588e11455921f1ca958ea2b3c076689293)", "numaflow-models", "numaflow-pb", "parking_lot", diff --git a/rust/numaflow-core/Cargo.toml b/rust/numaflow-core/Cargo.toml index 9a7678ff67..179da28dd8 100644 --- a/rust/numaflow-core/Cargo.toml +++ b/rust/numaflow-core/Cargo.toml @@ -46,7 +46,7 @@ async-nats = "0.37.0" [dev-dependencies] tempfile = "3.11.0" -numaflow = { git = "https://github.com/numaproj/numaflow-rs.git", rev = "9fb3c0ad0f5f43cc42b4919f849b7dcce9a91387" } +numaflow = { git = "https://github.com/numaproj/numaflow-rs.git", rev = "ddd879588e11455921f1ca958ea2b3c076689293" } [build-dependencies] diff --git a/rust/numaflow-core/src/message.rs b/rust/numaflow-core/src/message.rs index ef650a0df1..f24212967f 100644 --- a/rust/numaflow-core/src/message.rs +++ b/rust/numaflow-core/src/message.rs @@ -10,8 +10,8 @@ use bytes::{Bytes, BytesMut}; use chrono::{DateTime, Utc}; use numaflow_pb::clients::sink::sink_request::Request; use numaflow_pb::clients::sink::Status::{Failure, Fallback, Success}; -use numaflow_pb::clients::sink::{sink_response, SinkRequest, SinkResponse}; -use numaflow_pb::clients::source::{read_response, AckRequest}; +use numaflow_pb::clients::sink::{sink_response, SinkRequest}; +use numaflow_pb::clients::source::read_response; use numaflow_pb::clients::sourcetransformer::SourceTransformRequest; use prost::Message as ProtoMessage; use serde::{Deserialize, Serialize}; @@ -208,22 +208,17 @@ impl fmt::Display for MessageID { } } -impl TryFrom for AckRequest { +impl TryFrom for numaflow_pb::clients::source::Offset { type Error = Error; - fn try_from(value: Offset) -> std::result::Result { - match value { + fn try_from(offset: Offset) -> std::result::Result { + match offset { Offset::Int(_) => Err(Error::Source("IntOffset not supported".to_string())), - Offset::String(o) => Ok(Self { - request: Some(numaflow_pb::clients::source::ack_request::Request { - offset: Some(numaflow_pb::clients::source::Offset { - offset: BASE64_STANDARD - .decode(o.offset) - .expect("we control the encoding, so this should never fail"), - partition_id: o.partition_idx as i32, - }), - }), - handshake: None, + Offset::String(o) => Ok(numaflow_pb::clients::source::Offset { + offset: BASE64_STANDARD + .decode(o.offset) + .expect("we control the encoding, so this should never fail"), + partition_id: o.partition_idx as i32, }), } } @@ -371,7 +366,7 @@ pub(crate) struct ResponseFromSink { pub(crate) status: ResponseStatusFromSink, } -impl From for SinkResponse { +impl From for sink_response::Result { fn from(value: ResponseFromSink) -> Self { let (status, err_msg) = match value.status { ResponseStatusFromSink::Success => (Success, "".to_string()), @@ -380,35 +375,24 @@ impl From for SinkResponse { }; Self { - result: Some(sink_response::Result { - id: value.id, - status: status as i32, - err_msg, - }), - handshake: None, - status: None, + id: value.id, + status: status as i32, + err_msg, } } } -impl TryFrom for ResponseFromSink { - type Error = Error; - - fn try_from(value: SinkResponse) -> Result { - let value = value - .result - .ok_or(Error::Sink("result is empty".to_string()))?; - +impl From for ResponseFromSink { + fn from(value: sink_response::Result) -> Self { let status = match value.status() { Success => ResponseStatusFromSink::Success, Failure => ResponseStatusFromSink::Failed(value.err_msg), Fallback => ResponseStatusFromSink::Fallback, }; - - Ok(Self { + Self { id: value.id, status, - }) + } } } @@ -418,6 +402,7 @@ mod tests { use chrono::TimeZone; use numaflow_pb::clients::sink::sink_response::Result as SinkResult; + use numaflow_pb::clients::sink::SinkResponse; use numaflow_pb::clients::source::Offset as SourceOffset; use numaflow_pb::objects::isb::{ Body, Header, Message as ProtoMessage, MessageId, MessageInfo, @@ -444,26 +429,6 @@ mod tests { assert_eq!(format!("{}", message_id), "vertex-123-0"); } - #[test] - fn test_offset_to_ack_request() { - let offset = Offset::String(StringOffset { - offset: BASE64_STANDARD.encode("123"), - partition_idx: 1, - }); - let ack_request: AckRequest = offset.try_into().unwrap(); - assert_eq!(ack_request.request.unwrap().offset.unwrap().partition_id, 1); - - let offset = Offset::Int(IntOffset::new(42, 1)); - let result: Result = offset.try_into(); - - // Assert that the conversion results in an error - assert!(result.is_err()); - - if let Err(e) = result { - assert_eq!(e.to_string(), "Source Error - IntOffset not supported"); - } - } - #[test] fn test_message_to_vec_u8() { let message = Message { @@ -622,28 +587,34 @@ mod tests { status: ResponseStatusFromSink::Success, }; - let sink_response: SinkResponse = response.into(); - assert_eq!(sink_response.result.unwrap().status, Success as i32); + let sink_result: sink_response::Result = response.into(); + assert_eq!(sink_result.status, Success as i32); } #[test] fn test_sink_response_to_response_from_sink() { let sink_response = SinkResponse { - result: Some(SinkResult { + results: vec![SinkResult { id: "123".to_string(), status: Success as i32, err_msg: "".to_string(), - }), + }], handshake: None, status: None, }; - let response: Result = sink_response.try_into(); - assert!(response.is_ok()); + let results: Vec = sink_response + .results + .into_iter() + .map(Into::into) + .collect::>(); + assert!(!results.is_empty()); - let response = response.unwrap(); - assert_eq!(response.id, "123"); - assert_eq!(response.status, ResponseStatusFromSink::Success); + assert_eq!(results.get(0).unwrap().id, "123"); + assert_eq!( + results.get(0).unwrap().status, + ResponseStatusFromSink::Success + ); } #[test] @@ -692,14 +663,12 @@ mod tests { // Test conversion from Offset to AckRequest for StringOffset let offset = Offset::String(StringOffset::new(BASE64_STANDARD.encode("42"), 1)); - let result: Result = offset.try_into(); - assert!(result.is_ok()); - let ack_request = result.unwrap(); - assert_eq!(ack_request.request.unwrap().offset.unwrap().partition_id, 1); + let offset: Result = offset.try_into(); + assert_eq!(offset.unwrap().partition_id, 1); // Test conversion from Offset to AckRequest for IntOffset (should fail) let offset = Offset::Int(IntOffset::new(42, 1)); - let result: Result = offset.try_into(); + let result: Result = offset.try_into(); assert!(result.is_err()); } } diff --git a/rust/numaflow-core/src/metrics.rs b/rust/numaflow-core/src/metrics.rs index c81c16231c..3aaf97ab78 100644 --- a/rust/numaflow-core/src/metrics.rs +++ b/rust/numaflow-core/src/metrics.rs @@ -757,7 +757,7 @@ mod tests { impl source::Sourcer for SimpleSource { async fn read(&self, _: SourceReadRequest, _: Sender) {} - async fn ack(&self, _: Offset) {} + async fn ack(&self, _: Vec) {} async fn pending(&self) -> usize { 0 diff --git a/rust/numaflow-core/src/monovertex.rs b/rust/numaflow-core/src/monovertex.rs index 598a3d9e83..bbdeaf3a9d 100644 --- a/rust/numaflow-core/src/monovertex.rs +++ b/rust/numaflow-core/src/monovertex.rs @@ -300,7 +300,7 @@ mod tests { impl source::Sourcer for SimpleSource { async fn read(&self, _: SourceReadRequest, _: Sender) {} - async fn ack(&self, _: Offset) {} + async fn ack(&self, _: Vec) {} async fn pending(&self) -> usize { 0 diff --git a/rust/numaflow-core/src/monovertex/forwarder.rs b/rust/numaflow-core/src/monovertex/forwarder.rs index 793eae4526..f84cade170 100644 --- a/rust/numaflow-core/src/monovertex/forwarder.rs +++ b/rust/numaflow-core/src/monovertex/forwarder.rs @@ -625,11 +625,13 @@ mod tests { .extend(message_offsets) } - async fn ack(&self, offset: Offset) { - self.yet_to_be_acked - .write() - .unwrap() - .remove(&String::from_utf8(offset.offset).unwrap()); + async fn ack(&self, offsets: Vec) { + for offset in offsets { + self.yet_to_be_acked + .write() + .unwrap() + .remove(&String::from_utf8(offset.offset).unwrap()); + } } async fn pending(&self) -> usize { diff --git a/rust/numaflow-core/src/pipeline.rs b/rust/numaflow-core/src/pipeline.rs index ffe1c06944..a9724780ea 100644 --- a/rust/numaflow-core/src/pipeline.rs +++ b/rust/numaflow-core/src/pipeline.rs @@ -328,7 +328,7 @@ mod tests { #[cfg(feature = "nats-tests")] #[tokio::test] - async fn test_forwarder_for_source_vetex() { + async fn test_forwarder_for_source_vertex() { // Unique names for the streams we use in this test let streams = vec![ "default-test-forwarder-for-source-vertex-out-0", diff --git a/rust/numaflow-core/src/shared/utils.rs b/rust/numaflow-core/src/shared/utils.rs index 576f78499f..84fb5a0c3b 100644 --- a/rust/numaflow-core/src/shared/utils.rs +++ b/rust/numaflow-core/src/shared/utils.rs @@ -225,7 +225,7 @@ mod tests { impl source::Sourcer for SimpleSource { async fn read(&self, _request: SourceReadRequest, _transmitter: Sender) {} - async fn ack(&self, _offset: Offset) {} + async fn ack(&self, _offset: Vec) {} async fn pending(&self) -> usize { 0 diff --git a/rust/numaflow-core/src/sink/user_defined.rs b/rust/numaflow-core/src/sink/user_defined.rs index 92d05230ad..5799291eaf 100644 --- a/rust/numaflow-core/src/sink/user_defined.rs +++ b/rust/numaflow-core/src/sink/user_defined.rs @@ -1,3 +1,7 @@ +use crate::message::{Message, ResponseFromSink}; +use crate::sink::Sink; +use crate::Error; +use crate::Result; use numaflow_pb::clients::sink::sink_client::SinkClient; use numaflow_pb::clients::sink::{Handshake, SinkRequest, SinkResponse, TransmissionStatus}; use tokio::sync::mpsc; @@ -5,11 +9,6 @@ use tokio_stream::wrappers::ReceiverStream; use tonic::transport::Channel; use tonic::{Request, Streaming}; -use crate::error; -use crate::message::{Message, ResponseFromSink}; -use crate::sink::Sink; -use crate::Error; - const DEFAULT_CHANNEL_SIZE: usize = 1000; /// User-Defined Sink code writes messages to a custom [Sink]. @@ -19,7 +18,7 @@ pub struct UserDefinedSink { } impl UserDefinedSink { - pub(crate) async fn new(mut client: SinkClient) -> error::Result { + pub(crate) async fn new(mut client: SinkClient) -> Result { let (sink_tx, sink_rx) = mpsc::channel(DEFAULT_CHANNEL_SIZE); let sink_stream = ReceiverStream::new(sink_rx); @@ -59,7 +58,7 @@ impl UserDefinedSink { impl Sink for UserDefinedSink { /// writes a set of messages to the sink. - async fn sink(&mut self, messages: Vec) -> error::Result> { + async fn sink(&mut self, messages: Vec) -> Result> { let requests: Vec = messages.into_iter().map(|message| message.into()).collect(); let num_requests = requests.len(); @@ -88,7 +87,7 @@ impl Sink for UserDefinedSink { // response only once it has read all the requests. // We wait for num_requests + 1 responses because the last response will be the EOT response. let mut responses = Vec::new(); - for i in 0..num_requests + 1 { + loop { let response = self .resp_stream .message() @@ -96,12 +95,20 @@ impl Sink for UserDefinedSink { .ok_or(Error::Sink("failed to receive response".to_string()))?; if response.status.map_or(false, |s| s.eot) { - if i != num_requests { + if responses.len() != num_requests { log::error!("received EOT message before all responses are received, we will wait indefinitely for the remaining responses"); + } else { + break; } continue; } - responses.push(response.try_into()?); + responses.extend( + response + .results + .into_iter() + .map(Into::into) + .collect::>(), + ); } Ok(responses) @@ -112,7 +119,6 @@ impl Sink for UserDefinedSink { mod tests { use chrono::offset::Utc; use numaflow::sink; - use numaflow_pb::clients::sink::sink_client::SinkClient; use tokio::sync::mpsc; use tracing::info; diff --git a/rust/numaflow-core/src/source/user_defined.rs b/rust/numaflow-core/src/source/user_defined.rs index 00a3dd47a3..03162b53ac 100644 --- a/rust/numaflow-core/src/source/user_defined.rs +++ b/rust/numaflow-core/src/source/user_defined.rs @@ -185,25 +185,24 @@ impl UserDefinedSourceAck { impl SourceAcker for UserDefinedSourceAck { async fn ack(&mut self, offsets: Vec) -> Result<()> { - let n = offsets.len(); - - // send n ack requests - for offset in offsets { - let request = offset.try_into()?; - self.ack_tx - .send(request) - .await - .map_err(|e| Error::Source(e.to_string()))?; - } + let ack_offsets: Result> = + offsets.into_iter().map(TryInto::try_into).collect(); + + self.ack_tx + .send(AckRequest { + request: Some(source::ack_request::Request { + offsets: ack_offsets?, + }), + handshake: None, + }) + .await + .map_err(|e| Error::Source(e.to_string()))?; - // make sure we get n responses for the n requests. - for _ in 0..n { - let _ = self - .ack_resp_stream - .message() - .await? - .ok_or(Error::Source("failed to receive ack response".to_string()))?; - } + let _ = self + .ack_resp_stream + .message() + .await? + .ok_or(Error::Source("failed to receive ack response".to_string()))?; Ok(()) } @@ -284,11 +283,13 @@ mod tests { self.yet_to_ack.write().unwrap().extend(message_offsets) } - async fn ack(&self, offset: Offset) { - self.yet_to_ack - .write() - .unwrap() - .remove(&String::from_utf8(offset.offset).unwrap()); + async fn ack(&self, offsets: Vec) { + for offset in offsets { + self.yet_to_ack + .write() + .unwrap() + .remove(&String::from_utf8(offset.offset).unwrap()); + } } async fn pending(&self) -> usize { diff --git a/rust/numaflow-pb/src/clients/sink.v1.rs b/rust/numaflow-pb/src/clients/sink.v1.rs index 612e5693c3..3fd8289d10 100644 --- a/rust/numaflow-pb/src/clients/sink.v1.rs +++ b/rust/numaflow-pb/src/clients/sink.v1.rs @@ -61,8 +61,8 @@ pub struct TransmissionStatus { /// SinkResponse is the individual response of each message written to the sink. #[derive(Clone, PartialEq, ::prost::Message)] pub struct SinkResponse { - #[prost(message, optional, tag = "1")] - pub result: ::core::option::Option, + #[prost(message, repeated, tag = "1")] + pub results: ::prost::alloc::vec::Vec, #[prost(message, optional, tag = "2")] pub handshake: ::core::option::Option, #[prost(message, optional, tag = "3")] diff --git a/rust/numaflow-pb/src/clients/source.v1.rs b/rust/numaflow-pb/src/clients/source.v1.rs index f60a48315c..1b96a0e77f 100644 --- a/rust/numaflow-pb/src/clients/source.v1.rs +++ b/rust/numaflow-pb/src/clients/source.v1.rs @@ -179,8 +179,8 @@ pub mod ack_request { #[derive(Clone, PartialEq, ::prost::Message)] pub struct Request { /// Required field holding the offset to be acked - #[prost(message, optional, tag = "1")] - pub offset: ::core::option::Option, + #[prost(message, repeated, tag = "1")] + pub offsets: ::prost::alloc::vec::Vec, } } ///