Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Reduce the number of tests using TCP replication. (#13543)
Browse files Browse the repository at this point in the history
Uses Redis replication in additional test cases (instead of
TCP replication). A small step towards dropping TCP replication.
  • Loading branch information
clokep committed Aug 19, 2022
1 parent 3a245f6 commit f3fba49
Show file tree
Hide file tree
Showing 6 changed files with 36 additions and 77 deletions.
1 change: 1 addition & 0 deletions changelog.d/13543.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Reduce the number of tests using legacy TCP replication.
4 changes: 2 additions & 2 deletions tests/handlers/test_room_member.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from synapse.types import UserID, create_requester
from synapse.util import Clock

from tests.replication._base import RedisMultiWorkerStreamTestCase
from tests.replication._base import BaseMultiWorkerStreamTestCase
from tests.server import make_request
from tests.test_utils import make_awaitable
from tests.unittest import FederatingHomeserverTestCase, override_config
Expand Down Expand Up @@ -216,7 +216,7 @@ def test_remote_joins_contribute_to_rate_limit(self) -> None:
# - trying to remote-join again.


class TestReplicatedJoinsLimitedByPerRoomRateLimiter(RedisMultiWorkerStreamTestCase):
class TestReplicatedJoinsLimitedByPerRoomRateLimiter(BaseMultiWorkerStreamTestCase):
servlets = [
synapse.rest.admin.register_servlets,
synapse.rest.client.login.register_servlets,
Expand Down
7 changes: 0 additions & 7 deletions tests/module_api/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
from tests.test_utils import simple_async_mock
from tests.test_utils.event_injection import inject_member_event
from tests.unittest import HomeserverTestCase, override_config
from tests.utils import USE_POSTGRES_FOR_TESTS


class ModuleApiTestCase(HomeserverTestCase):
Expand Down Expand Up @@ -738,11 +737,6 @@ def test_create_room(self) -> None:
class ModuleApiWorkerTestCase(BaseMultiWorkerStreamTestCase):
"""For testing ModuleApi functionality in a multi-worker setup"""

# Testing stream ID replication from the main to worker processes requires postgres
# (due to needing `MultiWriterIdGenerator`).
if not USE_POSTGRES_FOR_TESTS:
skip = "Requires Postgres"

servlets = [
admin.register_servlets,
login.register_servlets,
Expand All @@ -752,7 +746,6 @@ class ModuleApiWorkerTestCase(BaseMultiWorkerStreamTestCase):

def default_config(self):
conf = super().default_config()
conf["redis"] = {"enabled": "true"}
conf["stream_writers"] = {"presence": ["presence_writer"]}
conf["instance_map"] = {
"presence_writer": {"host": "testserv", "port": 1001},
Expand Down
90 changes: 31 additions & 59 deletions tests/replication/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@
from synapse.replication.http import ReplicationRestResource
from synapse.replication.tcp.client import ReplicationDataHandler
from synapse.replication.tcp.handler import ReplicationCommandHandler
from synapse.replication.tcp.protocol import ClientReplicationStreamProtocol
from synapse.replication.tcp.resource import (
ReplicationStreamProtocolFactory,
from synapse.replication.tcp.protocol import (
ClientReplicationStreamProtocol,
ServerReplicationStreamProtocol,
)
from synapse.replication.tcp.resource import ReplicationStreamProtocolFactory
from synapse.server import HomeServer

from tests import unittest
Expand Down Expand Up @@ -220,15 +220,34 @@ def assert_request_is_get_repl_stream_updates(
class BaseMultiWorkerStreamTestCase(unittest.HomeserverTestCase):
"""Base class for tests running multiple workers.
Enables Redis, providing a fake Redis server.
Automatically handle HTTP replication requests from workers to master,
unlike `BaseStreamTestCase`.
"""

if not hiredis:
skip = "Requires hiredis"

if not USE_POSTGRES_FOR_TESTS:
# Redis replication only takes place on Postgres
skip = "Requires Postgres"

def default_config(self) -> Dict[str, Any]:
"""
Overrides the default config to enable Redis.
Even if the test only uses make_worker_hs, the main process needs Redis
enabled otherwise it won't create a Fake Redis server to listen on the
Redis port and accept fake TCP connections.
"""
base = super().default_config()
base["redis"] = {"enabled": True}
return base

def setUp(self):
super().setUp()

# build a replication server
self.server_factory = ReplicationStreamProtocolFactory(self.hs)
self.streamer = self.hs.get_replication_streamer()

# Fake in memory Redis server that servers can connect to.
Expand All @@ -247,15 +266,14 @@ def setUp(self):
# handling inbound HTTP requests to that instance.
self._hs_to_site = {self.hs: self.site}

if self.hs.config.redis.redis_enabled:
# Handle attempts to connect to fake redis server.
self.reactor.add_tcp_client_callback(
"localhost",
6379,
self.connect_any_redis_attempts,
)
# Handle attempts to connect to fake redis server.
self.reactor.add_tcp_client_callback(
"localhost",
6379,
self.connect_any_redis_attempts,
)

self.hs.get_replication_command_handler().start_replication(self.hs)
self.hs.get_replication_command_handler().start_replication(self.hs)

# When we see a connection attempt to the master replication listener we
# automatically set up the connection. This is so that tests don't
Expand Down Expand Up @@ -339,27 +357,6 @@ def make_worker_hs(
store = worker_hs.get_datastores().main
store.db_pool._db_pool = self.database_pool._db_pool

# Set up TCP replication between master and the new worker if we don't
# have Redis support enabled.
if not worker_hs.config.redis.redis_enabled:
repl_handler = ReplicationCommandHandler(worker_hs)
client = ClientReplicationStreamProtocol(
worker_hs,
"client",
"test",
self.clock,
repl_handler,
)
server = self.server_factory.buildProtocol(
IPv4Address("TCP", "127.0.0.1", 0)
)

client_transport = FakeTransport(server, self.reactor)
client.makeConnection(client_transport)

server_transport = FakeTransport(client, self.reactor)
server.makeConnection(server_transport)

# Set up a resource for the worker
resource = ReplicationRestResource(worker_hs)

Expand All @@ -378,8 +375,7 @@ def make_worker_hs(
reactor=self.reactor,
)

if worker_hs.config.redis.redis_enabled:
worker_hs.get_replication_command_handler().start_replication(worker_hs)
worker_hs.get_replication_command_handler().start_replication(worker_hs)

return worker_hs

Expand Down Expand Up @@ -582,27 +578,3 @@ def encode(self, obj):

def connectionLost(self, reason):
self._server.remove_subscriber(self)


class RedisMultiWorkerStreamTestCase(BaseMultiWorkerStreamTestCase):
"""
A test case that enables Redis, providing a fake Redis server.
"""

if not hiredis:
skip = "Requires hiredis"

if not USE_POSTGRES_FOR_TESTS:
# Redis replication only takes place on Postgres
skip = "Requires Postgres"

def default_config(self) -> Dict[str, Any]:
"""
Overrides the default config to enable Redis.
Even if the test only uses make_worker_hs, the main process needs Redis
enabled otherwise it won't create a Fake Redis server to listen on the
Redis port and accept fake TCP connections.
"""
base = super().default_config()
base["redis"] = {"enabled": True}
return base
4 changes: 2 additions & 2 deletions tests/replication/tcp/test_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from tests.replication._base import RedisMultiWorkerStreamTestCase
from tests.replication._base import BaseMultiWorkerStreamTestCase


class ChannelsTestCase(RedisMultiWorkerStreamTestCase):
class ChannelsTestCase(BaseMultiWorkerStreamTestCase):
def test_subscribed_to_enough_redis_channels(self) -> None:
# The default main process is subscribed to the USER_IP channel.
self.assertCountEqual(
Expand Down
7 changes: 0 additions & 7 deletions tests/replication/test_sharded_event_persister.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,13 @@

from tests.replication._base import BaseMultiWorkerStreamTestCase
from tests.server import make_request
from tests.utils import USE_POSTGRES_FOR_TESTS

logger = logging.getLogger(__name__)


class EventPersisterShardTestCase(BaseMultiWorkerStreamTestCase):
"""Checks event persisting sharding works"""

# Event persister sharding requires postgres (due to needing
# `MultiWriterIdGenerator`).
if not USE_POSTGRES_FOR_TESTS:
skip = "Requires Postgres"

servlets = [
admin.register_servlets_for_client_rest_resource,
room.register_servlets,
Expand All @@ -50,7 +44,6 @@ def prepare(self, reactor, clock, hs):

def default_config(self):
conf = super().default_config()
conf["redis"] = {"enabled": "true"}
conf["stream_writers"] = {"events": ["worker1", "worker2"]}
conf["instance_map"] = {
"worker1": {"host": "testserv", "port": 1001},
Expand Down

0 comments on commit f3fba49

Please sign in to comment.