Skip to content

Commit

Permalink
Fixes #412, DISPATCH-1962 - Python shutdown leaks (#413)
Browse files Browse the repository at this point in the history
* This is revival of #259
* It attempts to fix issue described in #412
* and by the way it seems to also fix #253

## About capsules and men

The Capsule is a container for a pointer, opaque from the Python side, and translucent from the C side. The C code can put a pointer in and Python can treat it as py_object and pass it to some other C code, but cannot unwrap the pointer (unless it really tries, see example code on Stack Overflow for how to break the encapsulation). The important thing for our purposes is that Capsules can own the pointer and can have a destructor associated. More at https://docs.python.org/3/c-api/capsule.html

The idea of the #253 fix is to use Capsule to associate destructor to the pointer, and then rely on Python to run GC eventually and free the memory. Thereby leak is prevented.

## Getting the GC to actually run

Capsule is owned by `AppStats`, which is owned by `PolicyLocal`, which is contained in `PolicyManager` which is contained in `Agent`, which is very bad, because `Agent` cannot be GC'd because there is a refcounting cycle with `IoAdapter`, which does not participate in the cyclic GC. The Dispatch PR which tried to fix the GC for IoAdapter hit into weird shutdown crashes because when the Python was actually running GC fully (which it was not before) it hit... issues.

## In conclusion

I considered writing a TODO list here, but I guess it would be best to just discuss this first... Honestly, I am not sure why I am not seeing the horrible problems that plagued the Dispatch PR. And I still feel uneasy about it.

One thing I had to do in Dispatch that does not seem necessary here is this (in python_embedded.c::qd_io_rx_handler)

```diff
diff --git a/src/python_embedded.c b/src/python_embedded.c
--- a/src/python_embedded.c	(revision 922ca0ad7d43f2d838c4bbb282d69e14b8b457c3)
+++ b/src/python_embedded.c	(revision 714e506d601054eb839e5968caf01c81afa1c3f0)
@@ -639,6 +639,11 @@
     IoAdapter *self = (IoAdapter*) context;
     *error = 0;
 
+    if (self->handler == NULL) {
+        *error = qdr_error(QD_AMQP_COND_INTERNAL_ERROR, "Router is shutting down");
+        return PN_REJECTED;
+    }
+
```
  • Loading branch information
jiridanek authored May 26, 2022
1 parent a8ef42f commit 618e4b3
Show file tree
Hide file tree
Showing 8 changed files with 40 additions and 27 deletions.
5 changes: 2 additions & 3 deletions python/skupper_router_internal/dispatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,8 @@ def __init__(self) -> None:

self._prototype(self.qd_dispatch_configure_policy, None, [self.qd_dispatch_p, py_object])
self._prototype(self.qd_dispatch_register_policy_manager, None, [self.qd_dispatch_p, py_object])
self._prototype(self.qd_dispatch_policy_c_counts_alloc, c_long, [], check=False)
self._prototype(self.qd_dispatch_policy_c_counts_free, None, [c_long], check=False)
self._prototype(self.qd_dispatch_policy_c_counts_refresh, None, [c_long, py_object])
self._prototype(self.qd_dispatch_policy_c_counts_alloc, py_object, [], check=False)
self._prototype(self.qd_dispatch_policy_c_counts_refresh, None, [py_object, py_object])
self._prototype(self.qd_dispatch_policy_host_pattern_add, c_bool, [self.qd_dispatch_p, py_object])
self._prototype(self.qd_dispatch_policy_host_pattern_remove, None, [self.qd_dispatch_p, py_object])
self._prototype(self.qd_dispatch_policy_host_pattern_lookup, c_char_p, [self.qd_dispatch_p, py_object])
Expand Down
6 changes: 3 additions & 3 deletions python/skupper_router_internal/policy/policy_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
#

"""Entity implementing the business logic of user connection/access policy."""

import ctypes
import json
from typing import Any, Dict, List, Union, TYPE_CHECKING

Expand Down Expand Up @@ -590,7 +590,7 @@ def disconnect(self, conn_id, user, host):
def count_other_denial(self) -> None:
self.conn_mgr.count_other_denial()

def get_cstats(self) -> int:
def get_cstats(self) -> ctypes.py_object:
return self._cstats

