Skip to content

Commit

Permalink
Websocket support using mod_websocket.c
Browse files Browse the repository at this point in the history
  • Loading branch information
Thalhammer committed Oct 18, 2017
1 parent de2dda1 commit c5e2136
Show file tree
Hide file tree
Showing 8 changed files with 425 additions and 39 deletions.
78 changes: 78 additions & 0 deletions .vscode/c_cpp_properties.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
{
"configurations": [
{
"name": "Mac",
"includePath": [
"/usr/include",
"/usr/local/include",
"${workspaceRoot}"
],
"defines": [],
"intelliSenseMode": "clang-x64",
"browse": {
"path": [
"/usr/include",
"/usr/local/include",
"${workspaceRoot}"
],
"limitSymbolsToIncludedHeaders": true,
"databaseFilename": ""
},
"macFrameworkPath": [
"/System/Library/Frameworks",
"/Library/Frameworks"
]
},
{
"name": "Linux",
"includePath": [
"/usr/include/x86_64-linux-gnu/c++/5",
"/usr/include/c++/5",
"/usr/local/include",
"/usr/lib/clang/3.8.0/include",
"/usr/include/x86_64-linux-gnu",
"/usr/include",
"${workspaceRoot}",
"${workspaceRoot}/../apache-websocket/",
"/usr/include/apache2",
"/usr/include/apr-1.0"
],
"defines": [],
"intelliSenseMode": "clang-x64",
"browse": {
"path": [
"/usr/include/x86_64-linux-gnu/c++/5",
"/usr/include/c++/5",
"/usr/local/include",
"/usr/lib/clang/3.8.0/include",
"/usr/include/x86_64-linux-gnu",
"/usr/include",
"${workspaceRoot}"
],
"limitSymbolsToIncludedHeaders": true,
"databaseFilename": ""
}
},
{
"name": "Win32",
"includePath": [
"C:/Program Files (x86)/Microsoft Visual Studio 14.0/VC/include",
"${workspaceRoot}"
],
"defines": [
"_DEBUG",
"UNICODE"
],
"intelliSenseMode": "msvc-x64",
"browse": {
"path": [
"C:/Program Files (x86)/Microsoft Visual Studio 14.0/VC/include/*",
"${workspaceRoot}"
],
"limitSymbolsToIncludedHeaders": true,
"databaseFilename": ""
}
}
],
"version": 3
}
5 changes: 5 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{
"files.associations": {
"thread": "cpp"
}
}
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ OBJ = $(FSRC:=.o)

DEP_DIR = .deps

FLAGS = -fPIC -DPIC -Wall -Wno-unknown-pragmas -fstack-protector-strong -flto `pkg-config --cflags apr-1` -I`apxs -q INCLUDEDIR`
FLAGS = -fPIC -DPIC -Wall -Wno-unknown-pragmas -fstack-protector-strong -flto `pkg-config --cflags apr-1` -I`apxs -q INCLUDEDIR` -I ../apache-websocket/
CXXFLAGS = -std=c++14
CFLAGS =
LINKFLAGS = -lprotobuf -lgrpc++
Expand Down
163 changes: 137 additions & 26 deletions handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,6 @@ size_t read_body(Func func, request_rec* r) {
return total_size;
}

#define PTABLE(s, table) print_table(s, #table, table)
#define PSTR(str) ap_log_error(APLOG_MARK, APLOG_INFO, 0, r->server, "%s = %s", #str, str);

