Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Don't double encode replication data #2103

Merged
merged 3 commits into from
Apr 7, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions synapse/app/federation_sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@
import sys
import logging
import gc
import ujson as json

logger = logging.getLogger("synapse.app.appservice")

Expand Down Expand Up @@ -290,8 +289,7 @@ def process_replication_rows(self, stream_name, token, rows):
# Parse the rows in the stream
for row in rows:
typ = row.type
content_js = row.data
content = json.loads(content_js)
content = row.data

if typ == send_queue.PRESENCE_TYPE:
destination = content["destination"]
Expand Down
4 changes: 1 addition & 3 deletions synapse/app/synchrotron.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@
import logging
import contextlib
import gc
import ujson as json

logger = logging.getLogger("synapse.app.synchrotron")

Expand Down Expand Up @@ -215,9 +214,8 @@ def process_replication_rows(self, token, rows):
self._latest_room_serial = token

for row in rows:
typing = json.loads(row.user_ids)
self._room_serials[row.room_id] = token
self._room_typing[row.room_id] = typing
self._room_typing[row.room_id] = row.user_ids


class SynchrotronApplicationService(object):
Expand Down
19 changes: 9 additions & 10 deletions synapse/federation/send_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import synapse.metrics

from blist import sorteddict
import ujson


metrics = synapse.metrics.get_metrics_for(__name__)
Expand Down Expand Up @@ -258,10 +257,10 @@ def get_replication_rows(self, from_token, to_token, limit, federation_ack=None)
)

for (key, (dest, user_id)) in dest_user_ids:
rows.append((key, PRESENCE_TYPE, ujson.dumps({
rows.append((key, PRESENCE_TYPE, {
"destination": dest,
"state": self.presence_map[user_id].as_dict(),
})))
}))

# Fetch changes keyed edus
keys = self.keyed_edu_changed.keys()
Expand All @@ -271,10 +270,10 @@ def get_replication_rows(self, from_token, to_token, limit, federation_ack=None)

for (pos, (destination, edu_key)) in keyed_edus:
rows.append(
(pos, KEYED_EDU_TYPE, ujson.dumps({
(pos, KEYED_EDU_TYPE, {
"key": edu_key,
"edu": self.keyed_edu[(destination, edu_key)].get_internal_dict(),
}))
})
)

# Fetch changed edus
Expand All @@ -284,7 +283,7 @@ def get_replication_rows(self, from_token, to_token, limit, federation_ack=None)
edus = set((k, self.edus[k]) for k in keys[i:j])

for (pos, edu) in edus:
rows.append((pos, EDU_TYPE, ujson.dumps(edu.get_internal_dict())))
rows.append((pos, EDU_TYPE, edu.get_internal_dict()))

# Fetch changed failures
keys = self.failures.keys()
Expand All @@ -293,10 +292,10 @@ def get_replication_rows(self, from_token, to_token, limit, federation_ack=None)
failures = set((k, self.failures[k]) for k in keys[i:j])

for (pos, (destination, failure)) in failures:
rows.append((pos, FAILURE_TYPE, ujson.dumps({
rows.append((pos, FAILURE_TYPE, {
"destination": destination,
"failure": failure,
})))
}))

# Fetch changed device messages
keys = self.device_messages.keys()
Expand All @@ -305,9 +304,9 @@ def get_replication_rows(self, from_token, to_token, limit, federation_ack=None)
device_messages = set((k, self.device_messages[k]) for k in keys[i:j])

for (pos, destination) in device_messages:
rows.append((pos, DEVICE_MESSAGE_TYPE, ujson.dumps({
rows.append((pos, DEVICE_MESSAGE_TYPE, {
"destination": destination,
})))
}))

# Sort rows based on pos
rows.sort()
Expand Down
4 changes: 1 addition & 3 deletions synapse/handlers/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import logging

from collections import namedtuple
import ujson as json

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -288,8 +287,7 @@ def get_all_typing_updates(self, last_id, current_id):
for room_id, serial in self._room_serials.items():
if last_id < serial and serial <= current_id:
typing = self._room_typing[room_id]
typing_bytes = json.dumps(list(typing), ensure_ascii=False)
rows.append((serial, room_id, typing_bytes))
rows.append((serial, room_id, list(typing)))
rows.sort()
return rows

