Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Merge pull request #3709 from matrix-org/rav/logcontext_for_replicati…
Browse files Browse the repository at this point in the history
…on_commands

Logcontexts for replication command handlers
  • Loading branch information
richvdh authored Aug 17, 2018
2 parents c144252 + d9efd87 commit 3cef867
Show file tree
Hide file tree
Showing 9 changed files with 54 additions and 20 deletions.
1 change: 1 addition & 0 deletions changelog.d/3709.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Logcontexts for replication command handlers
3 changes: 2 additions & 1 deletion synapse/app/appservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,9 @@ def __init__(self, hs):
super(ASReplicationHandler, self).__init__(hs.get_datastore())
self.appservice_handler = hs.get_application_service_handler()

@defer.inlineCallbacks
def on_rdata(self, stream_name, token, rows):
super(ASReplicationHandler, self).on_rdata(stream_name, token, rows)
yield super(ASReplicationHandler, self).on_rdata(stream_name, token, rows)

if stream_name == "events":
max_stream_id = self.store.get_room_max_stream_ordering()
Expand Down
3 changes: 2 additions & 1 deletion synapse/app/federation_sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,9 @@ def __init__(self, hs):
super(FederationSenderReplicationHandler, self).__init__(hs.get_datastore())
self.send_handler = FederationSenderHandler(hs, self)

@defer.inlineCallbacks
def on_rdata(self, stream_name, token, rows):
super(FederationSenderReplicationHandler, self).on_rdata(
yield super(FederationSenderReplicationHandler, self).on_rdata(
stream_name, token, rows
)
self.send_handler.process_replication_rows(stream_name, token, rows)
Expand Down
3 changes: 2 additions & 1 deletion synapse/app/pusher.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,9 @@ def __init__(self, hs):

self.pusher_pool = hs.get_pusherpool()

@defer.inlineCallbacks
def on_rdata(self, stream_name, token, rows):
super(PusherReplicationHandler, self).on_rdata(stream_name, token, rows)
yield super(PusherReplicationHandler, self).on_rdata(stream_name, token, rows)
run_in_background(self.poke_pushers, stream_name, token, rows)

@defer.inlineCallbacks
Expand Down
3 changes: 2 additions & 1 deletion synapse/app/synchrotron.py
Original file line number Diff line number Diff line change
Expand Up @@ -338,8 +338,9 @@ def __init__(self, hs):
self.presence_handler = hs.get_presence_handler()
self.notifier = hs.get_notifier()

@defer.inlineCallbacks
def on_rdata(self, stream_name, token, rows):
super(SyncReplicationHandler, self).on_rdata(stream_name, token, rows)
yield super(SyncReplicationHandler, self).on_rdata(stream_name, token, rows)
run_in_background(self.process_and_notify, stream_name, token, rows)

def get_streams_to_replicate(self):
Expand Down
3 changes: 2 additions & 1 deletion synapse/app/user_dir.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,9 @@ def __init__(self, hs):
super(UserDirectoryReplicationHandler, self).__init__(hs.get_datastore())
self.user_directory = hs.get_user_directory_handler()

@defer.inlineCallbacks
def on_rdata(self, stream_name, token, rows):
super(UserDirectoryReplicationHandler, self).on_rdata(
yield super(UserDirectoryReplicationHandler, self).on_rdata(
stream_name, token, rows
)
if stream_name == "current_state_deltas":
Expand Down
4 changes: 2 additions & 2 deletions synapse/replication/tcp/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,15 +107,15 @@ def on_rdata(self, stream_name, token, rows):
Can be overriden in subclasses to handle more.
"""
logger.info("Received rdata %s -> %s", stream_name, token)
self.store.process_replication_rows(stream_name, token, rows)
return self.store.process_replication_rows(stream_name, token, rows)

def on_position(self, stream_name, token):
"""Called when we get new position data. By default this just pokes
the slave store.
Can be overriden in subclasses to handle more.
"""
self.store.process_replication_rows(stream_name, token, [])
return self.store.process_replication_rows(stream_name, token, [])

def on_sync(self, data):
"""When we received a SYNC we wake up any deferreds that were waiting
Expand Down
12 changes: 12 additions & 0 deletions synapse/replication/tcp/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,12 @@ def to_line(self):
"""
return self.data

def get_logcontext_id(self):
"""Get a suitable string for the logcontext when processing this command"""

# by default, we just use the command name.
return self.NAME


class ServerCommand(Command):
"""Sent by the server on new connection and includes the server_name.
Expand Down Expand Up @@ -116,6 +122,9 @@ def to_line(self):
_json_encoder.encode(self.row),
))

