Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix bug where Sliding Sync could get stuck when using workers #17438

Merged
merged 10 commits into from
Jul 15, 2024
13 changes: 13 additions & 0 deletions synapse/types/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,8 @@ class AbstractMultiWriterStreamToken(metaclass=abc.ABCMeta):
represented by a default `stream` attribute and a map of instance name to
stream position of any writers that are ahead of the default stream
position.

The values in `instance_map` must be greater than the `stream` attribute.
"""

stream: int = attr.ib(validator=attr.validators.instance_of(int), kw_only=True)
Expand All @@ -472,6 +474,15 @@ class AbstractMultiWriterStreamToken(metaclass=abc.ABCMeta):
kw_only=True,
)

def __attrs_post_init__(self) -> None:
# Enforce that all instances have a value greater than the min stream
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

greater than or equal to*

likewise in test_instance_map_assertion

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, its probably better to do a strict check.

# position.
for v in self.instance_map.values():
if v < self.stream:
raise ValueError(
"'instance_map' includes a stream position before the main 'stream' attribute"
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
)

@classmethod
@abc.abstractmethod
async def parse(cls, store: "DataStore", string: str) -> "Self":
Expand Down Expand Up @@ -641,6 +652,8 @@ def __attrs_post_init__(self) -> None:
"Cannot set both 'topological' and 'instance_map' on 'RoomStreamToken'."
)

super().__attrs_post_init__()

@classmethod
async def parse(cls, store: "PurgeEventsStore", string: str) -> "RoomStreamToken":
try:
Expand Down
17 changes: 5 additions & 12 deletions tests/test_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,16 +161,9 @@ def test_instance_map(self) -> None:
parsed_token = self.get_success(RoomStreamToken.parse(store, string_token))
self.assertEqual(parsed_token, token)

@skipUnless(USE_POSTGRES_FOR_TESTS, "Requires Postgres")
def test_instance_map_behind(self) -> None:
"""Test for stream token with instance map, where instance map entries
are from before stream token."""
store = self.hs.get_datastores().main
def test_instance_map_assertion(self) -> None:
"""Test that we assert values in the instance map are greater than the
min stream position"""
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved

token = RoomStreamToken(stream=5, instance_map=immutabledict({"foo": 4}))

string_token = self.get_success(token.to_string(store))
self.assertEqual(string_token, "s5")

parsed_token = self.get_success(RoomStreamToken.parse(store, string_token))
self.assertEqual(parsed_token, RoomStreamToken(stream=5))
with self.assertRaises(ValueError):
RoomStreamToken(stream=5, instance_map=immutabledict({"foo": 4}))