Skip to content

Commit

Permalink
Fixed #533 - Handle spurious address-watch events in the test adaptor.
Browse files Browse the repository at this point in the history
  • Loading branch information
ted-ross committed Jun 1, 2022
1 parent b47bccd commit 1e5ddda
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 23 deletions.
4 changes: 4 additions & 0 deletions include/qpid/dispatch/router_core.h
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,10 @@ qdr_watch_handle_t qdr_core_watch_address(qdr_core_t *core,
* qdr_core_unwatch_address
*
* Cancel an address watch subscription. It is safe to invoke this function from an IO thread.
*
* Note that it is possible for the watch update handler to be invoked after the unwatch call is made.
* This is because a watch event may already be in flight during the call to unwatch_address. The caller
* must handle this case.
*
* @param core Pointer to the core module
* @param handle Watch handle returned by qdr_core_watch_address
Expand Down
72 changes: 49 additions & 23 deletions src/adaptors/test_adaptor.c
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ static qdr_core_t *core_ptr = 0;
static qd_log_source_t *log_source = 0;
static qdr_subscription_t *subscription = 0;
static dynamic_watch_list_t dynamic_watches = DEQ_EMPTY;
static sys_mutex_t *watch_lock;

static void on_watch(void *context,
uint32_t local_consumers,
Expand All @@ -62,39 +63,56 @@ static void on_dynamic_watch(void *context,
uint32_t local_producers)
{
dynamic_watch_t *dw = (dynamic_watch_t*) context;
bool valid_context = false;

sys_mutex_lock(watch_lock);
dynamic_watch_t *w = DEQ_HEAD(dynamic_watches);
while (!!w) {
if (w == dw) {
valid_context = true;
break;
}
w = DEQ_NEXT(w);
}

qd_log(log_source, QD_LOG_INFO, "On Dynamic Watch: %s", dw->address);
if (valid_context) {
qd_log(log_source, QD_LOG_INFO, "On Dynamic Watch: %s", dw->address);

qd_composed_field_t *field = qd_compose(QD_PERFORMATIVE_APPLICATION_PROPERTIES, 0);
qd_compose_start_map(field);
qd_composed_field_t *field = qd_compose(QD_PERFORMATIVE_APPLICATION_PROPERTIES, 0);
qd_compose_start_map(field);

qd_compose_insert_symbol(field, "address");
qd_compose_insert_string(field, dw->address);
qd_compose_insert_symbol(field, "address");
qd_compose_insert_string(field, dw->address);

qd_compose_insert_symbol(field, "local_consumers");
qd_compose_insert_uint(field, local_consumers);
sys_mutex_unlock(watch_lock); // We don't need to dereference dw any more

qd_compose_insert_symbol(field, "in_proc_consumers");
qd_compose_insert_uint(field, in_proc_consumers);
qd_compose_insert_symbol(field, "local_consumers");
qd_compose_insert_uint(field, local_consumers);

qd_compose_insert_symbol(field, "remote_consumers");
qd_compose_insert_uint(field, remote_consumers);
qd_compose_insert_symbol(field, "in_proc_consumers");
qd_compose_insert_uint(field, in_proc_consumers);

qd_compose_insert_symbol(field, "local_producers");
qd_compose_insert_uint(field, local_producers);
qd_compose_insert_symbol(field, "remote_consumers");
qd_compose_insert_uint(field, remote_consumers);

qd_compose_end_map(field);
qd_compose_insert_symbol(field, "local_producers");
qd_compose_insert_uint(field, local_producers);

qd_message_t *msg = qd_message();
qd_message_compose_2(msg, field, true);
qd_compose_free(field);
qdr_send_to2(core_ptr, msg, "_local/_testhook/watch_event", true, false);
qd_compose_end_map(field);

qd_message_free(msg);
qd_message_t *msg = qd_message();
qd_message_compose_2(msg, field, true);
qd_compose_free(field);
qdr_send_to2(core_ptr, msg, "_local/_testhook/watch_event", true, false);

qd_message_free(msg);
} else {
sys_mutex_unlock(watch_lock);
}
}


static void remove_dynamic_watch(dynamic_watch_t *dw)
static void remove_dynamic_watch_LH(dynamic_watch_t *dw)
{
qdr_core_unwatch_address(core_ptr, dw->watch_handle);
free(dw->address);
Expand All @@ -111,22 +129,26 @@ static void start_watch(const char *address)
DEQ_ITEM_INIT(dw);
dw->address = strdup(address);
dw->watch_handle = qdr_core_watch_address(core_ptr, address, QD_ITER_HASH_PREFIX_MOBILE, QD_TREATMENT_ANYCAST_BALANCED, on_dynamic_watch, dw);
sys_mutex_lock(watch_lock);
DEQ_INSERT_TAIL(dynamic_watches, dw);
sys_mutex_unlock(watch_lock);
}


static void stop_watch(const char *address)
{
qd_log(log_source, QD_LOG_INFO, "Stop Watch: %s", address);

sys_mutex_lock(watch_lock);
dynamic_watch_t *dw = DEQ_HEAD(dynamic_watches);
while (!!dw) {
if (strcmp(address, dw->address) == 0) {
remove_dynamic_watch(dw);
return;
remove_dynamic_watch_LH(dw);
break;
}
dw = DEQ_NEXT(dw);
}
sys_mutex_unlock(watch_lock);
}


Expand Down Expand Up @@ -165,6 +187,7 @@ static void qdr_test_adaptor_init(qdr_core_t *core, void **adaptor_context)
{
core_ptr = core;
if (qdr_core_test_hooks_enabled(core)) {
watch_lock = sys_mutex();
log_source = qd_log_source("ADDRESS_WATCH");
char address[100];
for (long index = 0; index < ADDRESS_COUNT; index++) {
Expand All @@ -186,11 +209,14 @@ static void qdr_test_adaptor_final(void *adaptor_context)
}

qdr_core_unsubscribe(subscription);
sys_mutex_lock(watch_lock);
dynamic_watch_t *dw = DEQ_HEAD(dynamic_watches);
while (!!dw) {
remove_dynamic_watch(dw);
remove_dynamic_watch_LH(dw);
dw = DEQ_HEAD(dynamic_watches);
}
sys_mutex_unlock(watch_lock);
sys_mutex_free(watch_lock);
}
}

Expand Down

0 comments on commit 1e5ddda

Please sign in to comment.