diff --git a/inc_internal/internal_model.h b/inc_internal/internal_model.h index 5ce62958..bc290b38 100644 --- a/inc_internal/internal_model.h +++ b/inc_internal/internal_model.h @@ -40,7 +40,8 @@ XX(protocols, ziti_er_protocols, none, supportedProtocols, __VA_ARGS__) XX(token, string, none, token, __VA_ARGS__)\ XX(id, string, none, id, __VA_ARGS__) \ XX(edge_routers, ziti_edge_router, list, edgeRouters, __VA_ARGS__) \ -XX(service_id, string, none, NULL, __VA_ARGS__) +XX(service_id, string, none, serviceId, __VA_ARGS__) \ +XX(refresh, bool, none, , __VA_ARGS__) #define ZITI_PROCESS_MODEL(XX, ...) \ XX(path, string, none, path, __VA_ARGS__) diff --git a/inc_internal/zt_internal.h b/inc_internal/zt_internal.h index 6a225f3d..9c18a7ab 100644 --- a/inc_internal/zt_internal.h +++ b/inc_internal/zt_internal.h @@ -1,4 +1,4 @@ -// Copyright (c) 2022-2023. NetFoundry Inc. +// Copyright (c) 2022-2024. NetFoundry Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -102,7 +102,6 @@ typedef struct ziti_channel { ch_state state; uint32_t reconnect_count; - LIST_HEAD(conn_reqs, ch_conn_req) conn_reqs; uint32_t msg_seq; buffer *incoming; @@ -298,6 +297,10 @@ struct ziti_ctx { // map model_map connections; + // map -- connections waiting for a suitable channel + // map to make removal easier + model_map waiting_connections; + uint32_t conn_seq; /* context wide metrics */ @@ -322,8 +325,7 @@ extern "C" { bool ziti_is_session_valid(ziti_context ztx, ziti_session *session, const char *service_id, ziti_session_type type); -void -ziti_invalidate_session(ziti_context ztx, ziti_session *session, const char *service_id, ziti_session_type type); +void ziti_invalidate_session(ziti_context ztx, const char *service_id, ziti_session_type type); void ziti_on_channel_event(ziti_channel_t *ch, ziti_router_status status, ziti_context ztx); @@ -335,7 +337,9 @@ bool ziti_channel_is_connected(ziti_channel_t *ch); uint64_t ziti_channel_latency(ziti_channel_t *ch); -int ziti_channel_connect(ziti_context ztx, const char *name, const char *url, ch_connect_cb, void *ctx); +int ziti_channel_force_connect(ziti_channel_t *ch); + +int ziti_channel_connect(ziti_context ztx, const char *name, const char *url); int ziti_channel_prepare(ziti_channel_t *ch); @@ -401,6 +405,9 @@ extern uv_timer_t *new_ztx_timer(ziti_context ztx); int conn_bridge_info(ziti_connection conn, char *buf, size_t buflen); +void process_connect(struct ziti_conn *conn, ziti_session *session); + + #ifdef __cplusplus } #endif diff --git a/library/channel.c b/library/channel.c index 4df7fcd4..c2dd955a 100644 --- a/library/channel.c +++ b/library/channel.c @@ -1,9 +1,9 @@ -// Copyright (c) 2022-2023. NetFoundry Inc. +// Copyright (c) 2022-2024. NetFoundry Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. -// You may obtain a copy of the License at // +// You may obtain a copy of the License at // https://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software @@ -85,7 +85,7 @@ static void process_inbound(ziti_channel_t *ch); static void on_tls_close(uv_handle_t *s); static inline void close_connection(ziti_channel_t *ch) { - if (ch->connection) { + if (ch->connection && ch->connection->close_cb == NULL) { tlsuv_stream_t *conn = ch->connection; CH_LOG(DEBUG, "closing TLS[%p]", conn); tlsuv_stream_close(conn, on_tls_close); @@ -95,23 +95,12 @@ static inline void close_connection(ziti_channel_t *ch) { // global channel sequence static uint32_t channel_counter = 0; -struct ch_write_req { - uv_buf_t buf; - ziti_channel_t *ch; -}; - struct waiter_s { uint32_t seq; reply_cb cb; void *reply_ctx; }; -struct ch_conn_req { - ch_connect_cb cb; - void *ctx; - LIST_ENTRY(ch_conn_req) next; -}; - struct msg_receiver { int id; void *receiver; @@ -160,7 +149,6 @@ static int ziti_channel_init(struct ziti_ctx *ctx, ziti_channel_t *ch, uint32_t snprintf(ch->token, sizeof(ch->token), "ziti-sdk-c[%d]@%*.*s", ch->id, (int) hostlen, (int) hostlen, hostname); ch->state = Initial; - LIST_INIT(&ch->conn_reqs); ch->name = NULL; ch->in_next = NULL; @@ -181,6 +169,10 @@ static int ziti_channel_init(struct ziti_ctx *ctx, ziti_channel_t *ch, uint32_t } void ziti_channel_free(ziti_channel_t *ch) { + if (ch->connection) { + ch->connection->data = NULL; + ch->connection = NULL; + } free_buffer(ch->incoming); pool_destroy(ch->in_msg_pool); ch->in_msg_pool = NULL; @@ -212,14 +204,17 @@ int ziti_close_channels(struct ziti_ctx *ztx, int err) { static void on_tls_close(uv_handle_t *s) { tlsuv_stream_t *tls = (tlsuv_stream_t *) s; ziti_channel_t *ch = tls->data; - ch->connection = NULL; + + if (ch) { + ch->connection = NULL; + if (ch->reconnect) { + ch->reconnect = false; + reconnect_channel(ch, true); + } + } tlsuv_stream_free(tls); free(tls); - - if (ch->reconnect) { - reconnect_channel(ch, true); - } } int ziti_channel_close(ziti_channel_t *ch, int err) { @@ -309,7 +304,23 @@ static void check_connecting_state(ziti_channel_t *ch) { } } -int ziti_channel_connect(ziti_context ztx, const char *ch_name, const char *url, ch_connect_cb cb, void *cb_ctx) { +int ziti_channel_force_connect(ziti_channel_t *ch) { + if (ch == NULL) { + return ZITI_INVALID_STATE; + } + + if (ch->state == Closed) { + return ZITI_GATEWAY_UNAVAILABLE; + } + + if (ch->state == Disconnected) { + reconnect_channel(ch, true); + } + + return ZITI_OK; +} + +int ziti_channel_connect(ziti_context ztx, const char *ch_name, const char *url) { ziti_channel_t *ch = model_map_get(&ztx->channels, url); if (ch != NULL) { @@ -324,29 +335,6 @@ int ziti_channel_connect(ziti_context ztx, const char *ch_name, const char *url, check_connecting_state(ch); } - switch (ch->state) { - case Connected: - if (cb) { - cb(ch, cb_ctx, ZITI_OK); - } - break; - - case Initial: - case Connecting: - case Disconnected: - if (cb != NULL) { - NEWP(r, struct ch_conn_req); - r->cb = cb; - r->ctx = cb_ctx; - LIST_INSERT_HEAD(&ch->conn_reqs, r, next); - } - - break; - default: - CH_LOG(ERROR, "should not be here: %s", ziti_errorstr(ZITI_WTF)); - return ZITI_WTF; - } - if (ch->state == Initial || ch->state == Disconnected) { reconnect_channel(ch, true); } @@ -695,13 +683,6 @@ static void hello_reply_cb(void *ctx, message *msg, int err) { close_connection(ch); reconnect_channel(ch, false); } - - while (!LIST_EMPTY(&ch->conn_reqs)) { - struct ch_conn_req *r = LIST_FIRST(&ch->conn_reqs); - LIST_REMOVE(r, next); - r->cb(ch, r->ctx, cb_code); - free(r); - } } static void send_hello(ziti_channel_t *ch, ziti_api_session *session) { @@ -778,6 +759,12 @@ static void reconnect_channel(ziti_channel_t *ch, bool now) { uint64_t timeout = 0; if (!now) { + if (uv_is_active((const uv_handle_t *) ch->timer) && + ch->timer->timer_cb == reconnect_cb) { + // reconnect is already scheduled + return; + } + ch->reconnect_count++; int backoff = MIN(ch->reconnect_count, MAX_BACKOFF); @@ -786,8 +773,7 @@ static void reconnect_channel(ziti_channel_t *ch, bool now) { timeout = random % ((1U << backoff) * BACKOFF_TIME); CH_LOG(INFO, "reconnecting in %" PRIu64 "ms (attempt = %d)", timeout, ch->reconnect_count); - } - else { + } else { CH_LOG(INFO, "reconnecting NOW"); } uv_timer_start(ch->timer, reconnect_cb, timeout, 0); @@ -894,7 +880,8 @@ static void on_channel_data(uv_stream_t *s, ssize_t len, const uv_buf_t *buf) { } static void on_channel_connect_internal(uv_connect_t *req, int status) { - ziti_channel_t *ch = req->data; + tlsuv_stream_t *tls = (tlsuv_stream_t *)req->handle; + ziti_channel_t *ch = tls->data; if (status == 0) { if (ch->ctx->api_session != NULL && ch->ctx->api_session->token != NULL) { @@ -908,16 +895,9 @@ static void on_channel_connect_internal(uv_connect_t *req, int status) { close_connection(ch); reconnect_channel(ch, false); } - } else { + } else if (ch != NULL) { CH_LOG(ERROR, "failed to connect to ER[%s] [%d/%s]", ch->name, status, uv_strerror(status)); - while (!LIST_EMPTY(&ch->conn_reqs)) { - struct ch_conn_req *r = LIST_FIRST(&ch->conn_reqs); - LIST_REMOVE(r, next); - r->cb(ch, r->ctx, status); - free(r); - } - if (status != UV_ECANCELED) { close_connection(ch); } diff --git a/library/connect.c b/library/connect.c index 43d82cd5..89c09242 100644 --- a/library/connect.c +++ b/library/connect.c @@ -1,4 +1,4 @@ -// Copyright (c) 2022-2023. NetFoundry Inc. +// Copyright (c) 2022-2024. NetFoundry Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -65,7 +65,6 @@ struct local_hash { struct ziti_conn_req { ziti_session_type session_type; char *service_id; - ziti_session *session; ziti_conn_cb cb; ziti_dial_opts dial_opts; @@ -81,15 +80,14 @@ static bool flush_to_service(ziti_connection conn); static bool flush_to_client(ziti_connection conn); -static void process_connect(struct ziti_conn *conn); - static int send_fin_message(ziti_connection conn, struct ziti_write_req_s *wr); static void queue_edge_message(struct ziti_conn *conn, message *msg, int code); static void process_edge_message(struct ziti_conn *conn, message *msg); -static int ziti_channel_start_connection(struct ziti_conn *conn, ziti_channel_t *ch); +static bool ziti_connect(struct ziti_ctx *ztx, ziti_session *session, struct ziti_conn *conn); +static int ziti_channel_start_connection(struct ziti_conn *conn, ziti_channel_t *ch, ziti_session *session); static int ziti_disconnect(ziti_connection conn); @@ -147,11 +145,6 @@ static void free_conn_req(struct ziti_conn_req *r) { uv_close((uv_handle_t *) r->conn_timeout, free_handle); } - if (r->session_type == ziti_session_types.Bind && r->session) { - free_ziti_session(r->session); - FREE(r->session); - } - free_ziti_dial_opts(&r->dial_opts); FREE(r->service_id); free(r); @@ -302,35 +295,6 @@ static int send_message(struct ziti_conn *conn, message *m, struct ziti_write_re return ziti_channel_send_message(ch, m, wr); } -static void on_channel_connected(ziti_channel_t *ch, void *ctx, int status) { - uintptr_t cid = (uintptr_t) ctx; - uint32_t conn_id = (uint32_t) cid; - ziti_context ztx = ch->ctx; - - // check if it is still a valid connection; - // connection may be completed and gone by the time this channel gets connected - struct ziti_conn *conn = model_map_getl(&ztx->connections, (long) conn_id); - if (conn == NULL) { - ZTX_LOG(VERBOSE, "ch[%d] connection(id = %d) is gone", ch->id, conn_id); - return; - } - - // if channel was already selected - if (conn->channel != NULL) { - CONN_LOG(TRACE, "is already using another channel"); - } else { - if (status < 0) { - ZTX_LOG(ERROR, "ch[%d] failed to connect [%d/%s]", ch->id, status, uv_strerror(status)); - } else if (conn->conn_req && conn->conn_req->failed) { - CONN_LOG(DEBUG, "request already timed out or closed"); - } else { // first channel to connect - CONN_LOG(DEBUG, "selected ch[%s] status[%d]", ch->name, status); - - ziti_channel_start_connection(conn, ch); - } - } -} - static void complete_conn_req(struct ziti_conn *conn, int code) { if (conn->conn_req && conn->conn_req->cb) { if (code != ZITI_OK) { @@ -357,6 +321,7 @@ static void complete_conn_req(struct ziti_conn *conn, int code) { } flush_connection(conn); + model_map_removel(&conn->ziti_ctx->waiting_connections, (long)conn->conn_id); } else { CONN_LOG(WARN, "connection attempt was already completed"); } @@ -384,41 +349,39 @@ static void connect_timeout(uv_timer_t *timer) { } } -static int ziti_connect(struct ziti_ctx *ztx, ziti_session *session, struct ziti_conn *conn) { - // verify ziti context is still authorized - if (ztx->api_session == NULL) { - CONN_LOG(ERROR, "ziti context is not authenticated, cannot connect to service[%s]", conn->service); - complete_conn_req(conn, ZITI_INVALID_STATE); - return ZITI_INVALID_STATE; - } +static void conn_wait_for_er() { - if (model_list_size(&session->edge_routers) == 0) { - CONN_LOG(ERROR, "no edge routers available for service[%s] session[%s]", conn->service, session->id); - complete_conn_req(conn, ZITI_GATEWAY_UNAVAILABLE); - return ZITI_GATEWAY_UNAVAILABLE; - } +} - conn->channel = NULL; +static bool ziti_connect(struct ziti_ctx *ztx, ziti_session *session, struct ziti_conn *conn) { + bool result = false; ziti_edge_router *er; + ziti_channel_t *ch; ziti_channel_t *best_ch = NULL; uint64_t best_latency = UINT64_MAX; - uintptr_t conn_id = conn->conn_id; + + model_list disconnected = {0}; + + conn->channel = NULL; + MODEL_LIST_FOREACH(er, session->edge_routers) { const char *tls = er->protocols.tls; if (tls) { - ziti_channel_t *ch = model_map_get(&ztx->channels, tls); + ch = model_map_get(&ztx->channels, tls); + if (ch == NULL) continue; - if (ch != NULL && ch->state == Connected) { + if (ch->state == Connected) { if (ch->latency < best_latency) { best_ch = ch; best_latency = ch->latency; } - } else { - CONN_LOG(TRACE, "connecting to %s(%s) for session[%s]", er->name, tls, session->id); - ziti_channel_connect(ztx, er->name, tls, on_channel_connected, (void *) conn_id); + } + + if (ch->state == Disconnected) { + model_list_append(&disconnected, ch); } } } @@ -426,10 +389,20 @@ static int ziti_connect(struct ziti_ctx *ztx, ziti_session *session, struct ziti if (best_ch) { CONN_LOG(DEBUG, "selected ch[%s@%s] for best latency(%llu ms)", best_ch->name, best_ch->url, (unsigned long long) best_ch->latency); - on_channel_connected(best_ch, (void *) conn_id, ZITI_OK); - } + ziti_channel_start_connection(conn, best_ch, session); + result = true; + } else { + // if no channels are currently connected + // force them to connect + MODEL_LIST_FOREACH(ch, disconnected) { + ziti_channel_force_connect(ch); + } - return 0; + CONN_LOG(DEBUG, "waiting for suitable channel"); + model_map_setl(&ztx->waiting_connections, (long)conn->conn_id, (void*)(uintptr_t)conn->conn_id); + } + model_list_clear(&disconnected, NULL); + return result; } static void connect_get_service_cb(ziti_context ztx, ziti_service *s, int status, void *ctx) { @@ -447,7 +420,7 @@ static void connect_get_service_cb(ziti_context ztx, ziti_service *s, int status req->service_id = strdup(s->id); conn->encrypted = s->encryption; - process_connect(conn); + process_connect(conn, NULL); } else if (status == ZITI_SERVICE_UNAVAILABLE) { CONN_LOG(ERROR, "service[%s] is not available for ztx[%s]", conn->service, ztx->api_session->identity->name); complete_conn_req(conn, ZITI_SERVICE_UNAVAILABLE); @@ -457,6 +430,17 @@ static void connect_get_service_cb(ziti_context ztx, ziti_service *s, int status } } +static void refresh_session_cb(ziti_session *s, const ziti_error *err, void *ctx) { + struct ziti_ctx *ztx = ctx; + if (err) { + ZITI_LOG(WARN, "failed to refresh session"); + } else if (s != NULL) { + ZITI_LOG(DEBUG, "ztx[%d] refreshed session[%s]", ztx->id, s->id); + ziti_session *existing = model_map_set(&ztx->sessions, s->service_id, s); + free_ziti_session_ptr(existing); + } +} + static void connect_get_net_session_cb(ziti_session *s, const ziti_error *err, void *ctx) { struct ziti_conn *conn = ctx; struct ziti_conn_req *req = conn->conn_req; @@ -474,28 +458,36 @@ static void connect_get_net_session_cb(ziti_session *s, const ziti_error *err, v complete_conn_req(conn, e); } } else { - req->session = s; - s->service_id = strdup(req->service_id); - if (req->session_type == ziti_session_types.Dial) { - ziti_session *existing = model_map_get(&ztx->sessions, req->service_id); - // this happens with concurrent connection requests for the same service (common with browsers) - if (existing) { - CONN_LOG(DEBUG, "found session[%s] for service[%s]", existing->id, conn->service); - free_ziti_session(s); - free(s); - req->session = existing; - } else { - CONN_LOG(DEBUG, "got session[%s] for service[%s]", s->id, conn->service); - model_map_set(&ztx->sessions, s->service_id, s); - } + ziti_session *existing = model_map_set(&ztx->sessions, req->service_id, s); + // this happens with concurrent connection requests for the same service (common with browsers) + if (existing) { + CONN_LOG(DEBUG, "discarding existing session[%s] for service[%s]", existing->id, conn->service); + free_ziti_session(existing); + free(existing); + } else { + CONN_LOG(DEBUG, "got session[%s] for service[%s]", s->id, conn->service); + model_map_set(&ztx->sessions, s->service_id, s); } - process_connect(conn); + process_connect(conn, s); } } -static void process_connect(struct ziti_conn *conn) { +void process_connect(struct ziti_conn *conn, ziti_session *session) { + assert(conn->conn_req); + assert(conn->ziti_ctx); + struct ziti_conn_req *req = conn->conn_req; struct ziti_ctx *ztx = conn->ziti_ctx; + + assert(req->session_type == ziti_session_types.Dial); + + // verify ziti context is still authorized + if (ztx->api_session == NULL) { + CONN_LOG(ERROR, "ziti context is not authenticated, cannot connect to service[%s]", conn->service); + complete_conn_req(conn, ZITI_INVALID_STATE); + return; + } + uv_loop_t *loop = ztx->loop; // find service @@ -509,26 +501,46 @@ static void process_connect(struct ziti_conn *conn) { } ziti_send_posture_data(ztx); - if (req->session == NULL && req->session_type == ziti_session_types.Dial) { - req->session = model_map_get(&ztx->sessions, req->service_id); + if (session == NULL) { + session = model_map_get(&ztx->sessions, req->service_id); } - if (req->session == NULL) { - CONN_LOG(DEBUG, "requesting '%s' session for service[%s]", ziti_session_types.name(req->session_type), - conn->service); - ziti_ctrl_create_session(&ztx->controller, req->service_id, req->session_type, connect_get_net_session_cb, - conn); + if (session == NULL) { + CONN_LOG(DEBUG, "requesting 'Dial' session for service[%s]", conn->service); + // this will re-enter with session if create succeeds + ziti_ctrl_create_session(&ztx->controller, req->service_id, ziti_session_types.Dial, + connect_get_net_session_cb, conn); return; - } else { - if (req->dial_opts.connect_timeout_seconds > 0) { - req->conn_timeout = calloc(1, sizeof(uv_timer_t)); - uv_timer_init(loop, req->conn_timeout); - req->conn_timeout->data = conn; - uv_timer_start(req->conn_timeout, connect_timeout, req->dial_opts.connect_timeout_seconds * 1000, 0); + } + + if (model_list_size(&session->edge_routers) == 0) { + if (session->refresh) { + ziti_ctrl_get_session(&ztx->controller, session->id, connect_get_net_session_cb, conn); + return; + } else { + CONN_LOG(ERROR, "no edge routers available for service[%s] session[%s]", conn->service, session->id); + complete_conn_req(conn, ZITI_GATEWAY_UNAVAILABLE); + return; } - CONN_LOG(DEBUG, "starting %s connection for service[%s] with session[%s]", - ziti_session_types.name(req->session_type), conn->service, req->session->id); - ziti_connect(ztx, req->session, conn); + } + + if (req->dial_opts.connect_timeout_seconds > 0) { + req->conn_timeout = calloc(1, sizeof(uv_timer_t)); + uv_timer_init(loop, req->conn_timeout); + req->conn_timeout->data = conn; + uv_timer_start(req->conn_timeout, connect_timeout, req->dial_opts.connect_timeout_seconds * 1000, 0); + } + + CONN_LOG(DEBUG, "starting Dial connection for service[%s] with session[%s]", conn->service, session->id); + if (!ziti_connect(ztx, session, conn)) { + CONN_LOG(DEBUG, "no active edge routers, pending ER connection"); + // TODO deal with pending connect + } + + if (session->refresh) { + CONN_LOG(DEBUG, "refreshing session[%s]", session->id); + ziti_ctrl_get_session(&ztx->controller, session->id, refresh_session_cb, ztx); + session->refresh = false; } } @@ -577,7 +589,7 @@ static int do_ziti_dial(ziti_connection conn, const char *service, ziti_dial_opt conn->start = uv_now(conn->ziti_ctx->loop); - process_connect(conn); + process_connect(conn, NULL); return ZITI_OK; } @@ -920,7 +932,7 @@ static void restart_connect(struct ziti_conn *conn) { CONN_LOG(DEBUG, "restarting connect sequence"); conn->channel = NULL; - process_connect(conn); + process_connect(conn, NULL); } void connect_reply_cb(void *ctx, message *msg, int err) { @@ -943,19 +955,7 @@ void connect_reply_cb(void *ctx, message *msg, int err) { case ContentTypeStateClosed: if (strncmp(INVALID_SESSION, (const char *) msg->body, msg->header.body_len) == 0) { CONN_LOG(WARN, "session for service[%s] became invalid", conn->service); - if (conn->conn_req->session_type == ziti_session_types.Dial) { - ziti_session *s = model_map_get(&conn->ziti_ctx->sessions, req->service_id); - if (s != req->session) { - // already removed or different one - // req reference is no longer valid - req->session = NULL; - } else if (s == req->session) { - model_map_remove(&conn->ziti_ctx->sessions, req->service_id); - } - } - free_ziti_session(req->session); - FREE(req->session); - + ziti_invalidate_session(conn->ziti_ctx, conn->conn_req->service_id, ziti_session_types.Dial); ziti_channel_rem_receiver(conn->channel, conn->conn_id); conn->channel = NULL; restart_connect(conn); @@ -999,9 +999,8 @@ void connect_reply_cb(void *ctx, message *msg, int err) { } } -static int ziti_channel_start_connection(struct ziti_conn *conn, ziti_channel_t *ch) { +static int ziti_channel_start_connection(struct ziti_conn *conn, ziti_channel_t *ch, ziti_session *session) { struct ziti_conn_req *req = conn->conn_req; - ziti_session *s = req->session; uint32_t content_type; switch (conn->state) { @@ -1016,18 +1015,7 @@ static int ziti_channel_start_connection(struct ziti_conn *conn, ziti_channel_t return ZITI_WTF; } - if (!ziti_is_session_valid(conn->ziti_ctx, s, req->service_id, req->session_type)) { - CONN_LOG(DEBUG, "session is no longer valid"); - if (req->session_type == ziti_session_types.Bind) { - free_ziti_session(req->session); - FREE(req->session); - } - req->session = NULL; - restart_connect(conn); - return ZITI_OK; - } - - CONN_LOG(TRACE, "ch[%d] => Edge Connect request token[%s]", ch->id, s->token); + CONN_LOG(TRACE, "ch[%d] => Edge Connect request token[%s]", ch->id, session->token); conn->channel = ch; ziti_channel_add_receiver(ch, conn->conn_id, conn, (void (*)(void *, message *, int)) queue_edge_message); @@ -1099,7 +1087,8 @@ static int ziti_channel_start_connection(struct ziti_conn *conn, ziti_channel_t } req->waiter = ziti_channel_send_for_reply(ch, content_type, headers, nheaders, - s->token, strlen(s->token), connect_reply_cb, conn); + session->token, strlen(session->token), + connect_reply_cb, conn); return ZITI_OK; } @@ -1352,9 +1341,7 @@ static void process_edge_message(struct ziti_conn *conn, message *msg) { case Accepting: { if (strncmp(INVALID_SESSION, (const char *) msg->body, msg->header.body_len) == 0) { CONN_LOG(WARN, "session for service[%s] became invalid", conn->service); - ziti_invalidate_session(conn->ziti_ctx, conn->conn_req->session, - conn->conn_req->service_id, conn->conn_req->session_type); - conn->conn_req->session = NULL; + ziti_invalidate_session(conn->ziti_ctx, conn->conn_req->service_id, conn->conn_req->session_type); retry_connect = true; } if (retry_connect) { diff --git a/library/model_support.c b/library/model_support.c index d3f0bdf1..c5131f50 100644 --- a/library/model_support.c +++ b/library/model_support.c @@ -683,8 +683,10 @@ static int parse_obj(void *obj, const char *json, jsmntok_t *tok, type_meta *met return -1; } field_meta *fm = NULL; + size_t token_len = tok->end - tok->start; for (int i = 0; i < meta->field_count; i++) { - if (strncmp(meta->fields[i].path, json + tok->start, tok->end - tok->start) == 0) { + if (strncmp(meta->fields[i].path, json + tok->start, token_len) == 0 && + meta->fields[i].path[token_len] == '\0') { fm = &meta->fields[i]; break; } diff --git a/library/ziti.c b/library/ziti.c index 29c7e630..f2042042 100644 --- a/library/ziti.c +++ b/library/ziti.c @@ -1,4 +1,4 @@ -// Copyright (c) 2022-2023. NetFoundry Inc. +// Copyright (c) 2022-2024. NetFoundry Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -1306,6 +1306,7 @@ void ziti_services_refresh(ziti_context ztx, bool now) { static void edge_routers_cb(ziti_edge_router_array ers, const ziti_error *err, void *ctx) { ziti_context ztx = ctx; + bool ers_changed = false; if (err) { ZTX_LOG(ERROR, "failed to get current edge routers: code[%d] %s/%s", @@ -1339,7 +1340,8 @@ static void edge_routers_cb(ziti_edge_router_array ers, const ziti_error *err, v // check if it is already in the list if (model_map_remove(&curr_routers, tls) == NULL) { ZTX_LOG(TRACE, "connecting to %s(%s)", er->name, tls); - ziti_channel_connect(ztx, er->name, tls, NULL, NULL); + ziti_channel_connect(ztx, er->name, tls); + ers_changed = true; } } else { ZTX_LOG(DEBUG, "edge router %s does not have TLS edge listener", er->name); @@ -1358,6 +1360,22 @@ static void edge_routers_cb(ziti_edge_router_array ers, const ziti_error *err, v ZTX_LOG(INFO, "removing channel[%s@%s]: no longer available", ch->name, er_url); ziti_channel_close(ch, ZITI_GATEWAY_UNAVAILABLE); it = model_map_it_remove(it); + ers_changed = true; + } + + // if the list of ERs changed, we want to opportunistically + // refresh sessions to clear out references to old ERs, + // and pull new ERs (which could be better for dialing) + + // we don't want to evict/refresh session right away + // because it may have a serviceable ER + // just refresh it on demand (next dial) + if (ers_changed) { + const char *serv; + ziti_session *session; + MODEL_MAP_FOREACH(serv, session, &ztx->sessions) { + session->refresh = true; + } } } @@ -1686,24 +1704,11 @@ bool ziti_is_session_valid(ziti_context ztx, ziti_session *session, const char * return s == session; } -void ziti_invalidate_session(ziti_context ztx, ziti_session *session, const char *service_id, ziti_session_type type) { - if (session == NULL) { - return; - } - +void ziti_invalidate_session(ziti_context ztx, const char *service_id, ziti_session_type type) { if (type == ziti_session_types.Dial) { - ziti_session *s = model_map_get(&ztx->sessions, service_id); - if (s != session) { - // already removed or different one - // passed reference is no longer valid - session = NULL; - } else if (s == session) { - model_map_remove(&ztx->sessions, session->service_id); - } + ziti_session *s = model_map_remove(&ztx->sessions, service_id); + free_ziti_session_ptr(s); } - - free_ziti_session(session); - FREE(session); } static const ziti_version sdk_version = { @@ -1778,6 +1783,26 @@ void ziti_on_channel_event(ziti_channel_t *ch, ziti_router_status status, ziti_c shutdown_and_free(ztx); } } + + if (status == EdgeRouterConnected) { + // move all ids to a list + model_list ids = {0}; + MODEL_MAP_FOR(it, ztx->waiting_connections) { + model_list_append(&ids, model_map_it_value(it)); + } + + model_map_clear(&ztx->waiting_connections, NULL); + + model_list_iter id_it = model_list_iterator(&ids); + while(id_it != NULL) { + uint32_t conn_id = (uint32_t)(uintptr_t)model_list_it_element(id_it); + ziti_connection conn = model_map_getl(&ztx->connections, (long)conn_id); + if (conn != NULL) { + process_connect(conn, NULL); + } + id_it = model_list_it_remove(id_it); + } + } } static void ztx_work_async(uv_async_t *ar) {