def get_logcontext_id(self):
return "RDATA-" + self.stream_name


class PositionCommand(Command):
"""Sent by the client to tell the client the stream postition without
Expand Down Expand Up @@ -190,6 +199,9 @@ def from_line(cls, line):
def to_line(self):
return " ".join((self.stream_name, str(self.token),))

def get_logcontext_id(self):
return "REPLICATE-" + self.stream_name


class UserSyncCommand(Command):
"""Sent by the client to inform the server that a user has started or
Expand Down
42 changes: 29 additions & 13 deletions synapse/replication/tcp/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@
from twisted.python.failure import Failure

from synapse.metrics import LaterGauge
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.util.logcontext import make_deferred_yieldable, run_in_background
from synapse.util.stringutils import random_string

from .commands import (
Expand Down Expand Up @@ -222,7 +224,11 @@ def lineReceived(self, line):

# Now lets try and call on_<CMD_NAME> function
try:
getattr(self, "on_%s" % (cmd_name,))(cmd)
run_as_background_process(
"replication-" + cmd.get_logcontext_id(),
getattr(self, "on_%s" % (cmd_name,)),
cmd,
)
except Exception:
logger.exception("[%s] Failed to handle line: %r", self.id(), line)

Expand Down Expand Up @@ -387,7 +393,7 @@ def on_NAME(self, cmd):
self.name = cmd.data

def on_USER_SYNC(self, cmd):
self.streamer.on_user_sync(
return self.streamer.on_user_sync(
self.conn_id, cmd.user_id, cmd.is_syncing, cmd.last_sync_ms,
)

Expand All @@ -397,22 +403,33 @@ def on_REPLICATE(self, cmd):

if stream_name == "ALL":
# Subscribe to all streams we're publishing to.
for stream in iterkeys(self.streamer.streams_by_name):
self.subscribe_to_stream(stream, token)
deferreds = [
run_in_background(
self.subscribe_to_stream,
stream, token,
)
for stream in iterkeys(self.streamer.streams_by_name)
]

return make_deferred_yieldable(
defer.gatherResults(deferreds, consumeErrors=True)
)
else:
self.subscribe_to_stream(stream_name, token)
return self.subscribe_to_stream(stream_name, token)

def on_FEDERATION_ACK(self, cmd):
self.streamer.federation_ack(cmd.token)
return self.streamer.federation_ack(cmd.token)

def on_REMOVE_PUSHER(self, cmd):
self.streamer.on_remove_pusher(cmd.app_id, cmd.push_key, cmd.user_id)
return self.streamer.on_remove_pusher(
cmd.app_id, cmd.push_key, cmd.user_id,
)

def on_INVALIDATE_CACHE(self, cmd):
self.streamer.on_invalidate_cache(cmd.cache_func, cmd.keys)
return self.streamer.on_invalidate_cache(cmd.cache_func, cmd.keys)

def on_USER_IP(self, cmd):
self.streamer.on_user_ip(
return self.streamer.on_user_ip(
cmd.user_id, cmd.access_token, cmd.ip, cmd.user_agent, cmd.device_id,
cmd.last_seen,
)
Expand Down Expand Up @@ -542,14 +559,13 @@ def on_RDATA(self, cmd):
# Check if this is the last of a batch of updates
rows = self.pending_batches.pop(stream_name, [])
rows.append(row)

self.handler.on_rdata(stream_name, cmd.token, rows)
return self.handler.on_rdata(stream_name, cmd.token, rows)

def on_POSITION(self, cmd):
self.handler.on_position(cmd.stream_name, cmd.token)
return self.handler.on_position(cmd.stream_name, cmd.token)

def on_SYNC(self, cmd):
self.handler.on_sync(cmd.data)
return self.handler.on_sync(cmd.data)

def replicate(self, stream_name, token):
"""Send the subscription request to the server
Expand Down

0 comments on commit 3cef867

Please sign in to comment.