Skip to content

Commit

Permalink
update to the latest oatpp API.
Browse files Browse the repository at this point in the history
  • Loading branch information
lganzzzo committed Nov 16, 2021
1 parent cec7bee commit 62cc5df
Show file tree
Hide file tree
Showing 10 changed files with 56 additions and 53 deletions.
23 changes: 11 additions & 12 deletions src/oatpp-websocket/AsyncWebSocket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -217,15 +217,15 @@ oatpp::async::CoroutineStarter AsyncWebSocket::writeFrameHeaderAsync(const std::
}

oatpp::async::CoroutineStarter AsyncWebSocket::readPayloadAsync(const std::shared_ptr<Frame::Header>& frameHeader,
const std::shared_ptr<oatpp::data::stream::ChunkedBuffer>& shortMessageStream)
const std::shared_ptr<oatpp::data::stream::BufferOutputStream>& shortMessageStream)
{

class ReadPayloadCoroutine : public oatpp::async::Coroutine<ReadPayloadCoroutine> {
private:
std::shared_ptr<AsyncWebSocket> m_socket;
provider::ResourceHandle<oatpp::data::stream::IOStream> m_connection;
std::shared_ptr<Frame::Header> m_frameHeader;
std::shared_ptr<oatpp::data::stream::ChunkedBuffer> m_shortMessageStream;
std::shared_ptr<oatpp::data::stream::BufferOutputStream> m_shortMessageStream;
std::shared_ptr<Listener> m_listener;
private:
p_char8 m_buffer;
Expand All @@ -236,7 +236,7 @@ oatpp::async::CoroutineStarter AsyncWebSocket::readPayloadAsync(const std::share
ReadPayloadCoroutine(const std::shared_ptr<AsyncWebSocket>& socket,
const provider::ResourceHandle<oatpp::data::stream::IOStream>& connection,
const std::shared_ptr<Frame::Header>& frameHeader,
const std::shared_ptr<oatpp::data::stream::ChunkedBuffer>& shortMessageStream,
const std::shared_ptr<oatpp::data::stream::BufferOutputStream>& shortMessageStream,
const std::shared_ptr<Listener>& listener)
: m_socket(socket)
, m_connection(connection)
Expand Down Expand Up @@ -340,7 +340,7 @@ oatpp::async::CoroutineStarter AsyncWebSocket::handleFrameAsync(const std::share
std::shared_ptr<Frame::Header> m_frameHeader;
std::shared_ptr<Listener> m_listener;
private:
std::shared_ptr<oatpp::data::stream::ChunkedBuffer> m_shortMessageStream;
std::shared_ptr<oatpp::data::stream::BufferOutputStream> m_shortMessageStream;
public:
HandleFrameCoroutine(const std::shared_ptr<AsyncWebSocket>& socket,
const std::shared_ptr<Frame::Header>& frameHeader)
Expand Down Expand Up @@ -373,15 +373,15 @@ oatpp::async::CoroutineStarter AsyncWebSocket::handleFrameAsync(const std::share
}

case Frame::OPCODE_CLOSE:
m_shortMessageStream = oatpp::data::stream::ChunkedBuffer::createShared();
m_shortMessageStream = std::make_shared<data::stream::BufferOutputStream>();
return m_socket->readPayloadAsync(m_frameHeader, m_shortMessageStream).next(yieldTo(&HandleFrameCoroutine::onClose));

case Frame::OPCODE_PING:
m_shortMessageStream = oatpp::data::stream::ChunkedBuffer::createShared();
m_shortMessageStream = std::make_shared<data::stream::BufferOutputStream>();
return m_socket->readPayloadAsync(m_frameHeader, m_shortMessageStream).next(yieldTo(&HandleFrameCoroutine::onPing));

case Frame::OPCODE_PONG:
m_shortMessageStream = oatpp::data::stream::ChunkedBuffer::createShared();
m_shortMessageStream = std::make_shared<data::stream::BufferOutputStream>();
return m_socket->readPayloadAsync(m_frameHeader, m_shortMessageStream).next(yieldTo(&HandleFrameCoroutine::onPong));

default:
Expand All @@ -395,10 +395,9 @@ oatpp::async::CoroutineStarter AsyncWebSocket::handleFrameAsync(const std::share
if(m_listener) {
v_uint16 code = 0;
oatpp::String message;
if(m_shortMessageStream->getSize() >= 2) {
m_shortMessageStream->readSubstring(&code, 0, 2);
code = ntohs(code);
message = m_shortMessageStream->getSubstring(2, m_shortMessageStream->getSize() - 2);
if(m_shortMessageStream->getCurrentPosition() >= 2) {
code = ntohs(*((p_uint16) m_shortMessageStream->getData()));
message = m_shortMessageStream->getSubstring(2, m_shortMessageStream->getCurrentPosition() - 2);
}
if(!message) {
message = "";
Expand Down Expand Up @@ -543,7 +542,7 @@ oatpp::async::CoroutineStarter AsyncWebSocket::sendCloseAsync(v_uint16 code, con

code = htons(code);

oatpp::data::stream::ChunkedBuffer buffer;
oatpp::data::stream::BufferOutputStream buffer;
buffer.writeSimple(&code, 2);
if(message) {
buffer.writeSimple(message->data(), message->size());
Expand Down
4 changes: 2 additions & 2 deletions src/oatpp-websocket/AsyncWebSocket.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@
#include "./Frame.hpp"
#include "./Config.hpp"

#include "oatpp/core/data/stream/BufferStream.hpp"
#include "oatpp/core/provider/Provider.hpp"
#include "oatpp/core/data/stream/ChunkedBuffer.hpp"
#include "oatpp/core/async/Coroutine.hpp"

namespace oatpp { namespace websocket {
Expand Down Expand Up @@ -130,7 +130,7 @@ class AsyncWebSocket : public oatpp::base::Countable, public std::enable_shared_
* if(shortMessageStream) - read message to shortMessageStream. Don't call listener
*/
CoroutineStarter readPayloadAsync(const std::shared_ptr<Frame::Header>& frameHeader,
const std::shared_ptr<oatpp::data::stream::ChunkedBuffer>& shortMessageStream);
const std::shared_ptr<oatpp::data::stream::BufferOutputStream>& shortMessageStream);

CoroutineStarter handleFrameAsync(const std::shared_ptr<Frame::Header>& frameHeader);

Expand Down
33 changes: 19 additions & 14 deletions src/oatpp-websocket/SHA1.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@

namespace oatpp { namespace websocket {

void SHA1::reset(uint32_t digest[], ChunkedBuffer& buffer, uint64_t &transforms) {
void SHA1::reset(uint32_t digest[], BufferOutputStream& buffer, uint64_t &transforms) {

/* SHA1 initialization constants */
digest[0] = 0x67452301;
Expand All @@ -44,7 +44,7 @@ void SHA1::reset(uint32_t digest[], ChunkedBuffer& buffer, uint64_t &transforms)
digest[4] = 0xc3d2e1f0;

/* Reset counters */
buffer.clear();
buffer.setCurrentPosition(0);
transforms = 0;

}
Expand Down Expand Up @@ -204,7 +204,7 @@ void SHA1::transform(uint32_t digest[], uint32_t block[BLOCK_INTS], uint64_t &tr
}


void SHA1::buffer_to_block(ChunkedBuffer& buffer, uint32_t block[BLOCK_INTS]) {
void SHA1::buffer_to_block(BufferOutputStream& buffer, uint32_t block[BLOCK_INTS]) {
/* Convert the std::string (byte buffer) to a uint32_t array (MSB) */
/*
for (size_t i = 0; i < BLOCK_INTS; i++) {
Expand All @@ -214,7 +214,12 @@ void SHA1::buffer_to_block(ChunkedBuffer& buffer, uint32_t block[BLOCK_INTS]) {
| (buffer[4 * i + 0] & 0xff)<<24;
}
*/
buffer.readSubstring(block, 0, BLOCK_INTS * 4);

v_int64 readSize = BLOCK_INTS * 4;
if(readSize > buffer.getCurrentPosition()) {
readSize = buffer.getCurrentPosition();
}
memcpy(block, buffer.getData(), readSize);
for(v_int32 i = 0; i < BLOCK_INTS; i ++) {
block[i] = ntohl(block[i]);
}
Expand All @@ -230,22 +235,22 @@ void SHA1::update(const oatpp::String &s) {
oatpp::v_io_size progress = 0;
while (true) {

auto readSize = BLOCK_BYTES - buffer.getSize();
auto readSize = BLOCK_BYTES - buffer.getCurrentPosition();
if(readSize > s->size() - progress) {
readSize = s->size() - progress;
}

buffer.writeSimple(&s->data()[progress], readSize);
progress += readSize;

if (buffer.getSize() != BLOCK_BYTES) {
if (buffer.getCurrentPosition() != BLOCK_BYTES) {
return;
}

uint32_t block[BLOCK_INTS];
buffer_to_block(buffer, block);
transform(digest, block, transforms);
buffer.clear();
buffer.setCurrentPosition(0);

}
}
Expand All @@ -256,17 +261,17 @@ void SHA1::update(std::istream &is) {

char sbuf[BLOCK_BYTES];

is.read(sbuf, (int)(BLOCK_BYTES - buffer.getSize()));
is.read(sbuf, (int)(BLOCK_BYTES - buffer.getCurrentPosition()));
buffer.writeSimple(sbuf, (std::size_t)is.gcount());

if (buffer.getSize() != BLOCK_BYTES) {
if (buffer.getCurrentPosition() != BLOCK_BYTES) {
return;
}

uint32_t block[BLOCK_INTS];
buffer_to_block(buffer, block);
transform(digest, block, transforms);
buffer.clear();
buffer.setCurrentPosition(0);

}
}
Expand All @@ -278,12 +283,12 @@ void SHA1::update(std::istream &is) {

oatpp::String SHA1::finalBinary() {
/* Total number of hashed bits */
uint64_t total_bits = (transforms * BLOCK_BYTES + buffer.getSize()) * 8;
uint64_t total_bits = (transforms * BLOCK_BYTES + buffer.getCurrentPosition()) * 8;

/* Padding */
buffer.writeCharSimple(0x80);
size_t orig_size = (size_t)buffer.getSize();
while (buffer.getSize() < BLOCK_BYTES) {
size_t orig_size = (size_t)buffer.getCurrentPosition();
while (buffer.getCurrentPosition() < BLOCK_BYTES) {
buffer.writeCharSimple(0x00);
}

Expand All @@ -303,7 +308,7 @@ oatpp::String SHA1::finalBinary() {
block[BLOCK_INTS - 2] = (uint32_t)(total_bits >> 32);
transform(digest, block, transforms);

oatpp::data::stream::ChunkedBuffer resultStream;
oatpp::data::stream::BufferOutputStream resultStream;
for (size_t i = 0; i < sizeof(digest) / sizeof(digest[0]); i++) {
uint32_t b = htonl(digest[i]);
resultStream.writeSimple(&b, 4);
Expand Down
10 changes: 5 additions & 5 deletions src/oatpp-websocket/SHA1.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
#ifndef oatpp_websocket_SHA1_HPP
#define oatpp_websocket_SHA1_HPP

#include "oatpp/core/data/stream/ChunkedBuffer.hpp"
#include "oatpp/core/data/stream/BufferStream.hpp"
#include "oatpp/core/Types.hpp"

#include <cstdint>
Expand All @@ -37,12 +37,12 @@ namespace oatpp { namespace websocket {
*/
class SHA1 {
private:
typedef oatpp::data::stream::ChunkedBuffer ChunkedBuffer;
typedef oatpp::data::stream::BufferOutputStream BufferOutputStream;
private:
static constexpr size_t BLOCK_INTS = 16; /* number of 32bit integers per SHA1 block */
static constexpr size_t BLOCK_BYTES = BLOCK_INTS * 4;
private:
static void reset(uint32_t digest[], ChunkedBuffer& buffer, uint64_t &transforms);
static void reset(uint32_t digest[], BufferOutputStream& buffer, uint64_t &transforms);
static uint32_t rol(const uint32_t value, const size_t bits);
static uint32_t blk(const uint32_t block[BLOCK_INTS], const size_t i);
static void R0(const uint32_t block[BLOCK_INTS], const uint32_t v, uint32_t &w, const uint32_t x, const uint32_t y, uint32_t &z, const size_t i);
Expand All @@ -51,10 +51,10 @@ class SHA1 {
static void R3(uint32_t block[BLOCK_INTS], const uint32_t v, uint32_t &w, const uint32_t x, const uint32_t y, uint32_t &z, const size_t i);
static void R4(uint32_t block[BLOCK_INTS], const uint32_t v, uint32_t &w, const uint32_t x, const uint32_t y, uint32_t &z, const size_t i);
static void transform(uint32_t digest[], uint32_t block[BLOCK_INTS], uint64_t &transforms);
static void buffer_to_block(ChunkedBuffer& buffer, uint32_t block[BLOCK_INTS]);
static void buffer_to_block(BufferOutputStream& buffer, uint32_t block[BLOCK_INTS]);
private:
uint32_t digest[5];
oatpp::data::stream::ChunkedBuffer buffer;
oatpp::data::stream::BufferOutputStream buffer;
uint64_t transforms;
public:

Expand Down
17 changes: 8 additions & 9 deletions src/oatpp-websocket/WebSocket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ void WebSocket::writeFrameHeader(const Frame::Header& frameHeader) const {

}

void WebSocket::readPayload(const Frame::Header& frameHeader, oatpp::data::stream::ChunkedBuffer* shortMessageStream) const {
void WebSocket::readPayload(const Frame::Header& frameHeader, data::stream::BufferOutputStream* shortMessageStream) const {

if(shortMessageStream && frameHeader.payloadLength > 125) {
throw std::runtime_error("[oatpp::web::protocol::websocket::WebSocket::readPayload()]: Invalid payloadLength. See RFC-6455, section-5.5.");
Expand Down Expand Up @@ -227,15 +227,14 @@ void WebSocket::handleFrame(const Frame::Header& frameHeader) {

case Frame::OPCODE_CLOSE:
{
oatpp::data::stream::ChunkedBuffer messageStream;
oatpp::data::stream::BufferOutputStream messageStream;
readPayload(frameHeader, &messageStream);
if(m_listener) {
v_uint16 code = 0;
oatpp::String message;
if(messageStream.getSize() >= 2) {
messageStream.readSubstring(&code, 0, 2);
code = ntohs(code);
message = messageStream.getSubstring(2, messageStream.getSize() - 2);
if(messageStream.getCurrentPosition() >= 2) {
code = ntohs(*((p_uint16) messageStream.getData()));
message = messageStream.getSubstring(2, messageStream.getCurrentPosition() - 2);
}
if(!message) {
message = "";
Expand All @@ -247,7 +246,7 @@ void WebSocket::handleFrame(const Frame::Header& frameHeader) {

case Frame::OPCODE_PING:
{
oatpp::data::stream::ChunkedBuffer messageStream;
oatpp::data::stream::BufferOutputStream messageStream;
readPayload(frameHeader, &messageStream);
if(m_listener) {
m_listener->onPing(*this, messageStream.toString());
Expand All @@ -257,7 +256,7 @@ void WebSocket::handleFrame(const Frame::Header& frameHeader) {

case Frame::OPCODE_PONG:
{
oatpp::data::stream::ChunkedBuffer messageStream;
oatpp::data::stream::BufferOutputStream messageStream;
readPayload(frameHeader, &messageStream);
if(m_listener) {
m_listener->onPong(*this, messageStream.toString());
Expand Down Expand Up @@ -342,7 +341,7 @@ void WebSocket::sendClose(v_uint16 code, const oatpp::String& message) const {

code = htons(code);

oatpp::data::stream::ChunkedBuffer buffer;
oatpp::data::stream::BufferOutputStream buffer;
buffer.writeSimple(&code, 2);
if(message) {
buffer.writeSimple(message->data(), message->size());
Expand Down
4 changes: 2 additions & 2 deletions src/oatpp-websocket/WebSocket.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@
#include "./Frame.hpp"
#include "./Config.hpp"

#include "oatpp/core/data/stream/BufferStream.hpp"
#include "oatpp/core/provider/Provider.hpp"
#include "oatpp/core/data/stream/ChunkedBuffer.hpp"

namespace oatpp { namespace websocket {

Expand Down Expand Up @@ -99,7 +99,7 @@ class WebSocket : public oatpp::base::Countable {
* if(shortMessageStream == nullptr) - read call readMessage() method of listener
* if(shortMessageStream) - read message to shortMessageStream. Don't call listener
*/
void readPayload(const Frame::Header& frameHeader, oatpp::data::stream::ChunkedBuffer* shortMessageStream) const;
void readPayload(const Frame::Header& frameHeader, oatpp::data::stream::BufferOutputStream* shortMessageStream) const;

void handleFrame(const Frame::Header& frameHeader);

Expand Down
4 changes: 2 additions & 2 deletions test/oatpp-websocket/FullAsyncTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ class ClientSocketListener : public oatpp::websocket::AsyncWebSocket::Listener{
v_int64 m_lastTick = 0;
v_int32 m_messageCounter = 0;
bool m_printLog;
oatpp::data::stream::ChunkedBuffer m_messageBuffer;
oatpp::data::stream::BufferOutputStream m_messageBuffer;
public:

ClientSocketListener(bool printLog)
Expand All @@ -138,7 +138,7 @@ class ClientSocketListener : public oatpp::websocket::AsyncWebSocket::Listener{
if(size == 0) {
m_messageCounter ++;
auto wholeMessage = m_messageBuffer.toString();
m_messageBuffer.clear();
m_messageBuffer.setCurrentPosition(0);
if(m_printLog) {
auto tick = oatpp::base::Environment::getMicroTickCount();
OATPP_LOGD("client", "sid=%d, received %s, latency=%d, messageCount=%d", socket.get(), wholeMessage->c_str(), tick - m_lastTick, m_messageCounter);
Expand Down
4 changes: 2 additions & 2 deletions test/oatpp-websocket/FullTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ class ClientWebSocketListener : public oatpp::websocket::WebSocket::Listener {
private:
static constexpr const char *const TAG = "ClientWebSocketListener";
private:
oatpp::data::stream::ChunkedBuffer m_messageBuffer;
oatpp::data::stream::BufferOutputStream m_messageBuffer;
public:

void onPing(const WebSocket &socket, const oatpp::String &message) override {
Expand All @@ -127,7 +127,7 @@ class ClientWebSocketListener : public oatpp::websocket::WebSocket::Listener {
auto wholeMessage = m_messageBuffer.toString();
OATPP_LOGD(TAG, "Message='%s'", wholeMessage->c_str());
socket.sendOneFrameText("Hello from oatpp! Your message was: " + wholeMessage);
m_messageBuffer.clear();
m_messageBuffer.setCurrentPosition(0);
} else if (size > 0) {
m_messageBuffer.writeSimple(data, size);
}
Expand Down
4 changes: 2 additions & 2 deletions test/oatpp-websocket/app/AsyncWebSocketListener.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ namespace oatpp { namespace test { namespace websocket { namespace app {

class AsyncWebSocketListener : public oatpp::websocket::AsyncWebSocket::Listener {
private:
oatpp::data::stream::ChunkedBuffer m_messageBuffer;
oatpp::data::stream::BufferOutputStream m_messageBuffer;
public:

CoroutineStarter onPing(const std::shared_ptr<AsyncWebSocket>& socket, const oatpp::String& message) override {
Expand All @@ -30,7 +30,7 @@ class AsyncWebSocketListener : public oatpp::websocket::AsyncWebSocket::Listener
CoroutineStarter readMessage(const std::shared_ptr<AsyncWebSocket>& socket, v_uint8 opcode, p_char8 data, oatpp::v_io_size size) override {
if(size == 0) {
oatpp::String wholeMessage = m_messageBuffer.toString();
m_messageBuffer.clear();
m_messageBuffer.setCurrentPosition(0);
return socket->sendOneFrameTextAsync("Hello from oatpp!: " + wholeMessage);
} else if(size > 0) {
m_messageBuffer.writeSimple(data, size);
Expand Down
Loading

0 comments on commit 62cc5df

Please sign in to comment.