Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wip: RPC server, client, bidirectional streaming #1115

Open
wants to merge 7 commits into
base: master
Choose a base branch
from

Conversation

anjmao
Copy link

@anjmao anjmao commented Sep 26, 2018

This PR will add possibility to implement server, client, bidirectional streams.

Server stream:

serverStream = () => {
    const { grpcService } = this.props;
    const stream = grpcService.serverStream({ message: `Ping ${utcNow()}` });
    stream.on('recv', (data) => {
        console.log('data', data);
    });
    stream.on('error', (err) => {
        console.log('error', err);
    });
    stream.on('end', () => {
        console.log('end');
    });
}

Client stream:

clientStream = () => {
    const { grpcService } = this.props;
    const stream = grpcService.clientStream();
    stream.emit('send', { message: `Ping ${utcNow()}` });
    stream.on('error', (err) => {
        console.log('error', err);
    });
    stream.on('end', () => {
        console.log('end');
    });
}

Bidirectional stream:

bidiStream = () => {
    const { grpcService } = this.props;
    const stream = grpcService.bidiStream();
    stream.on('recv', (data) => {
        console.log('recv data', data);
        // let's acknowledge message
        stream.emit('send', { message: 'OK'});
    });

    stream.on('error', (err) => {
        console.log('error', err);
    });
    stream.on('end', () => {
        console.log('end');
    });

    setTimeout(() => {
        console.log('close bidi stream');
        stream.emit('close');
    }, 5000);
}

It is possible to pass new RPCHandler implementation.

createService<T extends rpc.Service>(name: string, service: typeof rpc.Service): T {
    const rpcHandler: RPCHandler = {
        unaryCall: this.createRcpUnaryImpl(name),
        serverStreamCall: this.createServerStreamImpl(name),
        clientStreamCall: this.createClientStreamImpl(name),
        bidiStreamCall: this.createBidiStreamImpl(name)
    };
    return new service(rpcHandler) as T;
}

Some examples from my actual implementation (React Native + SwiftGrpc) using underlying EvenEmitter.

// Unary implementation returns RPCUnaryCall type
createRcpUnaryImpl(serviceName: string): RPCUnaryCall {
    return (method, requestData, callback) => {
        if (!requestData) {
            return;
        }
        const methodName = `/${serviceName}/${method.name}`;
        const reqBase64String = Buffer.from(requestData).toString('base64');
        this.grpcNative.makeUnaryCall(methodName, reqBase64String).then((rspBase64Sring) => {
            const rspBytes = Buffer.from(rspBase64Sring, 'base64');
            callback(null, rspBytes);
        }, (err) => {
            callback(err, null);
        });
    };
}

// Streaming implementation should return RPCServerStreamCall, RPCClientStreamCall or RPCBidiStreamCall
createServerStreamImpl(serviceName: string): RPCServerStreamCall {
    return (method, requestData, decodeFn) => {
        const methodName = `/${serviceName}/${method.name}`;
        let h = this._streamingHandlers.get(methodName);
        if (h) {
            return h.emitter;
        }
        h = { emitter: new util.EventEmitter(), decodeFn, type: StreamType.Server };
        this._streamingHandlers.set(methodName, h);

        const reqBase64String = Buffer.from(requestData).toString('base64');
        this.grpcNative.makeServerStreamCall(methodName, reqBase64String).catch((err) => {
            h.emitter.emit('error', err);
            h.emitter.emit('end', true);
        });
        return h.emitter;
    };
}

// client, bidi examples comming soon

@dcodeIO
Copy link
Member

dcodeIO commented Sep 27, 2018

Looking good to me from a first glimpse, I like how straight forward this is and that it doesn't add too much code to the library :). Still, would love to have some opinions by people who'd like to use streaming.

@anjmao
Copy link
Author

anjmao commented Oct 1, 2018

If somebody is interested I created working example with React Native + SwiftGrpc streaming https://github.com/trackforce/react-native-grpc-starter using my fork Protobuf.js version.

