Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ask clarification about streaming implementation for rpc service #784

Open
sulliwane opened this issue May 5, 2017 · 14 comments
Open

Ask clarification about streaming implementation for rpc service #784

sulliwane opened this issue May 5, 2017 · 14 comments

Comments

@sulliwane
Copy link

sulliwane commented May 5, 2017

protobuf.js version: 6.7.1

I'm trying to implement streaming RPC function, to create services. But looking at the streaming example, I don't understand how I can emit these events:

greeter.on("data", function(response, method) { // data function
    console.log("data in " + method.name + ":", response.message);
});

greeter.on("end", function() { // end function
    console.log("end");
});

greeter.on("error", function(err, method) { // error function
    console.log("error in " + method.name + ":", err);
});

greeter.on("status", function(code, text) { // status function
    console.log("custom status:", code, text);
});

from the rpcImpl function:

// I only have a callback function. It's OK for unary method, but how to use for streaming method (data, end, error, status events)
var greeter = Greeter.create(function myRPCImpl(method, requestData, callback) { 
        responseData = fetch();
        callback(null, responseData);
    };
});

My guess is:

  • data event: multiple call to callback(null, data).
  • end event: calling callback(null, null).
  • error event: calling callback(error).
  • status event: ???

If that's correct, does it also mean that rpcImpl will automatically fire events when receiving a streaming method, and fire a promise when receiving a unary method? Or fire both whatever the method?

Thank you for the clarification!

@dcodeIO
Copy link
Member

dcodeIO commented May 5, 2017

rpcImpl is just the low level method that implements a service request on the network level. It doesn't emit any events but solely uses a callback.

greeter is an instances of rpc.Service here (uses rpcImpl and extends an event emitter). That's where the events come from. MyService.create instantiates an rpc.Service.

The status event is a custom event present in this specific example only, to, well, demonstrate that one can also emit own events.

@jnordberg
Copy link

I just came here to create a similar issue (hi @dcodeIO 👋 😄)

I'm also confused by the streaming API. Shouldn't the rpc.Service do something different for streaming methods versus regular ones? Looks to me that a service can only have one method that is streaming and you can call the method only once before having to create a new rpc.Service instance.

Also I don't think the current implementation will work at all with promises, since once resolved the resolve callback is a no-op.

Something like this would make more sense to me:

let stream = myService.streamingMethod()
stream.on('data', (obj) => {
   console.log(obj) // decoded message
})
stream.on('end', () => console.log('ended'))
stream.on('error', (error) => throw error)

Could be done with a simple pseudo-stream interface (like a event emitter with write/end methods) or by using the readable-stream package which is fairly lightweight and very well tested.

@dcodeIO
Copy link
Member

dcodeIO commented May 5, 2017

Nice to meet you again ;)

I'm also confused by the streaming API. Shouldn't the rpc.Service do something different for streaming methods versus regular ones? Looks to me that a service can only have one method that is streaming and you can call the method only once before having to create a new rpc.Service instance.

Individual methods do not return a stream, but the service instance's data event also references the method a message belongs to:

greeter.on("data", function(response, method) {
    console.log("data in " + method.name + ":", response.message);
});

Likewise, rpcImpl also knows which method a request is coming from: function myRPCImpl(method, requestData, callback).

This is what happens:

  1. Call a streaming method
  2. rpcImpl is called with method and the request message and sends it to the server (can be anything, but most likely over a websocket or something like that)
  3. Server processes the request and responds
  4. rpcImpl receives the response from the server and calls its callback
  5. Service instance emits a data event with method and response message

Additionally, when rpcImpl's callback is called with a null message, the service instance assumes that the stream has ended and emits the end event.

This should also work for multiple methods. I must admit, though, that the current implementation is built around the desire of not adding another dependency besides a minimal event emitter and it might not be the greatest API of all time to work with.

Edit: I believe that ideally, protobuf.js itself wouldn't offer a service wrapper on its own. An additional package doing solely services might be better suited, and could contain more dependencies because it wouldn't affect the size of the core library.

@jnordberg
Copy link

Hmm, I still don't see how the API allows for a proper implementation of streams. Take this example:

service ImageService {
    rpc GetImage (ImageRequest) returns (stream ImageChunk) {}
}

message ImageRequest {
    string name = 1;
}

message ImageChunk {
    bytes data = 1;
}
let service = ImageService(function myRPCImpl(method, request, callback) { 
    let image = sometransport.createReadStream(request.name)
    image.on('data', (chunk) => { callback(null, {chunk}) })
    image.on('end', () => { callback(null) })
    image.on('error', (error) => { callback(error) })
})

let data = await myService.getImage({name: 'bar.jpg'}) // here I have only the first chunk

