Skip to content

Commit

Permalink
Fixes #1622: Added an observer field to the tcpListener entity so obs…
Browse files Browse the repository at this point in the history
…ervers can be directly specified or auto detected
  • Loading branch information
ganeshmurthy committed Oct 9, 2024
1 parent 3cf90e6 commit 423a5b6
Show file tree
Hide file tree
Showing 15 changed files with 983 additions and 446 deletions.
13 changes: 11 additions & 2 deletions include/qpid/dispatch/protocol_observer.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,13 @@
#include <qpid/dispatch/protocols.h>
#include <qpid/dispatch/vanflow.h>

typedef enum {
OBSERVER_NONE, // We will not observe the traffic at all.
OBSERVER_AUTO, // the tcp adaptor will determine if the protocol is http1 or http2
OBSERVER_HTTP1, // observer is http1
OBSERVER_HTTP2 // observer is http2
} qd_observer_t;

/**
* Callback type to indicate VAN address for cross-VAN transport.
*
Expand All @@ -37,10 +44,10 @@ typedef struct qdpo_config_t qdpo_config_t;
* Create a new protocol observer context
*
* @param use_address Callback address for use-address indications.
* @param allow_all_protocols If true, allow all protocols and deny exceptions. If false, deny all and allow exceptions.
* @param observer - the kind of observer to be associated with the config.
* @return qdpo_config_t* Newly allocated config record.
*/
qdpo_config_t *qdpo_config(qdpo_use_address_t use_address, bool allow_all_protocols);
qdpo_config_t *qdpo_config(qdpo_use_address_t use_address, qd_observer_t observer);

/**
* Free an allocated protocol observer config.
Expand Down Expand Up @@ -117,4 +124,6 @@ void qdpo_data(qdpo_transport_handle_t *transport_handle, bool from_client, cons
*/
void qdpo_end(qdpo_transport_handle_t *transport_handle);

void qdpo_set_observer(qdpo_t *protocol_observer, qd_observer_t observer);

#endif
9 changes: 8 additions & 1 deletion python/skupper_router/management/skrouter.json
Original file line number Diff line number Diff line change
Expand Up @@ -1064,7 +1064,7 @@
"tcpListener": {
"description": "Ingress TCP bridge.",
"extends": "configurationEntity",
"operations": ["CREATE", "DELETE"],
"operations": ["CREATE", "UPDATE", "DELETE"],
"attributes": {
"address": {
"description":"Address of this bridge",
Expand Down Expand Up @@ -1137,6 +1137,13 @@
"type": ["up", "down"],
"description": "The operational status of TCP socket listener: up - the service is active and incoming connections are permitted; down - the service is not active and incoming connection attempts will be refused.",
"create": false
},
"observer": {
"type": ["none","auto","http1", "http2"],
"default": "auto",
"description": "Specifies the type of observer that has been enabled on the tcpListner. If set to 'auto', the http1 and http2 protocols are auto detected, if set to 'http1', the http1 observer is enabled, if set to 'http2', the http2 observer is enabled. If the specified protocol was not detected on the wire, the observer exits with a warning message.",
"create": true,
"update": true
}
}
},
Expand Down
21 changes: 13 additions & 8 deletions python/skupper_router_internal/dispatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,23 +106,31 @@ def __init__(self) -> None:
self._prototype(self.qd_dispatch_configure_router, None, [self.qd_dispatch_p, py_object])
self._prototype(self.qd_dispatch_configure_site, None, [self.qd_dispatch_p, py_object])
self._prototype(self.qd_dispatch_prepare, None, [self.qd_dispatch_p])
self._prototype(self.qd_dispatch_configure_listener, c_void_p, [self.qd_dispatch_p, py_object])
self._prototype(self.qd_dispatch_configure_connector, c_void_p, [self.qd_dispatch_p, py_object])

