From 618e4b319fc7308e42a2c5ec15ff68f052c4ee86 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jiri=20Dan=C4=9Bk?= Date: Thu, 26 May 2022 10:45:17 +0200 Subject: [PATCH] Fixes #412, DISPATCH-1962 - Python shutdown leaks (#413) * This is revival of https://github.com/skupperproject/skupper-router/pull/259 * It attempts to fix issue described in https://github.com/skupperproject/skupper-router/issues/412 * and by the way it seems to also fix https://github.com/skupperproject/skupper-router/issues/253 ## About capsules and men The Capsule is a container for a pointer, opaque from the Python side, and translucent from the C side. The C code can put a pointer in and Python can treat it as py_object and pass it to some other C code, but cannot unwrap the pointer (unless it really tries, see example code on Stack Overflow for how to break the encapsulation). The important thing for our purposes is that Capsules can own the pointer and can have a destructor associated. More at https://docs.python.org/3/c-api/capsule.html The idea of the #253 fix is to use Capsule to associate destructor to the pointer, and then rely on Python to run GC eventually and free the memory. Thereby leak is prevented. ## Getting the GC to actually run Capsule is owned by `AppStats`, which is owned by `PolicyLocal`, which is contained in `PolicyManager` which is contained in `Agent`, which is very bad, because `Agent` cannot be GC'd because there is a refcounting cycle with `IoAdapter`, which does not participate in the cyclic GC. The Dispatch PR which tried to fix the GC for IoAdapter hit into weird shutdown crashes because when the Python was actually running GC fully (which it was not before) it hit... issues. ## In conclusion I considered writing a TODO list here, but I guess it would be best to just discuss this first... Honestly, I am not sure why I am not seeing the horrible problems that plagued the Dispatch PR. And I still feel uneasy about it. One thing I had to do in Dispatch that does not seem necessary here is this (in python_embedded.c::qd_io_rx_handler) ```diff diff --git a/src/python_embedded.c b/src/python_embedded.c --- a/src/python_embedded.c (revision 922ca0ad7d43f2d838c4bbb282d69e14b8b457c3) +++ b/src/python_embedded.c (revision 714e506d601054eb839e5968caf01c81afa1c3f0) @@ -639,6 +639,11 @@ IoAdapter *self = (IoAdapter*) context; *error = 0; + if (self->handler == NULL) { + *error = qdr_error(QD_AMQP_COND_INTERNAL_ERROR, "Router is shutting down"); + return PN_REJECTED; + } + ``` --- python/skupper_router_internal/dispatch.py | 5 ++--- .../policy/policy_local.py | 6 +++--- src/dispatch.c | 17 +++++++++++------ src/entity.c | 12 ++++++++++++ src/entity.h | 3 +++ src/policy.c | 15 ++++++--------- src/policy.h | 6 +++--- tests/lsan.supp | 3 --- 8 files changed, 40 insertions(+), 27 deletions(-) diff --git a/python/skupper_router_internal/dispatch.py b/python/skupper_router_internal/dispatch.py index 1d71e72b9..0f2f205ac 100644 --- a/python/skupper_router_internal/dispatch.py +++ b/python/skupper_router_internal/dispatch.py @@ -126,9 +126,8 @@ def __init__(self) -> None: self._prototype(self.qd_dispatch_configure_policy, None, [self.qd_dispatch_p, py_object]) self._prototype(self.qd_dispatch_register_policy_manager, None, [self.qd_dispatch_p, py_object]) - self._prototype(self.qd_dispatch_policy_c_counts_alloc, c_long, [], check=False) - self._prototype(self.qd_dispatch_policy_c_counts_free, None, [c_long], check=False) - self._prototype(self.qd_dispatch_policy_c_counts_refresh, None, [c_long, py_object]) + self._prototype(self.qd_dispatch_policy_c_counts_alloc, py_object, [], check=False) + self._prototype(self.qd_dispatch_policy_c_counts_refresh, None, [py_object, py_object]) self._prototype(self.qd_dispatch_policy_host_pattern_add, c_bool, [self.qd_dispatch_p, py_object]) self._prototype(self.qd_dispatch_policy_host_pattern_remove, None, [self.qd_dispatch_p, py_object]) self._prototype(self.qd_dispatch_policy_host_pattern_lookup, c_char_p, [self.qd_dispatch_p, py_object]) diff --git a/python/skupper_router_internal/policy/policy_local.py b/python/skupper_router_internal/policy/policy_local.py index 99fde4e59..3e67280aa 100644 --- a/python/skupper_router_internal/policy/policy_local.py +++ b/python/skupper_router_internal/policy/policy_local.py @@ -18,7 +18,7 @@ # """Entity implementing the business logic of user connection/access policy.""" - +import ctypes import json from typing import Any, Dict, List, Union, TYPE_CHECKING @@ -590,7 +590,7 @@ def disconnect(self, conn_id, user, host): def count_other_denial(self) -> None: self.conn_mgr.count_other_denial() - def get_cstats(self) -> int: + def get_cstats(self) -> ctypes.py_object: return self._cstats # @@ -939,7 +939,7 @@ def lookup_settings( self, vhost_in: str, groupname: str, - upolicy: Dict[Any, Any] + upolicy: Dict[str, Any] ) -> bool: """ Given a settings name, return the aggregated policy blob. diff --git a/src/dispatch.c b/src/dispatch.c index 80eb3b4df..b20f511e5 100644 --- a/src/dispatch.c +++ b/src/dispatch.c @@ -58,6 +58,7 @@ void qd_router_free(qd_router_t *router); void qd_error_initialize(); static void qd_dispatch_set_router_id(qd_dispatch_t *qd, char *_id); static void qd_dispatch_set_router_area(qd_dispatch_t *qd, char *_area); +static void qd_dispatch_policy_c_counts_free(PyObject *capsule); const char *CLOSEST_DISTRIBUTION = "closest"; const char *MULTICAST_DISTRIBUTION = "multicast"; @@ -267,20 +268,24 @@ QD_EXPORT qd_error_t qd_dispatch_register_display_name_service(qd_dispatch_t *qd return qd_register_display_name_service(qd, object); } - -QD_EXPORT long qd_dispatch_policy_c_counts_alloc() +QD_EXPORT PyObject* qd_dispatch_policy_c_counts_alloc() { - return qd_policy_c_counts_alloc(); + return PyCapsule_New(qd_policy_c_counts_alloc(), "qd_policy_c_counts", qd_dispatch_policy_c_counts_free); } - -QD_EXPORT void qd_dispatch_policy_c_counts_free(long ccounts) +static void qd_dispatch_policy_c_counts_free(PyObject *capsule) { + void *ccounts = PyCapsule_GetPointer(capsule, "qd_policy_c_counts"); qd_policy_c_counts_free(ccounts); } -QD_EXPORT void qd_dispatch_policy_c_counts_refresh(long ccounts, qd_entity_t *entity) +QD_EXPORT void qd_dispatch_policy_c_counts_refresh(PyObject *ccounts_capsule, qd_entity_t *entity) { + assert(PyCapsule_CheckExact(ccounts_capsule)); + const char * name = PyCapsule_GetName(ccounts_capsule); + assert(PyCapsule_IsValid(ccounts_capsule, name)); + void* ccounts = PyCapsule_GetPointer(ccounts_capsule, name); + qd_error_py(); qd_policy_c_counts_refresh(ccounts, entity); } diff --git a/src/entity.c b/src/entity.c index aecaabeeb..c8047fc18 100644 --- a/src/entity.c +++ b/src/entity.c @@ -70,6 +70,18 @@ long qd_entity_get_long(qd_entity_t *entity, const char* attribute) { return result; } +void *qd_entity_get_pointer_from_capsule(qd_entity_t *entity, const char *attribute) { + qd_error_clear(); + PyObject *py_obj = qd_entity_get_py(entity, attribute); + assert(PyCapsule_CheckExact(py_obj)); + const char * name = PyCapsule_GetName(py_obj); + assert(PyCapsule_IsValid(py_obj, name)); + void* result = PyCapsule_GetPointer(py_obj, name); + Py_XDECREF(py_obj); + qd_error_py(); + return result; +} + bool qd_entity_get_bool(qd_entity_t *entity, const char* attribute) { qd_error_clear(); PyObject *py_obj = qd_entity_get_py(entity, attribute); diff --git a/src/entity.h b/src/entity.h index 75077b99d..05af627ef 100644 --- a/src/entity.h +++ b/src/entity.h @@ -44,6 +44,9 @@ char *qd_entity_get_string(qd_entity_t *entity, const char* attribute); /** Get an integer valued attribute. Return -1 and set qd_error if there is an error. */ long qd_entity_get_long(qd_entity_t *entity, const char* attribute); +/** Get a void* valued attribute stored in a PyCapsule. Return NULL and set qd_error if there is an error. */ +void *qd_entity_get_pointer_from_capsule(qd_entity_t *entity, const char *attribute); + /** Get a boolean valued attribute. Return false and set qd_error if there is an error. */ bool qd_entity_get_bool(qd_entity_t *entity, const char *attribute); diff --git a/src/policy.c b/src/policy.c index 04a43f1f0..55531fecf 100644 --- a/src/policy.c +++ b/src/policy.c @@ -191,26 +191,23 @@ qd_error_t qd_register_policy_manager(qd_policy_t *policy, void *policy_manager) } -long qd_policy_c_counts_alloc() +void *qd_policy_c_counts_alloc() { - qd_policy_denial_counts_t * dc = NEW(qd_policy_denial_counts_t); + qd_policy_denial_counts_t *dc = NEW(qd_policy_denial_counts_t); assert(dc); ZERO(dc); - return (long)dc; + return dc; } - -void qd_policy_c_counts_free(long ccounts) +void qd_policy_c_counts_free(void *dc) { - void *dc = (void *)ccounts; assert(dc); free(dc); } -qd_error_t qd_policy_c_counts_refresh(long ccounts, qd_entity_t *entity) +qd_error_t qd_policy_c_counts_refresh(qd_policy_denial_counts_t* dc, qd_entity_t *entity) { - qd_policy_denial_counts_t *dc = (qd_policy_denial_counts_t*)ccounts; if (!qd_entity_set_long(entity, "sessionDenied", dc->sessionDenied) && !qd_entity_set_long(entity, "senderDenied", dc->senderDenied) && !qd_entity_set_long(entity, "receiverDenied", dc->receiverDenied) && @@ -573,7 +570,7 @@ bool qd_policy_open_fetch_settings( settings->sourceParseTree = qd_policy_parse_tree(settings->sourcePattern); settings->targetParseTree = qd_policy_parse_tree(settings->targetPattern); settings->denialCounts = (qd_policy_denial_counts_t*) - qd_entity_get_long((qd_entity_t*)upolicy, "denialCounts"); + qd_entity_get_pointer_from_capsule((qd_entity_t*)upolicy, "denialCounts"); res = true; // named settings content returned } else { // lookup failed: object did not exist in python database diff --git a/src/policy.h b/src/policy.h index a36166f32..d326c0c28 100644 --- a/src/policy.h +++ b/src/policy.h @@ -85,17 +85,17 @@ qd_error_t qd_register_policy_manager(qd_policy_t *policy, void *policy_manager) /** Allocate counts statistics block. * Called from Python */ -long qd_policy_c_counts_alloc(); +void* qd_policy_c_counts_alloc(); /** Free counts statistics block. * Called from Python */ -void qd_policy_c_counts_free(long ccounts); +void qd_policy_c_counts_free(void* dc); /** Refresh a counts statistics block * Called from Python */ -qd_error_t qd_policy_c_counts_refresh(long ccounts, qd_entity_t*entity); +qd_error_t qd_policy_c_counts_refresh(qd_policy_denial_counts_t* dc, qd_entity_t*entity); /** Allow or deny an incoming connection based on connection count(s). diff --git a/tests/lsan.supp b/tests/lsan.supp index d06f14d22..7d247e93d 100644 --- a/tests/lsan.supp +++ b/tests/lsan.supp @@ -11,9 +11,6 @@ leak:^qd_policy_open_fetch_settings$ # to be triaged; system_tests_handle_failover leak:^parse_failover_property_list$ -# to be triaged; system_tests_policy, system_tests_policy_oversize_basic -leak:^qd_policy_c_counts_alloc$ - # to be triaged; system_tests_http leak:^callback_healthz$ leak:^callback_metrics$