Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Record more information into structured logs #9654

Merged
merged 13 commits into from
Apr 8, 2021
5 changes: 4 additions & 1 deletion synapse/http/site.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ def render(self, resrc):

# create a LogContext for this request
request_id = self.get_request_id()
self.logcontext = LoggingContext(request_id, request=request_id)
self.logcontext = LoggingContext(request_id, request=self)

# override the Server header which is set by twisted
self.setHeader("Server", self.site.server_version_string)
Expand Down Expand Up @@ -334,6 +334,9 @@ def _finished_processing(self):
except Exception as e:
logger.warning("Failed to stop metrics: %r", e)

# Break the cycle between SynapseRequest and LoggingContext.
self.logcontext = None
clokep marked this conversation as resolved.
Show resolved Hide resolved

def _should_log_request(self) -> bool:
"""Whether we should log at INFO that we processed the request."""
if self.path == b"/health":
Expand Down
35 changes: 23 additions & 12 deletions synapse/logging/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@

See doc/log_contexts.rst for details on how this works.
"""

import inspect
import logging
import threading
Expand All @@ -35,6 +34,7 @@
from twisted.internet import defer, threads

if TYPE_CHECKING:
from synapse.http.site import SynapseRequest
from synapse.logging.scopecontextmanager import _LogContextScope

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -256,7 +256,7 @@ def __init__(
self,
name: Optional[str] = None,
parent_context: "Optional[LoggingContext]" = None,
request: Optional[str] = None,
request: "Optional[SynapseRequest]" = None,
) -> None:
self.previous_context = current_context()
self.name = name
Expand All @@ -281,15 +281,19 @@ def __init__(
self.parent_context = parent_context

if self.parent_context is not None:
self.parent_context.copy_to(self)
# we track the current request_id
self.request = self.parent_context.request

# we also track the current scope:
self.scope = self.parent_context.scope

if request is not None:
# the request param overrides the request from the parent context
# the request_id param overrides the request_id from the parent context
self.request = request

def __str__(self) -> str:
if self.request:
return str(self.request)
return self.request.get_request_id()
return "%s@%x" % (self.name, id(self))

@classmethod
Expand Down Expand Up @@ -556,8 +560,9 @@ def filter(self, record: logging.LogRecord) -> Literal[True]:
# we end up in a death spiral of infinite loops, so let's check, for
# robustness' sake.
if context is not None:
# Logging is interested in the request.
record.request = context.request # type: ignore
# Logging is interested in the request ID. Note that for backwards
# compatibility this is stored as the "request" on the record.
record.request = str(context) # type: ignore

return True

Expand Down Expand Up @@ -630,8 +635,8 @@ def set_current_context(context: LoggingContextOrSentinel) -> LoggingContextOrSe
def nested_logging_context(suffix: str) -> LoggingContext:
"""Creates a new logging context as a child of another.

The nested logging context will have a 'request' made up of the parent context's
request, plus the given suffix.
The nested logging context will have a 'request_id' made up of the parent context's
request_id, plus the given suffix.

CPU/db usage stats will be added to the parent context's on exit.

Expand All @@ -641,7 +646,7 @@ def nested_logging_context(suffix: str) -> LoggingContext:
# ... do stuff

Args:
suffix: suffix to add to the parent context's 'request'.
suffix: suffix to add to the parent context's 'request_id'.

Returns:
LoggingContext: new logging context.
Expand All @@ -653,11 +658,17 @@ def nested_logging_context(suffix: str) -> LoggingContext:
)
parent_context = None
prefix = ""
request = None
else:
assert isinstance(curr_context, LoggingContext)
parent_context = curr_context
prefix = str(parent_context.request)
return LoggingContext(parent_context=parent_context, request=prefix + "-" + suffix)
prefix = str(parent_context.name)
request = parent_context.request
return LoggingContext(
prefix + "-" + suffix,
parent_context=parent_context,
request=request,
)


def preserve_fn(f):
Expand Down
18 changes: 12 additions & 6 deletions synapse/metrics/background_process_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import logging
import threading
from functools import wraps
from typing import TYPE_CHECKING, Dict, Optional, Set
from typing import TYPE_CHECKING, Dict, Optional, Set, Union

from prometheus_client.core import REGISTRY, Counter, Gauge

Expand Down Expand Up @@ -199,11 +199,11 @@ async def run():
_background_process_start_count.labels(desc).inc()
_background_process_in_flight_count.labels(desc).inc()

with BackgroundProcessLoggingContext(desc, "%s-%i" % (desc, count)) as context:
with BackgroundProcessLoggingContext(desc, count) as context:
try:
ctx = noop_context_manager()
if bg_start_span:
ctx = start_active_span(desc, tags={"request_id": context.request})
ctx = start_active_span(desc, tags={"request_id": str(context)})
with ctx:
return await maybe_awaitable(func(*args, **kwargs))
except Exception:
Expand Down Expand Up @@ -242,13 +242,19 @@ class BackgroundProcessLoggingContext(LoggingContext):
processes.
"""

__slots__ = ["_proc"]
__slots__ = ["_id", "_proc"]

def __init__(self, name: str, request: Optional[str] = None):
super().__init__(name, request=request)
def __init__(self, name: str, id: Optional[Union[int, str]] = None):
super().__init__(name)
self._id = id

self._proc = _BackgroundProcess(name, self)

def __str__(self) -> str:
if self._id is not None:
return "%s-%s" % (self.name, self._id)
return "%s@%x" % (self.name, id(self))

def start(self, rusage: "Optional[resource._RUsage]"):
"""Log context has started running (again)."""

Expand Down
5 changes: 3 additions & 2 deletions synapse/replication/tcp/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,8 +184,9 @@ def __init__(self, clock: Clock, handler: "ReplicationCommandHandler"):

# a logcontext which we use for processing incoming commands. We declare it as a
# background process so that the CPU stats get reported to prometheus.
ctx_name = "replication-conn-%s" % self.conn_id
self._logging_context = BackgroundProcessLoggingContext(ctx_name, ctx_name)
self._logging_context = BackgroundProcessLoggingContext(
"replication-conn", self.conn_id
)

def connectionMade(self):
logger.info("[%s] Connection established", self.id())
Expand Down
23 changes: 11 additions & 12 deletions tests/crypto/test_keyring.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

from mock import Mock

import attr
import canonicaljson
import signedjson.key
import signedjson.sign
Expand Down Expand Up @@ -68,6 +69,11 @@ def sign_response(self, res):
signedjson.sign.sign_json(res, self.server_name, self.key)


@attr.s(slots=True)
class FakeRequest:
id = attr.ib()


@logcontext_clean
class KeyringTestCase(unittest.HomeserverTestCase):
def check_context(self, val, expected):
Expand All @@ -89,7 +95,7 @@ def test_verify_json_objects_for_server_awaits_previous_requests(self):
first_lookup_deferred = Deferred()

async def first_lookup_fetch(keys_to_fetch):
self.assertEquals(current_context().request, "context_11")
self.assertEquals(current_context().request.id, "context_11")
self.assertEqual(keys_to_fetch, {"server10": {get_key_id(key1): 0}})

await make_deferred_yieldable(first_lookup_deferred)
Expand All @@ -102,9 +108,7 @@ async def first_lookup_fetch(keys_to_fetch):
mock_fetcher.get_keys.side_effect = first_lookup_fetch

async def first_lookup():
with LoggingContext("context_11") as context_11:
context_11.request = "context_11"

with LoggingContext("context_11", request=FakeRequest("context_11")):
res_deferreds = kr.verify_json_objects_for_server(
[("server10", json1, 0, "test10"), ("server11", {}, 0, "test11")]
)
Expand All @@ -130,7 +134,7 @@ async def first_lookup():
# should block rather than start a second call

async def second_lookup_fetch(keys_to_fetch):
self.assertEquals(current_context().request, "context_12")
self.assertEquals(current_context().request.id, "context_12")
return {
"server10": {
get_key_id(key1): FetchKeyResult(get_verify_key(key1), 100)
Expand All @@ -142,9 +146,7 @@ async def second_lookup_fetch(keys_to_fetch):
second_lookup_state = [0]

async def second_lookup():
with LoggingContext("context_12") as context_12:
context_12.request = "context_12"

with LoggingContext("context_12", request=FakeRequest("context_12")):
res_deferreds_2 = kr.verify_json_objects_for_server(
[("server10", json1, 0, "test")]
)
Expand Down Expand Up @@ -589,10 +591,7 @@ def get_key_id(key):

@defer.inlineCallbacks
def run_in_context(f, *args, **kwargs):
with LoggingContext("testctx") as ctx:
# we set the "request" prop to make it easier to follow what's going on in the
# logs.
ctx.request = "testctx"
with LoggingContext("testctx"):
rv = yield f(*args, **kwargs)
return rv

Expand Down
44 changes: 41 additions & 3 deletions tests/logging/test_terse_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,16 @@

import json
import logging
from io import StringIO
from io import BytesIO, StringIO

from mock import Mock

from synapse.http.site import SynapseRequest
from synapse.logging._terse_json import JsonFormatter, TerseJsonFormatter
from synapse.logging.context import LoggingContext, LoggingContextFilter

from tests.logging import LoggerCleanupMixin
from tests.server import FakeChannel
from tests.unittest import TestCase


Expand Down Expand Up @@ -120,7 +124,41 @@ def test_with_context(self):
handler.addFilter(LoggingContextFilter())
logger = self.get_logger(handler)

with LoggingContext(request="test"):
with LoggingContext("name"):
logger.info("Hello there, %s!", "wally")

log = self.get_log_line()

# The terse logger should give us these keys.
expected_log_keys = [
"log",
"level",
"namespace",
"request",
]
self.assertCountEqual(log.keys(), expected_log_keys)
self.assertEqual(log["log"], "Hello there, wally!")
self.assertTrue(log["request"].startswith("name@"))

def test_with_request_context(self):
"""
Information from the logging context request should be added to the JSON response.
"""
handler = logging.StreamHandler(self.output)
handler.setFormatter(JsonFormatter())
handler.addFilter(LoggingContextFilter())
logger = self.get_logger(handler)

# A full request isn't needed here.
site = Mock(spec=["site_tag"])
site.site_tag = "test-site"
request = SynapseRequest(FakeChannel(site, None))
# Call requestReceived to finish instantiating the object.
request.content = BytesIO()
request.process = Mock()
request.requestReceived(b"POST", b"/_matrix/client/versions", b"1.1")

with LoggingContext(request=request):
logger.info("Hello there, %s!", "wally")

log = self.get_log_line()
Expand All @@ -134,4 +172,4 @@ def test_with_context(self):
]
self.assertCountEqual(log.keys(), expected_log_keys)
self.assertEqual(log["log"], "Hello there, wally!")
self.assertEqual(log["request"], "test")
self.assertTrue(log["request"].startswith("POST-"))
2 changes: 1 addition & 1 deletion tests/unittest.py
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,7 @@ def setup_test_homeserver(self, *args, **kwargs):
kwargs["config"] = config_obj

async def run_bg_updates():
with LoggingContext("run_bg_updates", request="run_bg_updates-1"):
with LoggingContext("run_bg_updates"):
while not await stor.db_pool.updates.has_completed_background_updates():
await stor.db_pool.updates.do_next_background_update(1)

Expand Down
7 changes: 3 additions & 4 deletions tests/util/caches/test_descriptors.py
Original file line number Diff line number Diff line change
Expand Up @@ -661,14 +661,13 @@ def fn(self, arg1, arg2):

@descriptors.cachedList("fn", "args1")
async def list_fn(self, args1, arg2):
assert current_context().request == "c1"
assert current_context().name == "c1"
# we want this to behave like an asynchronous function
await run_on_reactor()
assert current_context().request == "c1"
assert current_context().name == "c1"
return self.mock(args1, arg2)

with LoggingContext() as c1:
c1.request = "c1"
with LoggingContext("c1") as c1:
obj = Cls()
obj.mock.return_value = {10: "fish", 20: "chips"}
d1 = obj.list_fn([10, 20], 2)
Expand Down
Loading