void print_table(server_rec* s, const char* name, apr_table_t* table) {
auto *fields = apr_table_elts(table);
auto *e = (apr_table_entry_t *)fields->elts;
Expand All @@ -94,10 +91,8 @@ class grpc_connection_provider {
std::map<std::string, con_entry> channels;
public:
grpc_connection_provider() {
printf("ConProvider create\n");
}
~grpc_connection_provider() {
printf("ConProvider destroy\n");
}

std::unique_ptr<::thalhammer::http::Handler::Stub> getStub(const char* host, int64_t timeout) {
Expand Down Expand Up @@ -137,7 +132,12 @@ class grpc_connection_provider {

static grpc_connection_provider con_provider;

int handle_request(request_rec* r, const grpcbackend_config_t* config) {
http_handler::http_handler(request_rec* r) {
this->r = r;
this->config = static_cast<grpcbackend_config_t*>(ap_get_module_config(r->per_dir_config, &grpcbackend_module));
}

int http_handler::handle_request() {
if(config->host == nullptr)
return HTTP_INTERNAL_SERVER_ERROR;
auto stub = con_provider.getStub(config->host, config->connect_timeout_ms);
Expand Down Expand Up @@ -183,26 +183,6 @@ int handle_request(request_rec* r, const grpcbackend_config_t* config) {
}
}

/*
// Debug helpers
PSTR(r->uri);
PSTR(r->unparsed_uri);
PSTR(r->parsed_uri.scheme);
PSTR(r->parsed_uri.hostinfo);
PSTR(r->parsed_uri.user);
PSTR(r->parsed_uri.password);
PSTR(r->parsed_uri.hostname);
PSTR(r->parsed_uri.port_str);
PSTR(r->parsed_uri.path);
PSTR(r->parsed_uri.query);
PSTR(r->parsed_uri.fragment);
PSTR(r->filename);
PSTR(r->canonical_filename);
PSTR(r->path_info);
PSTR(r->args);
PSTR(r->hostname);
*/

bool failed = false;
read_body([&stream, &failed](const char* data, size_t len){
::thalhammer::http::HandleRequest req;
Expand Down Expand Up @@ -254,3 +234,134 @@ int handle_request(request_rec* r, const grpcbackend_config_t* config) {

return DONE;
}

void websocket_handler::send(int type, const uint8_t* buffer, size_t buffer_size)
{
_server->send(_server, type, buffer, buffer_size);
}

websocket_handler::websocket_handler(const WebSocketServer* server)
: _server(server)
{
auto* r = server->request(server);
auto* config = static_cast<grpcbackend_config_t*>(ap_get_module_config(r->per_dir_config, &grpcbackend_module));
if(!config->host)
throw std::runtime_error("Missing grpc host");
_stub = con_provider.getStub(config->host, config->connect_timeout_ms);

if(!_stub) {
throw std::runtime_error("GRPC Backend timeout");
}

_stream = _stub->HandleWebSocket(&_call_context);

{
::thalhammer::http::HandleWebSocketRequest req;
auto* client = req.mutable_request()->mutable_client();
auto* con = r->connection;
client->set_local_port(con->local_addr->port);
client->set_local_ip(apr_addr_to_string(con->local_addr));
client->set_remote_port(con->client_addr->port);
client->set_remote_ip(apr_addr_to_string(con->client_addr));
client->set_encrypted(!strcmp(ap_http_scheme(r), "https"));
auto* request = req.mutable_request();
request->set_method(r->method);
request->set_protocol(r->protocol);
request->set_resource(r->unparsed_uri);

auto *fields = apr_table_elts(r->headers_in);
auto *e = (apr_table_entry_t *)fields->elts;
for(int i = 0; i < fields->nelts; i++) {
auto* header = request->add_headers();
header->set_key(e[i].key);
header->set_value(e[i].val);
}

if(!_stream->Write(req)) {
con_provider.reset_cache(config->host);
throw std::runtime_error("Failed to write initial grpc request");
}
}

{
::thalhammer::http::HandleWebSocketResponse resp;
if(!_stream->Read(&resp)) {
con_provider.reset_cache(config->host);
throw std::runtime_error("Failed to read initial grpc response");
} else {
for(auto& header : resp.response().headers()) {
apr_table_setn(r->headers_out, apr_pstrdup(r->pool, header.key().c_str()), apr_pstrdup(r->pool, header.value().c_str()));
std::string key = header.key();
std::transform(key.begin(), key.end(), key.begin(), ::tolower);
}
}
}

_recv_thread = std::thread([this,r](){
try {
::thalhammer::http::HandleWebSocketResponse resp;
while(!_recv_shutdown && _stream->Read(&resp)) {
auto& msg = resp.message();
int mtype = MESSAGE_TYPE_INVALID;
switch(msg.type()) {
case ::thalhammer::http::WebSocketMessage::TEXT:
mtype = MESSAGE_TYPE_TEXT; break;
case ::thalhammer::http::WebSocketMessage::BINARY:
mtype = MESSAGE_TYPE_BINARY; break;
case ::thalhammer::http::WebSocketMessage::CLOSE:
_server->close(_server);
_recv_shutdown = true;
break;
default:
break;
}
if(mtype != MESSAGE_TYPE_INVALID) {
auto& content = msg.content();
this->send(mtype, (const uint8_t*)content.data(), content.size());
}
}
} catch(...) {
ap_log_error(APLOG_MARK, APLOG_ERR, 0, r->server, "Exception in read thread");
}
});
}

websocket_handler::~websocket_handler()
{
}

void websocket_handler::on_message(int type, const uint8_t* buffer, size_t buffer_size)
{
::thalhammer::http::HandleWebSocketRequest req;
auto* msg = req.mutable_message();
switch(type) {
case MESSAGE_TYPE_TEXT:
msg->set_type(::thalhammer::http::WebSocketMessage::TEXT); break;
case MESSAGE_TYPE_BINARY:
msg->set_type(::thalhammer::http::WebSocketMessage::BINARY); break;
}
msg->set_content((const char*)buffer, buffer_size);

if(!_stream->Write(req))
{
ap_log_error(APLOG_MARK, APLOG_ERR, 0, _server->request(_server)->server, "Failed to send websocket message to backend");
}
}

void websocket_handler::on_disconnect()
{
::thalhammer::http::HandleWebSocketRequest req;
auto* msg = req.mutable_message();
msg->set_type(::thalhammer::http::WebSocketMessage::CLOSE);
msg->set_content("");
_stream->WriteLast(req, ::grpc::WriteOptions());

_recv_shutdown = true;
::grpc::Status s = _stream->Finish();
if(!s.ok()) {
ap_log_error(APLOG_MARK, APLOG_ERR, 0, _server->request(_server)->server, "Failed to execute rpc: %s", s.error_message().c_str());
}
if(_recv_thread.joinable())
_recv_thread.join();
}

52 changes: 51 additions & 1 deletion handler.h
Original file line number Diff line number Diff line change
@@ -1,9 +1,59 @@
#pragma once
#include "handler.grpc.pb.h"
extern "C" {
#include "httpd.h"
#include "http_log.h"
#include "http_protocol.h"
#include "apr_strings.h"
}
#include "config.h"
extern int handle_request(request_rec* r, const grpcbackend_config_t* config);
#include "websocket_plugin.h"
#include <thread>
#include <atomic>
template<typename T>
class pool_class {
static apr_status_t cleanup(void* ptr) {
if(ptr != nullptr) {
T* instance = (T*)ptr;
instance->~T();
}
return APR_SUCCESS;
}
apr_pool_t* _pool;
public:
template<typename... Args>
static T* create(apr_pool_t* pool, Args&&... args) {
auto* mem = apr_palloc(pool, sizeof(T));
new(mem) T(std::forward<Args>(args)...);
apr_pool_cleanup_register(pool, mem, cleanup, apr_pool_cleanup_null) ;
return (T*)mem;
}

virtual ~pool_class() {
}
};

class http_handler: public pool_class<http_handler> {
request_rec* r;
const grpcbackend_config_t* config;
public:
http_handler(request_rec* r);
const grpcbackend_config_t* get_config() const { return config; }
int handle_request();
};

class websocket_handler: public pool_class<websocket_handler> {
std::thread _recv_thread;
std::atomic<bool> _recv_shutdown;
const WebSocketServer* _server;
std::unique_ptr<::thalhammer::http::Handler::Stub> _stub;
::grpc::ClientContext _call_context;
std::unique_ptr<::grpc::ClientReaderWriterInterface<::thalhammer::http::HandleWebSocketRequest, ::thalhammer::http::HandleWebSocketResponse>> _stream;
protected:
void send(int type, const uint8_t* buffer, size_t buffer_size);
public:
websocket_handler(const WebSocketServer* server);
virtual ~websocket_handler();
void on_message(int type, const uint8_t* buffer, size_t buffer_size);
void on_disconnect();
};
33 changes: 33 additions & 0 deletions handler.proto
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,39 @@ message HandleResponse {
Response response = 1;
}

message WebSocketMessage {
enum Type {
TEXT = 0;
BINARY = 1;
CLOSE = 2;
}
Type type = 1;
bytes content = 2;
}

message WebSocketRequest {
string method = 1;
string resource = 2;
string protocol = 3;
repeated Header headers = 4;
ClientInfo client = 5;
}

message WebSocketResponse {
repeated Header headers = 3;
}

message HandleWebSocketRequest {
WebSocketRequest request = 1;
WebSocketMessage message = 2;
}

message HandleWebSocketResponse {
WebSocketResponse response = 1;
WebSocketMessage message = 2;
}

service Handler {
rpc Handle(stream HandleRequest) returns(stream HandleResponse) {}
rpc HandleWebSocket(stream HandleWebSocketRequest) returns(stream HandleWebSocketResponse) {}
}
Loading

0 comments on commit c5e2136

Please sign in to comment.