Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Split up replication.tcp.streams into smaller files #4953

Merged
merged 4 commits into from
Mar 28, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions changelog.d/4953.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Split synapse.replication.tcp.streams into smaller files.

2 changes: 1 addition & 1 deletion synapse/app/federation_sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
from synapse.replication.slave.storage.transactions import SlavedTransactionStore
from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.replication.tcp.streams import ReceiptsStream
from synapse.replication.tcp.streams._base import ReceiptsStream
from synapse.server import HomeServer
from synapse.storage.engines import create_engine
from synapse.types import ReadReceipt
Expand Down
3 changes: 2 additions & 1 deletion synapse/replication/tcp/resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@
from synapse.util.metrics import Measure, measure_func

from .protocol import ServerReplicationStreamProtocol
from .streams import STREAMS_MAP, FederationStream
from .streams import STREAMS_MAP
from .streams.federation import FederationStream

stream_updates_counter = Counter("synapse_replication_tcp_resource_stream_updates",
"", ["stream_name"])
Expand Down
50 changes: 50 additions & 0 deletions synapse/replication/tcp/streams/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# -*- coding: utf-8 -*-
# Copyright 2017 Vector Creations Ltd
# Copyright 2019 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Defines all the valid streams that clients can subscribe to, and the format
of the rows returned by each stream.

Each stream is defined by the following information:

stream name: The name of the stream
row type: The type that is used to serialise/deserialse the row
current_token: The function that returns the current token for the stream
update_function: The function that returns a list of updates between two tokens
"""

from . import _base, events, federation

STREAMS_MAP = {
stream.NAME: stream
for stream in (
events.EventsStream,
_base.BackfillStream,
_base.PresenceStream,
_base.TypingStream,
_base.ReceiptsStream,
_base.PushRulesStream,
_base.PushersStream,
_base.CachesStream,
_base.PublicRoomsStream,
_base.DeviceListsStream,
_base.ToDeviceStream,
federation.FederationStream,
_base.TagAccountDataStream,
_base.AccountDataStream,
_base.CurrentStateDeltaStream,
_base.GroupServerStream,
)
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2017 Vector Creations Ltd
# Copyright 2019 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -13,16 +14,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

"""Defines all the valid streams that clients can subscribe to, and the format
of the rows returned by each stream.

Each stream is defined by the following information:

stream name: The name of the stream
row type: The type that is used to serialise/deserialse the row
current_token: The function that returns the current token for the stream
update_function: The function that returns a list of updates between two tokens
"""
import itertools
import logging
from collections import namedtuple
Expand All @@ -34,14 +26,6 @@

MAX_EVENTS_BEHIND = 10000


EventStreamRow = namedtuple("EventStreamRow", (
"event_id", # str
"room_id", # str
"type", # str
"state_key", # str, optional
"redacts", # str, optional
))
BackfillStreamRow = namedtuple("BackfillStreamRow", (
"event_id", # str
"room_id", # str
Expand Down Expand Up @@ -96,10 +80,6 @@
ToDeviceStreamRow = namedtuple("ToDeviceStreamRow", (
"entity", # str
))
FederationStreamRow = namedtuple("FederationStreamRow", (
"type", # str, the type of data as defined in the BaseFederationRows
"data", # dict, serialization of a federation.send_queue.BaseFederationRow
))
TagAccountDataStreamRow = namedtuple("TagAccountDataStreamRow", (
"user_id", # str
"room_id", # str
Expand Down Expand Up @@ -236,20 +216,6 @@ def update_function(self, from_token, current_token, limit=None):
raise NotImplementedError()


class EventsStream(Stream):
"""We received a new event, or an event went from being an outlier to not
"""
NAME = "events"
ROW_TYPE = EventStreamRow

def __init__(self, hs):
store = hs.get_datastore()
self.current_token = store.get_current_events_token
self.update_function = store.get_all_new_forward_event_rows

super(EventsStream, self).__init__(hs)


class BackfillStream(Stream):
"""We fetched some old events and either we had never seen that event before
or it went from being an outlier to not.
Expand Down Expand Up @@ -404,22 +370,6 @@ def __init__(self, hs):
super(ToDeviceStream, self).__init__(hs)


class FederationStream(Stream):
"""Data to be sent over federation. Only available when master has federation
sending disabled.
"""
NAME = "federation"
ROW_TYPE = FederationStreamRow

def __init__(self, hs):
federation_sender = hs.get_federation_sender()

self.current_token = federation_sender.get_current_token
self.update_function = federation_sender.get_replication_rows

super(FederationStream, self).__init__(hs)


class TagAccountDataStream(Stream):
"""Someone added/removed a tag for a room
"""
Expand Down Expand Up @@ -489,26 +439,3 @@ def __init__(self, hs):
self.update_function = store.get_all_groups_changes

super(GroupServerStream, self).__init__(hs)


STREAMS_MAP = {
stream.NAME: stream
for stream in (
EventsStream,
BackfillStream,
PresenceStream,
TypingStream,
ReceiptsStream,
PushRulesStream,
PushersStream,
CachesStream,
PublicRoomsStream,
DeviceListsStream,
ToDeviceStream,
FederationStream,
TagAccountDataStream,
AccountDataStream,
CurrentStateDeltaStream,
GroupServerStream,
)
}
40 changes: 40 additions & 0 deletions synapse/replication/tcp/streams/events.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# -*- coding: utf-8 -*-
# Copyright 2017 Vector Creations Ltd
# Copyright 2019 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from collections import namedtuple

from ._base import Stream

EventStreamRow = namedtuple("EventStreamRow", (
"event_id", # str
"room_id", # str
"type", # str
"state_key", # str, optional
"redacts", # str, optional
))


class EventsStream(Stream):
"""We received a new event, or an event went from being an outlier to not
"""
NAME = "events"
ROW_TYPE = EventStreamRow

def __init__(self, hs):
store = hs.get_datastore()
self.current_token = store.get_current_events_token
self.update_function = store.get_all_new_forward_event_rows

super(EventsStream, self).__init__(hs)
39 changes: 39 additions & 0 deletions synapse/replication/tcp/streams/federation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# -*- coding: utf-8 -*-
# Copyright 2017 Vector Creations Ltd
# Copyright 2019 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from collections import namedtuple

from ._base import Stream

FederationStreamRow = namedtuple("FederationStreamRow", (
"type", # str, the type of data as defined in the BaseFederationRows
"data", # dict, serialization of a federation.send_queue.BaseFederationRow
))


class FederationStream(Stream):
"""Data to be sent over federation. Only available when master has federation
sending disabled.
"""
NAME = "federation"
ROW_TYPE = FederationStreamRow

def __init__(self, hs):
federation_sender = hs.get_federation_sender()

self.current_token = federation_sender.get_current_token
self.update_function = federation_sender.get_replication_rows

super(FederationStream, self).__init__(hs)
2 changes: 1 addition & 1 deletion tests/replication/tcp/streams/test_receipts.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from synapse.replication.tcp.streams import ReceiptsStreamRow
from synapse.replication.tcp.streams._base import ReceiptsStreamRow

from tests.replication.tcp.streams._base import BaseStreamTestCase

Expand Down