# tcp and amqp listeners
self._prototype(self.qd_dispatch_configure_tcp_listener, c_void_p, [self.qd_dispatch_p, py_object])
self._prototype(self.qd_dispatch_configure_tcp_connector, c_void_p, [self.qd_dispatch_p, py_object])
self._prototype(self.qd_dispatch_delete_tcp_listener, None, [self.qd_dispatch_p, c_void_p])
self._prototype(self.qd_dispatch_delete_tcp_connector, None, [self.qd_dispatch_p, c_void_p])
self._prototype(self.qd_dispatch_update_tcp_listener, c_void_p, [self.qd_dispatch_p, py_object, c_void_p])
self._prototype(self.qd_dispatch_configure_listener, c_void_p, [self.qd_dispatch_p, py_object])
self._prototype(self.qd_connection_manager_delete_listener, None, [self.qd_dispatch_p, c_void_p])

# tcp and amqp connectors
self._prototype(self.qd_dispatch_configure_tcp_connector, c_void_p, [self.qd_dispatch_p, py_object])
self._prototype(self.qd_dispatch_delete_tcp_connector, None, [self.qd_dispatch_p, c_void_p])
self._prototype(self.qd_dispatch_configure_connector, c_void_p, [self.qd_dispatch_p, py_object])
self._prototype(self.qd_connection_manager_delete_connector, None, [self.qd_dispatch_p, c_void_p])

#sslProfile and display name service
self._prototype(self.qd_tls_configure_ssl_profile, c_void_p, [self.qd_dispatch_p, py_object])
self._prototype(self.qd_tls_update_ssl_profile, c_void_p, [self.qd_dispatch_p, py_object, c_void_p])
self._prototype(self.qd_tls_delete_ssl_profile, None, [self.qd_dispatch_p, c_void_p])
self._prototype(self.qd_tls_register_display_name_service, None, [py_object])

# address and autoLink
self._prototype(self.qd_dispatch_configure_address, None, [self.qd_dispatch_p, py_object])
self._prototype(self.qd_dispatch_configure_auto_link, None, [self.qd_dispatch_p, py_object])

# policy
self._prototype(self.qd_dispatch_configure_policy, None, [self.qd_dispatch_p, py_object])
self._prototype(self.qd_dispatch_register_policy_manager, None, [self.qd_dispatch_p, py_object])
self._prototype(self.qd_dispatch_policy_c_counts_alloc, py_object, [], check=False)
Expand All @@ -131,17 +139,14 @@ def __init__(self) -> None:
self._prototype(self.qd_dispatch_policy_host_pattern_remove, None, [self.qd_dispatch_p, py_object])
self._prototype(self.qd_dispatch_policy_host_pattern_lookup, c_char_p, [self.qd_dispatch_p, py_object])

# General router.
self._prototype(self.qd_dispatch_set_agent, None, [self.qd_dispatch_p, py_object])

self._prototype(self.qd_router_setup_late, None, [self.qd_dispatch_p])

self._prototype(self.qd_dispatch_router_lock, None, [self.qd_dispatch_p])
self._prototype(self.qd_dispatch_router_unlock, None, [self.qd_dispatch_p])

self._prototype(self.qd_connection_manager_start, None, [self.qd_dispatch_p])
self._prototype(self.qd_entity_refresh_begin, c_long, [py_object])
self._prototype(self.qd_entity_refresh_end, None, [])

self._prototype(self.qd_log_recent_py, py_object, [c_long])

def _prototype(self, f, restype, argtypes, check=True):
Expand Down
11 changes: 8 additions & 3 deletions python/skupper_router_internal/management/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -544,15 +544,20 @@ def create(self):
raise ValidationError("Invalid tcpListener configuration: see logs for details.")
return config_listener

def _delete(self):
self._qd.qd_dispatch_delete_tcp_listener(self._dispatch, self._implementations[0].key)

