Skip to content

Commit

Permalink
liburing
Browse files Browse the repository at this point in the history
  • Loading branch information
jmillan committed Nov 14, 2023
1 parent e441bb2 commit 3858182
Show file tree
Hide file tree
Showing 10 changed files with 453 additions and 1 deletion.
75 changes: 75 additions & 0 deletions worker/include/DepLibUring.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
#ifndef MS_DEP_LIBURING_HPP
#define MS_DEP_LIBURING_HPP

#include "DepLibUV.hpp"
#include <functional>
#include <liburing.h>
#include <queue>

class DepLibUring
{
public:
using onSendCallback = const std::function<void(bool sent)>;

/* Struct for the user data field of SQE and CQE. */
struct UserData
{
uint8_t store[1500]{};
onSendCallback* cb{ nullptr };
size_t idx{ 0 };
};

/* Number of submission queue entries (SQE). */
static constexpr size_t QueueDepth{ 1024 * 4 };

static void ClassInit();
static void ClassDestroy();

thread_local static DepLibUring* liburing;

public:
DepLibUring();
~DepLibUring();
bool PrepareSend(int sockfd, const void* data, size_t len, const struct sockaddr* addr, onSendCallback* cb);
bool PrepareWrite(int sockfd, const void* data1, size_t len1, const void* data2, size_t len2, onSendCallback* cb);
void Submit();
void SetActive()
{
this->active = true;
}
bool IsActive() const
{
return this->active;
}
io_uring* GetRing()
{
return &this->ring;
}
int GetEventFd() const
{
return this->efd;
}
void ReleaseUserDataEntry(size_t idx)
{
this->availableUserDataEntries.push(idx);
}

private:
UserData* GetUserData();

private:
// io_uring instance.
io_uring ring;
// Event file descriptor to watch for completions.
int efd;
// libuv handle used to poll uring completions.
uv_poll_t* uvHandle{ nullptr };
// Whether we are currently sending RTP over io_uring.
bool active{ false };
// Pre-allocated UserData entries.
UserData userDataBuffer[QueueDepth]{};
// Indexes of available UserData entries.
std::queue<size_t> availableUserDataEntries;
};

#endif
1 change: 1 addition & 0 deletions worker/include/handles/TcpConnectionHandle.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ class TcpConnectionHandle
uv_tcp_t* uvHandle{ nullptr };
// Others.
struct sockaddr_storage* localAddr{ nullptr };
uv_os_fd_t fd{ 0u };
bool closed{ false };
size_t recvBytes{ 0u };
size_t sentBytes{ 0u };
Expand Down
1 change: 1 addition & 0 deletions worker/include/handles/UdpSocketHandle.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ class UdpSocketHandle
// Allocated by this (may be passed by argument).
uv_udp_t* uvHandle{ nullptr };
// Others.
uv_os_fd_t fd{ 0u };
bool closed{ false };
size_t recvBytes{ 0u };
size_t sentBytes{ 0u };
Expand Down
20 changes: 20 additions & 0 deletions worker/meson.build
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,26 @@ if host_machine.system() == 'windows'
]
endif

if host_machine.system() == 'linux'
kernel_version = run_command('uname', '-r').stdout().strip()

# Enable liburing for kernel versions greather than or equal to 6.
if kernel_version[0].to_int() >= 6
liburing_proj = subproject('liburing', default_options: ['default_library=static'], required: true)
liburing = liburing_proj.get_variable('uring')

dependencies += [liburing]

common_sources += [
'src/DepLibUring.cpp',
]

cpp_args += [
'-DMS_LIBURING_ENABLED',
]
endif
endif

libmediasoup_worker = library(
'libmediasoup-worker',
name_prefix: '',
Expand Down
264 changes: 264 additions & 0 deletions worker/src/DepLibUring.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,264 @@
#define MS_CLASS "DepLibUring"

// #define MS_LOG_DEV_LEVEL 3

#include "DepLibUring.hpp"
#include "Logger.hpp"
#include "MediaSoupErrors.hpp"
#include <sys/eventfd.h>

/* Static variables. */

/* liburing instance per thread. */
thread_local DepLibUring* DepLibUring::liburing{ nullptr };
/* Completion queue entry array used to retrieve processes tasks. */
thread_local struct io_uring_cqe* cqes[DepLibUring::QueueDepth];

/* Static method for UV callback. */

inline static void onFdEvent(uv_poll_t* handle, int status, int events)
{
auto* liburing = static_cast<DepLibUring*>(handle->data);

auto count = io_uring_peek_batch_cqe(liburing->GetRing(), cqes, DepLibUring::QueueDepth);

// libuv uses level triggering, so we need to read from the socket to reset
// the counter in order to avoid libuv calling this callback indefinitely.
eventfd_t v;
int error = eventfd_read(liburing->GetEventFd(), &v);
if (error < 0)
{
MS_ERROR("eventfd_read() failed: %s", std::strerror(-error));
};

for (unsigned int i = 0; i < count; ++i)
{
struct io_uring_cqe* cqe = cqes[i];
auto* userData = static_cast<DepLibUring::UserData*>(io_uring_cqe_get_data(cqe));

if (cqe->res < 0)
{
MS_ERROR("sending failed: %s", std::strerror(-cqe->res));

if (userData->cb)
{
(*userData->cb)(false);
delete userData->cb;
}
}
else
{
if (userData->cb)
{
(*userData->cb)(true);
delete userData->cb;
}
}

io_uring_cqe_seen(liburing->GetRing(), cqe);
liburing->ReleaseUserDataEntry(userData->idx);
}
}

void DepLibUring::ClassInit()
{
const auto mayor = io_uring_major_version();
const auto minor = io_uring_minor_version();

MS_DEBUG_TAG(info, "liburing version: \"%i.%i\"", mayor, minor);

DepLibUring::liburing = new DepLibUring();
}

void DepLibUring::ClassDestroy()
{
MS_TRACE();

delete DepLibUring::liburing;
}

DepLibUring::DepLibUring()
{
MS_TRACE();

auto error = io_uring_queue_init(DepLibUring::QueueDepth, &this->ring, 0);

if (error < 0)
{
MS_THROW_ERROR("io_uring_queue_init() failed: %s", std::strerror(-error));
}

/* Create an eventfd instance */
this->efd = eventfd(0, 0);

if (this->efd < 0)
{
MS_THROW_ERROR("eventfd() failed: %s", std::strerror(-efd));
}

error = io_uring_register_eventfd(&this->ring, this->efd);

if (error < 0)
{
MS_THROW_ERROR("io_uring_register_eventfd() failed: %s", std::strerror(-error));
}

// Watch the event file descriptor.
this->uvHandle = new uv_poll_t;

error = uv_poll_init(DepLibUV::GetLoop(), uvHandle, efd);

if (error != 0)
{
delete uvHandle;

MS_THROW_ERROR("uv_poll_init() failed: %s", uv_strerror(error));
}

uvHandle->data = this;

error = uv_poll_start(uvHandle, UV_READABLE, static_cast<uv_poll_cb>(onFdEvent));

if (error != 0)
{
MS_THROW_ERROR("uv_poll_start() failed: %s", uv_strerror(error));
}

// Initialize available UserData entries.
for (size_t i = 0; i < DepLibUring::QueueDepth; ++i)
{
this->availableUserDataEntries.push(i);
}
}

DepLibUring::~DepLibUring()
{
MS_TRACE();

// Stop polling the event file descriptor.
uv_poll_stop(this->uvHandle);

delete this->uvHandle;

// Close the event file descriptor.
close(this->efd);

// Close the ring.
io_uring_queue_exit(&this->ring);
}

bool DepLibUring::PrepareSend(
int sockfd, const void* data, size_t len, const struct sockaddr* addr, onSendCallback* cb)
{
MS_TRACE();

auto* userData = this->GetUserData();

if (!userData)
{
MS_WARN_DEV("no user data entry available");

return false;
}

auto* sqe = io_uring_get_sqe(&this->ring);

if (!sqe)
{
MS_WARN_DEV("no sqe available");

return false;
}

std::memcpy(userData->store, data, len);
userData->cb = cb;

io_uring_sqe_set_data(sqe, userData);

socklen_t addrlen = 0;

if (addr->sa_family == AF_INET)
{
addrlen = sizeof(struct sockaddr_in);
}
else if (addr->sa_family == AF_INET6)
{
addrlen = sizeof(struct sockaddr_in6);
}

io_uring_prep_sendto(sqe, sockfd, userData->store, len, 0, addr, addrlen);

return true;
}

bool DepLibUring::PrepareWrite(int sockfd, const void* data1, size_t len1, const void* data2, size_t len2, onSendCallback* cb)
{
MS_TRACE();

auto* userData = this->GetUserData();

if (!userData)
{
MS_WARN_DEV("no user data entry available");

return false;
}

auto* sqe = io_uring_get_sqe(&this->ring);

if (!sqe)
{
MS_WARN_DEV("no sqe available");

return false;
}

std::memcpy(userData->store, data1, len1);
std::memcpy(userData->store + len1, data2, len2);
userData->cb = cb;

io_uring_sqe_set_data(sqe, userData);
io_uring_prep_write(sqe, sockfd, userData->store, len1+len2, 0);

return true;
}

void DepLibUring::Submit()
{
MS_TRACE();

// Unset active flag.
this->active = false;

auto error = io_uring_submit(&this->ring);

if (error >= 0)
{
MS_DEBUG_DEV("%i submission queue entries submitted", ret);
}
else
{
MS_ERROR("io_uring_submit() failed: %s", std::strerror(-error));
}
}

DepLibUring::UserData* DepLibUring::GetUserData()
{
MS_TRACE();

if (this->availableUserDataEntries.empty())
{
MS_WARN_DEV("no user data entry available");

return nullptr;
}

auto idx = this->availableUserDataEntries.front();

this->availableUserDataEntries.pop();

auto* userData = &this->userDataBuffer[idx];
userData->idx = idx;

return userData;
}
Loading

0 comments on commit 3858182

Please sign in to comment.