From a71fa8dbe836a31a5727ebbe331cbdb275e94716 Mon Sep 17 00:00:00 2001 From: lganzzzo Date: Sat, 11 Jan 2020 05:21:20 +0200 Subject: [PATCH] Update to the oatpp 0.19.12 API. --- CMakeLists.txt | 2 +- .../AsyncConnectionHandler.cpp | 5 ++- src/oatpp-websocket/AsyncWebSocket.cpp | 44 ++++++++++--------- src/oatpp-websocket/AsyncWebSocket.hpp | 4 +- src/oatpp-websocket/ConnectionHandler.cpp | 3 ++ src/oatpp-websocket/SHA1.cpp | 14 +++--- src/oatpp-websocket/WebSocket.cpp | 41 +++++++++-------- src/oatpp-websocket/WebSocket.hpp | 2 +- test/oatpp-websocket/FullAsyncTest.cpp | 37 ++++++++++------ test/oatpp-websocket/FullTest.cpp | 4 +- .../app/AsyncWebSocketListener.hpp | 4 +- .../oatpp-websocket/app/WebSocketListener.hpp | 4 +- 12 files changed, 92 insertions(+), 72 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 172b6c5..7044b0b 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -5,7 +5,7 @@ cmake_minimum_required(VERSION 3.1 FATAL_ERROR) ## use these variables to configure module installation set(OATPP_THIS_MODULE_NAME oatpp-websocket) ## name of the module (also name of folders in installation dirs) -set(OATPP_THIS_MODULE_VERSION "0.19.11") ## version of the module (also sufix of folders in installation dirs) +set(OATPP_THIS_MODULE_VERSION "0.19.12") ## version of the module (also sufix of folders in installation dirs) set(OATPP_THIS_MODULE_LIBRARIES oatpp-websocket) ## list of libraries to find when find_package is called set(OATPP_THIS_MODULE_TARGETS oatpp-websocket) ## list of targets to install set(OATPP_THIS_MODULE_DIRECTORIES oatpp-websocket) ## list of directories to install diff --git a/src/oatpp-websocket/AsyncConnectionHandler.cpp b/src/oatpp-websocket/AsyncConnectionHandler.cpp index 027447b..795af99 100644 --- a/src/oatpp-websocket/AsyncConnectionHandler.cpp +++ b/src/oatpp-websocket/AsyncConnectionHandler.cpp @@ -86,7 +86,10 @@ void AsyncConnectionHandler::handleConnection(const std::shared_ptr& c } }; - + + connection->setInputStreamIOMode(oatpp::data::stream::IOMode::ASYNCHRONOUS); + connection->setOutputStreamIOMode(oatpp::data::stream::IOMode::ASYNCHRONOUS); + m_executor->execute(connection, params, m_listener); } diff --git a/src/oatpp-websocket/AsyncWebSocket.cpp b/src/oatpp-websocket/AsyncWebSocket.cpp index 38fb6fa..f9b6801 100644 --- a/src/oatpp-websocket/AsyncWebSocket.cpp +++ b/src/oatpp-websocket/AsyncWebSocket.cpp @@ -79,7 +79,7 @@ oatpp::async::CoroutineStarter AsyncWebSocket::readFrameHeaderAsync(const std::s v_word16 m_messageLen2; v_word32 m_messageLen3 [2]; private: - oatpp::data::stream::AsyncInlineReadData m_inlineData; + oatpp::data::buffer::InlineReadData m_inlineData; public: ReadFrameCoroutine(const std::shared_ptr connection, @@ -91,7 +91,7 @@ oatpp::async::CoroutineStarter AsyncWebSocket::readFrameHeaderAsync(const std::s } Action act() override { - return oatpp::data::stream::readExactSizeDataAsyncInline(m_connection.get(), m_inlineData, yieldTo(&ReadFrameCoroutine::onBbRead)); + return m_connection->readExactSizeDataAsyncInline(m_inlineData, yieldTo(&ReadFrameCoroutine::onBbRead)); } Action onBbRead() { @@ -118,7 +118,7 @@ oatpp::async::CoroutineStarter AsyncWebSocket::readFrameHeaderAsync(const std::s } Action readLen() { - return oatpp::data::stream::readExactSizeDataAsyncInline(m_connection.get(), m_inlineData, yieldTo(&ReadFrameCoroutine::onLenRead)); + return m_connection->readExactSizeDataAsyncInline(m_inlineData, yieldTo(&ReadFrameCoroutine::onLenRead)); } Action onLenRead() { @@ -139,7 +139,7 @@ oatpp::async::CoroutineStarter AsyncWebSocket::readFrameHeaderAsync(const std::s } Action readMask() { - return oatpp::data::stream::readExactSizeDataAsyncInline(m_connection.get(), m_inlineData, finish()); + return m_connection->readExactSizeDataAsyncInline(m_inlineData, finish()); } }; @@ -162,7 +162,7 @@ oatpp::async::CoroutineStarter AsyncWebSocket::writeFrameHeaderAsync(const std:: v_word32 m_messageLen3 [2]; v_word8 m_messageLengthScenario; private: - oatpp::data::stream::AsyncInlineWriteData m_inlineData; + oatpp::data::buffer::InlineWriteData m_inlineData; public: WriteFrameCoroutine(const std::shared_ptr connection, @@ -177,7 +177,7 @@ oatpp::async::CoroutineStarter AsyncWebSocket::writeFrameHeaderAsync(const std:: } Action act() override { - return oatpp::data::stream::writeExactSizeDataAsyncInline(m_connection.get(), m_inlineData, yieldTo(&WriteFrameCoroutine::onBbWritten)); + return m_connection->writeExactSizeDataAsyncInline(m_inlineData, yieldTo(&WriteFrameCoroutine::onBbWritten)); } Action onBbWritten() { @@ -195,7 +195,7 @@ oatpp::async::CoroutineStarter AsyncWebSocket::writeFrameHeaderAsync(const std:: } Action writeMessageLen() { - return oatpp::data::stream::writeExactSizeDataAsyncInline(m_connection.get(), m_inlineData, yieldTo(&WriteFrameCoroutine::onLenWritten)); + return m_connection->writeExactSizeDataAsyncInline(m_inlineData, yieldTo(&WriteFrameCoroutine::onLenWritten)); } Action onLenWritten() { @@ -207,7 +207,7 @@ oatpp::async::CoroutineStarter AsyncWebSocket::writeFrameHeaderAsync(const std:: } Action writeMask() { - return oatpp::data::stream::writeExactSizeDataAsyncInline(m_connection.get(), m_inlineData, finish()); + return m_connection->writeExactSizeDataAsyncInline(m_inlineData, finish()); } }; @@ -229,9 +229,9 @@ oatpp::async::CoroutineStarter AsyncWebSocket::readPayloadAsync(const std::share std::shared_ptr m_listener; private: p_char8 m_buffer; - oatpp::data::v_io_size m_progress; + oatpp::v_io_size m_progress; private: - oatpp::data::stream::AsyncInlineReadData m_inlineData; + oatpp::data::buffer::InlineReadData m_inlineData; public: ReadPayloadCoroutine(const std::shared_ptr& socket, const std::shared_ptr& connection, @@ -245,7 +245,11 @@ oatpp::async::CoroutineStarter AsyncWebSocket::readPayloadAsync(const std::share , m_listener(listener) , m_buffer(new v_char8[m_socket->m_config.readBufferSize]) , m_progress(0) - {} + { + if(m_shortMessageStream && m_frameHeader->payloadLength > 125) { + throw std::runtime_error("[oatpp::web::protocol::websocket::AsyncWebSocket::readPayloadAsync()]: Invalid payloadLength. See RFC-6455, section-5.5."); + } + } ~ReadPayloadCoroutine() { delete [] m_buffer; @@ -255,7 +259,7 @@ oatpp::async::CoroutineStarter AsyncWebSocket::readPayloadAsync(const std::share if(m_progress < m_frameHeader->payloadLength) { - oatpp::data::v_io_size desiredSize = m_socket->m_config.readBufferSize; + oatpp::v_io_size desiredSize = m_socket->m_config.readBufferSize; if(desiredSize > m_frameHeader->payloadLength - m_progress) { desiredSize = m_frameHeader->payloadLength - m_progress; } @@ -273,7 +277,7 @@ oatpp::async::CoroutineStarter AsyncWebSocket::readPayloadAsync(const std::share } Action readData() { - return oatpp::data::stream::readSomeDataAsyncInline(m_connection.get(), m_inlineData, yieldTo(&ReadPayloadCoroutine::onDataRead)); + return m_connection->readSomeDataAsyncInline(m_inlineData, yieldTo(&ReadPayloadCoroutine::onDataRead)); } Action onDataRead() { @@ -283,7 +287,7 @@ oatpp::async::CoroutineStarter AsyncWebSocket::readPayloadAsync(const std::share if(readResult > 0) { if(m_frameHeader->hasMask) { - std::unique_ptr decoded(new v_char8[readResult]); + std::unique_ptr decoded(new v_char8[readResult]); for(v_int32 i = 0; i < readResult; i ++) { decoded.get()[i] = m_buffer[i] ^ m_frameHeader->mask[(i + m_progress) % 4]; } @@ -292,7 +296,7 @@ oatpp::async::CoroutineStarter AsyncWebSocket::readPayloadAsync(const std::share if(m_shortMessageStream) { /* this is RAM stream. Non-blocking call */ - m_shortMessageStream->write(decoded.get(), readResult); + m_shortMessageStream->writeSimple(decoded.get(), readResult); } else if(m_listener) { return m_listener->readMessage(m_socket, m_frameHeader->opcode, decoded.get(), readResult).next(yieldTo(&ReadPayloadCoroutine::act)); } @@ -303,7 +307,7 @@ oatpp::async::CoroutineStarter AsyncWebSocket::readPayloadAsync(const std::share if(m_shortMessageStream) { /* this is RAM stream. Non-blocking call */ - m_shortMessageStream->write(m_buffer, readResult); + m_shortMessageStream->writeSimple(m_buffer, readResult); } else if(m_listener) { return m_listener->readMessage(m_socket, m_frameHeader->opcode, m_buffer, readResult).next(yieldTo(&ReadPayloadCoroutine::act)); } @@ -487,7 +491,7 @@ oatpp::async::CoroutineStarter AsyncWebSocket::sendOneFrameAsync(bool fin, v_wor private: p_char8 m_encoded = nullptr; private: - oatpp::data::stream::AsyncInlineWriteData m_inlineData; + oatpp::data::buffer::InlineWriteData m_inlineData; public: SendFrameCoroutine(const std::shared_ptr& socket, bool fin, v_word8 opcode, const oatpp::String& message) @@ -526,7 +530,7 @@ oatpp::async::CoroutineStarter AsyncWebSocket::sendOneFrameAsync(bool fin, v_wor } Action writeMessage() { - return oatpp::data::stream::writeExactSizeDataAsyncInline(m_socket->m_connection.get(), m_inlineData, finish()); + return m_socket->m_connection->writeExactSizeDataAsyncInline(m_inlineData, finish()); } }; @@ -540,9 +544,9 @@ oatpp::async::CoroutineStarter AsyncWebSocket::sendCloseAsync(v_word16 code, con code = htons(code); oatpp::data::stream::ChunkedBuffer buffer; - buffer.write(&code, 2); + buffer.writeSimple(&code, 2); if(message) { - buffer.write(message->getData(), message->getSize()); + buffer.writeSimple(message->getData(), message->getSize()); } return sendOneFrameAsync(true, Frame::OPCODE_CLOSE, buffer.toString()); diff --git a/src/oatpp-websocket/AsyncWebSocket.hpp b/src/oatpp-websocket/AsyncWebSocket.hpp index 9e7023c..b197b34 100644 --- a/src/oatpp-websocket/AsyncWebSocket.hpp +++ b/src/oatpp-websocket/AsyncWebSocket.hpp @@ -110,11 +110,11 @@ class AsyncWebSocket : public oatpp::base::Countable, public std::enable_shared_ * @param socket - &id:oatpp::websocket::AsyncWebSocket;. * @param opcode - &id:oatpp::websocket::Frame::OPCODE_TEXT; or &id:oatpp::websocket::Frame::OPCODE_BINARY;. * @param data - pointer to received data. - * @param size - data size. &id:oatpp::data::v_io_size;. + * @param size - data size. &id:oatpp::v_io_size;. * @return - &id:oatpp::async::CoroutineStarter;.
* *To ignore this event return nullptr.* */ - virtual CoroutineStarter readMessage(const std::shared_ptr& socket, v_word8 opcode, p_char8 data, data::v_io_size size) = 0; + virtual CoroutineStarter readMessage(const std::shared_ptr& socket, v_word8 opcode, p_char8 data, oatpp::v_io_size size) = 0; }; diff --git a/src/oatpp-websocket/ConnectionHandler.cpp b/src/oatpp-websocket/ConnectionHandler.cpp index b712a86..14ebc25 100644 --- a/src/oatpp-websocket/ConnectionHandler.cpp +++ b/src/oatpp-websocket/ConnectionHandler.cpp @@ -79,6 +79,9 @@ void ConnectionHandler::handleConnection(const std::shared_ptr& connec } }; + + connection->setInputStreamIOMode(oatpp::data::stream::IOMode::BLOCKING); + connection->setOutputStreamIOMode(oatpp::data::stream::IOMode::BLOCKING); /* Create working thread */ std::thread thread(&Task::run, Task(connection, params, m_listener)); diff --git a/src/oatpp-websocket/SHA1.cpp b/src/oatpp-websocket/SHA1.cpp index b10f8d1..40f0a16 100644 --- a/src/oatpp-websocket/SHA1.cpp +++ b/src/oatpp-websocket/SHA1.cpp @@ -227,7 +227,7 @@ SHA1::SHA1() { void SHA1::update(const oatpp::String &s) { - data::v_io_size progress = 0; + oatpp::v_io_size progress = 0; while (true) { auto readSize = BLOCK_BYTES - buffer.getSize(); @@ -235,7 +235,7 @@ void SHA1::update(const oatpp::String &s) { readSize = s->getSize() - progress; } - buffer.write(&s->getData()[progress], readSize); + buffer.writeSimple(&s->getData()[progress], readSize); progress += readSize; if (buffer.getSize() != BLOCK_BYTES) { @@ -256,8 +256,8 @@ void SHA1::update(std::istream &is) { char sbuf[BLOCK_BYTES]; - is.read(sbuf, (int)(BLOCK_BYTES - buffer.getSize())); - buffer.write(sbuf, (std::size_t)is.gcount()); + is.read(sbuf, (int)(BLOCK_BYTES - buffer.getSize())); + buffer.writeSimple(sbuf, (std::size_t)is.gcount()); if (buffer.getSize() != BLOCK_BYTES) { return; @@ -281,10 +281,10 @@ oatpp::String SHA1::finalBinary() { uint64_t total_bits = (transforms * BLOCK_BYTES + buffer.getSize()) * 8; /* Padding */ - buffer.writeChar(0x80); + buffer.writeCharSimple(0x80); size_t orig_size = (size_t)buffer.getSize(); while (buffer.getSize() < BLOCK_BYTES) { - buffer.writeChar(0x00); + buffer.writeCharSimple(0x00); } uint32_t block[BLOCK_INTS]; @@ -306,7 +306,7 @@ oatpp::String SHA1::finalBinary() { oatpp::data::stream::ChunkedBuffer resultStream; for (size_t i = 0; i < sizeof(digest) / sizeof(digest[0]); i++) { uint32_t b = htonl(digest[i]); - resultStream.write(&b, 4); + resultStream.writeSimple(&b, 4); } /* Reset for next run */ diff --git a/src/oatpp-websocket/WebSocket.cpp b/src/oatpp-websocket/WebSocket.cpp index dadd006..f959689 100644 --- a/src/oatpp-websocket/WebSocket.cpp +++ b/src/oatpp-websocket/WebSocket.cpp @@ -70,7 +70,7 @@ bool WebSocket::checkForContinuation(const Frame::Header& frameHeader) { void WebSocket::readFrameHeader(Frame::Header& frameHeader) const { v_word16 bb; - auto res = oatpp::data::stream::readExactSizeData(m_connection.get(), &bb, 2); + auto res = m_connection->readExactSizeDataSimple(&bb, 2); if(res != 2) { throw std::runtime_error("[oatpp::web::protocol::websocket::WebSocket::readFrameHeader()]: Error reading frame header"); } @@ -82,14 +82,14 @@ void WebSocket::readFrameHeader(Frame::Header& frameHeader) const { frameHeader.payloadLength = messageLen1; } else if(messageLen1 == 126) { v_word16 messageLen2; - res = oatpp::data::stream::readExactSizeData(m_connection.get(), &messageLen2, 2); + res = m_connection->readExactSizeDataSimple(&messageLen2, 2); if(res != 2) { throw std::runtime_error("[oatpp::web::protocol::websocket::WebSocket::readFrameHeader()]: Error reading frame header. Reading payload length scenario 2."); } frameHeader.payloadLength = ntohs(messageLen2); } else if(messageLen1 == 127) { v_word32 messageLen3[2]; - res = oatpp::data::stream::readExactSizeData(m_connection.get(), &messageLen3, 8); + res = m_connection->readExactSizeDataSimple(&messageLen3, 8); if(res != 8) { throw std::runtime_error("[oatpp::web::protocol::websocket::WebSocket::readFrameHeader()]: Error reading frame header. Reading payload length scenario 3."); } @@ -97,7 +97,7 @@ void WebSocket::readFrameHeader(Frame::Header& frameHeader) const { } if(frameHeader.hasMask) { - res = oatpp::data::stream::readExactSizeData(m_connection.get(), frameHeader.mask, 4); + res = m_connection->readExactSizeDataSimple(frameHeader.mask, 4); if(res != 4) { throw std::runtime_error("[oatpp::web::protocol::websocket::WebSocket::readFrameHeader()]: Error reading frame header. Reading mask."); } @@ -113,14 +113,14 @@ void WebSocket::writeFrameHeader(const Frame::Header& frameHeader) const { bb = htons(bb); - auto res = oatpp::data::stream::writeExactSizeData(m_connection.get(), &bb, 2); + auto res = m_connection->writeExactSizeDataSimple(&bb, 2); if(res != 2) { throw std::runtime_error("[oatpp::web::protocol::websocket::WebSocket::writeFrameHeader()]: Error writing frame header"); } if(messageLengthScenario == 2) { v_word16 messageLen2 = htons(frameHeader.payloadLength); - res = oatpp::data::stream::writeExactSizeData(m_connection.get(), &messageLen2, 2); + res = m_connection->writeExactSizeDataSimple(&messageLen2, 2); if(res != 2) { throw std::runtime_error("[oatpp::web::protocol::websocket::WebSocket::writeFrameHeader()]: Error writing frame header. Writing payload length scenario 2."); } @@ -128,14 +128,14 @@ void WebSocket::writeFrameHeader(const Frame::Header& frameHeader) const { v_word32 messageLen3[2]; messageLen3[0] = htonl(frameHeader.payloadLength >> 32); messageLen3[1] = htonl(frameHeader.payloadLength & 0xFFFFFFFF); - res = oatpp::data::stream::writeExactSizeData(m_connection.get(), &messageLen3, 8); + res = m_connection->writeExactSizeDataSimple(&messageLen3, 8); if(res != 8) { throw std::runtime_error("[oatpp::web::protocol::websocket::WebSocket::writeFrameHeader()]: Error writing frame header. Writing payload length scenario 3."); } } if(frameHeader.hasMask) { - res = oatpp::data::stream::writeExactSizeData(m_connection.get(), frameHeader.mask, 4); + res = m_connection->writeExactSizeDataSimple(frameHeader.mask, 4); if(res != 4) { throw std::runtime_error("[oatpp::web::protocol::websocket::WebSocket::writeFrameHeader()]: Error writing frame header. Writing mask."); } @@ -150,16 +150,16 @@ void WebSocket::readPayload(const Frame::Header& frameHeader, oatpp::data::strea } std::unique_ptr buffer(new v_char8[m_config.readBufferSize]); - oatpp::data::v_io_size progress = 0; + oatpp::v_io_size progress = 0; while (progress < frameHeader.payloadLength) { - oatpp::data::v_io_size desiredSize = m_config.readBufferSize; + oatpp::v_io_size desiredSize = m_config.readBufferSize; if(desiredSize > frameHeader.payloadLength - progress) { desiredSize = frameHeader.payloadLength - progress; } - auto res = m_connection->read(buffer.get(), desiredSize); + auto res = m_connection->readSimple(buffer.get(), desiredSize); if(res > 0) { @@ -169,23 +169,22 @@ void WebSocket::readPayload(const Frame::Header& frameHeader, oatpp::data::strea decoded.get()[i] = buffer.get()[i] ^ frameHeader.mask[(i + progress) % 4]; } if(shortMessageStream) { - shortMessageStream->write(decoded.get(), res); + shortMessageStream->writeSimple(decoded.get(), res); } else if(m_listener) { m_listener->readMessage(*this, frameHeader.opcode, decoded.get(), res); } } else { if(shortMessageStream) { - shortMessageStream->write(buffer.get(), res); + shortMessageStream->writeSimple(buffer.get(), res); } else if(m_listener) { m_listener->readMessage(*this, frameHeader.opcode, buffer.get(), res); } } progress += res; - }else { // if res == 0 then probably stream handles read() error incorrectly. trow. + } else { // if res == 0 then probably stream handles read() error incorrectly. trow. - if(res == oatpp::data::IOError::RETRY_READ || res == oatpp::data::IOError::WAIT_RETRY_READ || - res == oatpp::data::IOError::RETRY_WRITE || res == oatpp::data::IOError::WAIT_RETRY_WRITE) { + if(res == oatpp::IOError::RETRY_READ || res == oatpp::IOError::RETRY_WRITE) { continue; } throw std::runtime_error("[oatpp::web::protocol::websocket::WebSocket::readPayload()]: Invalid connection state."); @@ -320,15 +319,15 @@ bool WebSocket::sendOneFrame(bool fin, v_word8 opcode, const oatpp::String& mess Frame::Header frameHeader; if(message && message->getSize() > 0) { sendFrameHeader(frameHeader, fin, opcode, message->getSize()); - oatpp::data::v_io_size res; + oatpp::v_io_size res; if(frameHeader.hasMask) { std::unique_ptr encoded(new v_char8[message->getSize()]); for(v_int32 i = 0; i < message->getSize(); i ++) { encoded.get()[i] = message->getData()[i] ^ frameHeader.mask[i % 4]; } - res = oatpp::data::stream::writeExactSizeData(m_connection.get(), encoded.get(), message->getSize()); + res = m_connection->writeExactSizeDataSimple(encoded.get(), message->getSize()); } else { - res = oatpp::data::stream::writeExactSizeData(m_connection.get(), message->getData(), message->getSize()); + res = m_connection->writeExactSizeDataSimple(message->getData(), message->getSize()); } if(res != message->getSize()) { return false; @@ -344,9 +343,9 @@ void WebSocket::sendClose(v_word16 code, const oatpp::String& message) const { code = htons(code); oatpp::data::stream::ChunkedBuffer buffer; - buffer.write(&code, 2); + buffer.writeSimple(&code, 2); if(message) { - buffer.write(message->getData(), message->getSize()); + buffer.writeSimple(message->getData(), message->getSize()); } if(!sendOneFrame(true, Frame::OPCODE_CLOSE, buffer.toString())) { diff --git a/src/oatpp-websocket/WebSocket.hpp b/src/oatpp-websocket/WebSocket.hpp index f8211ec..f6cb695 100644 --- a/src/oatpp-websocket/WebSocket.hpp +++ b/src/oatpp-websocket/WebSocket.hpp @@ -85,7 +85,7 @@ class WebSocket : public oatpp::base::Countable { * @param data - pointer to message data. * @param size - data size. */ - virtual void readMessage(const WebSocket& socket, v_word8 opcode, p_char8 data, data::v_io_size size) = 0; + virtual void readMessage(const WebSocket& socket, v_word8 opcode, p_char8 data, oatpp::v_io_size size) = 0; }; diff --git a/test/oatpp-websocket/FullAsyncTest.cpp b/test/oatpp-websocket/FullAsyncTest.cpp index 8b4bc9c..966f290 100644 --- a/test/oatpp-websocket/FullAsyncTest.cpp +++ b/test/oatpp-websocket/FullAsyncTest.cpp @@ -32,11 +32,15 @@ namespace { class TestComponent { public: - OATPP_CREATE_COMPONENT(std::shared_ptr, serverExecutor)("ws-server-exec", [] { + OATPP_CREATE_COMPONENT(std::shared_ptr, httpServerExecutor)("http-server-exec", [] { return std::make_shared(4, 2); }()); - OATPP_CREATE_COMPONENT(std::shared_ptr, clientExecutor)("ws-client-exec", [] { + OATPP_CREATE_COMPONENT(std::shared_ptr, wsServerExecutor)("ws-server-exec", [] { + return std::make_shared(4, 2); + }()); + + OATPP_CREATE_COMPONENT(std::shared_ptr, wsClientExecutor)("ws-client-exec", [] { return std::make_shared(4, 2); }()); @@ -65,8 +69,8 @@ class TestComponent { */ OATPP_CREATE_COMPONENT(std::shared_ptr, serverConnectionHandler)([] { OATPP_COMPONENT(std::shared_ptr, router); // get Router component - //OATPP_COMPONENT(std::shared_ptr, executor); - return oatpp::web::server::AsyncHttpConnectionHandler::createShared(router, 2); + OATPP_COMPONENT(std::shared_ptr, executor, "http-server-exec"); + return oatpp::web::server::AsyncHttpConnectionHandler::createShared(router, executor); }()); /** @@ -130,7 +134,7 @@ class ClientSocketListener : public oatpp::websocket::AsyncWebSocket::Listener{ return nullptr; } - CoroutineStarter readMessage(const std::shared_ptr& socket, v_word8 opcode, p_char8 data, oatpp::data::v_io_size size) override { + CoroutineStarter readMessage(const std::shared_ptr& socket, v_word8 opcode, p_char8 data, oatpp::v_io_size size) override { if(size == 0) { m_messageCounter ++; auto wholeMessage = m_messageBuffer.toString(); @@ -141,7 +145,7 @@ class ClientSocketListener : public oatpp::websocket::AsyncWebSocket::Listener{ m_lastTick = tick; } } else if(size > 0) { - m_messageBuffer.write(data, size); + m_messageBuffer.writeSimple(data, size); } return nullptr; } @@ -225,8 +229,9 @@ void FullAsyncTest::onRun() { TestComponent component; - OATPP_COMPONENT(std::shared_ptr, serverExecutor, "ws-server-exec"); - OATPP_COMPONENT(std::shared_ptr, clientExecutor, "ws-client-exec"); + OATPP_COMPONENT(std::shared_ptr, httpServerExecutor, "http-server-exec"); + OATPP_COMPONENT(std::shared_ptr, wsServerExecutor, "ws-server-exec"); + OATPP_COMPONENT(std::shared_ptr, wsClientExecutor, "ws-client-exec"); oatpp::test::web::ClientServerTestRunner runner; @@ -239,7 +244,7 @@ void FullAsyncTest::onRun() { ///////////////////////////////////////////////////////////////////////////////////// // Create clients - v_int32 clients = 10000; + v_int32 clients = 1000; v_int32 messagesPerClient = 100; for(v_int32 i = 0; i < clients; i ++) { @@ -262,11 +267,17 @@ void FullAsyncTest::onRun() { }, std::chrono::minutes(10)); - serverExecutor->stop(); - clientExecutor->stop(); + httpServerExecutor->waitTasksFinished(); + wsServerExecutor->waitTasksFinished(); + wsClientExecutor->waitTasksFinished(); + + httpServerExecutor->stop(); + wsServerExecutor->stop(); + wsClientExecutor->stop(); - serverExecutor->join(); - clientExecutor->join(); + httpServerExecutor->join(); + wsServerExecutor->join(); + wsClientExecutor->join(); } diff --git a/test/oatpp-websocket/FullTest.cpp b/test/oatpp-websocket/FullTest.cpp index 7e41479..bf0e2bb 100644 --- a/test/oatpp-websocket/FullTest.cpp +++ b/test/oatpp-websocket/FullTest.cpp @@ -122,14 +122,14 @@ class ClientWebSocketListener : public oatpp::websocket::WebSocket::Listener { * When all data of message is read, readMessage is called again with size == 0 to * indicate end of the message */ - void readMessage(const WebSocket &socket, v_word8 opcode, p_char8 data, oatpp::data::v_io_size size) override { + void readMessage(const WebSocket &socket, v_word8 opcode, p_char8 data, oatpp::v_io_size size) override { if (size == 0) { auto wholeMessage = m_messageBuffer.toString(); OATPP_LOGD(TAG, "Message='%s'", wholeMessage->c_str()); socket.sendOneFrameText("Hello from oatpp! Your message was: " + wholeMessage); m_messageBuffer.clear(); } else if (size > 0) { - m_messageBuffer.write(data, size); + m_messageBuffer.writeSimple(data, size); } }; diff --git a/test/oatpp-websocket/app/AsyncWebSocketListener.hpp b/test/oatpp-websocket/app/AsyncWebSocketListener.hpp index 49adf81..a4ca222 100644 --- a/test/oatpp-websocket/app/AsyncWebSocketListener.hpp +++ b/test/oatpp-websocket/app/AsyncWebSocketListener.hpp @@ -27,13 +27,13 @@ class AsyncWebSocketListener : public oatpp::websocket::AsyncWebSocket::Listener return socket->sendCloseAsync(); } - CoroutineStarter readMessage(const std::shared_ptr& socket, v_word8 opcode, p_char8 data, oatpp::data::v_io_size size) override { + CoroutineStarter readMessage(const std::shared_ptr& socket, v_word8 opcode, p_char8 data, oatpp::v_io_size size) override { if(size == 0) { oatpp::String wholeMessage = m_messageBuffer.toString(); m_messageBuffer.clear(); return socket->sendOneFrameTextAsync("Hello from oatpp!: " + wholeMessage); } else if(size > 0) { - m_messageBuffer.write(data, size); + m_messageBuffer.writeSimple(data, size); } return nullptr; } diff --git a/test/oatpp-websocket/app/WebSocketListener.hpp b/test/oatpp-websocket/app/WebSocketListener.hpp index 524a0cd..20169de 100644 --- a/test/oatpp-websocket/app/WebSocketListener.hpp +++ b/test/oatpp-websocket/app/WebSocketListener.hpp @@ -38,14 +38,14 @@ class WebSocketListener : public oatpp::websocket::WebSocket::Listener { * When all data of message is read, readMessage is called again with size == 0 to * indicate end of the message */ - void readMessage(const WebSocket &socket, v_word8 opcode, p_char8 data, oatpp::data::v_io_size size) override { + void readMessage(const WebSocket &socket, v_word8 opcode, p_char8 data, oatpp::v_io_size size) override { if (size == 0) { auto wholeMessage = m_messageBuffer.toString(); OATPP_LOGD(TAG, "Message='%s'", wholeMessage->c_str()); socket.sendOneFrameText("Hello from oatpp! Your message was: " + wholeMessage); m_messageBuffer.clear(); } else if (size > 0) { - m_messageBuffer.write(data, size); + m_messageBuffer.writeSimple(data, size); } };