diff --git a/CMakeLists.txt b/CMakeLists.txt index 1125edbe..0f474a39 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -44,6 +44,7 @@ set(LIBJUICE_SOURCES ${CMAKE_CURRENT_SOURCE_DIR}/src/conn_poll.c ${CMAKE_CURRENT_SOURCE_DIR}/src/conn_thread.c ${CMAKE_CURRENT_SOURCE_DIR}/src/conn_mux.c + ${CMAKE_CURRENT_SOURCE_DIR}/src/conn_user.c ${CMAKE_CURRENT_SOURCE_DIR}/src/base64.c ${CMAKE_CURRENT_SOURCE_DIR}/src/hash.c ${CMAKE_CURRENT_SOURCE_DIR}/src/hmac.c @@ -74,6 +75,7 @@ set(TESTS_SOURCES ${CMAKE_CURRENT_SOURCE_DIR}/test/turn.c ${CMAKE_CURRENT_SOURCE_DIR}/test/thread.c ${CMAKE_CURRENT_SOURCE_DIR}/test/mux.c + ${CMAKE_CURRENT_SOURCE_DIR}/test/user.c ${CMAKE_CURRENT_SOURCE_DIR}/test/notrickle.c ${CMAKE_CURRENT_SOURCE_DIR}/test/server.c ${CMAKE_CURRENT_SOURCE_DIR}/test/conflict.c diff --git a/include/juice/juice.h b/include/juice/juice.h index 06bcc139..37357514 100644 --- a/include/juice/juice.h +++ b/include/juice/juice.h @@ -74,6 +74,13 @@ typedef enum juice_concurrency_mode { JUICE_CONCURRENCY_MODE_POLL = 0, // Connections share a single thread JUICE_CONCURRENCY_MODE_MUX, // Connections are multiplexed on a single UDP socket JUICE_CONCURRENCY_MODE_THREAD, // Each connection runs in its own thread + + JUICE_CONCURRENCY_MODE_USER, // Agents must be updated via frequent calls to juice_user_poll... + // Note: + // - ICE keepalive requires regular polling RFC 8445 11. + // - The OS's UDP packet buffering capacity is limited you need to make + // sure the inflow of packets will not cause a bottleneck/packet loss + // - Also DNS resolution blocks in this mode } juice_concurrency_mode_t; typedef struct juice_config { @@ -116,6 +123,22 @@ JUICE_EXPORT int juice_get_selected_addresses(juice_agent_t *agent, char *local, char *remote, size_t remote_size); JUICE_EXPORT const char *juice_state_to_string(juice_state_t state); +// Valid for JUICE_CONCURRENCY_MODE_USER only +// +// Non-blocking tries to read a datagram from `agent`'s socket. Forwards Non-ICE packets to you via `agent`'s `on_recv` callback. +// You shouldn't use data in `buffer` directly since it might be a STUN packet, contain TURN headers, etc; +// The intended use of passing `buffer` is to give finer control to the user e.g. zero-copy +// +// Parameters: +// - `agent`: A pointer to an `juice_agent_t` structure initialized with `juice_create_agent()`. +// - `buffer`: A pointer to the buffer where the received data will be stored It should accommodate +// the largest datagram you expect to receive (INCLUDING TURN HEADERS) otherwise, data will be truncated +// +// Returns: +// - Positive number: Indicates the number of bytes received (0 means no more datagrams) +// - Negative number: Indicates an error occurred +JUICE_EXPORT int juice_user_poll(juice_agent_t *agent, char *buffer, size_t size); + // ICE server typedef struct juice_server juice_server_t; diff --git a/src/agent.c b/src/agent.c index 679393cc..27de4a7e 100644 --- a/src/agent.c +++ b/src/agent.c @@ -279,7 +279,8 @@ int agent_gather_candidates(juice_agent_t *agent) { conn_unlock(agent); conn_interrupt(agent); - if (has_nonnumeric_server_hostnames(&agent->config)) { + if (has_nonnumeric_server_hostnames(&agent->config) && + CONN_MODE_IS_CONCURRENT(agent->config.concurrency_mode)) { // Resolve server hostnames in a separate thread as it may block JLOG_DEBUG("Starting resolver thread for servers"); int ret = thread_init(&agent->resolver_thread, resolver_thread_entry, agent); diff --git a/src/conn.c b/src/conn.c index 4d7893a0..7cfe6c27 100644 --- a/src/conn.c +++ b/src/conn.c @@ -11,6 +11,7 @@ #include "conn_mux.h" #include "conn_poll.h" #include "conn_thread.h" +#include "conn_user.h" #include "log.h" #include @@ -33,10 +34,10 @@ typedef struct conn_mode_entry { int (*get_addrs_func)(juice_agent_t *agent, addr_record_t *records, size_t size); mutex_t mutex; - conn_registry_t *registry; + conn_registry_t *registry; // left NULL for concurrency modes that don't use a global registry } conn_mode_entry_t; -#define MODE_ENTRIES_SIZE 3 +#define MODE_ENTRIES_SIZE 4 static conn_mode_entry_t mode_entries[MODE_ENTRIES_SIZE] = { {conn_poll_registry_init, conn_poll_registry_cleanup, conn_poll_init, conn_poll_cleanup, @@ -46,7 +47,10 @@ static conn_mode_entry_t mode_entries[MODE_ENTRIES_SIZE] = { conn_mux_lock, conn_mux_unlock, conn_mux_interrupt, conn_mux_send, conn_mux_get_addrs, MUTEX_INITIALIZER, NULL}, {NULL, NULL, conn_thread_init, conn_thread_cleanup, conn_thread_lock, conn_thread_unlock, - conn_thread_interrupt, conn_thread_send, conn_thread_get_addrs, MUTEX_INITIALIZER, NULL}}; + conn_thread_interrupt, conn_thread_send, conn_thread_get_addrs, MUTEX_INITIALIZER, NULL}, + {NULL, NULL, conn_user_init, conn_user_cleanup, conn_user_lock, conn_user_unlock, + conn_user_interrupt, conn_user_send, conn_user_get_addrs, MUTEX_INITIALIZER, NULL}, +}; static conn_mode_entry_t *get_mode_entry(juice_agent_t *agent) { juice_concurrency_mode_t mode = agent->config.concurrency_mode; diff --git a/src/conn.h b/src/conn.h index 4aec7d0c..4a915af5 100644 --- a/src/conn.h +++ b/src/conn.h @@ -41,4 +41,6 @@ int conn_send(juice_agent_t *agent, const addr_record_t *dst, const char *data, int ds); int conn_get_addrs(juice_agent_t *agent, addr_record_t *records, size_t size); +#define CONN_MODE_IS_CONCURRENT(mode) mode != JUICE_CONCURRENCY_MODE_USER + #endif diff --git a/src/conn_user.c b/src/conn_user.c new file mode 100644 index 00000000..ec4b418d --- /dev/null +++ b/src/conn_user.c @@ -0,0 +1,189 @@ +/** + * Copyright (c) 2023 Paul-Louis Ageneau + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at https://mozilla.org/MPL/2.0/. + */ + +#include "conn_user.h" +#include "agent.h" +#include "log.h" +#include "socket.h" +#include "udp.h" + +#include + +typedef enum conn_state { CONN_STATE_NEW = 0, CONN_STATE_READY, CONN_STATE_FINISHED } conn_state_t; + +typedef struct conn_impl { + conn_state_t state; + socket_t sock; + mutex_t mutex; + mutex_t send_mutex; + int send_ds; + timestamp_t next_timestamp; +} conn_impl_t; + +static inline int conn_user_recv(socket_t sock, char *buffer, size_t size, addr_record_t *src); + +JUICE_EXPORT int juice_user_poll(juice_agent_t *agent, char *buffer, size_t size) { + if (!agent || !buffer) + return JUICE_ERR_INVALID; + + conn_impl_t *conn_impl = agent->conn_impl; + + if (!conn_impl) + return JUICE_ERR_INVALID; + + mutex_lock(&conn_impl->mutex); + + if (conn_impl->state == CONN_STATE_FINISHED) { + mutex_unlock(&conn_impl->mutex); + return JUICE_ERR_FAILED; + } + + if (agent->config.concurrency_mode != JUICE_CONCURRENCY_MODE_USER) { + JLOG_ERROR("agent->config.concurrency_mode=%d Only JUICE_CONCURRENCY_MODE_USER (%d) is supported", + agent->config.concurrency_mode, JUICE_CONCURRENCY_MODE_USER); + mutex_unlock(&conn_impl->mutex); + return JUICE_ERR_INVALID; + } + + addr_record_t src; + int ret = conn_user_recv(conn_impl->sock, buffer, size, &src); + + if (ret < 0) { + agent_conn_fail(agent); + conn_impl->state = CONN_STATE_FINISHED; + mutex_unlock(&conn_impl->mutex); + return JUICE_ERR_FAILED; + } else if (ret > 0) { + if (agent_conn_recv(agent, buffer, (size_t)ret, &src) != 0) { + JLOG_WARN("Agent receive failed"); + conn_impl->state = CONN_STATE_FINISHED; + mutex_unlock(&conn_impl->mutex); + return JUICE_ERR_FAILED; + } + } + + if ( ret > 0 // We just received a datagram + || conn_impl->next_timestamp <= current_timestamp() + || agent->state != JUICE_STATE_COMPLETED) { + if (agent_conn_update(agent, &conn_impl->next_timestamp) != 0) { + JLOG_WARN("Agent update failed"); + conn_impl->state = CONN_STATE_FINISHED; + mutex_unlock(&conn_impl->mutex); + return JUICE_ERR_FAILED; + } + } + + mutex_unlock(&conn_impl->mutex); + return ret; +} + +static inline int conn_user_recv(socket_t sock, char *buffer, size_t size, addr_record_t *src) { + JLOG_VERBOSE("Receiving datagram"); + int len; + while ((len = udp_recvfrom(sock, buffer, size, src)) == 0) { + // Empty datagram, ignore + } + + if (len < 0) { + if (sockerrno == SEAGAIN || sockerrno == SEWOULDBLOCK) { + JLOG_VERBOSE("No more datagrams to receive"); + return 0; + } + JLOG_ERROR("recvfrom failed, errno=%d", sockerrno); + return -1; + } + + addr_unmap_inet6_v4mapped((struct sockaddr *)&src->addr, &src->len); + return len; // len > 0 +} + +int conn_user_init(juice_agent_t *agent, conn_registry_t *registry, udp_socket_config_t *config) { + (void)registry; + + conn_impl_t *conn_impl = calloc(1, sizeof(conn_impl_t)); + if (!conn_impl) { + JLOG_FATAL("Memory allocation failed for connection impl"); + return -1; + } + + conn_impl->sock = udp_create_socket(config); + if (conn_impl->sock == INVALID_SOCKET) { + JLOG_ERROR("UDP socket creation failed"); + free(conn_impl); + return -1; + } + + mutex_init(&conn_impl->mutex, 0); + mutex_init(&conn_impl->send_mutex, 0); + + agent->conn_impl = conn_impl; + + return JUICE_ERR_SUCCESS; +} + +void conn_user_cleanup(juice_agent_t *agent) { + conn_impl_t *conn_impl = agent->conn_impl; + + closesocket(conn_impl->sock); + mutex_destroy(&conn_impl->mutex); + mutex_destroy(&conn_impl->send_mutex); + free(agent->conn_impl); + agent->conn_impl = NULL; +} + +void conn_user_lock(juice_agent_t *agent) { + conn_impl_t *conn_impl = agent->conn_impl; + mutex_lock(&conn_impl->mutex); +} + +void conn_user_unlock(juice_agent_t *agent) { + conn_impl_t *conn_impl = agent->conn_impl; + mutex_unlock(&conn_impl->mutex); +} + +int conn_user_interrupt(juice_agent_t *agent) { + // juice_user_poll does not block when polling, so there's nothing to interrupt + (void) agent; // Explicitly discard unused parameter + return JUICE_ERR_SUCCESS; +} + +int conn_user_send(juice_agent_t *agent, const addr_record_t *dst, const char *data, size_t size, + int ds) { + conn_impl_t *conn_impl = agent->conn_impl; + + mutex_lock(&conn_impl->send_mutex); + + if (conn_impl->send_ds >= 0 && conn_impl->send_ds != ds) { + JLOG_VERBOSE("Setting Differentiated Services field to 0x%X", ds); + if (udp_set_diffserv(conn_impl->sock, ds) == 0) + conn_impl->send_ds = ds; + else + conn_impl->send_ds = -1; // disable for next time + } + + JLOG_VERBOSE("Sending datagram, size=%d", size); + + int ret = udp_sendto(conn_impl->sock, data, size, dst); + if (ret < 0) { + if (sockerrno == SEAGAIN || sockerrno == SEWOULDBLOCK) + JLOG_INFO("Send failed, buffer is full"); + else if (sockerrno == SEMSGSIZE) + JLOG_WARN("Send failed, datagram is too large"); + else + JLOG_WARN("Send failed, errno=%d", sockerrno); + } + + mutex_unlock(&conn_impl->send_mutex); + return ret; +} + +int conn_user_get_addrs(juice_agent_t *agent, addr_record_t *records, size_t size) { + conn_impl_t *conn_impl = agent->conn_impl; + + return udp_get_addrs(conn_impl->sock, records, size); +} diff --git a/src/conn_user.h b/src/conn_user.h new file mode 100644 index 00000000..a14a3506 --- /dev/null +++ b/src/conn_user.h @@ -0,0 +1,28 @@ +/** + * Copyright (c) 2023 Paul-Louis Ageneau + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at https://mozilla.org/MPL/2.0/. + */ + +#ifndef JUICE_CONN_USER_H +#define JUICE_CONN_USER_H + +#include "addr.h" +#include "conn.h" +#include "timestamp.h" + +#include +#include + +int conn_user_init(juice_agent_t *agent, conn_registry_t *registry, udp_socket_config_t *config); +void conn_user_cleanup(juice_agent_t *agent); +void conn_user_lock(juice_agent_t *agent); +void conn_user_unlock(juice_agent_t *agent); +int conn_user_interrupt(juice_agent_t *agent); +int conn_user_send(juice_agent_t *agent, const addr_record_t *dst, const char *data, size_t size, + int ds); +int conn_user_get_addrs(juice_agent_t *agent, addr_record_t *records, size_t size); + +#endif diff --git a/test/main.c b/test/main.c index 5caeba74..bbd3e9eb 100644 --- a/test/main.c +++ b/test/main.c @@ -16,6 +16,7 @@ int test_stun(void); int test_connectivity(void); int test_thread(void); int test_mux(void); +int test_user(void); int test_notrickle(void); int test_gathering(void); int test_turn(void); @@ -79,6 +80,12 @@ int main(int argc, char **argv) { return -1; } + printf("\nRunning user-mode connectivity test...\n"); + if (test_user()) { + fprintf(stderr, "User-mode connectivity test failed\n"); + return -1; + } + printf("\nRunning non-trickled connectivity test...\n"); if (test_notrickle()) { fprintf(stderr, "Non-trickled connectivity test failed\n"); diff --git a/test/user.c b/test/user.c new file mode 100644 index 00000000..4ac84f9c --- /dev/null +++ b/test/user.c @@ -0,0 +1,197 @@ +/** + * Copyright (c) 2022 Paul-Louis Ageneau + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at https://mozilla.org/MPL/2.0/. + */ + +#include "juice/juice.h" + +#include +#include +#include +#include + +#ifdef WIN32 +#include +#elif _POSIX_C_SOURCE >= 199309L +#include // for nanosleep +#else +#include // for usleep +#endif + +void sleep_ms(int milliseconds){ +#ifdef WIN32 + Sleep(milliseconds); +#elif _POSIX_C_SOURCE >= 199309L + struct timespec ts; + ts.tv_sec = milliseconds / 1000; + ts.tv_nsec = (milliseconds % 1000) * 1000000; + nanosleep(&ts, NULL); +#else + if (milliseconds >= 1000) + sleep(milliseconds / 1000); + usleep((milliseconds % 1000) * 1000); +#endif +} + +#define BUFFER_SIZE 1500 // Ethernet MTU +#define PACKET_HISTORY_SIZE 2 + +// The way this struct is accessed would be a data-race in any concurrency mode other than JUICE_CONCURRENCY_MODE_USER +// since on_recv would be called from a different thread than the one that calls juice_user_poll +typedef struct agent_data { + int id; + int received; + int sent; + char buffer[PACKET_HISTORY_SIZE][BUFFER_SIZE]; +} agent_data_t; + +static juice_agent_t *agents[2]; +static agent_data_t agent_data[2]; + + +static void on_candidate(juice_agent_t *agent, const char *sdp, void *user_ptr); + +static void on_gathering_done(juice_agent_t *agent, void *user_ptr); + +static void on_recv(juice_agent_t *agent, const char *data, size_t size, void *user_ptr); + +int test_user() { + juice_set_log_level(JUICE_LOG_LEVEL_DEBUG); + + memset(&agent_data, 0, sizeof(agent_data)); + + // Create agents + juice_config_t config; + memset(&config, 0, sizeof(config)); + config.concurrency_mode = JUICE_CONCURRENCY_MODE_USER; + config.stun_server_host = "stun.l.google.com"; + config.stun_server_port = 19302; + config.cb_state_changed = NULL; + config.cb_candidate = on_candidate; + config.cb_gathering_done = on_gathering_done; + config.cb_recv = on_recv; + + for (int i = 0; i < sizeof(agents) / sizeof(agents[0]); i++) { + agent_data[i].id = i + 1; + config.user_ptr = &agent_data[i]; + agents[i] = juice_create(&config); + } + + // Agent 1: Generate local description + char sdp1[JUICE_MAX_SDP_STRING_LEN]; + juice_get_local_description(agents[0], sdp1, JUICE_MAX_SDP_STRING_LEN); + printf("Local description 1:\n%s\n", sdp1); + + // Agent 2: Receive description from agent 1 + juice_set_remote_description(agents[1], sdp1); + + // Agent 2: Generate local description + char sdp2[JUICE_MAX_SDP_STRING_LEN]; + juice_get_local_description(agents[1], sdp2, JUICE_MAX_SDP_STRING_LEN); + printf("Local description 2:\n%s\n", sdp2); + + // Agent 1: Receive description from agent 2 + juice_set_remote_description(agents[0], sdp2); + + // Agent 1: Gather candidates (and send them to agent 2) + juice_gather_candidates(agents[0]); + juice_gather_candidates(agents[1]); + + int polls = 100; + int timeout_milliseconds = 4000; + for (int i = 0; i < polls; i++) { + + bool test_done = true; + for (size_t i = 0; i < sizeof(agents) / sizeof(agents[0]); ++i) { + juice_agent_t *agent = agents[i]; + int packet_history_index = agent_data[i].received % 2; + char *buffer = agent_data[i].buffer[packet_history_index]; + + // This next call Dequeues datagrams from the OS and facilitates ICE e.g. candidate gathering, keep-alives, etc. + // After parsing a packet it will call the associated the callback. If it's a packet sent with juice_send we'll get in + // in the on_recv callback + juice_user_poll(agent, buffer, BUFFER_SIZE); + + juice_state_t state = juice_get_state(agent); + if (state == JUICE_STATE_CONNECTED) { + // Send three messages + while (agent_data[i].sent < 3) { + char message[50]; + snprintf(message, sizeof(message), "Message %d from Agent %d", agent_data[i].sent + 1, agent_data[i].id); + juice_send(agent, message, strlen(message)); + agent_data[i].sent++; + } + } + + // Most likely we'll get our 3 datagrams since it's a local connection, + // but in case we don't it's not a failure condition because of UDP reliability + test_done &= agent_data[i].received == 3; + test_done &= state == JUICE_STATE_COMPLETED; + } + + if (test_done) + break; + + sleep_ms(timeout_milliseconds / polls); + } + + // -- Connection should be finished -- + + // Check states + juice_state_t state1 = juice_get_state(agents[0]); + juice_state_t state2 = juice_get_state(agents[1]); + bool success = (state1 == JUICE_STATE_COMPLETED && state2 == JUICE_STATE_COMPLETED); + + // Agent 1: destroy + juice_destroy(agents[0]); + + // Agent 2: destroy + juice_destroy(agents[1]); + + if (success) { + printf("Success\n"); + return 0; + } else { + printf("Failure\n"); + return -1; + } +} + +// On local candidate gathered +static void on_candidate(juice_agent_t *agent, const char *sdp, void *user_ptr) { + agent_data_t *agent_data = user_ptr; + printf("Candidate %d: %s\n", agent_data->id, sdp); + + // Receive it from the other agent + if (agent_data->id == 1) + juice_add_remote_candidate(agents[1], sdp); + else + juice_add_remote_candidate(agents[0], sdp); +} + +// On local candidates gathering done +static void on_gathering_done(juice_agent_t *agent, void *user_ptr) { + agent_data_t *agent_data = user_ptr; + printf("Gathering done %d\n", agent_data->id); + + // optional + if (agent_data->id == 1) + juice_set_remote_gathering_done(agents[1]); + else + juice_set_remote_gathering_done(agents[0]); +} + +static void on_recv(juice_agent_t *agent, const char *data, size_t size, void *user_ptr) { + agent_data_t *agent_data = user_ptr; + agent_data->received++; + + char buffer[BUFFER_SIZE]; + if (size > BUFFER_SIZE - 1) + size = BUFFER_SIZE - 1; + memcpy(buffer, data, size); + buffer[size] = '\0'; + printf("Received %d: %s\n", agent_data->id, buffer); +}