Skip to content

Commit

Permalink
feat: implementation for /external handler for RPC
Browse files Browse the repository at this point in the history
  • Loading branch information
knst committed Jun 6, 2024
1 parent 3612b8a commit f1c1fd8
Show file tree
Hide file tree
Showing 6 changed files with 85 additions and 19 deletions.
17 changes: 14 additions & 3 deletions src/httprpc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ static bool RPCAuthorized(const std::string& strAuth, std::string& strAuthUserna
return multiUserAuthorized(strUserPass);
}

static bool HTTPReq_JSONRPC(const CoreContext& context, HTTPRequest* req)
static bool HTTPReq_JSONRPC(const CoreContext& context, HTTPRequest* req, bool external = false)
{
// JSONRPC handles only POST
if (req->GetRequestMethod() != HTTPRequest::POST) {
Expand Down Expand Up @@ -176,6 +176,14 @@ static bool HTTPReq_JSONRPC(const CoreContext& context, HTTPRequest* req)
return false;
}

if (jreq.authUser == gArgs.GetArg("-rpcexternaluser", "") && !jreq.authUser.empty()) {
if (!external) {
LogPrintf("RPC User '%s' is allowed to call rpc only by path /external\n", jreq.authUser);
req->WriteReply(HTTP_FORBIDDEN);
return false;
}
LogPrintf("RPC user '%s' is external\n", jreq.authUser);
}
try {
// Parse request
UniValue valRequest;
Expand Down Expand Up @@ -298,10 +306,12 @@ bool StartHTTPRPC(const CoreContext& context)
return false;

auto handle_rpc = [&context](HTTPRequest* req, const std::string&) { return HTTPReq_JSONRPC(context, req); };
RegisterHTTPHandler("/", true, handle_rpc);
auto handle_rpc_external = [&context](HTTPRequest* req, const std::string&) { return HTTPReq_JSONRPC(context, req, true); };
RegisterHTTPHandler("/", true, false, handle_rpc);
if (g_wallet_init_interface.HasWalletSupport()) {
RegisterHTTPHandler("/wallet/", false, handle_rpc);
RegisterHTTPHandler("/wallet/", false, false, handle_rpc);
}
RegisterHTTPHandler("/external", true, true, handle_rpc_external);
struct event_base* eventBase = EventBase();
assert(eventBase);
httpRPCTimerInterface = std::make_unique<HTTPRPCTimerInterface>(eventBase);
Expand All @@ -317,6 +327,7 @@ void InterruptHTTPRPC()
void StopHTTPRPC()
{
LogPrint(BCLog::RPC, "Stopping HTTP RPC server\n");
UnregisterHTTPHandler("/external", true);
UnregisterHTTPHandler("/", true);
if (g_wallet_init_interface.HasWalletSupport()) {
UnregisterHTTPHandler("/wallet/", false);
Expand Down
54 changes: 44 additions & 10 deletions src/httpserver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,12 @@ class WorkQueue
std::deque<std::unique_ptr<WorkItem>> queue GUARDED_BY(cs);
bool running GUARDED_BY(cs);
const size_t maxDepth;
const bool m_is_external;

public:
explicit WorkQueue(size_t _maxDepth) : running(true),
maxDepth(_maxDepth)
explicit WorkQueue(size_t _maxDepth, bool is_external) : running(true),
maxDepth(_maxDepth),
m_is_external(is_external)
{
}
/** Precondition: worker threads have all stopped (they have been joined).
Expand Down Expand Up @@ -107,6 +109,9 @@ class WorkQueue
i = std::move(queue.front());
queue.pop_front();
}
if (m_is_external) {
LogPrintf("HTTP: Calling handler for external user...\n");
}
(*i)();
}
}
Expand All @@ -121,12 +126,13 @@ class WorkQueue

struct HTTPPathHandler
{
HTTPPathHandler(std::string _prefix, bool _exactMatch, HTTPRequestHandler _handler):
prefix(_prefix), exactMatch(_exactMatch), handler(_handler)
HTTPPathHandler(std::string _prefix, bool _exactMatch, bool external, HTTPRequestHandler _handler):
prefix(_prefix), exactMatch(_exactMatch), m_external(external), handler(_handler)
{
}
std::string prefix;
bool exactMatch;
bool m_external;
HTTPRequestHandler handler;
};

Expand All @@ -140,6 +146,7 @@ static struct evhttp* eventHTTP = nullptr;
static std::vector<CSubNet> rpc_allow_subnets;
//! Work queue for handling longer requests off the event loop thread
static std::unique_ptr<WorkQueue<HTTPClosure>> g_work_queue{nullptr};
static std::unique_ptr<WorkQueue<HTTPClosure>> g_work_queue_external{nullptr};
//! Handlers for (sub)paths
static std::vector<HTTPPathHandler> pathHandlers;
//! Bound listening sockets
Expand Down Expand Up @@ -258,9 +265,18 @@ static void http_request_cb(struct evhttp_request* req, void* arg)

// Dispatch to worker thread
if (i != iend) {
auto item{std::make_unique<HTTPWorkItem>(std::move(hreq), path, i->handler)};
auto item{std::make_unique<HTTPWorkItem>(std::move(hreq), path, i->handler)}; /// this handler!
assert(g_work_queue);
if (g_work_queue->Enqueue(item.get())) {

// We have queue created only if RPC arg 'rpcexternaluser' is specified
if (i->m_external && g_work_queue_external) {
if (g_work_queue_external->Enqueue(item.get())) {
item.release();
} else {
LogPrintf("WARNING: request rejected because http work queue depth of externals exceeded, it can be increased with the -rpcexternalworkqueue= setting\n");
item->req->WriteReply(HTTP_SERVICE_UNAVAILABLE, "Work queue depth of externals exceeded");
}
} else if (g_work_queue->Enqueue(item.get())) {
item.release(); /* if true, queue took ownership */
} else {
LogPrintf("WARNING: request rejected because http work queue depth exceeded, it can be increased with the -rpcworkqueue= setting\n");
Expand Down Expand Up @@ -393,9 +409,14 @@ bool InitHTTPServer()

LogPrint(BCLog::HTTP, "Initialized HTTP server\n");
int workQueueDepth = std::max((long)gArgs.GetArg("-rpcworkqueue", DEFAULT_HTTP_WORKQUEUE), 1L);
int workQueueDepthExternal = std::max((long)gArgs.GetArg("-rpcexternalworkqueue", DEFAULT_HTTP_WORKQUEUE), 1L);
LogPrintf("HTTP: creating work queue of depth %d\n", workQueueDepth);

g_work_queue = std::make_unique<WorkQueue<HTTPClosure>>(workQueueDepth);
g_work_queue = std::make_unique<WorkQueue<HTTPClosure>>(workQueueDepth, false);
if (!gArgs.GetArg("-rpcexternaluser", "").empty()) {
LogPrintf("HTTP: creating external work queue of depth %d\n", workQueueDepthExternal);
g_work_queue_external = std::make_unique<WorkQueue<HTTPClosure>>(workQueueDepthExternal, true);
}
// transfer ownership to eventBase/HTTP via .release()
eventBase = base_ctr.release();
eventHTTP = http_ctr.release();
Expand Down Expand Up @@ -423,12 +444,18 @@ void StartHTTPServer()
{
LogPrint(BCLog::HTTP, "Starting HTTP server\n");
int rpcThreads = std::max((long)gArgs.GetArg("-rpcthreads", DEFAULT_HTTP_THREADS), 1L);
int rpcThreadsExternals = std::max((long)gArgs.GetArg("-rpcexternalthreads", DEFAULT_HTTP_THREADS), 1L);
LogPrintf("HTTP: starting %d worker threads\n", rpcThreads);
g_thread_http = std::thread(ThreadHTTP, eventBase);

for (int i = 0; i < rpcThreads; i++) {
g_thread_http_workers.emplace_back(HTTPWorkQueueRun, g_work_queue.get(), i);
}
if (g_work_queue_external) {
for (int i = 0; i < rpcThreadsExternals; i++) {
g_thread_http_workers.emplace_back(HTTPWorkQueueRun, g_work_queue_external.get(), i);
}
}
}

void InterruptHTTPServer()
Expand All @@ -438,6 +465,9 @@ void InterruptHTTPServer()
// Reject requests on current connections
evhttp_set_gencb(eventHTTP, http_reject_request_cb, nullptr);
}
if (g_work_queue_external) {
g_work_queue_external->Interrupt();
}
if (g_work_queue) {
g_work_queue->Interrupt();
}
Expand All @@ -446,6 +476,9 @@ void InterruptHTTPServer()
void StopHTTPServer()
{
LogPrint(BCLog::HTTP, "Stopping HTTP server\n");
if (g_work_queue_external) {
g_work_queue_external->Interrupt();
}
if (g_work_queue) {
LogPrint(BCLog::HTTP, "Waiting for HTTP worker threads to exit\n");
for (auto& thread : g_thread_http_workers) {
Expand All @@ -471,6 +504,7 @@ void StopHTTPServer()
event_base_free(eventBase);
eventBase = nullptr;
}
g_work_queue_external.reset();
g_work_queue.reset();
LogPrint(BCLog::HTTP, "Stopped HTTP server\n");
}
Expand Down Expand Up @@ -639,10 +673,10 @@ HTTPRequest::RequestMethod HTTPRequest::GetRequestMethod() const
}
}

void RegisterHTTPHandler(const std::string &prefix, bool exactMatch, const HTTPRequestHandler &handler)
void RegisterHTTPHandler(const std::string &prefix, bool exactMatch, bool external, const HTTPRequestHandler &handler)
{
LogPrint(BCLog::HTTP, "Registering HTTP handler for %s (exactmatch %d)\n", prefix, exactMatch);
pathHandlers.push_back(HTTPPathHandler(prefix, exactMatch, handler));
LogPrint(BCLog::HTTP, "Registering HTTP handler for %s (exactmatch %d external %d)\n", prefix, exactMatch, external);
pathHandlers.push_back(HTTPPathHandler(prefix, exactMatch, external, handler));
}

void UnregisterHTTPHandler(const std::string &prefix, bool exactMatch)
Expand Down
2 changes: 1 addition & 1 deletion src/httpserver.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ typedef std::function<bool(HTTPRequest* req, const std::string &)> HTTPRequestHa
* If multiple handlers match a prefix, the first-registered one will
* be invoked.
*/
void RegisterHTTPHandler(const std::string &prefix, bool exactMatch, const HTTPRequestHandler &handler);
void RegisterHTTPHandler(const std::string &prefix, bool exactMatch, bool external, const HTTPRequestHandler &handler);
/** Unregister handler for prefix */
void UnregisterHTTPHandler(const std::string &prefix, bool exactMatch);

Expand Down
3 changes: 3 additions & 0 deletions src/init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -768,10 +768,13 @@ void SetupServerArgs(NodeContext& node)
argsman.AddArg("-rpcport=<port>", strprintf("Listen for JSON-RPC connections on <port> (default: %u, testnet: %u, regtest: %u)", defaultBaseParams->RPCPort(), testnetBaseParams->RPCPort(), regtestBaseParams->RPCPort()), ArgsManager::ALLOW_ANY | ArgsManager::NETWORK_ONLY, OptionsCategory::RPC);
argsman.AddArg("-rpcservertimeout=<n>", strprintf("Timeout during HTTP requests (default: %d)", DEFAULT_HTTP_SERVER_TIMEOUT), ArgsManager::ALLOW_ANY | ArgsManager::DEBUG_ONLY, OptionsCategory::RPC);
argsman.AddArg("-rpcthreads=<n>", strprintf("Set the number of threads to service RPC calls (default: %d)", DEFAULT_HTTP_THREADS), ArgsManager::ALLOW_ANY, OptionsCategory::RPC);
argsman.AddArg("-rpcexternalthreads=<n>", strprintf("Set the number of threads to service RPC calls from external consumers (default: %d)", DEFAULT_HTTP_THREADS), ArgsManager::ALLOW_ANY, OptionsCategory::RPC);
argsman.AddArg("-rpcuser=<user>", "Username for JSON-RPC connections", ArgsManager::ALLOW_ANY | ArgsManager::SENSITIVE, OptionsCategory::RPC);
argsman.AddArg("-rpcexternaluser=<user>", "Username for JSON-RPC external connections", ArgsManager::ALLOW_ANY | ArgsManager::SENSITIVE, OptionsCategory::RPC);
argsman.AddArg("-rpcwhitelist=<whitelist>", "Set a whitelist to filter incoming RPC calls for a specific user. The field <whitelist> comes in the format: <USERNAME>:<rpc 1>,<rpc 2>,...,<rpc n>. If multiple whitelists are set for a given user, they are set-intersected. See -rpcwhitelistdefault documentation for information on default whitelist behavior.", ArgsManager::ALLOW_ANY, OptionsCategory::RPC);
argsman.AddArg("-rpcwhitelistdefault", "Sets default behavior for rpc whitelisting. Unless rpcwhitelistdefault is set to 0, if any -rpcwhitelist is set, the rpc server acts as if all rpc users are subject to empty-unless-otherwise-specified whitelists. If rpcwhitelistdefault is set to 1 and no -rpcwhitelist is set, rpc server acts as if all rpc users are subject to empty whitelists.", ArgsManager::ALLOW_BOOL, OptionsCategory::RPC);
argsman.AddArg("-rpcworkqueue=<n>", strprintf("Set the depth of the work queue to service RPC calls (default: %d)", DEFAULT_HTTP_WORKQUEUE), ArgsManager::ALLOW_ANY | ArgsManager::DEBUG_ONLY, OptionsCategory::RPC);
argsman.AddArg("-rpcexternalworkqueue=<n>", strprintf("Set the depth of the work queue to service external RPC calls (default: %d)", DEFAULT_HTTP_WORKQUEUE), ArgsManager::ALLOW_ANY | ArgsManager::DEBUG_ONLY, OptionsCategory::RPC);
argsman.AddArg("-server", "Accept command line and JSON-RPC commands", ArgsManager::ALLOW_ANY, OptionsCategory::RPC);

argsman.AddArg("-statsenabled", strprintf("Publish internal stats to statsd (default: %u)", DEFAULT_STATSD_ENABLE), ArgsManager::ALLOW_ANY, OptionsCategory::STATSD);
Expand Down
2 changes: 1 addition & 1 deletion src/rest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -723,7 +723,7 @@ void StartREST(const CoreContext& context)
{
for (const auto& up : uri_prefixes) {
auto handler = [&context, up](HTTPRequest* req, const std::string& prefix) { return up.handler(context, req, prefix); };
RegisterHTTPHandler(up.prefix, false, handler);
RegisterHTTPHandler(up.prefix, false, false, handler);
}
}

Expand Down
26 changes: 22 additions & 4 deletions test/functional/rpc_platform_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,18 +42,24 @@ def setup_chain(self):
def run_test(self):
url = urllib.parse.urlparse(self.nodes[0].url)

def test_command(method, params, auth, expexted_status, should_not_match=False):
def test_command(method, params, auth, expected_status, should_not_match=False):
test_command_helper(method, params, '/', auth, expected_status, should_not_match)

def test_external_command(method, params, auth, expected_status, should_not_match=False):
test_command_helper(method, params, '/external', auth, expected_status, should_not_match)

def test_command_helper(method, params, path, auth, expected_status, should_not_match):
conn = http.client.HTTPConnection(url.hostname, url.port)
conn.connect()
body = {"method": method}
if len(params):
body["params"] = params
conn.request('POST', '/', json.dumps(body), {"Authorization": "Basic " + str_to_b64str(auth)})
conn.request('POST', path, json.dumps(body), {"Authorization": "Basic " + str_to_b64str(auth)})
resp = conn.getresponse()
if should_not_match:
assert resp.status != expexted_status
assert resp.status != expected_status
else:
assert_equal(resp.status, expexted_status)
assert_equal(resp.status, expected_status)
conn.close()

whitelisted = ["getassetunlockstatuses",
Expand Down Expand Up @@ -114,5 +120,17 @@ def test_command(method, params, auth, expexted_status, should_not_match=False):
test_command("debug", ["1"], rpcuser_authpair_operator, 200)


self.log.info("Restart node with /external handler...")
test_external_command("getbestblockhash", [], rpcuser_authpair_platform, 200)
test_external_command("getblockchaininfo", [], rpcuser_authpair_platform, 403)

self.restart_node(0, extra_args=["-rpcexternaluser=platform-user"])
test_command("getbestblockhash", [], rpcuser_authpair_platform, 403)
test_external_command("getbestblockhash", [], rpcuser_authpair_platform, 200)
test_external_command("getblockchaininfo", [], rpcuser_authpair_platform, 403)
test_external_command("getbestblockhash", [], rpcuser_authpair_operator, 200)



if __name__ == '__main__':
HTTPBasicsTest().main()

0 comments on commit f1c1fd8

Please sign in to comment.