Skip to content

Commit

Permalink
Fix ws and socket client error handling and add test to #212 (#213)
Browse files Browse the repository at this point in the history
  • Loading branch information
jangko committed Feb 19, 2024
1 parent 47cfc89 commit 514049a
Show file tree
Hide file tree
Showing 4 changed files with 276 additions and 15 deletions.
2 changes: 1 addition & 1 deletion json_rpc/clients/httpclient.nim
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ method call*(client: RpcHttpClient, name: string,
let msgRes = client.processMessage(resText)
if msgRes.isErr:
# Need to clean up in case the answer was invalid
debug "Failed to process POST Response for JSON-RPC", msg = msgRes.error
error "Failed to process POST Response for JSON-RPC", msg = msgRes.error
let exc = newException(JsonRpcError, msgRes.error)
newFut.fail(exc)
client.awaiting.del(id)
Expand Down
46 changes: 37 additions & 9 deletions json_rpc/clients/socketclient.nim
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ import

export client

logScope:
topics = "JSONRPC-SOCKET-CLIENT"

type
RpcSocketClient* = ref object of RpcClient
transport*: StreamTransport
Expand Down Expand Up @@ -74,22 +77,47 @@ method callBatch*(client: RpcSocketClient,

return await client.batchFut

proc processData(client: RpcSocketClient) {.async.} =
proc processData(client: RpcSocketClient) {.async: (raises: []).} =
while true:
var localException: ref JsonRpcError
while true:
var value = await client.transport.readLine(defaultMaxRequestLength)
if value == "":
# transmission ends
try:
var value = await client.transport.readLine(defaultMaxRequestLength)
if value == "":
# transmission ends
await client.transport.closeWait()
break

let res = client.processMessage(value)
if res.isErr:
error "Error when processing RPC message", msg=res.error
localException = newException(JsonRpcError, res.error)
break
except TransportError as exc:
localException = newException(JsonRpcError, exc.msg)
await client.transport.closeWait()
break
except CancelledError as exc:
localException = newException(JsonRpcError, exc.msg)
await client.transport.closeWait()
break

let res = client.processMessage(value)
if res.isErr:
error "error when processing message", msg=res.error
raise newException(JsonRpcError, res.error)
if localException.isNil.not:
for _,fut in client.awaiting:
fut.fail(localException)
if client.batchFut.isNil.not and not client.batchFut.completed():
client.batchFut.fail(localException)

# async loop reconnection and waiting
client.transport = await connect(client.address)
try:
info "Reconnect to server", address=client.address
client.transport = await connect(client.address)
except TransportError as exc:
error "Error when reconnecting to server", msg=exc.msg
break
except CancelledError as exc:
error "Error when reconnecting to server", msg=exc.msg
break

proc connect*(client: RpcSocketClient, address: string, port: Port) {.async.} =
let addresses = resolveTAddress(address, port)
Expand Down
15 changes: 11 additions & 4 deletions json_rpc/clients/websocketclientimpl.nim
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,13 @@ method callBatch*(client: RpcWebSocketClient,
proc processData(client: RpcWebSocketClient) {.async.} =
var error: ref CatchableError

template processError() =
for k, v in client.awaiting:
v.fail(error)
if client.batchFut.isNil.not and not client.batchFut.completed():
client.batchFut.fail(error)
client.awaiting.clear()

let ws = client.transport
try:
while ws.readyState != ReadyState.Closed:
Expand All @@ -85,7 +92,9 @@ proc processData(client: RpcWebSocketClient) {.async.} =

let res = client.processMessage(string.fromBytes(value))
if res.isErr:
raise newException(JsonRpcError, res.error)
error "Error when processing RPC message", msg=res.error
error = newException(JsonRpcError, res.error)
processError()

except CatchableError as e:
error = e
Expand All @@ -97,9 +106,7 @@ proc processData(client: RpcWebSocketClient) {.async.} =
if client.awaiting.len != 0:
if error.isNil:
error = newException(IOError, "Transport was closed while waiting for response")
for k, v in client.awaiting:
v.fail(error)
client.awaiting.clear()
processError()
if not client.onDisconnect.isNil:
client.onDisconnect()

Expand Down
228 changes: 227 additions & 1 deletion tests/test_client_hook.nim
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@
# those terms.

import
std/importutils,
unittest2,
chronicles,
websock/websock,
../json_rpc/rpcclient,
../json_rpc/rpcserver

Expand Down Expand Up @@ -48,7 +51,7 @@ proc setupClientHook(client: RpcClient): Shadow =
return err(exc.msg)
shadow

suite "test callsigs":
suite "test client features":
var server = newRpcHttpServer(["127.0.0.1:0"])
server.installHandlers()
var client = newRpcHttpClient()
Expand All @@ -74,3 +77,226 @@ suite "test callsigs":

waitFor server.stop()
waitFor server.closeWait()


type
TestSocketServer = ref object of RpcSocketServer
getData: proc(): string {.gcsafe, raises: [].}

proc processClient(server: StreamServer, transport: StreamTransport) {.async: (raises: []), gcsafe.} =
## Process transport data to the RPC server
try:
var rpc = getUserData[TestSocketServer](server)
while true:
var
value = await transport.readLine(router.defaultMaxRequestLength)
if value == "":
await transport.closeWait()
break

let res = rpc.getData()
discard await transport.write(res & "\r\n")
except TransportError as ex:
error "Transport closed during processing client", msg=ex.msg
except CatchableError as ex:
error "Error occured during processing client", msg=ex.msg

proc addStreamServer(server: TestSocketServer, address: TransportAddress) =
privateAccess(RpcSocketServer)
try:
info "Starting JSON-RPC socket server", address = $address
var transportServer = createStreamServer(address, processClient, {ReuseAddr}, udata = server)
server.servers.add(transportServer)
except CatchableError as exc:
error "Failed to create server", address = $address, message = exc.msg

if len(server.servers) == 0:
# Server was not bound, critical error.
raise newException(RpcBindError, "Unable to create server!")

proc new(T: type TestSocketServer, getData: proc(): string {.gcsafe, raises: [].}): T =
T(
router: RpcRouter.init(),
getData: getData,
)


suite "test rpc socket client":
let server = TestSocketServer.new(proc(): string {.gcsafe, raises: [].} =
return """{"jsonrpc":"2.0","result":10}"""
)
let serverAddress = initTAddress("127.0.0.1:0")
server.addStreamServer(serverAddress)

var client = newRpcSocketClient()
server.start()
waitFor client.connect(server.localAddress()[0])

test "missing id in server response":
expect JsonRpcError:
let res = waitFor client.get_Banana(11)
discard res

server.stop()
waitFor server.closeWait()


type
TestHttpServer = ref object of RpcHttpServer
getData: proc(): string {.gcsafe, raises: [].}

proc processClientRpc(rpcServer: TestHttpServer): HttpProcessCallback2 =
return proc (req: RequestFence): Future[HttpResponseRef]
{.async: (raises: [CancelledError]).} =
if not req.isOk():
return defaultResponse()

let
request = req.get()
headers = HttpTable.init([("Content-Type",
"application/json; charset=utf-8")])
try:
let data = rpcServer.getData()
let res = await request.respond(Http200, data, headers)
trace "JSON-RPC result has been sent"
return res
except CancelledError as exc:
raise exc
except CatchableError as exc:
debug "Internal error while processing JSON-RPC call"
return defaultResponse(exc)

proc addHttpServer(
rpcServer: TestHttpServer,
address: TransportAddress,
socketFlags: set[ServerFlags] = {ServerFlags.TcpNoDelay, ServerFlags.ReuseAddr},
serverUri = Uri(),
serverIdent = "",
maxConnections: int = -1,
bufferSize: int = 4096,
backlogSize: int = 100,
httpHeadersTimeout = 10.seconds,
maxHeadersSize: int = 8192,
maxRequestBodySize: int = 1_048_576) =
let server = HttpServerRef.new(
address,
processClientRpc(rpcServer),
{},
socketFlags,
serverUri, "nim-json-rpc", maxConnections, backlogSize,
bufferSize, httpHeadersTimeout, maxHeadersSize, maxRequestBodySize
).valueOr:
error "Failed to create server", address = $address,
message = error
raise newException(RpcBindError, "Unable to create server: " & $error)
info "Starting JSON-RPC HTTP server", url = "http://" & $address

privateAccess(RpcHttpServer)
rpcServer.httpServers.add server

proc new(T: type TestHttpServer, getData: proc(): string {.gcsafe, raises: [].}): T =
T(
router: RpcRouter.init(),
maxChunkSize: 8192,
getData: getData,
)

suite "test rpc http client":
let server = TestHttpServer.new(proc(): string {.gcsafe, raises: [].} =
return """{"jsonrpc":"2.0","result":10}"""
)
let serverAddress = initTAddress("127.0.0.1:0")
server.addHttpServer(serverAddress)

var client = newRpcHttpClient()
server.start()
waitFor client.connect("http://" & $server.localAddress()[0])

test "missing id in server response":
expect JsonRpcError:
let res = waitFor client.get_Banana(11)
discard res

waitFor server.stop()
waitFor server.closeWait()


type
TestWsServer = ref object of RpcWebSocketServer
getData: proc(): string {.gcsafe, raises: [].}

proc handleRequest(rpc: TestWsServer, request: websock.HttpRequest)
{.async: (raises: [CancelledError]).} =
try:
let server = rpc.wsserver
let ws = await server.handleRequest(request)
if ws.readyState != ReadyState.Open:
error "Failed to open websocket connection"
return

trace "Websocket handshake completed"
while ws.readyState != ReadyState.Closed:
let recvData = await ws.recvMsg()
trace "Client message: ", size = recvData.len, binary = ws.binary

if ws.readyState == ReadyState.Closed:
# if session already terminated by peer,
# no need to send response
break

if recvData.len == 0:
await ws.close(
reason = "cannot process zero length message"
)
break

let data = rpc.getData()

trace "RPC result has been sent", address = $request.uri
await ws.send(data)

except WebSocketError as exc:
error "WebSocket error:", exception = exc.msg

except CancelledError as exc:
raise exc

except CatchableError as exc:
error "Something error", msg=exc.msg

proc newWsServer(address: TransportAddress, getData: proc(): string {.gcsafe, raises: [].}): TestWsServer =

let flags = {ServerFlags.TcpNoDelay,ServerFlags.ReuseAddr}
var server = new(TestWsServer)
proc processCallback(request: websock.HttpRequest): Future[void] =
handleRequest(server, request)

privateAccess(RpcWebSocketServer)

server.getData = getData
server.wsserver = WSServer.new(rng = HmacDrbgContext.new())
server.server = websock.HttpServer.create(
address,
processCallback,
flags
)

server

suite "test ws http client":
let serverAddress = initTAddress("127.0.0.1:0")
let server = newWsServer(serverAddress, proc(): string {.gcsafe, raises: [].} =
return """{"jsonrpc":"2.0","result":10}"""
)

var client = newRpcWebSocketClient()
server.start()
waitFor client.connect("ws://" & $server.localAddress())

test "missing id in server response":
expect JsonRpcError:
let res = waitFor client.get_Banana(11)
discard res

server.stop()
waitFor server.closeWait()

0 comments on commit 514049a

Please sign in to comment.