Skip to content
This repository has been archived by the owner on Sep 14, 2020. It is now read-only.

Post k8s-events properly from multiple threads (thread-safe) #148

Merged
merged 6 commits into from
Jul 16, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 8 additions & 41 deletions kopf/engines/logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,6 @@ class K8sPoster(logging.Handler):
A handler to post all log messages as K8s events.
"""

def __init__(self, level=logging.NOTSET, queue=None):
super().__init__(level=level)
self.queue = queue

def createLock(self):
# Save some time on unneeded locks. Events are posted in the background.
# We only put events to the queue, which is already lock-protected.
Expand All @@ -59,11 +55,11 @@ def emit(self, record):
logging.getLevelName(record.levelno).capitalize())
reason = 'Logging'
message = self.format(record)
self.queue.put_nowait(posting.K8sEvent(
posting.enqueue(
ref=record.k8s_ref,
type=type,
reason=reason,
message=message))
message=message)
except Exception:
self.handleError(record)

Expand All @@ -84,8 +80,8 @@ class ObjectLogger(logging.LoggerAdapter):
(e.g. in case of background posting via the queue; see `K8sPoster`).
"""

def __init__(self, *, body, event_level=logging.INFO, event_queue=None):
super().__init__(self._make_logger(event_level, event_queue), dict(
def __init__(self, *, body):
super().__init__(logger, dict(
k8s_skip=False,
k8s_ref=dict(
apiVersion=body.get('apiVersion'),
Expand All @@ -95,39 +91,6 @@ def __init__(self, *, body, event_level=logging.INFO, event_queue=None):
namespace=body.get('metadata', {}).get('namespace'),
),
))
self.queue = event_queue # for kopf.event()&co explicit posting

def __del__(self):
# When an object logger is garbage-collected, purge its handlers & posting queues.
# Note: Depending on the garbage collection setup, this can never happen or be delayed.
# In this case, the object logger stays ready to log, i.e. keeps its handler+queue.
# TODO: also remove the dynamic logger itself, to avoid memory leaks.
for handler in list(self.logger.handlers):
if isinstance(handler, K8sPoster):
self.logger.removeHandler(handler)
handler.close()

def _make_logger(self, event_level, event_queue):
"""
Get-or-create a logger for this event queue, and setup the k8s poster.

If only one global queue is used, it will be one logger.

If multiple queues are used, each queue uses its own logger
with its own handler. This is currently needed for tests
(every test provides and later asserts its own k8s-event queue).

In the future, or now via user tweaks, the framework can create
separate k8s-event-queues per resource kind, per individual objects,
or on another grouping basis. They should not duplicate each other
by posting the same log-message to k8s more than once.
For this purpose, separate `logging.Logger` instances are used:
strictly one per an `ObjectLogger` instance, dynamically created.
"""
logger = logging.getLogger(f'kopf.objects.{id(event_queue)}')
if not logger.handlers:
logger.addHandler(K8sPoster(level=event_level, queue=event_queue))
return logger

def process(self, msg, kwargs):
# Native logging overwrites the message's extra with the adapter's extra.
Expand All @@ -139,3 +102,7 @@ def log(self, level, msg, *args, local=False, **kwargs):
if local:
kwargs['extra'] = dict(kwargs.pop('extra', {}), k8s_skip=True)
super().log(level, msg, *args, **kwargs)


logger = logging.getLogger('kopf.objects')
logger.addHandler(K8sPoster(level=logging.INFO))
33 changes: 30 additions & 3 deletions kopf/engines/posting.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@
from kopf.structs import dicts
from kopf.structs import hierarchies

# Logging and event-posting can happen cross-thread: e.g. in sync-executors.
# We have to remember our main event-loop with the queue consumer, to make
# thread-safe coro calls both from inside that event-loop and from outside.
event_queue_loop_var: ContextVar[asyncio.AbstractEventLoop] = ContextVar('event_queue_loop_var')
event_queue_var: ContextVar[asyncio.Queue] = ContextVar('event_queue_var')


Expand All @@ -38,12 +42,35 @@ class K8sEvent(NamedTuple):
message: Text


def event(objs, *, type, reason, message=''):
def enqueue(ref, type, reason, message):
loop = event_queue_loop_var.get()
queue = event_queue_var.get()
event = K8sEvent(ref=ref, type=type, reason=reason, message=message)

# Events can be posted from another thread than the event-loop's thread
# (e.g. from sync-handlers, or from explicitly started per-object threads),
# or from the same thread (async-handlers and the framework itself).
try:
running_loop = asyncio.get_running_loop()
except RuntimeError:
running_loop = None

if running_loop is loop:
# Posting from the same event-loop as the poster task and queue are in.
# Therefore, it is the same thread, and all calls here are thread-safe.
# Special thread-safe cross-event-loop methods make no effect here.
queue.put_nowait(event)
else:
# No event-loop or another event-loop - assume another thread.
# Use the cross-thread thread-safe methods. Block until enqueued there.
future = asyncio.run_coroutine_threadsafe(queue.put(event), loop=loop)
future.result() # block, wait, re-raise.


def event(objs, *, type, reason, message=''):
for obj in dicts.walk(objs):
ref = hierarchies.build_object_reference(obj)
event = K8sEvent(ref=ref, type=type, reason=reason, message=message)
queue.put_nowait(event)
enqueue(ref=ref, type=type, reason=reason, message=message)


def info(obj, *, reason, message=''):
Expand Down
3 changes: 2 additions & 1 deletion kopf/reactor/handling.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@ async def custom_object_handler(
patch = {}

# Each object has its own prefixed logger, to distinguish parallel handling.
logger = logging_engine.ObjectLogger(body=body, event_queue=event_queue)
logger = logging_engine.ObjectLogger(body=body)
posting.event_queue_loop_var.set(asyncio.get_running_loop())
posting.event_queue_var.set(event_queue) # till the end of this object's task.

# If the global freeze is set for the processing (i.e. other operator overrides), do nothing.
Expand Down
24 changes: 24 additions & 0 deletions tests/posting/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import asyncio

import pytest

from kopf.engines.posting import event_queue_var, event_queue_loop_var


@pytest.fixture()
def event_queue_loop(event_loop):
token = event_queue_loop_var.set(event_loop)
try:
yield event_loop
finally:
event_queue_loop_var.reset(token)


@pytest.fixture()
def event_queue(event_loop):
queue = asyncio.Queue(loop=event_loop)
token = event_queue_var.set(queue)
try:
yield queue
finally:
event_queue_var.reset(token)
40 changes: 20 additions & 20 deletions tests/posting/test_log2k8s.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
import asyncio

import pytest

from kopf.engines.logging import ObjectLogger
Expand All @@ -16,14 +14,14 @@
['error', "Error"],
['critical', "Fatal"],
])
def test_posting_normal_levels(caplog, logstream, logfn, event_type):
queue = asyncio.Queue()
logger = ObjectLogger(body=OBJ1, event_queue=queue)
async def test_posting_normal_levels(caplog, logstream, logfn, event_type, event_queue, event_queue_loop):
logger = ObjectLogger(body=OBJ1)
logger_fn = getattr(logger, logfn)

getattr(logger, logfn)("hello %s", "world")
logger_fn("hello %s", "world")

assert queue.qsize() == 1
event1 = queue.get_nowait()
assert event_queue.qsize() == 1
event1 = event_queue.get_nowait()
assert event1.ref == REF1
assert event1.type == event_type
assert event1.reason == "Logging"
Expand All @@ -34,14 +32,15 @@ def test_posting_normal_levels(caplog, logstream, logfn, event_type):
@pytest.mark.parametrize('logfn', [
'debug',
])
def test_skipping_debug_level(caplog, logstream, logfn):
queue = asyncio.Queue()
logger = ObjectLogger(body=OBJ1, event_queue=queue)
async def test_skipping_hidden_levels(caplog, logstream, logfn, event_queue, event_queue_loop):
logger = ObjectLogger(body=OBJ1)
logger_fn = getattr(logger, logfn)

getattr(logger, logfn)("hello %s", "world")
logger_fn("hello %s", "world")
logger.info("must be here")

assert queue.empty()
assert caplog.messages == ["hello world"]
assert event_queue.qsize() == 1 # not 2!
assert caplog.messages == ["hello world", "must be here"]


@pytest.mark.parametrize('logfn', [
Expand All @@ -51,11 +50,12 @@ def test_skipping_debug_level(caplog, logstream, logfn):
'error',
'critical',
])
def test_skipping_when_local_with_all_level(caplog, logstream, logfn):
queue = asyncio.Queue()
logger = ObjectLogger(body=OBJ1, event_queue=queue)
async def test_skipping_when_local_with_all_levels(caplog, logstream, logfn, event_queue, event_queue_loop):
logger = ObjectLogger(body=OBJ1)
logger_fn = getattr(logger, logfn)

getattr(logger, logfn)("hello %s", "world", local=True)
logger_fn("hello %s", "world", local=True)
logger.info("must be here")

assert queue.empty()
assert caplog.messages == ["hello world"]
assert event_queue.qsize() == 1 # not 2!
assert caplog.messages == ["hello world", "must be here"]
21 changes: 13 additions & 8 deletions tests/posting/test_poster.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from asynctest import call

from kopf import event, info, warn, exception
from kopf.engines.posting import poster, K8sEvent, event_queue_var
from kopf.engines.posting import poster, K8sEvent, event_queue_var, event_queue_loop_var

OBJ1 = {'apiVersion': 'group1/version1', 'kind': 'Kind1',
'metadata': {'uid': 'uid1', 'name': 'name1', 'namespace': 'ns1'}}
Expand Down Expand Up @@ -42,7 +42,7 @@ def _cancel(*args, **kwargs):
)


