Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix ws and socket client error handling and add test to #212 #213

Merged
merged 1 commit into from
Feb 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion json_rpc/clients/httpclient.nim
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ method call*(client: RpcHttpClient, name: string,
let msgRes = client.processMessage(resText)
if msgRes.isErr:
# Need to clean up in case the answer was invalid
debug "Failed to process POST Response for JSON-RPC", msg = msgRes.error
error "Failed to process POST Response for JSON-RPC", msg = msgRes.error
let exc = newException(JsonRpcError, msgRes.error)
newFut.fail(exc)
client.awaiting.del(id)
Expand Down
46 changes: 37 additions & 9 deletions json_rpc/clients/socketclient.nim
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ import

export client

logScope:
topics = "JSONRPC-SOCKET-CLIENT"

type
RpcSocketClient* = ref object of RpcClient
transport*: StreamTransport
Expand Down Expand Up @@ -74,22 +77,47 @@ method callBatch*(client: RpcSocketClient,

return await client.batchFut

proc processData(client: RpcSocketClient) {.async.} =
proc processData(client: RpcSocketClient) {.async: (raises: []).} =
while true:
var localException: ref JsonRpcError
while true:
var value = await client.transport.readLine(defaultMaxRequestLength)
if value == "":
# transmission ends
try:
var value = await client.transport.readLine(defaultMaxRequestLength)
if value == "":
# transmission ends
await client.transport.closeWait()
break

let res = client.processMessage(value)
if res.isErr:
error "Error when processing RPC message", msg=res.error
localException = newException(JsonRpcError, res.error)
break
except TransportError as exc:
localException = newException(JsonRpcError, exc.msg)
await client.transport.closeWait()
break
except CancelledError as exc:
localException = newException(JsonRpcError, exc.msg)
await client.transport.closeWait()
break

let res = client.processMessage(value)
if res.isErr:
error "error when processing message", msg=res.error
raise newException(JsonRpcError, res.error)
if localException.isNil.not:
for _,fut in client.awaiting:
fut.fail(localException)
if client.batchFut.isNil.not and not client.batchFut.completed():
client.batchFut.fail(localException)

# async loop reconnection and waiting
client.transport = await connect(client.address)
try:
info "Reconnect to server", address=client.address
client.transport = await connect(client.address)
except TransportError as exc:
error "Error when reconnecting to server", msg=exc.msg
break
except CancelledError as exc:
error "Error when reconnecting to server", msg=exc.msg
break

proc connect*(client: RpcSocketClient, address: string, port: Port) {.async.} =
let addresses = resolveTAddress(address, port)
Expand Down
15 changes: 11 additions & 4 deletions json_rpc/clients/websocketclientimpl.nim
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,13 @@ method callBatch*(client: RpcWebSocketClient,
proc processData(client: RpcWebSocketClient) {.async.} =
var error: ref CatchableError

template processError() =
for k, v in client.awaiting:
v.fail(error)
if client.batchFut.isNil.not and not client.batchFut.completed():
client.batchFut.fail(error)
client.awaiting.clear()

let ws = client.transport
try:
while ws.readyState != ReadyState.Closed:
Expand All @@ -85,7 +92,9 @@ proc processData(client: RpcWebSocketClient) {.async.} =

let res = client.processMessage(string.fromBytes(value))
if res.isErr:
raise newException(JsonRpcError, res.error)
error "Error when processing RPC message", msg=res.error
error = newException(JsonRpcError, res.error)
processError()

except CatchableError as e:
error = e
Expand All @@ -97,9 +106,7 @@ proc processData(client: RpcWebSocketClient) {.async.} =
if client.awaiting.len != 0:
if error.isNil:
error = newException(IOError, "Transport was closed while waiting for response")
for k, v in client.awaiting:
v.fail(error)
client.awaiting.clear()
processError()
if not client.onDisconnect.isNil:
client.onDisconnect()

Expand Down
228 changes: 227 additions & 1 deletion tests/test_client_hook.nim
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@
# those terms.

import
std/importutils,
unittest2,
chronicles,
websock/websock,
../json_rpc/rpcclient,
../json_rpc/rpcserver

Expand Down Expand Up @@ -48,7 +51,7 @@ proc setupClientHook(client: RpcClient): Shadow =
return err(exc.msg)
shadow

suite "test callsigs":
suite "test client features":
var server = newRpcHttpServer(["127.0.0.1:0"])
server.installHandlers()
var client = newRpcHttpClient()
Expand All @@ -74,3 +77,226 @@ suite "test callsigs":

waitFor server.stop()
waitFor server.closeWait()


type
TestSocketServer = ref object of RpcSocketServer
getData: proc(): string {.gcsafe, raises: [].}

proc processClient(server: StreamServer, transport: StreamTransport) {.async: (raises: []), gcsafe.} =
## Process transport data to the RPC server
try:
var rpc = getUserData[TestSocketServer](server)
while true:
var
value = await transport.readLine(router.defaultMaxRequestLength)
if value == "":
await transport.closeWait()
break

let res = rpc.getData()
discard await transport.write(res & "\r\n")
except TransportError as ex:
error "Transport closed during processing client", msg=ex.msg
except CatchableError as ex:
error "Error occured during processing client", msg=ex.msg

proc addStreamServer(server: TestSocketServer, address: TransportAddress) =
privateAccess(RpcSocketServer)
try:
info "Starting JSON-RPC socket server", address = $address
var transportServer = createStreamServer(address, processClient, {ReuseAddr}, udata = server)
server.servers.add(transportServer)
except CatchableError as exc:
error "Failed to create server", address = $address, message = exc.msg

if len(server.servers) == 0:
# Server was not bound, critical error.
raise newException(RpcBindError, "Unable to create server!")

proc new(T: type TestSocketServer, getData: proc(): string {.gcsafe, raises: [].}): T =
T(
router: RpcRouter.init(),
getData: getData,
)


suite "test rpc socket client":
let server = TestSocketServer.new(proc(): string {.gcsafe, raises: [].} =
return """{"jsonrpc":"2.0","result":10}"""
)
let serverAddress = initTAddress("127.0.0.1:0")
server.addStreamServer(serverAddress)

var client = newRpcSocketClient()
server.start()
waitFor client.connect(server.localAddress()[0])

test "missing id in server response":
expect JsonRpcError:
let res = waitFor client.get_Banana(11)
discard res

server.stop()
waitFor server.closeWait()


type
TestHttpServer = ref object of RpcHttpServer
getData: proc(): string {.gcsafe, raises: [].}

proc processClientRpc(rpcServer: TestHttpServer): HttpProcessCallback2 =
return proc (req: RequestFence): Future[HttpResponseRef]
{.async: (raises: [CancelledError]).} =
if not req.isOk():
return defaultResponse()

let
request = req.get()
headers = HttpTable.init([("Content-Type",
"application/json; charset=utf-8")])
try:
let data = rpcServer.getData()
let res = await request.respond(Http200, data, headers)
trace "JSON-RPC result has been sent"
return res
except CancelledError as exc:
raise exc
except CatchableError as exc:
debug "Internal error while processing JSON-RPC call"
return defaultResponse(exc)

proc addHttpServer(
rpcServer: TestHttpServer,
address: TransportAddress,
socketFlags: set[ServerFlags] = {ServerFlags.TcpNoDelay, ServerFlags.ReuseAddr},
serverUri = Uri(),
serverIdent = "",
maxConnections: int = -1,
bufferSize: int = 4096,
backlogSize: int = 100,
httpHeadersTimeout = 10.seconds,
maxHeadersSize: int = 8192,
maxRequestBodySize: int = 1_048_576) =
let server = HttpServerRef.new(
address,
processClientRpc(rpcServer),
{},
socketFlags,
serverUri, "nim-json-rpc", maxConnections, backlogSize,
bufferSize, httpHeadersTimeout, maxHeadersSize, maxRequestBodySize
).valueOr:
error "Failed to create server", address = $address,
message = error
raise newException(RpcBindError, "Unable to create server: " & $error)
info "Starting JSON-RPC HTTP server", url = "http://" & $address

privateAccess(RpcHttpServer)
rpcServer.httpServers.add server

proc new(T: type TestHttpServer, getData: proc(): string {.gcsafe, raises: [].}): T =
T(
router: RpcRouter.init(),
maxChunkSize: 8192,
getData: getData,
)

suite "test rpc http client":
let server = TestHttpServer.new(proc(): string {.gcsafe, raises: [].} =
return """{"jsonrpc":"2.0","result":10}"""
)
let serverAddress = initTAddress("127.0.0.1:0")
server.addHttpServer(serverAddress)

var client = newRpcHttpClient()
server.start()
waitFor client.connect("http://" & $server.localAddress()[0])

test "missing id in server response":
expect JsonRpcError:
let res = waitFor client.get_Banana(11)
discard res

waitFor server.stop()
waitFor server.closeWait()


type
TestWsServer = ref object of RpcWebSocketServer
getData: proc(): string {.gcsafe, raises: [].}

proc handleRequest(rpc: TestWsServer, request: websock.HttpRequest)
{.async: (raises: [CancelledError]).} =
try:
let server = rpc.wsserver
let ws = await server.handleRequest(request)
if ws.readyState != ReadyState.Open:
error "Failed to open websocket connection"
return

trace "Websocket handshake completed"
while ws.readyState != ReadyState.Closed:
let recvData = await ws.recvMsg()
trace "Client message: ", size = recvData.len, binary = ws.binary

if ws.readyState == ReadyState.Closed:
# if session already terminated by peer,
# no need to send response
break

if recvData.len == 0:
await ws.close(
reason = "cannot process zero length message"
)
break

let data = rpc.getData()

trace "RPC result has been sent", address = $request.uri
await ws.send(data)

except WebSocketError as exc:
error "WebSocket error:", exception = exc.msg

except CancelledError as exc:
raise exc

except CatchableError as exc:
error "Something error", msg=exc.msg

proc newWsServer(address: TransportAddress, getData: proc(): string {.gcsafe, raises: [].}): TestWsServer =

let flags = {ServerFlags.TcpNoDelay,ServerFlags.ReuseAddr}
var server = new(TestWsServer)
proc processCallback(request: websock.HttpRequest): Future[void] =
handleRequest(server, request)

privateAccess(RpcWebSocketServer)

server.getData = getData
server.wsserver = WSServer.new(rng = HmacDrbgContext.new())
server.server = websock.HttpServer.create(
address,
processCallback,
flags
)

server

suite "test ws http client":
let serverAddress = initTAddress("127.0.0.1:0")
let server = newWsServer(serverAddress, proc(): string {.gcsafe, raises: [].} =
return """{"jsonrpc":"2.0","result":10}"""
)

var client = newRpcWebSocketClient()
server.start()
waitFor client.connect("ws://" & $server.localAddress())

test "missing id in server response":
expect JsonRpcError:
let res = waitFor client.get_Banana(11)
discard res

server.stop()
waitFor server.closeWait()