Skip to content

Commit

Permalink
implement more robust ziti_channel re-connect (#736)
Browse files Browse the repository at this point in the history
* improve channel re-connect
* close `ch.connection` on failed writes
* consolidate disconnect logic
* update [email protected]
  • Loading branch information
ekoby authored Oct 1, 2024
1 parent 5c468d5 commit 7a3d145
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 66 deletions.
2 changes: 1 addition & 1 deletion deps/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ if (NOT TARGET tlsuv)
else ()
FetchContent_Declare(tlsuv
GIT_REPOSITORY https://github.com/openziti/tlsuv.git
GIT_TAG v0.32.1
GIT_TAG v0.32.2
)
FetchContent_MakeAvailable(tlsuv)
endif (tlsuv_DIR)
Expand Down
125 changes: 60 additions & 65 deletions library/channel.c
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@
#include "zt_internal.h"
#include "utils.h"
#include "endian_internal.h"

#if _WIN32
#include "win32_compat.h"
#endif

#ifndef MAXHOSTNAMELEN
#define MAXHOSTNAMELEN 255
Expand Down Expand Up @@ -67,7 +70,7 @@ static void reconnect_channel(ziti_channel_t *ch, bool now);

static void reconnect_cb(uv_timer_t *t);

static void on_channel_connect_internal(uv_connect_t *req, int status);
static void on_tls_connect(uv_connect_t *req, int status);

static struct msg_receiver *find_receiver(ziti_channel_t *ch, uint32_t conn_id);

Expand All @@ -85,10 +88,12 @@ static void process_inbound(ziti_channel_t *ch);
static void on_tls_close(uv_handle_t *s);

static inline void close_connection(ziti_channel_t *ch) {
if (ch->connection && ch->connection->close_cb == NULL) {
tlsuv_stream_t *conn = ch->connection;
CH_LOG(DEBUG, "closing TLS[%p]", conn);
tlsuv_stream_close(conn, on_tls_close);
tlsuv_stream_t *tls = ch->connection;
ch->connection = NULL;

if (tls) {
CH_LOG(DEBUG, "closing TLS[%p]", tls);
tlsuv_stream_close(tls, on_tls_close);
}
}

Expand All @@ -102,7 +107,7 @@ struct waiter_s {
};

struct msg_receiver {
int id;
uint32_t id;
void *receiver;

void (*receive)(void *receiver, message *m, int code);
Expand Down Expand Up @@ -136,7 +141,7 @@ int ziti_channel_prepare(ziti_channel_t *ch) {
return 0;
}

static int ziti_channel_init(struct ziti_ctx *ctx, ziti_channel_t *ch, uint32_t id, tls_context *tls) {
static int ziti_channel_init(struct ziti_ctx *ctx, ziti_channel_t *ch, uint32_t id) {
ch->ztx = ctx;
ch->loop = ctx->loop;
ch->id = id;
Expand Down Expand Up @@ -203,16 +208,6 @@ int ziti_close_channels(struct ziti_ctx *ztx, int err) {

static void on_tls_close(uv_handle_t *s) {
tlsuv_stream_t *tls = (tlsuv_stream_t *) s;
ziti_channel_t *ch = tls->data;

if (ch) {
ch->connection = NULL;
if (ch->reconnect) {
ch->reconnect = false;
reconnect_channel(ch, true);
}
}

tlsuv_stream_free(tls);
free(tls);
}
Expand All @@ -228,8 +223,6 @@ int ziti_channel_close(ziti_channel_t *ch, int err) {
uv_close((uv_handle_t *) ch->timer, (uv_close_cb) free);
ch->timer = NULL;

close_connection(ch);

ziti_channel_free(ch);
free(ch);
}
Expand Down Expand Up @@ -265,11 +258,11 @@ uint64_t ziti_channel_latency(ziti_channel_t *ch) {

static ziti_channel_t *new_ziti_channel(ziti_context ztx, const char *ch_name, const char *url) {
ziti_channel_t *ch = calloc(1, sizeof(ziti_channel_t));
ziti_channel_init(ztx, ch, channel_counter++, ztx->tlsCtx);
ziti_channel_init(ztx, ch, channel_counter++);
const ziti_identity *identity = ziti_get_identity(ztx);
ch->name = strdup(ch_name);
ch->url = strdup(url);
CH_LOG(INFO, "(%s) new channel for ztx[%d] identity[%s]", ch->name, ztx->id, ziti_get_identity(ztx)->name);
CH_LOG(INFO, "(%s) new channel for ztx[%d] identity[%s]", ch->name, ztx->id, identity->name);

struct tlsuv_url_s ingress;
tlsuv_parse_url(&ingress, url);
Expand Down Expand Up @@ -401,7 +394,9 @@ void on_channel_send(uv_write_t *w, int status) {

if (status < 0) {
CH_LOG(ERROR, "write failed [%d/%s]", status, uv_strerror(status));
on_channel_close(ch, ZITI_CONNABORT, status);
if (ch->out_q == 0) {
on_channel_close(ch, ZITI_CONNABORT, status);
}
}

free(w);
Expand Down Expand Up @@ -652,7 +647,6 @@ static void latency_timeout(uv_timer_t *t) {
ch->latency_waiter = NULL;
ch->latency = UINT64_MAX;

close_connection(ch);
on_channel_close(ch, ZITI_TIMEOUT, UV_ETIMEDOUT);
}
}
Expand Down Expand Up @@ -703,15 +697,12 @@ static void hello_reply_cb(void *ctx, message *msg, int err) {
ch->notify_cb(ch, EdgeRouterConnected, ch->notify_ctx);
ch->latency = uv_now(ch->loop) - ch->latency;
uv_timer_start(ch->timer, send_latency_probe, LATENCY_INTERVAL, 0);
}
else {
if (msg)
} else {
if (msg) {
CH_LOG(ERROR, "connect rejected: %d %*s", success, msg->header.body_len, msg->body);
}

ch->state = Disconnected;
ch->notify_cb(ch, EdgeRouterUnavailable, ch->notify_ctx);
close_connection(ch);
reconnect_channel(ch, false);
on_channel_close(ch, ZITI_CONNABORT, 0);
}
}

Expand All @@ -738,17 +729,12 @@ static void ch_connect_timeout(uv_timer_t *t) {
ziti_channel_t *ch = t->data;
CH_LOG(ERROR, "connect timeout");

if (ch->state == Closed) {
return;
}

ch->state = Disconnected;
if (ch->connection->conn_req == NULL) {
if (ch->connection && ch->connection->conn_req == NULL) {
// diagnostics
CH_LOG(WARN, "diagnostics: no conn_req in connect timeout");
}
reconnect_channel(ch, false);
close_connection(ch);

on_channel_close(ch, ZITI_TIMEOUT, UV_ETIMEDOUT);
}

static void reconnect_cb(uv_timer_t *t) {
Expand All @@ -773,9 +759,9 @@ static void reconnect_cb(uv_timer_t *t) {

CH_LOG(DEBUG, "connecting to %s", ch->url);

int rc = tlsuv_stream_connect(req, ch->connection, ch->host, ch->port, on_channel_connect_internal);
int rc = tlsuv_stream_connect(req, ch->connection, ch->host, ch->port, on_tls_connect);
if (rc != 0) {
on_channel_connect_internal(req, rc);
on_tls_connect(req, rc);
} else {
uv_timer_start(ch->timer, ch_connect_timeout, CONNECT_TIMEOUT, 0);
}
Expand Down Expand Up @@ -852,22 +838,24 @@ static void on_channel_close(ziti_channel_t *ch, int ziti_err, ssize_t uv_err) {
ch->in_next = NULL;
}

close_connection(ch);

if (ziti_err == ZITI_DISABLED || ziti_err == ZITI_GATEWAY_UNAVAILABLE) {
return;
}

if (ch->state != Closed) {
if (uv_err == UV_EOF) {
ZTX_LOG(VERBOSE, "edge router closed connection, trying to refresh api session");
ziti_force_api_session_refresh(ch->ztx);
}
reconnect_channel(ch, false);
if (uv_err == UV_EOF) {
ZTX_LOG(VERBOSE, "edge router closed connection, trying to refresh api session");
ziti_force_api_session_refresh(ch->ztx);
}

reconnect_channel(ch, ch->reconnect);
ch->reconnect = false;
}

static void channel_alloc_cb(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf) {
tlsuv_stream_t *mbed = (tlsuv_stream_t *) handle;
ziti_channel_t *ch = mbed->data;
tlsuv_stream_t *tls = (tlsuv_stream_t *) handle;
ziti_channel_t *ch = tls->data;
if (ch->in_next || pool_has_available(ch->in_msg_pool)) {
buf->base = (char *) malloc(suggested_size);
if (buf->base == NULL) {
Expand All @@ -885,11 +873,11 @@ static void channel_alloc_cb(uv_handle_t *handle, size_t suggested_size, uv_buf_
}

static void on_channel_data(uv_stream_t *s, ssize_t len, const uv_buf_t *buf) {
tlsuv_stream_t *ssl = (tlsuv_stream_t *) s;
ziti_channel_t *ch = ssl->data;
tlsuv_stream_t *tls = (tlsuv_stream_t *) s;
ziti_channel_t *ch = tls->data;

if (len == UV_ENOBUFS) {
tlsuv_stream_read_stop(ssl);
tlsuv_stream_read_stop(tls);
CH_LOG(VERBOSE, "blocked until messages are processed");
return;
}
Expand All @@ -899,7 +887,6 @@ static void on_channel_data(uv_stream_t *s, ssize_t len, const uv_buf_t *buf) {
CH_LOG(INFO, "channel disconnected [%zd/%s]", len, uv_strerror(len));
// propagate close
on_channel_close(ch, ZITI_CONNABORT, len);
close_connection(ch);
return;
}

Expand All @@ -916,9 +903,25 @@ static void on_channel_data(uv_stream_t *s, ssize_t len, const uv_buf_t *buf) {
process_inbound(ch);
}

static void on_channel_connect_internal(uv_connect_t *req, int status) {
static void on_tls_connect(uv_connect_t *req, int status) {
tlsuv_stream_t *tls = (tlsuv_stream_t *)req->handle;

// connect request was cancelled via tlsuv_stream_close
// cleanup in close callback
if (status == UV_ECANCELED || tls->data == NULL) {
goto done;
}

ziti_channel_t *ch = tls->data;
assert(ch);

if (tls != ch->connection) {
// this should never happen but handle it anyway -- close connected tls stream
CH_LOG(ERROR, "invalid state, mismatch req->conn[%p] != ch->conn[%p]", tls, ch->connection);
tls->data = NULL;
tlsuv_stream_close(tls, on_tls_close);
goto done;
}

if (status == 0) {
const char *token = ziti_get_api_session_token(ch->ztx);
Expand All @@ -930,21 +933,13 @@ static void on_channel_connect_internal(uv_connect_t *req, int status) {
send_hello(ch, token);
} else {
CH_LOG(WARN, "api session invalidated, while connecting");
close_connection(ch);
reconnect_channel(ch, false);
on_channel_close(ch, ZITI_CONNABORT, 0);
}
} else if (ch != NULL) {
} else {
CH_LOG(ERROR, "failed to connect to ER[%s] [%d/%s]", ch->name, status, uv_strerror(status));

if (status != UV_ECANCELED) {
close_connection(ch);
}

if (ch->state != Closed) {
ch->state = Disconnected;
reconnect_channel(ch, false);
}
on_channel_close(ch, ZITI_CONNABORT, status);
}
done:
free(req);
}

Expand Down

0 comments on commit 7a3d145

Please sign in to comment.