From 7a3d14572cb7f87d4773c9be4c546960cc107fcf Mon Sep 17 00:00:00 2001 From: ekoby <7406535+ekoby@users.noreply.github.com> Date: Tue, 1 Oct 2024 11:31:04 -0400 Subject: [PATCH] implement more robust ziti_channel re-connect (#736) * improve channel re-connect * close `ch.connection` on failed writes * consolidate disconnect logic * update tlsuv@v0.32.2 --- deps/CMakeLists.txt | 2 +- library/channel.c | 125 +++++++++++++++++++++----------------------- 2 files changed, 61 insertions(+), 66 deletions(-) diff --git a/deps/CMakeLists.txt b/deps/CMakeLists.txt index 2a403f22..6621f302 100644 --- a/deps/CMakeLists.txt +++ b/deps/CMakeLists.txt @@ -9,7 +9,7 @@ if (NOT TARGET tlsuv) else () FetchContent_Declare(tlsuv GIT_REPOSITORY https://github.com/openziti/tlsuv.git - GIT_TAG v0.32.1 + GIT_TAG v0.32.2 ) FetchContent_MakeAvailable(tlsuv) endif (tlsuv_DIR) diff --git a/library/channel.c b/library/channel.c index 6d38430a..b2a47778 100644 --- a/library/channel.c +++ b/library/channel.c @@ -19,7 +19,10 @@ #include "zt_internal.h" #include "utils.h" #include "endian_internal.h" + +#if _WIN32 #include "win32_compat.h" +#endif #ifndef MAXHOSTNAMELEN #define MAXHOSTNAMELEN 255 @@ -67,7 +70,7 @@ static void reconnect_channel(ziti_channel_t *ch, bool now); static void reconnect_cb(uv_timer_t *t); -static void on_channel_connect_internal(uv_connect_t *req, int status); +static void on_tls_connect(uv_connect_t *req, int status); static struct msg_receiver *find_receiver(ziti_channel_t *ch, uint32_t conn_id); @@ -85,10 +88,12 @@ static void process_inbound(ziti_channel_t *ch); static void on_tls_close(uv_handle_t *s); static inline void close_connection(ziti_channel_t *ch) { - if (ch->connection && ch->connection->close_cb == NULL) { - tlsuv_stream_t *conn = ch->connection; - CH_LOG(DEBUG, "closing TLS[%p]", conn); - tlsuv_stream_close(conn, on_tls_close); + tlsuv_stream_t *tls = ch->connection; + ch->connection = NULL; + + if (tls) { + CH_LOG(DEBUG, "closing TLS[%p]", tls); + tlsuv_stream_close(tls, on_tls_close); } } @@ -102,7 +107,7 @@ struct waiter_s { }; struct msg_receiver { - int id; + uint32_t id; void *receiver; void (*receive)(void *receiver, message *m, int code); @@ -136,7 +141,7 @@ int ziti_channel_prepare(ziti_channel_t *ch) { return 0; } -static int ziti_channel_init(struct ziti_ctx *ctx, ziti_channel_t *ch, uint32_t id, tls_context *tls) { +static int ziti_channel_init(struct ziti_ctx *ctx, ziti_channel_t *ch, uint32_t id) { ch->ztx = ctx; ch->loop = ctx->loop; ch->id = id; @@ -203,16 +208,6 @@ int ziti_close_channels(struct ziti_ctx *ztx, int err) { static void on_tls_close(uv_handle_t *s) { tlsuv_stream_t *tls = (tlsuv_stream_t *) s; - ziti_channel_t *ch = tls->data; - - if (ch) { - ch->connection = NULL; - if (ch->reconnect) { - ch->reconnect = false; - reconnect_channel(ch, true); - } - } - tlsuv_stream_free(tls); free(tls); } @@ -228,8 +223,6 @@ int ziti_channel_close(ziti_channel_t *ch, int err) { uv_close((uv_handle_t *) ch->timer, (uv_close_cb) free); ch->timer = NULL; - close_connection(ch); - ziti_channel_free(ch); free(ch); } @@ -265,11 +258,11 @@ uint64_t ziti_channel_latency(ziti_channel_t *ch) { static ziti_channel_t *new_ziti_channel(ziti_context ztx, const char *ch_name, const char *url) { ziti_channel_t *ch = calloc(1, sizeof(ziti_channel_t)); - ziti_channel_init(ztx, ch, channel_counter++, ztx->tlsCtx); + ziti_channel_init(ztx, ch, channel_counter++); const ziti_identity *identity = ziti_get_identity(ztx); ch->name = strdup(ch_name); ch->url = strdup(url); - CH_LOG(INFO, "(%s) new channel for ztx[%d] identity[%s]", ch->name, ztx->id, ziti_get_identity(ztx)->name); + CH_LOG(INFO, "(%s) new channel for ztx[%d] identity[%s]", ch->name, ztx->id, identity->name); struct tlsuv_url_s ingress; tlsuv_parse_url(&ingress, url); @@ -401,7 +394,9 @@ void on_channel_send(uv_write_t *w, int status) { if (status < 0) { CH_LOG(ERROR, "write failed [%d/%s]", status, uv_strerror(status)); - on_channel_close(ch, ZITI_CONNABORT, status); + if (ch->out_q == 0) { + on_channel_close(ch, ZITI_CONNABORT, status); + } } free(w); @@ -652,7 +647,6 @@ static void latency_timeout(uv_timer_t *t) { ch->latency_waiter = NULL; ch->latency = UINT64_MAX; - close_connection(ch); on_channel_close(ch, ZITI_TIMEOUT, UV_ETIMEDOUT); } } @@ -703,15 +697,12 @@ static void hello_reply_cb(void *ctx, message *msg, int err) { ch->notify_cb(ch, EdgeRouterConnected, ch->notify_ctx); ch->latency = uv_now(ch->loop) - ch->latency; uv_timer_start(ch->timer, send_latency_probe, LATENCY_INTERVAL, 0); - } - else { - if (msg) + } else { + if (msg) { CH_LOG(ERROR, "connect rejected: %d %*s", success, msg->header.body_len, msg->body); + } - ch->state = Disconnected; - ch->notify_cb(ch, EdgeRouterUnavailable, ch->notify_ctx); - close_connection(ch); - reconnect_channel(ch, false); + on_channel_close(ch, ZITI_CONNABORT, 0); } } @@ -738,17 +729,12 @@ static void ch_connect_timeout(uv_timer_t *t) { ziti_channel_t *ch = t->data; CH_LOG(ERROR, "connect timeout"); - if (ch->state == Closed) { - return; - } - - ch->state = Disconnected; - if (ch->connection->conn_req == NULL) { + if (ch->connection && ch->connection->conn_req == NULL) { // diagnostics CH_LOG(WARN, "diagnostics: no conn_req in connect timeout"); } - reconnect_channel(ch, false); - close_connection(ch); + + on_channel_close(ch, ZITI_TIMEOUT, UV_ETIMEDOUT); } static void reconnect_cb(uv_timer_t *t) { @@ -773,9 +759,9 @@ static void reconnect_cb(uv_timer_t *t) { CH_LOG(DEBUG, "connecting to %s", ch->url); - int rc = tlsuv_stream_connect(req, ch->connection, ch->host, ch->port, on_channel_connect_internal); + int rc = tlsuv_stream_connect(req, ch->connection, ch->host, ch->port, on_tls_connect); if (rc != 0) { - on_channel_connect_internal(req, rc); + on_tls_connect(req, rc); } else { uv_timer_start(ch->timer, ch_connect_timeout, CONNECT_TIMEOUT, 0); } @@ -852,22 +838,24 @@ static void on_channel_close(ziti_channel_t *ch, int ziti_err, ssize_t uv_err) { ch->in_next = NULL; } + close_connection(ch); + if (ziti_err == ZITI_DISABLED || ziti_err == ZITI_GATEWAY_UNAVAILABLE) { return; } - if (ch->state != Closed) { - if (uv_err == UV_EOF) { - ZTX_LOG(VERBOSE, "edge router closed connection, trying to refresh api session"); - ziti_force_api_session_refresh(ch->ztx); - } - reconnect_channel(ch, false); + if (uv_err == UV_EOF) { + ZTX_LOG(VERBOSE, "edge router closed connection, trying to refresh api session"); + ziti_force_api_session_refresh(ch->ztx); } + + reconnect_channel(ch, ch->reconnect); + ch->reconnect = false; } static void channel_alloc_cb(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf) { - tlsuv_stream_t *mbed = (tlsuv_stream_t *) handle; - ziti_channel_t *ch = mbed->data; + tlsuv_stream_t *tls = (tlsuv_stream_t *) handle; + ziti_channel_t *ch = tls->data; if (ch->in_next || pool_has_available(ch->in_msg_pool)) { buf->base = (char *) malloc(suggested_size); if (buf->base == NULL) { @@ -885,11 +873,11 @@ static void channel_alloc_cb(uv_handle_t *handle, size_t suggested_size, uv_buf_ } static void on_channel_data(uv_stream_t *s, ssize_t len, const uv_buf_t *buf) { - tlsuv_stream_t *ssl = (tlsuv_stream_t *) s; - ziti_channel_t *ch = ssl->data; + tlsuv_stream_t *tls = (tlsuv_stream_t *) s; + ziti_channel_t *ch = tls->data; if (len == UV_ENOBUFS) { - tlsuv_stream_read_stop(ssl); + tlsuv_stream_read_stop(tls); CH_LOG(VERBOSE, "blocked until messages are processed"); return; } @@ -899,7 +887,6 @@ static void on_channel_data(uv_stream_t *s, ssize_t len, const uv_buf_t *buf) { CH_LOG(INFO, "channel disconnected [%zd/%s]", len, uv_strerror(len)); // propagate close on_channel_close(ch, ZITI_CONNABORT, len); - close_connection(ch); return; } @@ -916,9 +903,25 @@ static void on_channel_data(uv_stream_t *s, ssize_t len, const uv_buf_t *buf) { process_inbound(ch); } -static void on_channel_connect_internal(uv_connect_t *req, int status) { +static void on_tls_connect(uv_connect_t *req, int status) { tlsuv_stream_t *tls = (tlsuv_stream_t *)req->handle; + + // connect request was cancelled via tlsuv_stream_close + // cleanup in close callback + if (status == UV_ECANCELED || tls->data == NULL) { + goto done; + } + ziti_channel_t *ch = tls->data; + assert(ch); + + if (tls != ch->connection) { + // this should never happen but handle it anyway -- close connected tls stream + CH_LOG(ERROR, "invalid state, mismatch req->conn[%p] != ch->conn[%p]", tls, ch->connection); + tls->data = NULL; + tlsuv_stream_close(tls, on_tls_close); + goto done; + } if (status == 0) { const char *token = ziti_get_api_session_token(ch->ztx); @@ -930,21 +933,13 @@ static void on_channel_connect_internal(uv_connect_t *req, int status) { send_hello(ch, token); } else { CH_LOG(WARN, "api session invalidated, while connecting"); - close_connection(ch); - reconnect_channel(ch, false); + on_channel_close(ch, ZITI_CONNABORT, 0); } - } else if (ch != NULL) { + } else { CH_LOG(ERROR, "failed to connect to ER[%s] [%d/%s]", ch->name, status, uv_strerror(status)); - - if (status != UV_ECANCELED) { - close_connection(ch); - } - - if (ch->state != Closed) { - ch->state = Disconnected; - reconnect_channel(ch, false); - } + on_channel_close(ch, ZITI_CONNABORT, status); } + done: free(req); }