@alexstrat
Copy link

alexstrat commented Nov 14, 2018

Adding my 2 cents here after bumping into the same questions reported in #784

@anjmao's suggested API is definitely an improvement.

To be familiar with existing interfaces and to match a probable underlying wire API, I'd rather see an API that is closer to nodejs's Stream — or even use directly Stream's interfaces (in object mode).

// server stream
serverStream = () => stream.Readable<ServerStreamMessage>

const stream = service.serverStream();
stream.on('data', (serverStreamMessage) => {..})
stream.on('end', () => {})


// bi-directional stream
bidiStream = () => stream.Duplex<ServerStreamMessage, ClientStreamMessage>

const stream = service.bidiStream();
stream.on('data', (serverStreamMessage) => {..})
stream.on('end', () => {})
stream.write({ message: 'foo'})
stream.end()

Alternatively, we could use APIs that are closer to web's streams but i know less these APIs.

export interface RPCServerStream<T> {
on(evt: 'recv', fn: (data: T) => any, ctx?: any): util.EventEmitter;
on(evt: 'error', fn: (data: any) => any, ctx?: any): util.EventEmitter;
on(evt: 'end', fn: (data: any) => any, ctx?: any): util.EventEmitter;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can a client end a server stream before the stream is actually ended by the server itself?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, client which sends end event is indicating to server that it don't want to continue.

@anjmao
Copy link
Author

anjmao commented Nov 27, 2018

@alexstrat Ideally it would be great to have Duplex, Readable, Writable interfaces from node.js, but the problem I see is that it would be hard to use it for browsers and mobile (react native, native script) since stream module works only on node.js (correct me if I'm wrong).

@cameronbraid
Copy link

cameronbraid commented Sep 2, 2019

I discovered this PR when looking for a soloution to implement a typescript async client for grpc

The protobuf.js generated api is really nice, and this PR adds stream support - which is fantastic.

I've built a prototype that uses your fork https://github.com/trackforce/protobuf.js#rpc-streaming and implements a $protobuf.RPCHandler using a grpc.Client directly. I've only implemented unaryCall and serverStreamCall but the other two should be trivial.

Here's how it looks :

import * as $protobuf from "protobufjs";
import { Client, ClientReadableStream } from "grpc";

interface StreamHandler {
    emitter: $protobuf.util.EventEmitter;
    decodeFn: (rsp: Uint8Array) => any;
    method: string;
}

export function grpcImpl(serviceName: string, client : Client) : $protobuf.RPCHandler {
  
  let _streamHandlers = new Map()
  function _removeStreamHandler(h: StreamHandler) {
    _streamHandlers.delete(h.method);
  }
  
  function _emitStreamError(h: StreamHandler, err: any) {
    h.emitter.emit('error', err);
  }
  function _emitStreamRecv(h: StreamHandler, data: any) {
    h.emitter.emit('recv', data);
  }
  
  function _emitStreamEnd(h: StreamHandler) {
    h.emitter.emit('end', true);
  }

  function _methodName(method : $protobuf.Method|$protobuf.rpc.ServiceMethod<$protobuf.Message<{}>, $protobuf.Message<{}>>) {
    return `/${serviceName}/${method.name}`;
  }
  
  return {
  
    unaryCall: (method: ($protobuf.Method|$protobuf.rpc.ServiceMethod<$protobuf.Message<{}>, $protobuf.Message<{}>>), requestData: Uint8Array, callback: $protobuf.RPCImplCallback) => {      
      client.makeUnaryRequest(_methodName(method), s=>Buffer.from(s), s=>s, requestData, null, null, callback)
    },
  
    serverStreamCall : (method: ($protobuf.Method|$protobuf.rpc.ServiceMethod<$protobuf.Message<{}>, $protobuf.Message<{}>>), requestData: Uint8Array, decodeFn: (responseData: Uint8Array) => protobuf.Message) => {
      
      const methodName = _methodName(method)

      let h = _streamHandlers.get(methodName);
      if (h) {
          return h.emitter;
      }
  
      h = { emitter: new $protobuf.util.EventEmitter(), decodeFn, method: methodName };
      _streamHandlers.set(methodName, h);
  
      let stream : ClientReadableStream<any> = client.makeServerStreamRequest(methodName, s=>Buffer.from(s), s=>s, requestData, null, null)
      stream.on("data", buffer=>{
        _emitStreamRecv(h,decodeFn(buffer))
      })
      stream.on("error", err=>{
        _emitStreamError(h, err);
        _removeStreamHandler(h);
      })
      stream.on("end", data=>{
        _emitStreamEnd(h);
        _removeStreamHandler(h);
      })
  
      h.emitter.on('close', () => {
        stream.cancel()
      });
  
      return h.emitter;
    },
  
    clientStreamCall: (method: ($protobuf.Method|$protobuf.rpc.ServiceMethod<$protobuf.Message<{}>, $protobuf.Message<{}>>), encodeFn: (requestData: any) => Uint8Array, decodeFn: (responseData: Uint8Array) => $protobuf.Message) => {
      if (true) throw Error("NOT YET IMPLEMNETED")
    },
  
    bidiStreamCall: (method: ($protobuf.Method|$protobuf.rpc.ServiceMethod<$protobuf.Message<{}>, $protobuf.Message<{}>>), encodeFn: (requestData: any) => Uint8Array, decodeFn: (responseData: Uint8Array) => $protobuf.Message) => {
      if (true) throw Error("NOT YET IMPLEMNETED")
    },
    
  }
}
  

Its pretty straight forward since its based on https://github.com/trackforce/react-native-grpc-starter/blob/master/mobile/src/services/grpc/grpcCore.ts

To use you can do the following :

async function main() {
  const client = new grpc.Client("localhost:50051", grpc.credentials.createInsecure()) 
  const serviceImpl = demo.Demo.create(grpcImpl("demo.Demo", client) as any);
  console.log("received", await serviceImpl.ping(demo.PingRequest.create({})))

  let resStream : $protobuf.RPCServerStream<demo.StartStreamMessage> = serviceImpl.startStream(demo.Empty.create())
  resStream.on("recv", (msg:demo.StreamMessage) => {
    console.log("received stream", msg)
  })
  resStream.on("end", () => {
    console.log("stream ended")
  })
  resStream.on("error", err =>{
    console.log("stream errored", err)
  })  

}
main()

Is there any chance of this PR and an addition like the above being accepted into protobuf.js ?

@cameronbraid
Copy link

cameronbraid commented Sep 2, 2019

One issue I found with the PR is the typescript signiture of rpc.Service.create, and constructor needs to be changed to accept rpcImpl: $protobuf.RPCImpl | $protobuf.RPCHandler

@cameronbraid
Copy link

Or another option could be to change RPCImpl as per : type RPCImpl = RPCUnaryCall | RPCHandler;

@anjmao
Copy link
Author

anjmao commented Sep 2, 2019

@cameronbraid Are you using nodejs grpc client? In such case I would rather stick to it since it already generates streaming method stubs for you :)

@awildeep
Copy link

awildeep commented Oct 9, 2019

Is there any traction on this?

@kincaidoneil
Copy link

We've been using this PR to add types to @grpc/grpc-js and it's worked pretty well, save for some minor issues (thanks for your work @anjmao!). I'd echo some of the comments @cameronbraid made: we had to change RpcImpl to $protobuf.RPCImpl | $protobuf.RPCHandler, and had manually change some other types to make them compatible with grpc-js.

For reference, here's some links to our code:

@xuio
Copy link

xuio commented Feb 25, 2021

@anjmao @dcodeIO any update on this? What would it take to get this merged? I am willing to help.

@achingbrain
Copy link

It would be great if this could get merged. I've had to roll my own version of this for https://github.com/ipfs/js-ipfs but I'd much rather support was in protobuf.js for this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants