Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement #214 JUICE_CONCURRENCY_MODE_USER #226

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ set(LIBJUICE_SOURCES
${CMAKE_CURRENT_SOURCE_DIR}/src/conn_poll.c
${CMAKE_CURRENT_SOURCE_DIR}/src/conn_thread.c
${CMAKE_CURRENT_SOURCE_DIR}/src/conn_mux.c
${CMAKE_CURRENT_SOURCE_DIR}/src/conn_user.c
${CMAKE_CURRENT_SOURCE_DIR}/src/base64.c
${CMAKE_CURRENT_SOURCE_DIR}/src/hash.c
${CMAKE_CURRENT_SOURCE_DIR}/src/hmac.c
Expand Down Expand Up @@ -74,6 +75,7 @@ set(TESTS_SOURCES
${CMAKE_CURRENT_SOURCE_DIR}/test/turn.c
${CMAKE_CURRENT_SOURCE_DIR}/test/thread.c
${CMAKE_CURRENT_SOURCE_DIR}/test/mux.c
${CMAKE_CURRENT_SOURCE_DIR}/test/user.c
${CMAKE_CURRENT_SOURCE_DIR}/test/notrickle.c
${CMAKE_CURRENT_SOURCE_DIR}/test/server.c
${CMAKE_CURRENT_SOURCE_DIR}/test/conflict.c
Expand Down
23 changes: 23 additions & 0 deletions include/juice/juice.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,13 @@ typedef enum juice_concurrency_mode {
JUICE_CONCURRENCY_MODE_POLL = 0, // Connections share a single thread
JUICE_CONCURRENCY_MODE_MUX, // Connections are multiplexed on a single UDP socket
JUICE_CONCURRENCY_MODE_THREAD, // Each connection runs in its own thread

JUICE_CONCURRENCY_MODE_USER, // Agents must be updated via frequent calls to juice_user_poll...
// Note:
// - ICE keepalive requires regular polling RFC 8445 11.
// - The OS's UDP packet buffering capacity is limited you need to make
// sure the inflow of packets will not cause a bottleneck/packet loss
// - Also DNS resolution blocks in this mode
} juice_concurrency_mode_t;

typedef struct juice_config {
Expand Down Expand Up @@ -116,6 +123,22 @@ JUICE_EXPORT int juice_get_selected_addresses(juice_agent_t *agent, char *local,
char *remote, size_t remote_size);
JUICE_EXPORT const char *juice_state_to_string(juice_state_t state);

// Valid for JUICE_CONCURRENCY_MODE_USER only
//
// Non-blocking tries to read a datagram from `agent`'s socket. Forwards Non-ICE packets to you via `agent`'s `on_recv` callback.
// You shouldn't use data in `buffer` directly since it might be a STUN packet, contain TURN headers, etc;
// The intended use of passing `buffer` is to give finer control to the user e.g. zero-copy
//
// Parameters:
// - `agent`: A pointer to an `juice_agent_t` structure initialized with `juice_create_agent()`.
// - `buffer`: A pointer to the buffer where the received data will be stored It should accommodate
// the largest datagram you expect to receive (INCLUDING TURN HEADERS) otherwise, data will be truncated
//
// Returns:
// - Positive number: Indicates the number of bytes received (0 means no more datagrams)
// - Negative number: Indicates an error occurred
JUICE_EXPORT int juice_user_poll(juice_agent_t *agent, char *buffer, size_t size);

// ICE server

typedef struct juice_server juice_server_t;
Expand Down
3 changes: 2 additions & 1 deletion src/agent.c
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,8 @@ int agent_gather_candidates(juice_agent_t *agent) {
conn_unlock(agent);
conn_interrupt(agent);

if (has_nonnumeric_server_hostnames(&agent->config)) {
if (has_nonnumeric_server_hostnames(&agent->config) &&
CONN_MODE_IS_CONCURRENT(agent->config.concurrency_mode)) {
// Resolve server hostnames in a separate thread as it may block
JLOG_DEBUG("Starting resolver thread for servers");
int ret = thread_init(&agent->resolver_thread, resolver_thread_entry, agent);
Expand Down
10 changes: 7 additions & 3 deletions src/conn.c
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include "conn_mux.h"
#include "conn_poll.h"
#include "conn_thread.h"
#include "conn_user.h"
#include "log.h"

#include <assert.h>
Expand All @@ -33,10 +34,10 @@ typedef struct conn_mode_entry {
int (*get_addrs_func)(juice_agent_t *agent, addr_record_t *records, size_t size);

mutex_t mutex;
conn_registry_t *registry;
conn_registry_t *registry; // left NULL for concurrency modes that don't use a global registry
} conn_mode_entry_t;

#define MODE_ENTRIES_SIZE 3
#define MODE_ENTRIES_SIZE 4

static conn_mode_entry_t mode_entries[MODE_ENTRIES_SIZE] = {
{conn_poll_registry_init, conn_poll_registry_cleanup, conn_poll_init, conn_poll_cleanup,
Expand All @@ -46,7 +47,10 @@ static conn_mode_entry_t mode_entries[MODE_ENTRIES_SIZE] = {
conn_mux_lock, conn_mux_unlock, conn_mux_interrupt, conn_mux_send, conn_mux_get_addrs,
MUTEX_INITIALIZER, NULL},
{NULL, NULL, conn_thread_init, conn_thread_cleanup, conn_thread_lock, conn_thread_unlock,
conn_thread_interrupt, conn_thread_send, conn_thread_get_addrs, MUTEX_INITIALIZER, NULL}};
conn_thread_interrupt, conn_thread_send, conn_thread_get_addrs, MUTEX_INITIALIZER, NULL},
{NULL, NULL, conn_user_init, conn_user_cleanup, conn_user_lock, conn_user_unlock,
conn_user_interrupt, conn_user_send, conn_user_get_addrs, MUTEX_INITIALIZER, NULL},
};

static conn_mode_entry_t *get_mode_entry(juice_agent_t *agent) {
juice_concurrency_mode_t mode = agent->config.concurrency_mode;
Expand Down
2 changes: 2 additions & 0 deletions src/conn.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,6 @@ int conn_send(juice_agent_t *agent, const addr_record_t *dst, const char *data,
int ds);
int conn_get_addrs(juice_agent_t *agent, addr_record_t *records, size_t size);

#define CONN_MODE_IS_CONCURRENT(mode) mode != JUICE_CONCURRENCY_MODE_USER
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
#define CONN_MODE_IS_CONCURRENT(mode) mode != JUICE_CONCURRENCY_MODE_USER
#define CONN_MODE_IS_CONCURRENT(mode) (mode != JUICE_CONCURRENCY_MODE_USER)


#endif
189 changes: 189 additions & 0 deletions src/conn_user.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
/**
* Copyright (c) 2023 Paul-Louis Ageneau
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at https://mozilla.org/MPL/2.0/.
*/

#include "conn_user.h"
#include "agent.h"
#include "log.h"
#include "socket.h"
#include "udp.h"

#include <stdint.h>

typedef enum conn_state { CONN_STATE_NEW = 0, CONN_STATE_READY, CONN_STATE_FINISHED } conn_state_t;

typedef struct conn_impl {
conn_state_t state;
socket_t sock;
mutex_t mutex;
mutex_t send_mutex;
int send_ds;
timestamp_t next_timestamp;
} conn_impl_t;

static inline int conn_user_recv(socket_t sock, char *buffer, size_t size, addr_record_t *src);

JUICE_EXPORT int juice_user_poll(juice_agent_t *agent, char *buffer, size_t size) {
if (!agent || !buffer)
return JUICE_ERR_INVALID;

conn_impl_t *conn_impl = agent->conn_impl;

if (!conn_impl)
return JUICE_ERR_INVALID;

mutex_lock(&conn_impl->mutex);

if (conn_impl->state == CONN_STATE_FINISHED) {
mutex_unlock(&conn_impl->mutex);
return JUICE_ERR_FAILED;
}

if (agent->config.concurrency_mode != JUICE_CONCURRENCY_MODE_USER) {
JLOG_ERROR("agent->config.concurrency_mode=%d Only JUICE_CONCURRENCY_MODE_USER (%d) is supported",
agent->config.concurrency_mode, JUICE_CONCURRENCY_MODE_USER);
mutex_unlock(&conn_impl->mutex);
return JUICE_ERR_INVALID;
}

addr_record_t src;
int ret = conn_user_recv(conn_impl->sock, buffer, size, &src);

if (ret < 0) {
agent_conn_fail(agent);
conn_impl->state = CONN_STATE_FINISHED;
mutex_unlock(&conn_impl->mutex);
return JUICE_ERR_FAILED;
} else if (ret > 0) {
if (agent_conn_recv(agent, buffer, (size_t)ret, &src) != 0) {
JLOG_WARN("Agent receive failed");
conn_impl->state = CONN_STATE_FINISHED;
mutex_unlock(&conn_impl->mutex);
return JUICE_ERR_FAILED;
}
}

if ( ret > 0 // We just received a datagram
|| conn_impl->next_timestamp <= current_timestamp()
|| agent->state != JUICE_STATE_COMPLETED) {
paullouisageneau marked this conversation as resolved.
Show resolved Hide resolved
if (agent_conn_update(agent, &conn_impl->next_timestamp) != 0) {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moving the loop out of juice_user_poll() is actually problematic performance-wise: you have to call the agent_conn_update() process after each received packet, which comes at a performance cost when receiving a lot of application traffic. You should instead receive all available datagrams, then update the agent once.

JLOG_WARN("Agent update failed");
conn_impl->state = CONN_STATE_FINISHED;
mutex_unlock(&conn_impl->mutex);
return JUICE_ERR_FAILED;
}
}

mutex_unlock(&conn_impl->mutex);
return ret;
}

static inline int conn_user_recv(socket_t sock, char *buffer, size_t size, addr_record_t *src) {
JLOG_VERBOSE("Receiving datagram");
int len;
while ((len = udp_recvfrom(sock, buffer, size, src)) == 0) {
// Empty datagram, ignore
}

if (len < 0) {
if (sockerrno == SEAGAIN || sockerrno == SEWOULDBLOCK) {
JLOG_VERBOSE("No more datagrams to receive");
return 0;
}
JLOG_ERROR("recvfrom failed, errno=%d", sockerrno);
return -1;
}

addr_unmap_inet6_v4mapped((struct sockaddr *)&src->addr, &src->len);
return len; // len > 0
}

int conn_user_init(juice_agent_t *agent, conn_registry_t *registry, udp_socket_config_t *config) {
(void)registry;

conn_impl_t *conn_impl = calloc(1, sizeof(conn_impl_t));
if (!conn_impl) {
JLOG_FATAL("Memory allocation failed for connection impl");
return -1;
}

conn_impl->sock = udp_create_socket(config);
if (conn_impl->sock == INVALID_SOCKET) {
JLOG_ERROR("UDP socket creation failed");
free(conn_impl);
return -1;
}

mutex_init(&conn_impl->mutex, 0);
mutex_init(&conn_impl->send_mutex, 0);

agent->conn_impl = conn_impl;

return JUICE_ERR_SUCCESS;
}

void conn_user_cleanup(juice_agent_t *agent) {
conn_impl_t *conn_impl = agent->conn_impl;

closesocket(conn_impl->sock);
mutex_destroy(&conn_impl->mutex);
mutex_destroy(&conn_impl->send_mutex);
free(agent->conn_impl);
agent->conn_impl = NULL;
}

void conn_user_lock(juice_agent_t *agent) {
conn_impl_t *conn_impl = agent->conn_impl;
mutex_lock(&conn_impl->mutex);
}

void conn_user_unlock(juice_agent_t *agent) {
conn_impl_t *conn_impl = agent->conn_impl;
mutex_unlock(&conn_impl->mutex);
}

int conn_user_interrupt(juice_agent_t *agent) {
// juice_user_poll does not block when polling, so there's nothing to interrupt
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You still need to schedule a call to agent_conn_update() on the next poll, it could be achieved by setting resetting next_timestamp to 0. This is why you currently need to add the state != JUICE_STATE_COMPLETED in juice_user_poll().

(void) agent; // Explicitly discard unused parameter
return JUICE_ERR_SUCCESS;
}

int conn_user_send(juice_agent_t *agent, const addr_record_t *dst, const char *data, size_t size,
int ds) {
conn_impl_t *conn_impl = agent->conn_impl;

mutex_lock(&conn_impl->send_mutex);

if (conn_impl->send_ds >= 0 && conn_impl->send_ds != ds) {
JLOG_VERBOSE("Setting Differentiated Services field to 0x%X", ds);
if (udp_set_diffserv(conn_impl->sock, ds) == 0)
conn_impl->send_ds = ds;
else
conn_impl->send_ds = -1; // disable for next time
}

JLOG_VERBOSE("Sending datagram, size=%d", size);

int ret = udp_sendto(conn_impl->sock, data, size, dst);
if (ret < 0) {
if (sockerrno == SEAGAIN || sockerrno == SEWOULDBLOCK)
JLOG_INFO("Send failed, buffer is full");
else if (sockerrno == SEMSGSIZE)
JLOG_WARN("Send failed, datagram is too large");
else
JLOG_WARN("Send failed, errno=%d", sockerrno);
}

mutex_unlock(&conn_impl->send_mutex);
return ret;
}

int conn_user_get_addrs(juice_agent_t *agent, addr_record_t *records, size_t size) {
conn_impl_t *conn_impl = agent->conn_impl;

return udp_get_addrs(conn_impl->sock, records, size);
}
28 changes: 28 additions & 0 deletions src/conn_user.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/**
* Copyright (c) 2023 Paul-Louis Ageneau
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at https://mozilla.org/MPL/2.0/.
*/

#ifndef JUICE_CONN_USER_H
#define JUICE_CONN_USER_H

#include "addr.h"
#include "conn.h"
#include "timestamp.h"

#include <stdbool.h>
#include <stdint.h>

int conn_user_init(juice_agent_t *agent, conn_registry_t *registry, udp_socket_config_t *config);
void conn_user_cleanup(juice_agent_t *agent);
void conn_user_lock(juice_agent_t *agent);
void conn_user_unlock(juice_agent_t *agent);
int conn_user_interrupt(juice_agent_t *agent);
int conn_user_send(juice_agent_t *agent, const addr_record_t *dst, const char *data, size_t size,
int ds);
int conn_user_get_addrs(juice_agent_t *agent, addr_record_t *records, size_t size);

#endif
7 changes: 7 additions & 0 deletions test/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ int test_stun(void);
int test_connectivity(void);
int test_thread(void);
int test_mux(void);
int test_user(void);
int test_notrickle(void);
int test_gathering(void);
int test_turn(void);
Expand Down Expand Up @@ -79,6 +80,12 @@ int main(int argc, char **argv) {
return -1;
}

printf("\nRunning user-mode connectivity test...\n");
if (test_user()) {
fprintf(stderr, "User-mode connectivity test failed\n");
return -1;
}

printf("\nRunning non-trickled connectivity test...\n");
if (test_notrickle()) {
fprintf(stderr, "Non-trickled connectivity test failed\n");
Expand Down
Loading
Loading