#
Expand Down Expand Up @@ -939,7 +939,7 @@ def lookup_settings(
self,
vhost_in: str,
groupname: str,
upolicy: Dict[Any, Any]
upolicy: Dict[str, Any]
) -> bool:
"""
Given a settings name, return the aggregated policy blob.
Expand Down
17 changes: 11 additions & 6 deletions src/dispatch.c
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ void qd_router_free(qd_router_t *router);
void qd_error_initialize();
static void qd_dispatch_set_router_id(qd_dispatch_t *qd, char *_id);
static void qd_dispatch_set_router_area(qd_dispatch_t *qd, char *_area);
static void qd_dispatch_policy_c_counts_free(PyObject *capsule);

const char *CLOSEST_DISTRIBUTION = "closest";
const char *MULTICAST_DISTRIBUTION = "multicast";
Expand Down Expand Up @@ -267,20 +268,24 @@ QD_EXPORT qd_error_t qd_dispatch_register_display_name_service(qd_dispatch_t *qd
return qd_register_display_name_service(qd, object);
}


QD_EXPORT long qd_dispatch_policy_c_counts_alloc()
QD_EXPORT PyObject* qd_dispatch_policy_c_counts_alloc()
{
return qd_policy_c_counts_alloc();
return PyCapsule_New(qd_policy_c_counts_alloc(), "qd_policy_c_counts", qd_dispatch_policy_c_counts_free);
}


QD_EXPORT void qd_dispatch_policy_c_counts_free(long ccounts)
static void qd_dispatch_policy_c_counts_free(PyObject *capsule)
{
void *ccounts = PyCapsule_GetPointer(capsule, "qd_policy_c_counts");
qd_policy_c_counts_free(ccounts);
}

QD_EXPORT void qd_dispatch_policy_c_counts_refresh(long ccounts, qd_entity_t *entity)
QD_EXPORT void qd_dispatch_policy_c_counts_refresh(PyObject *ccounts_capsule, qd_entity_t *entity)
{
assert(PyCapsule_CheckExact(ccounts_capsule));
const char * name = PyCapsule_GetName(ccounts_capsule);
assert(PyCapsule_IsValid(ccounts_capsule, name));
void* ccounts = PyCapsule_GetPointer(ccounts_capsule, name);
qd_error_py();
qd_policy_c_counts_refresh(ccounts, entity);
}

Expand Down
12 changes: 12 additions & 0 deletions src/entity.c
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,18 @@ long qd_entity_get_long(qd_entity_t *entity, const char* attribute) {
return result;
}

void *qd_entity_get_pointer_from_capsule(qd_entity_t *entity, const char *attribute) {
qd_error_clear();
PyObject *py_obj = qd_entity_get_py(entity, attribute);
assert(PyCapsule_CheckExact(py_obj));
const char * name = PyCapsule_GetName(py_obj);
assert(PyCapsule_IsValid(py_obj, name));
void* result = PyCapsule_GetPointer(py_obj, name);
Py_XDECREF(py_obj);
qd_error_py();
return result;
}

bool qd_entity_get_bool(qd_entity_t *entity, const char* attribute) {
qd_error_clear();
PyObject *py_obj = qd_entity_get_py(entity, attribute);
Expand Down
3 changes: 3 additions & 0 deletions src/entity.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ char *qd_entity_get_string(qd_entity_t *entity, const char* attribute);
/** Get an integer valued attribute. Return -1 and set qd_error if there is an error. */
long qd_entity_get_long(qd_entity_t *entity, const char* attribute);

/** Get a void* valued attribute stored in a PyCapsule. Return NULL and set qd_error if there is an error. */
void *qd_entity_get_pointer_from_capsule(qd_entity_t *entity, const char *attribute);

/** Get a boolean valued attribute. Return false and set qd_error if there is an error. */
bool qd_entity_get_bool(qd_entity_t *entity, const char *attribute);

Expand Down
15 changes: 6 additions & 9 deletions src/policy.c
Original file line number Diff line number Diff line change
Expand Up @@ -191,26 +191,23 @@ qd_error_t qd_register_policy_manager(qd_policy_t *policy, void *policy_manager)
}


long qd_policy_c_counts_alloc()
void *qd_policy_c_counts_alloc()
{
qd_policy_denial_counts_t * dc = NEW(qd_policy_denial_counts_t);
qd_policy_denial_counts_t *dc = NEW(qd_policy_denial_counts_t);
assert(dc);
ZERO(dc);
return (long)dc;
return dc;
}


void qd_policy_c_counts_free(long ccounts)
void qd_policy_c_counts_free(void *dc)
{
void *dc = (void *)ccounts;
assert(dc);
free(dc);
}


qd_error_t qd_policy_c_counts_refresh(long ccounts, qd_entity_t *entity)
qd_error_t qd_policy_c_counts_refresh(qd_policy_denial_counts_t* dc, qd_entity_t *entity)
{
qd_policy_denial_counts_t *dc = (qd_policy_denial_counts_t*)ccounts;
if (!qd_entity_set_long(entity, "sessionDenied", dc->sessionDenied) &&
!qd_entity_set_long(entity, "senderDenied", dc->senderDenied) &&
!qd_entity_set_long(entity, "receiverDenied", dc->receiverDenied) &&
Expand Down Expand Up @@ -573,7 +570,7 @@ bool qd_policy_open_fetch_settings(
settings->sourceParseTree = qd_policy_parse_tree(settings->sourcePattern);
settings->targetParseTree = qd_policy_parse_tree(settings->targetPattern);
settings->denialCounts = (qd_policy_denial_counts_t*)
qd_entity_get_long((qd_entity_t*)upolicy, "denialCounts");
qd_entity_get_pointer_from_capsule((qd_entity_t*)upolicy, "denialCounts");
res = true; // named settings content returned
} else {
// lookup failed: object did not exist in python database
Expand Down
6 changes: 3 additions & 3 deletions src/policy.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,17 +85,17 @@ qd_error_t qd_register_policy_manager(qd_policy_t *policy, void *policy_manager)
/** Allocate counts statistics block.
* Called from Python
*/
long qd_policy_c_counts_alloc();
void* qd_policy_c_counts_alloc();

/** Free counts statistics block.
* Called from Python
*/
void qd_policy_c_counts_free(long ccounts);
void qd_policy_c_counts_free(void* dc);

/** Refresh a counts statistics block
* Called from Python
*/
qd_error_t qd_policy_c_counts_refresh(long ccounts, qd_entity_t*entity);
qd_error_t qd_policy_c_counts_refresh(qd_policy_denial_counts_t* dc, qd_entity_t*entity);


/** Allow or deny an incoming connection based on connection count(s).
Expand Down
3 changes: 0 additions & 3 deletions tests/lsan.supp
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,6 @@ leak:^qd_policy_open_fetch_settings$
# to be triaged; system_tests_handle_failover
leak:^parse_failover_property_list$

# to be triaged; system_tests_policy, system_tests_policy_oversize_basic
leak:^qd_policy_c_counts_alloc$

# to be triaged; system_tests_http
leak:^callback_healthz$
leak:^callback_metrics$
Expand Down

0 comments on commit 618e4b3

Please sign in to comment.