Expand Down
104 changes: 76 additions & 28 deletions synapse/replication/tcp/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,34 +36,82 @@
MAX_EVENTS_BEHIND = 10000


EventStreamRow = namedtuple("EventStreamRow",
("event_id", "room_id", "type", "state_key", "redacts"))
BackfillStreamRow = namedtuple("BackfillStreamRow",
("event_id", "room_id", "type", "state_key", "redacts"))
PresenceStreamRow = namedtuple("PresenceStreamRow",
("user_id", "state", "last_active_ts",
"last_federation_update_ts", "last_user_sync_ts",
"status_msg", "currently_active"))
TypingStreamRow = namedtuple("TypingStreamRow",
("room_id", "user_ids"))
ReceiptsStreamRow = namedtuple("ReceiptsStreamRow",
("room_id", "receipt_type", "user_id", "event_id",
"data"))
PushRulesStreamRow = namedtuple("PushRulesStreamRow", ("user_id",))
PushersStreamRow = namedtuple("PushersStreamRow",
("user_id", "app_id", "pushkey", "deleted",))
CachesStreamRow = namedtuple("CachesStreamRow",
("cache_func", "keys", "invalidation_ts",))
PublicRoomsStreamRow = namedtuple("PublicRoomsStreamRow",
("room_id", "visibility", "appservice_id",
"network_id",))
DeviceListsStreamRow = namedtuple("DeviceListsStreamRow", ("user_id", "destination",))
ToDeviceStreamRow = namedtuple("ToDeviceStreamRow", ("entity",))
FederationStreamRow = namedtuple("FederationStreamRow", ("type", "data",))
TagAccountDataStreamRow = namedtuple("TagAccountDataStreamRow",
("user_id", "room_id", "data"))
AccountDataStreamRow = namedtuple("AccountDataStream",
("user_id", "room_id", "data_type", "data"))
EventStreamRow = namedtuple("EventStreamRow", (
"event_id", # str
"room_id", # str
"type", # str
"state_key", # str, optional
"redacts", # str, optional
))
BackfillStreamRow = namedtuple("BackfillStreamRow", (
"event_id", # str
"room_id", # str
"type", # str
"state_key", # str, optional
"redacts", # str, optional
))
PresenceStreamRow = namedtuple("PresenceStreamRow", (
"user_id", # str
"state", # str
"last_active_ts", # int
"last_federation_update_ts", # int
"last_user_sync_ts", # int
"status_msg", # str
"currently_active", # bool
))
TypingStreamRow = namedtuple("TypingStreamRow", (
"room_id", # str
"user_ids", # list(str)
))
ReceiptsStreamRow = namedtuple("ReceiptsStreamRow", (
"room_id", # str
"receipt_type", # str
"user_id", # str
"event_id", # str
"data", # dict
))
PushRulesStreamRow = namedtuple("PushRulesStreamRow", (
"user_id", # str
))
PushersStreamRow = namedtuple("PushersStreamRow", (
"user_id", # str
"app_id", # str
"pushkey", # str
"deleted", # bool
))
CachesStreamRow = namedtuple("CachesStreamRow", (
"cache_func", # str
"keys", # list(str)
"invalidation_ts", # int
))
PublicRoomsStreamRow = namedtuple("PublicRoomsStreamRow", (
"room_id", # str
"visibility", # str
"appservice_id", # str, optional
"network_id", # str, optional
))
DeviceListsStreamRow = namedtuple("DeviceListsStreamRow", (
"user_id", # str
"destination", # str
))
ToDeviceStreamRow = namedtuple("ToDeviceStreamRow", (
"entity", # str
))
FederationStreamRow = namedtuple("FederationStreamRow", (
"type", # str
"data", # dict
))
TagAccountDataStreamRow = namedtuple("TagAccountDataStreamRow", (
"user_id", # str
"room_id", # str
"data", # dict
))
AccountDataStreamRow = namedtuple("AccountDataStream", (
"user_id", # str
"room_id", # str
"data_type", # str
"data", # dict
))


class Stream(object):
Expand Down