Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Fix missing attributes on workers. #3582

Merged
merged 2 commits into from
Jul 23, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/3582.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Lazily load state on master process when using workers to reduce DB consumption
10 changes: 6 additions & 4 deletions synapse/events/snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,8 @@ def with_state(state_group, current_state_ids, prev_state_ids,

return context

def serialize(self, event):
@defer.inlineCallbacks
def serialize(self, event, store):
"""Converts self to a type that can be serialized as JSON, and then
deserialized by `deserialize`

Expand All @@ -126,11 +127,12 @@ def serialize(self, event):
# the prev_state_ids, so if we're a state event we include the event
# id that we replaced in the state.
if event.is_state():
prev_state_id = self.prev_state_ids.get((event.type, event.state_key))
prev_state_ids = yield self.get_prev_state_ids(store)
prev_state_id = prev_state_ids.get((event.type, event.state_key))
else:
prev_state_id = None

return {
defer.returnValue({
"prev_state_id": prev_state_id,
"event_type": event.type,
"event_state_key": event.state_key if event.is_state() else None,
Expand All @@ -140,7 +142,7 @@ def serialize(self, event):
"delta_ids": _encode_state_dict(self.delta_ids),
"prev_state_events": self.prev_state_events,
"app_service_id": self.app_service.id if self.app_service else None
}
})

@staticmethod
def deserialize(store, input):
Expand Down
5 changes: 3 additions & 2 deletions synapse/handlers/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -807,8 +807,9 @@ def handle_new_client_event(
# If we're a worker we need to hit out to the master.
if self.config.worker_app:
yield send_event_to_master(
self.hs.get_clock(),
self.http_client,
clock=self.hs.get_clock(),
store=self.store,
client=self.http_client,
host=self.config.worker_replication_host,
port=self.config.worker_replication_http_port,
requester=requester,
Expand Down
7 changes: 5 additions & 2 deletions synapse/replication/http/send_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,13 @@


@defer.inlineCallbacks
def send_event_to_master(clock, client, host, port, requester, event, context,
def send_event_to_master(clock, store, client, host, port, requester, event, context,
ratelimit, extra_users):
"""Send event to be handled on the master

Args:
clock (synapse.util.Clock)
store (DataStore)
client (SimpleHttpClient)
host (str): host of master
port (int): port on master listening for HTTP replication
Expand All @@ -53,11 +54,13 @@ def send_event_to_master(clock, client, host, port, requester, event, context,
host, port, event.event_id,
)

serialized_context = yield context.serialize(event, store)

payload = {
"event": event.get_pdu_json(),
"internal_metadata": event.internal_metadata.get_dict(),
"rejected_reason": event.rejected_reason,
"context": context.serialize(event),
"context": serialized_context,
"requester": requester.serialize(),
"ratelimit": ratelimit,
"extra_users": [u.to_string() for u in extra_users],
Expand Down