def test_queueing_fails_with_no_queue(mocker):
def test_queueing_fails_with_no_queue(event_queue_loop):
# Prerequisite: the context-var should not be set by anything in advance.
sentinel = object()
assert event_queue_var.get(sentinel) is sentinel
Expand All @@ -51,11 +51,18 @@ def test_queueing_fails_with_no_queue(mocker):
event(OBJ1, type='type1', reason='reason1', message='message1')


def test_via_event_function(mocker):
def test_queueing_fails_with_no_loop(event_queue):
# Prerequisite: the context-var should not be set by anything in advance.
sentinel = object()
assert event_queue_loop_var.get(sentinel) is sentinel

with pytest.raises(LookupError):
event(OBJ1, type='type1', reason='reason1', message='message1')


async def test_via_event_function(mocker, event_queue, event_queue_loop):
post_event = mocker.patch('kopf.clients.events.post_event')

event_queue = asyncio.Queue()
event_queue_var.set(event_queue)
event(OBJ1, type='type1', reason='reason1', message='message1')

assert not post_event.called
Expand All @@ -74,11 +81,9 @@ def test_via_event_function(mocker):
pytest.param(warn, "Warning", id='warn'),
pytest.param(exception, "Error", id='exception'),
])
def test_via_shortcut(mocker, event_fn, event_type):
async def test_via_shortcut(mocker, event_fn, event_type, event_queue, event_queue_loop):
post_event = mocker.patch('kopf.clients.events.post_event')

event_queue = asyncio.Queue()
event_queue_var.set(event_queue)
event_fn(OBJ1, reason='reason1', message='message1')

assert not post_event.called
Expand Down
27 changes: 0 additions & 27 deletions tests/posting/test_selfdestruction.py

This file was deleted.

Loading