def _update(self):
tmp = self._qd.qd_dispatch_update_tcp_listener(self._dispatch, self, self._implementations[0].key)
if tmp is None:
raise ValidationError("listener configuration update failed: see logs for details.")

def _identifier(self):
return _host_port_name_identifier(self)

def __str__(self):
return super(TcpListenerEntity, self).__str__().replace("Entity(", "TcpListenerEntity(")

def _delete(self):
self._qd.qd_dispatch_delete_tcp_listener(self._dispatch, self._implementations[0].key)


class TcpConnectorEntity(EntityAdapter):
def create(self):
Expand Down
39 changes: 31 additions & 8 deletions src/adaptors/adaptor_common.c
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@
#include <stdatomic.h>

ALLOC_DEFINE(qd_adaptor_config_t);
const char* LISTENER_OBSERVER_AUTO = "auto";
const char* LISTENER_OBSERVER_HTTP1 = "http1";
const char* LISTENER_OBSERVER_HTTP2 = "http2";
const char* LISTENER_OBSERVER_NONE = "none";

void qd_free_adaptor_config(qd_adaptor_config_t *config)
{
Expand All @@ -49,20 +53,39 @@ void qd_free_adaptor_config(qd_adaptor_config_t *config)

#define CHECK() if (qd_error_code()) goto error

qd_observer_t get_listener_observer(const char *observer)
{
if (strcmp(observer, LISTENER_OBSERVER_NONE) == 0) {
return OBSERVER_NONE;
}

if (strcmp(observer, LISTENER_OBSERVER_HTTP1) == 0) {
return OBSERVER_HTTP1;
}

if (strcmp(observer, LISTENER_OBSERVER_HTTP2) == 0) {
return OBSERVER_HTTP2;
}

return OBSERVER_AUTO;
}

qd_error_t qd_load_adaptor_config(qdr_core_t *core, qd_adaptor_config_t *config, qd_entity_t *entity)
{
char *config_address;
qd_error_clear();
config->name = qd_entity_opt_string(entity, "name", 0); CHECK();
config->host = qd_entity_get_string(entity, "host"); CHECK();
config->port = qd_entity_get_string(entity, "port"); CHECK();
config_address = qd_entity_get_string(entity, "address"); CHECK();
config->site_id = qd_entity_opt_string(entity, "siteId", 0); CHECK();
config->ssl_profile_name = qd_entity_opt_string(entity, "sslProfile", 0); CHECK();
config->name = qd_entity_opt_string(entity, "name", 0); CHECK();
config->host = qd_entity_get_string(entity, "host"); CHECK();
config->port = qd_entity_get_string(entity, "port"); CHECK();
config_address = qd_entity_get_string(entity, "address"); CHECK();
config->site_id = qd_entity_opt_string(entity, "siteId", 0); CHECK();
config->ssl_profile_name = qd_entity_opt_string(entity, "sslProfile", 0); CHECK();
config->authenticate_peer = qd_entity_opt_bool(entity, "authenticatePeer", false); CHECK();
config->verify_host_name = qd_entity_opt_bool(entity, "verifyHostname", false); CHECK();

config->backlog = qd_entity_opt_long(entity, "backlog", 0);
char *observer = qd_entity_opt_string(entity, "observer", "auto"); CHECK();
config->observer = get_listener_observer(observer);
free(observer);
config->backlog = qd_entity_opt_long(entity, "backlog", 0);
CHECK();
if (config->backlog <= 0 || config->backlog > SOMAXCONN)
config->backlog = SOMAXCONN;
Expand Down
25 changes: 14 additions & 11 deletions src/adaptors/adaptor_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include "qpid/dispatch/threading.h"
#include "qpid/dispatch/vanflow.h"
#include "qpid/dispatch/router_core.h"
#include <qpid/dispatch/protocol_observer.h>

#include <proton/raw_connection.h>

Expand All @@ -44,18 +45,18 @@ typedef struct qd_adaptor_config_t qd_adaptor_config_t;

struct qd_adaptor_config_t
{
char *name;
char *host;
char *port;
char *address;
char *site_id;
char *host_port;
int backlog;

char *name;
char *host;
char *port;
char *address;
char *site_id;
char *host_port;
int backlog;
qd_observer_t observer;
//TLS related info
char *ssl_profile_name;
bool authenticate_peer;
bool verify_host_name;
char *ssl_profile_name;
bool authenticate_peer;
bool verify_host_name;
};


Expand Down Expand Up @@ -92,4 +93,6 @@ void qd_set_vflow_netaddr_string(vflow_record_t *vflow, pn_raw_connection_t *pn_
*/
void qd_set_condition_on_vflow(pn_raw_connection_t *raw_conn, vflow_record_t *vflow);

qd_observer_t get_listener_observer(const char *observer);

#endif // __adaptor_common_h__
72 changes: 60 additions & 12 deletions src/adaptors/tcp/tcp_adaptor.c
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include <qpid/dispatch/connection_counters.h>
#include <qpid/dispatch/vanflow.h>
#include <qpid/dispatch/tls_raw.h>
#include <qpid/dispatch/threading.h>
#include <proton/proactor.h>
#include <proton/raw_connection.h>
#include <proton/listener.h>
Expand Down Expand Up @@ -187,6 +188,22 @@ static void qd_tcp_connector_incref(qd_tcp_connector_t *connector)
sys_atomic_inc(&connector->ref_count);
}

/*
* Create or free listener->protocol_observer.
* listener->protocol_observer is setup when going from a none observer to a non-none observer
* listener->protocol_observer is freed when going from a non-one observer to a none observer
* This function does not do anything when you are going from a non-none observer to another non-none observer.
*/
static void _setup_protocol_observer_LH(qd_tcp_listener_t *listener)
{
if (listener->adaptor_config->observer == OBSERVER_NONE && listener->protocol_observer != 0) {
qdpo_free(listener->protocol_observer);
listener->protocol_observer = 0;
} else if (!listener->protocol_observer) {
listener->protocol_observer = protocol_observer(QD_PROTOCOL_TCP, qdpo_config(0, listener->adaptor_config->observer));
}
}

/**
* NOTE: Do not call this function directly. This function should only be called directly from ADAPTOR_final().
* To free the listener, call qd_tcp_listener_decref which will check the listener's ref_count before freeing it.
Expand All @@ -207,9 +224,8 @@ static void qd_tcp_listener_free(qd_tcp_listener_t *listener)
qd_log(LOG_TCP_ADAPTOR, QD_LOG_INFO,
"Deleted TcpListener for %s, %s:%s",
listener->adaptor_config->address, listener->adaptor_config->host, listener->adaptor_config->port);

qdpo_free(listener->protocol_observer);
qdpo_config_free(listener->protocol_observer_config);
if (listener->protocol_observer)
qdpo_free(listener->protocol_observer);

qd_tls_config_decref(listener->tls_config);
qd_free_adaptor_config(listener->adaptor_config);
Expand Down Expand Up @@ -364,10 +380,10 @@ static void TL_setup_listener(qd_tcp_listener_t *li)
//
// Set up the protocol observer
//
// TODO - add configuration to the listener to influence whether and how the observer is set up.
//
li->protocol_observer_config = qdpo_config(0, true);
li->protocol_observer = protocol_observer(QD_PROTOCOL_TCP, li->protocol_observer_config);
sys_mutex_lock(&li->lock);
_setup_protocol_observer_LH(li);
sys_mutex_unlock(&li->lock);

//
// Create an adaptor listener. This listener will automatically create a listening socket when there is at least one
Expand Down Expand Up @@ -2081,17 +2097,23 @@ static void on_accept(qd_adaptor_listener_t *adaptor_listener, pn_listener_t *pn

conn->raw_conn = pn_raw_connection();
pn_raw_connection_set_context(conn->raw_conn, &conn->context);
bool has_protocol_observer = false;

sys_mutex_lock(&listener->lock);
_setup_protocol_observer_LH(listener);
if (listener->protocol_observer) {
has_protocol_observer = true;
conn->observer_handle = qdpo_begin(listener->protocol_observer, conn->common.vflow, conn, conn->conn_id);
}

sys_mutex_lock(&listener->lock);
DEQ_INSERT_TAIL(listener->connections, conn);
listener->connections_opened++;
vflow_set_uint64(listener->common.vflow, VFLOW_ATTRIBUTE_FLOW_COUNT_L4, listener->connections_opened);
sys_mutex_unlock(&listener->lock);

if (!has_protocol_observer) {
qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, "[C%"PRIu64"] on_accept, no protocol observer setup for this connection", conn->conn_id);
}

// Note: this will trigger the connection's event handler on another thread:
pn_listener_raw_accept(pn_listener, conn->raw_conn);
}
Expand Down Expand Up @@ -2331,10 +2353,6 @@ static void CORE_connection_close(void *context, qdr_connection_t *conn, qdr_err
qd_log(LOG_TCP_ADAPTOR, QD_LOG_ERROR, "qdr_tcp_conn_trace: no connection context");
assert(false);
}




}


Expand Down Expand Up @@ -2397,6 +2415,36 @@ QD_EXPORT void *qd_dispatch_configure_tcp_listener(qd_dispatch_t *qd, qd_entity_
return listener;
}

/**
* Handles tcpListener record update request from management agent.
*/
QD_EXPORT void *qd_dispatch_update_tcp_listener(qd_dispatch_t *qd, qd_entity_t *entity, void *impl)
{
SET_THREAD_UNKNOWN;
qd_error_clear();
qd_tcp_listener_t *listener = (qd_tcp_listener_t*) impl;
assert(listener);
if (listener) {
//
// The only field that can be updated on the tcpListener is the 'observer' field.
//
char *observer_string = qd_entity_opt_string(entity, "observer", 0);
qd_observer_t observer = get_listener_observer(observer_string);
listener->adaptor_config->observer = observer;
sys_mutex_lock(&listener->lock);
// Set up or free the listener->protocol_observer pointer.
_setup_protocol_observer_LH(listener);

// If we are moving to a none observer, listener->protocol_observer will be 0.
if (listener->protocol_observer) {
qdpo_set_observer(listener->protocol_observer, observer);
}
sys_mutex_unlock(&listener->lock);
free(observer_string);
}
return listener;
}


QD_EXPORT void qd_dispatch_delete_tcp_listener(qd_dispatch_t *qd, void *impl)
{
Expand Down
1 change: 0 additions & 1 deletion src/adaptors/tcp/tcp_adaptor.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ struct qd_tcp_listener_t {
qd_tls_config_t *tls_config;
qd_adaptor_listener_t *adaptor_listener;
qd_tcp_connection_list_t connections;
qdpo_config_t *protocol_observer_config;
qdpo_t *protocol_observer;
uint64_t connections_opened;
uint64_t connections_closed;
Expand Down
5 changes: 4 additions & 1 deletion src/observers/http2/http2_observer.c
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,10 @@ void qdpo_http2_final(qdpo_transport_handle_t *transport_handle)
transport_handle->observe = 0;
qd_http2_decoder_connection_free(transport_handle->http2.conn_state);
}
qd_hash_free(transport_handle->http2.stream_id_hash);
if (transport_handle->http2.stream_id_hash) {
qd_hash_free(transport_handle->http2.stream_id_hash);
transport_handle->http2.stream_id_hash = 0;
}
qd_http2_stream_info_t *stream_info = DEQ_HEAD(transport_handle->http2.streams);
while (stream_info) {
DEQ_REMOVE_HEAD(transport_handle->http2.streams);
Expand Down
Loading

0 comments on commit 423a5b6

Please sign in to comment.