Skip to content

Commit

Permalink
Merge pull request #34 from cyshi/master
Browse files Browse the repository at this point in the history
add interface to posting protobuf data
  • Loading branch information
qinzuoyan committed Nov 13, 2015
2 parents cd10b38 + c70ef1b commit a8e294f
Show file tree
Hide file tree
Showing 8 changed files with 168 additions and 79 deletions.
50 changes: 50 additions & 0 deletions python/sample/client_http_protobuf.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# -*- coding:UTF-8 -*-

# Copyright (c) 2014 Baidu.com, Inc. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

# This is a sample code to show how to use python client of sofa-pbrpc.
#
# Preconditions:
# * The protobuf python lib has been installed.
# * The sofa-pbrpc python lib has been installed.
# * The server in ../../sample/echo has been started.
#
# For more, please refer to `./README'.

from sofa.pbrpc import client
import echo_service_pb2
import sys
import urllib2

# Prepare post data
echo_request = echo_service_pb2.EchoRequest()
echo_request.message = 'Hello World'
send_data = echo_request.SerializeToString()

# Prepare http request
url = 'http://localhost:12321/sofa.pbrpc.test.EchoServer.Echo'
accept = 'application/protobuf'
headers = { 'Accept' : accept }
request = urllib2.Request(url, send_data, headers)

# Send request
try:
response = urllib2.urlopen(request)
except Exception as e:
print "ERROR: Send fail: %s" % e.reason
sys.exit(1)

# Read http body
recv_data = response.read()

# check failure
if response.info().getheader('Content-Type') != accept:
print "ERROR: %s" % recv_data
sys.exit(1)

# print response
echo_response = echo_service_pb2.EchoResponse()
echo_response.ParseFromString(recv_data)
print "Response:\n%s" % echo_response.message
152 changes: 84 additions & 68 deletions src/sofa/pbrpc/http_rpc_request.cc
Original file line number Diff line number Diff line change
Expand Up @@ -123,37 +123,6 @@ void HTTPRpcRequest::ProcessRequest(
}
}

std::string json_str;
if (_type == POST)
{
json_str = _req_body->ToString();
}
else
{
json_str = _query_params["request"];
}
if (json_str.empty())
{
// if null json str, set as null object
json_str = "{}";
}

std::string err;
_req_json = ParseJson(json_str.c_str(), err);
if (_req_json == NULL)
{
#if defined( LOG )
LOG(ERROR) << "ProcessRequest(): " << RpcEndpointToString(_remote_endpoint)
<< ": {" << SequenceId() << "}: parse json failed: " << err;
#else
SLOG(ERROR, "ProcessRequest(): %s: {%lu}: parse json failed: %s",
RpcEndpointToString(_remote_endpoint).c_str(), SequenceId(), err.c_str());
#endif
SendFailedResponse(server_stream,
RPC_ERROR_PARSE_REQUEST_MESSAGE, "parse json failed: " + err);
return;
}