myService.on('data', (chunk, method) => {
   // here I don't know which image I asked so if I sent multiple getImage requests the chunks would be mixed up
})

// this would throw since myService.rpcImpl is set to null after a stream has finished
// but imagining I create a new service instance for every call I want to make this works:
myService.getImage({name: 'bar.jpg'}, (error, chunk) => {
   // even though calling a callback multiple times is considered bad practice
})

IMHO the rpc.Service class needs a bit of a refactor before working with streams become possible. Unless I'm completely misunderstanding how streaming services should work :)

@dcodeIO
Copy link
Member

dcodeIO commented May 5, 2017

so if I sent multiple getImage requests the chunks would be mixed up

Ah, you are right, parallel requests using the same method don't work with the current design.

@jnordberg
Copy link

If you're interested I'd be happy to send a PR with a proposal for a better streaming API

@sulliwane
Copy link
Author

sulliwane commented May 10, 2017

this makes sense to me as well:

let stream = myService.streamingMethod(request)
stream.on('data', (obj) => {
   console.log(obj) // decoded message
})
stream.on('end', () => console.log('ended'))
stream.on('error', (error) => throw error)

I'd be happy to test/review any proposal 👍

@sulliwane
Copy link
Author

@jnordberg Did you come up with a solution ?

@jnordberg
Copy link

@sulliwane nope, just using non-streaming services for now...

@sulliwane
Copy link
Author

Would be nice to have streaming working for rpc.Service :)

Especially since protobuf.js API is so much nicer to work with than google-protobuf

@aep
Copy link

aep commented Sep 2, 2018

any chance to get this going again?

@anjmao
Copy link

anjmao commented Sep 20, 2018

Hi, is anyone working on this issue? If not I could work on this. Let me know since I need server streaming support. Current RPCImpl is suitable only for simple unary calls. It is even more clear when using with Typescript

Example:

proto

service Debug {
  rpc GetServerStream(StreamReques) returns (stream StreamReply) {}
}

typescript generated method

public getServerStream(request: debug.IStreamRequest): Promise<debug.IStreamReply>;

Instead it should return Observable object or EventEmitter as @sulliwane pointed out.

interface Stream {
    on(eventName: 'data' | 'end' | 'error', cb: (data) => void): void;
}

public getServerStream(request: debug.IStreamRequest): Stream<debug.IStreamReply>;

Also we need separate RCPStreaImp

const rcp: RPCStreamImpl = (method, requestData, stream) => {
    underlingStream.start(method.name, requestData);
    underlingStream.on('data', (data) => stream.emit('data', data));
    underlingStream.on('end', () => stream.emit('end'));
    underlingStream.on('error', (err) => stream.emit('error', err));
};

PS.

RPCImpl doesn't allow to get service and pacakge name. I'm also thinking to change method to return name, service and package names.

@anjmao
Copy link

anjmao commented Sep 26, 2018

Hi, here is PR for streaming support, let me know what do you think #1115

@christian-ehmig
Copy link

Since I don't know when #1115 will be accepted, here two workarounds, assuming you generated a protobufjs static file and wish to use a bidirectional stream:

Variant 1: inject custom RPC method definition into makeGenericClientConstructor, bidi stream

const grpc = require('grpc');
const messages = require('./pbjs-static.js').my.package; 

const MyService = exports.MyService = {  
  myCall: {
    path: '/my.package/myCall',
    requestStream: true,
    responseStream: true,
    requestType: messages.MyCallRequest,
    responseType: messages.MyCallResponse,
    requestSerialize: arg => messages.MyCallRequest.encode(arg).finish(), 
    responseDeserialize: arg => messages.MyCallResponse.decode(arg)
  }
};

const Client = grpc.makeGenericClientConstructor(MyService)
const client = new Client(
  grpcServerUrl,
  grpc.credentials.createInsecure()
)

// create bidi-stream
var stream = client.myCall();

// listen on incoming data (decoded via protobufjs)
stream.on('data', function(response){      
})

// send data (json is encoded via protobufjs)
stream.write({my: "message"});

Variant 2: directly create a bidi stream from makeBidiStreamRequest

const grpc = require('grpc');
const messages = require('./pbjs-static.js').my.package; 

const Client = grpc.makeGenericClientConstructor()
const client = new Client(
  grpcServerUrl,
  grpc.credentials.createInsecure()
)

var stream = client.makeBidiStreamRequest(
  '/my.package/myCall',      
  arg => messages.MyCallRequest.encode(arg).finish(),
  arg => messages.MyCallResponse.decode(arg)  
); 

// listen on incoming data (decoded via protobufjs)
stream.on('data', function(response){      
})

// send data (json is encoded via protobufjs)
stream.write({my: "message"});

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

6 participants