Skip to content

Commit

Permalink
Prepare for development of new http client API.
Browse files Browse the repository at this point in the history
Copy the old client to the new “test_client” package and update affected unit-tests. There is no functional change to the existing API or interfaces.
  • Loading branch information
jeremiahmao authored and Jeremiah Mao committed Jan 12, 2022
1 parent 9aed1a6 commit 78e94e9
Show file tree
Hide file tree
Showing 13 changed files with 705 additions and 60 deletions.
27 changes: 27 additions & 0 deletions tensorflow_serving/util/net_http/client/test_client/internal/BUILD
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Description: a lightweight http client

package(
default_visibility = [
"//tensorflow_serving/util/net_http:__subpackages__",
],
)

licenses(["notice"])

cc_library(
name = "evhttp_client",
srcs = [
"evhttp_connection.cc",
],
hdrs = [
"evhttp_connection.h",
],
deps = [
"//tensorflow_serving/util/net_http/client/test_client/public:http_client_api",
"//tensorflow_serving/util/net_http/internal:net_logging",
"//tensorflow_serving/util/net_http/server/public:http_server_api",
"@com_github_libevent_libevent//:libevent",
"@com_google_absl//absl/strings",
"@com_google_absl//absl/synchronization",
],
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
The client library is still under development, and has yet to be finalized.

It should be primarily used for writing tests for users of the
ServerRequestInterface and HTTPServerInterface APIs to verify basic
functionality, and the current state should be considered experimental.
Original file line number Diff line number Diff line change
@@ -0,0 +1,256 @@
/* Copyright 2018 Google Inc. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
==============================================================================*/

// libevent based client implementation

#include "tensorflow_serving/util/net_http/client/test_client/internal/evhttp_connection.h"

#include "absl/strings/str_cat.h"
#include "tensorflow_serving/util/net_http/internal/net_logging.h"
#include "tensorflow_serving/util/net_http/server/public/response_code_enum.h"

namespace tensorflow {
namespace serving {
namespace net_http {

TestEvHTTPConnection::~TestEvHTTPConnection() {
if (evcon_ != nullptr) {
evhttp_connection_free(evcon_);
}
if (http_uri_ != nullptr) {
evhttp_uri_free(http_uri_);
}

event_base_free(ev_base_);
}

// This needs be called with any async SendRequest()
void TestEvHTTPConnection::Terminate() {
event_base_loopexit(ev_base_, nullptr);
if (loop_exit_ != nullptr) {
loop_exit_->WaitForNotification();
}
}

std::unique_ptr<TestEvHTTPConnection> TestEvHTTPConnection::Connect(
absl::string_view url) {
std::string url_str(url.data(), url.size());
struct evhttp_uri* http_uri = evhttp_uri_parse(url_str.c_str());
if (http_uri == nullptr) {
NET_LOG(ERROR, "Failed to connect : event_base_new()");
return nullptr;
}

const char* host = evhttp_uri_get_host(http_uri);
if (host == nullptr) {
NET_LOG(ERROR, "url must have a host %.*s", static_cast<int>(url.size()),
url.data());
return nullptr;
}

int port = evhttp_uri_get_port(http_uri);
if (port == -1) {
port = 80;
}

auto result = Connect(host, port);
evhttp_uri_free(http_uri);

return result;
}

std::unique_ptr<TestEvHTTPConnection> TestEvHTTPConnection::Connect(
absl::string_view host, int port) {
std::unique_ptr<TestEvHTTPConnection> result(new TestEvHTTPConnection());

result->ev_base_ = event_base_new();
if (result->ev_base_ == nullptr) {
NET_LOG(ERROR, "Failed to connect : event_base_new()");
return nullptr;
}

// blocking call (DNS resolution)
std::string host_str(host.data(), host.size());
result->evcon_ = evhttp_connection_base_bufferevent_new(
result->ev_base_, nullptr, nullptr, host_str.c_str(),
static_cast<uint16_t>(port));
if (result->evcon_ == nullptr) {
NET_LOG(ERROR,
"Failed to connect : evhttp_connection_base_bufferevent_new()");
return nullptr;
}

evhttp_connection_set_retries(result->evcon_, 0);

// TODO(wenboz): make this an option (default to 5s)
evhttp_connection_set_timeout(result->evcon_, 5);

return result;
}

namespace {

// Copy ev response data to ClientResponse.
void PopulateResponse(evhttp_request* req, TestClientResponse* response) {
response->status =
static_cast<HTTPStatusCode>(evhttp_request_get_response_code(req));

struct evkeyvalq* headers = evhttp_request_get_input_headers(req);
struct evkeyval* header;
for (header = headers->tqh_first; header; header = header->next.tqe_next) {
response->headers.emplace_back(header->key, header->value);
}

char buffer[1024];
int nread;

while ((nread = evbuffer_remove(evhttp_request_get_input_buffer(req), buffer,
sizeof(buffer))) > 0) {
absl::StrAppend(&response->body,
absl::string_view(buffer, static_cast<size_t>(nread)));
}
}

evhttp_cmd_type GetMethodEnum(absl::string_view method, bool with_body) {
if (method.compare("GET") == 0) {
return EVHTTP_REQ_GET;
} else if (method.compare("POST") == 0) {
return EVHTTP_REQ_POST;
} else if (method.compare("HEAD") == 0) {
return EVHTTP_REQ_HEAD;
} else if (method.compare("PUT") == 0) {
return EVHTTP_REQ_PUT;
} else if (method.compare("DELETE") == 0) {
return EVHTTP_REQ_DELETE;
} else if (method.compare("OPTIONS") == 0) {
return EVHTTP_REQ_OPTIONS;
} else if (method.compare("TRACE") == 0) {
return EVHTTP_REQ_TRACE;
} else if (method.compare("CONNECT") == 0) {
return EVHTTP_REQ_CONNECT;
} else if (method.compare("PATCH") == 0) {
return EVHTTP_REQ_PATCH;
} else {
if (with_body) {
return EVHTTP_REQ_POST;
} else {
return EVHTTP_REQ_GET;
}
}
}

void ResponseDone(evhttp_request* req, void* ctx) {
TestClientResponse* response = reinterpret_cast<TestClientResponse*>(ctx);

if (req == nullptr) {
// TODO(wenboz): make this a util and check safety
int errcode = EVUTIL_SOCKET_ERROR();
NET_LOG(ERROR, "socket error = %s (%d)",
evutil_socket_error_to_string(errcode), errcode);
return;
}

PopulateResponse(req, response);

if (response->done != nullptr) {
response->done();
}
}

// Returns false if there is any error.
bool GenerateEvRequest(evhttp_connection* evcon, const TestClientRequest& request,
TestClientResponse* response) {
evhttp_request* evreq = evhttp_request_new(ResponseDone, response);
if (evreq == nullptr) {
NET_LOG(ERROR, "Failed to send request : evhttp_request_new()");
return false;
}

evkeyvalq* output_headers = evhttp_request_get_output_headers(evreq);
for (auto header : request.headers) {
std::string key(header.first.data(), header.first.size());
std::string value(header.second.data(), header.second.size());
evhttp_add_header(output_headers, key.c_str(), value.c_str());
}

evhttp_add_header(output_headers, "Connection", "close");

if (!request.body.empty()) {
evbuffer* output_buffer = evhttp_request_get_output_buffer(evreq);

std::string body(request.body.data(), request.body.size());
evbuffer_add(output_buffer, body.c_str(), request.body.size());

char length_header[16];
evutil_snprintf(length_header, sizeof(length_header) - 1, "%lu",
request.body.size());
evhttp_add_header(output_headers, "Content-Length", length_header);
}

std::string uri(request.uri_path.data(), request.uri_path.size());
int r = evhttp_make_request(
evcon, evreq, GetMethodEnum(request.method, !request.body.empty()),
uri.c_str());
if (r != 0) {
NET_LOG(ERROR, "evhttp_make_request() failed");
return false;
}

return true;
}

} // namespace

// Sends the request and has the connection closed
bool TestEvHTTPConnection::BlockingSendRequest(const TestClientRequest& request,
TestClientResponse* response) {
if (!GenerateEvRequest(evcon_, request, response)) {
NET_LOG(ERROR, "Failed to generate the ev_request");
return false;
}

// inline loop blocking
event_base_dispatch(ev_base_);
return true;
}

bool TestEvHTTPConnection::SendRequest(const TestClientRequest& request,
TestClientResponse* response) {
if (this->executor_ == nullptr) {
NET_LOG(ERROR, "EventExecutor is not configured.");
return false;
}

if (!GenerateEvRequest(evcon_, request, response)) {
NET_LOG(ERROR, "Failed to generate the ev_request");
return false;
}

executor_->Schedule([this]() {
loop_exit_.reset(new absl::Notification());
event_base_dispatch(ev_base_);
loop_exit_->Notify();
});

return true;
}

void TestEvHTTPConnection::SetExecutor(std::unique_ptr<EventExecutor> executor) {
this->executor_ = std::move(executor);
}

} // namespace net_http
} // namespace serving
} // namespace tensorflow
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/* Copyright 2018 Google Inc. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
==============================================================================*/

#ifndef TENSORFLOW_SERVING_UTIL_NET_HTTP_CLIENT_TEST_CLIENT_INTERNAL_EVHTTP_CONNECTION_H_
#define TENSORFLOW_SERVING_UTIL_NET_HTTP_CLIENT_TEST_CLIENT_INTERNAL_EVHTTP_CONNECTION_H_

#include <functional>
#include <memory>
#include <string>
#include <utility>
#include <vector>

#include "absl/strings/string_view.h"
#include "absl/synchronization/notification.h"

#include "libevent/include/event2/buffer.h"
#include "libevent/include/event2/bufferevent.h"
#include "libevent/include/event2/event.h"
#include "libevent/include/event2/http.h"
#include "libevent/include/event2/keyvalq_struct.h"
#include "libevent/include/event2/util.h"

// TODO(wenboz): move EventExecutor to net_http/common
#include "tensorflow_serving/util/net_http/client/test_client/public/httpclient_interface.h"
#include "tensorflow_serving/util/net_http/server/public/httpserver_interface.h"

namespace tensorflow {
namespace serving {
namespace net_http {

// The following types may be moved to an API interface in future.

class TestEvHTTPConnection final : public TestHTTPClientInterface {
public:
TestEvHTTPConnection() = default;

~TestEvHTTPConnection() override;

TestEvHTTPConnection(const TestEvHTTPConnection& other) = delete;
TestEvHTTPConnection& operator=(const TestEvHTTPConnection& other) = delete;

// Terminates the connection.
void Terminate() override;

// Returns a new connection given an absolute URL.
// Always treat the URL scheme as "http" for now.
// Returns nullptr if any error
static std::unique_ptr<TestEvHTTPConnection> Connect(absl::string_view url);

// Returns a new connection to the specified host:port.
// Returns nullptr if any error
static std::unique_ptr<TestEvHTTPConnection> Connect(absl::string_view host,
int port);

// Returns a new connection to the specified port of localhost.
// Returns nullptr if any error
static std::unique_ptr<TestEvHTTPConnection> ConnectLocal(int port) {
return Connect("localhost", port);
}

// Sends a request and blocks the caller till a response is received
// or any error has happened.
// Returns false if any error.
bool BlockingSendRequest(const TestClientRequest& request,
TestClientResponse* response) override;

// Sends a request and returns immediately. The response will be handled
// asynchronously via the response->done callback.
// Returns false if any error in sending the request, or if the executor
// has not been configured.
bool SendRequest(const TestClientRequest& request,
TestClientResponse* response) override;

// Sets the executor for processing requests asynchronously.
void SetExecutor(std::unique_ptr<EventExecutor> executor) override;

private:
struct event_base* ev_base_;
struct evhttp_uri* http_uri_;
struct evhttp_connection* evcon_;

std::unique_ptr<EventExecutor> executor_;

std::unique_ptr<absl::Notification> loop_exit_;
};

} // namespace net_http
} // namespace serving
} // namespace tensorflow

#endif // TENSORFLOW_SERVING_UTIL_NET_HTTP_CLIENT_TEST_CLIENT_INTERNAL_EVHTTP_CONNECTION_H_
Loading

0 comments on commit 78e94e9

Please sign in to comment.