MethodBoard* method_board = FindMethodBoard(service_pool, service_name, method_name);
if (method_board == NULL)
{
Expand All @@ -173,18 +142,57 @@ void HTTPRpcRequest::ProcessRequest(
const google::protobuf::MethodDescriptor* method_desc = method_board->Descriptor();

google::protobuf::Message* request = service->GetRequestPrototype(method_desc).New();
if (jsonobject2pb(_req_json, request, err) < 0)
if (_type == POST_PB)
{
bool parse_request_return = request->ParseFromZeroCopyStream(_req_body.get());
if (!parse_request_return)
{
#if defined( LOG )
LOG(ERROR) << "ProcessRequest(): " << RpcEndpointToString(_remote_endpoint)
<< ": {" << SequenceId() << "}: parse json to pb failed: " << err;
LOG(ERROR) << "ProcessRequest(): " << RpcEndpointToString(_remote_endpoint)
<< ": {" << SequenceId() << "}: parse pb body failed";
#else
SLOG(ERROR, "ProcessRequest(): %s: {%lu}: parse json to pb failed: %s",
RpcEndpointToString(_remote_endpoint).c_str(), SequenceId(), err.c_str());
SLOG(ERROR, "ProcessRequest(): %s: {%lu}: parse pb body failed",
RpcEndpointToString(_remote_endpoint).c_str(), SequenceId());
#endif
SendFailedResponse(server_stream,
RPC_ERROR_PARSE_REQUEST_MESSAGE, "parse json to pb failed: " + err);
return;
SendFailedResponse(server_stream,
RPC_ERROR_PARSE_REQUEST_MESSAGE, "parse pb body failed");
delete request;
return;
}
}
else
{
std::string json_str;
if (_type == POST)
{
json_str = _req_body->ToString();
}
else
{
json_str = _query_params["request"];
}
if (json_str.empty())
{
// if null json str, set as null object
json_str = "{}";
}

std::string err;
_req_json = ParseJson(json_str.c_str(), err);
if (_req_json == NULL || jsonobject2pb(_req_json, request, err) < 0)
{
#if defined( LOG )
LOG(ERROR) << "ProcessRequest(): " << RpcEndpointToString(_remote_endpoint)
<< ": {" << SequenceId() << "}: parse json failed: " << err;
#else
SLOG(ERROR, "ProcessRequest(): %s: {%lu}: parse json failed: %s",
RpcEndpointToString(_remote_endpoint).c_str(), SequenceId(), err.c_str());
#endif
SendFailedResponse(server_stream,
RPC_ERROR_PARSE_REQUEST_MESSAGE, "parse json failed: " + err);
delete request;
return;
}
}

google::protobuf::Message* response = service->GetResponsePrototype(method_desc).New();
Expand All @@ -208,14 +216,24 @@ ReadBufferPtr HTTPRpcRequest::AssembleSucceedResponse(
const google::protobuf::Message* response,
std::string& err)
{
std::string json_str;
pb2json(response, json_str);

WriteBuffer write_buffer;
if (!RenderJsonResponse(&write_buffer, json_str))
if (_type == POST_PB)
{
err = "render json response failed";
return ReadBufferPtr();
if (!RenderResponse(&write_buffer, PROTOBUF, response->SerializeAsString()))
{
err = "render protobuf response failed";
return ReadBufferPtr();
}
}
else
{
std::string json_str;
pb2json(response, json_str);
if (!RenderResponse(&write_buffer, JSON, json_str))
{
err = "render json response failed";
return ReadBufferPtr();
}
}

ReadBufferPtr read_buffer(new ReadBuffer());
Expand All @@ -233,7 +251,7 @@ ReadBufferPtr HTTPRpcRequest::AssembleFailedResponse(
<< StringUtils::replace_all(reason, "\"", "\\\"") << "\"";

WriteBuffer write_buffer;
if (!RenderJsonResponse(&write_buffer, oss.str()))
if (!RenderResponse(&write_buffer, JSON, oss.str()))
{
err = "render json response failed";
return ReadBufferPtr();
Expand Down Expand Up @@ -385,7 +403,7 @@ void HTTPRpcRequest::SendPage(
const std::string& page)
{
WriteBuffer write_buffer;
if (!RenderHtmlResponse(&write_buffer, page))
if (!RenderResponse(&write_buffer, HTML, page))
{
#if defined( LOG )
LOG(ERROR) << "SendPage(): " << RpcEndpointToString(_remote_endpoint)
Expand Down Expand Up @@ -415,35 +433,33 @@ void HTTPRpcRequest::SendError(
SendPage(server_stream, oss.str());
}

bool HTTPRpcRequest::RenderJsonResponse(
bool HTTPRpcRequest::RenderResponse(
google::protobuf::io::ZeroCopyOutputStream* output,
const std::string& json)
const RenderType type,
const std::string& body)
{
std::ostringstream oss;
oss << json.size();
oss << body.size();
google::protobuf::io::Printer printer(output, '$');
printer.Print("HTTP/1.1 200 OK\r\n");
printer.Print("Content-Type: application/json\r\n");
printer.Print("Access-Control-Allow-Origin: *\r\n");
printer.Print("Content-Length: $LENGTH$\r\n", "LENGTH", oss.str());
printer.Print("\r\n");
printer.PrintRaw(json);
return !printer.failed();
}

bool HTTPRpcRequest::RenderHtmlResponse(
google::protobuf::io::ZeroCopyOutputStream* output,
const std::string& html)
{
std::ostringstream oss;
oss << html.size();
google::protobuf::io::Printer printer(output, '$');
printer.Print("HTTP/1.1 200 OK\r\n");
printer.Print("Content-Type: text/html; charset=UTF-8\r\n");
switch (type)
{
case JSON:
printer.Print("Content-Type: application/json\r\n");
break;
case PROTOBUF:
printer.Print("Content-Type: application/protobuf\r\n");
break;
case HTML:
printer.Print("Content-Type: text/html; charset=UTF-8\r\n");
break;
default:
break;
}
printer.Print("Access-Control-Allow-Origin: *\r\n");
printer.Print("Content-Length: $LENGTH$\r\n", "LENGTH", oss.str());
printer.Print("\r\n");
printer.PrintRaw(html);
printer.PrintRaw(body);
return !printer.failed();
}

Expand Down
20 changes: 13 additions & 7 deletions src/sofa/pbrpc/http_rpc_request.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,14 @@ class HTTPRpcRequest : public RpcRequest
const std::string& reason,
std::string& err);

private:
enum RenderType
{
JSON = 1,
PROTOBUF = 2,
HTML = 3
};

private:
// Parse http path.
// @return false if parse failed.
Expand All @@ -64,13 +72,10 @@ class HTTPRpcRequest : public RpcRequest
const RpcServerStreamWPtr& server_stream,
const std::string& error);

static bool RenderJsonResponse(
google::protobuf::io::ZeroCopyOutputStream* output,
const std::string& json);

static bool RenderHtmlResponse(
static bool RenderResponse(
google::protobuf::io::ZeroCopyOutputStream* output,
const std::string& html);
const RenderType type,
const std::string& body);

static rapidjson::Document* ParseJson(
const char* str,
Expand Down Expand Up @@ -106,7 +111,8 @@ class HTTPRpcRequest : public RpcRequest
enum Type
{
GET = 0,
POST = 1
POST = 1,
POST_PB = 2
};
Type _type;
std::string _path;
Expand Down
16 changes: 15 additions & 1 deletion src/sofa/pbrpc/http_rpc_request_parser.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@
namespace sofa {
namespace pbrpc {

const std::string HTTPRpcRequestParser::CONTENT_LENGTH = "Content-Length";
const std::string HTTPRpcRequestParser::ACCEPT = "Accept";
const std::string HTTPRpcRequestParser::ACCEPT_PROTOBUF = "application/protobuf";

HTTPRpcRequestParser::HTTPRpcRequestParser() :
_state(PS_METHOD),
_content_length(0),
Expand Down Expand Up @@ -199,7 +203,7 @@ int HTTPRpcRequestParser::ParseInternal(char c, std::string& err)
if (c == '\n')
{
std::map<std::string, std::string>::const_iterator it =
_req->_headers.find("Content-Length");
_req->_headers.find(CONTENT_LENGTH);
if (it != _req->_headers.end())
{
char* endptr = NULL;
Expand All @@ -210,6 +214,16 @@ int HTTPRpcRequestParser::ParseInternal(char c, std::string& err)
return -1;
}
}

it = _req->_headers.find(ACCEPT);
if (it != _req->_headers.end())
{
if (it->second == ACCEPT_PROTOBUF)
{
_req->_type = HTTPRpcRequest::POST_PB;
}
}

_state = PS_BODY;
return 1;
}
Expand Down
3 changes: 3 additions & 0 deletions src/sofa/pbrpc/http_rpc_request_parser.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ class HTTPRpcRequestParser : public RpcRequestParser
std::string _header_value; // currrent parsing header value
int64 _content_length; // body content length
HTTPRpcRequestPtr _req;
static const std::string CONTENT_LENGTH;
static const std::string ACCEPT;
static const std::string ACCEPT_PROTOBUF;
}; // class HTTPRpcRequestParser

} // namespace pbrpc
Expand Down
2 changes: 1 addition & 1 deletion test/perf_test/test_delay.sh
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#!/bin/sh
#!/bin/bash

#############################################
HOST=$HOSTNAME
Expand Down
2 changes: 1 addition & 1 deletion test/perf_test/test_multi_server.sh
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#!/bin/sh
#!/bin/bash

#############################################
HOST=$HOSTNAME
Expand Down
2 changes: 1 addition & 1 deletion test/perf_test/test_qps.sh
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#!/bin/sh
#!/bin/bash

#############################################
HOST=$HOSTNAME
Expand Down

0 comments on commit a8e294f

Please sign in to comment.