Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Move additional tasks to the background worker, part 4 #8513

Merged
merged 7 commits into from
Oct 13, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/8513.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Allow running background tasks in a separate worker process.
3 changes: 2 additions & 1 deletion synapse/handlers/account_validity.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ def send_emails():
"send_renewals", self._send_renewal_emails
)

self.clock.looping_call(send_emails, 30 * 60 * 1000)
if hs.config.run_background_tasks:
self.clock.looping_call(send_emails, 30 * 60 * 1000)

async def _send_renewal_emails(self):
"""Gets the list of users whose account is expiring in the amount of time
Expand Down
2 changes: 1 addition & 1 deletion synapse/handlers/deactivate_account.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def __init__(self, hs: "HomeServer"):

# Start the user parter loop so it can resume parting users from rooms where
# it left off (if it has work left to do).
if hs.config.worker_app is None:
if hs.config.run_background_tasks:
hs.get_reactor().callWhenRunning(self._start_user_parting)

self._account_validity_enabled = hs.config.account_validity.enabled
Expand Down
18 changes: 9 additions & 9 deletions synapse/handlers/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -402,21 +402,23 @@ def __init__(self, hs: "HomeServer"):
self.config.block_events_without_consent_error
)

# we need to construct a ConsentURIBuilder here, as it checks that the necessary
# config options, but *only* if we have a configuration for which we are
# going to need it.
if self._block_events_without_consent_error:
self._consent_uri_builder = ConsentURIBuilder(self.config)

# Rooms which should be excluded from dummy insertion. (For instance,
# those without local users who can send events into the room).
#
# map from room id to time-of-last-attempt.
#
self._rooms_to_exclude_from_dummy_event_insertion = {} # type: Dict[str, int]

# we need to construct a ConsentURIBuilder here, as it checks that the necessary
# config options, but *only* if we have a configuration for which we are
# going to need it.
if self._block_events_without_consent_error:
self._consent_uri_builder = ConsentURIBuilder(self.config)
# The number of forward extremeities before a dummy event is sent.
self._dummy_events_threshold = hs.config.dummy_events_threshold

if (
not self.config.worker_app
self.config.run_background_tasks
and self.config.cleanup_extremities_with_dummy_events
):
self.clock.looping_call(
Expand All @@ -431,8 +433,6 @@ def __init__(self, hs: "HomeServer"):

self._ephemeral_events_enabled = hs.config.enable_ephemeral_messages

self._dummy_events_threshold = hs.config.dummy_events_threshold

async def create_event(
self,
requester: Requester,
Expand Down
2 changes: 1 addition & 1 deletion synapse/handlers/pagination.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ def __init__(self, hs: "HomeServer"):
self._retention_allowed_lifetime_min = hs.config.retention_allowed_lifetime_min
self._retention_allowed_lifetime_max = hs.config.retention_allowed_lifetime_max

if hs.config.retention_enabled:
if hs.config.run_background_tasks and hs.config.retention_enabled:
# Run the purge jobs described in the configuration file.
for job in hs.config.retention_purge_jobs:
logger.info("Setting up purge job with config: %s", job)
Expand Down
29 changes: 11 additions & 18 deletions synapse/handlers/profile.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,16 @@
MAX_AVATAR_URL_LEN = 1000


class BaseProfileHandler(BaseHandler):
class ProfileHandler(BaseHandler):
"""Handles fetching and updating user profile information.

BaseProfileHandler can be instantiated directly on workers and will
delegate to master when necessary. The master process should use the
subclass MasterProfileHandler
ProfileHandler can be instantiated directly on workers and will
delegate to master when necessary.
"""

PROFILE_UPDATE_MS = 60 * 1000
PROFILE_UPDATE_EVERY_MS = 24 * 60 * 60 * 1000

def __init__(self, hs):
super().__init__(hs)

Expand All @@ -53,6 +55,11 @@ def __init__(self, hs):

self.user_directory_handler = hs.get_user_directory_handler()

if hs.config.run_background_tasks:
self.clock.looping_call(
self._start_update_remote_profile_cache, self.PROFILE_UPDATE_MS
)

async def get_profile(self, user_id):
target_user = UserID.from_string(user_id)

Expand Down Expand Up @@ -363,20 +370,6 @@ async def check_profile_query_allowed(self, target_user, requester=None):
raise SynapseError(403, "Profile isn't available", Codes.FORBIDDEN)
raise


class MasterProfileHandler(BaseProfileHandler):
PROFILE_UPDATE_MS = 60 * 1000
PROFILE_UPDATE_EVERY_MS = 24 * 60 * 60 * 1000

def __init__(self, hs):
super().__init__(hs)

assert hs.config.worker_app is None

self.clock.looping_call(
self._start_update_remote_profile_cache, self.PROFILE_UPDATE_MS
)

def _start_update_remote_profile_cache(self):
return run_as_background_process(
"Update remote profile", self._update_remote_profile_cache
Expand Down
12 changes: 7 additions & 5 deletions synapse/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@
from synapse.handlers.pagination import PaginationHandler
from synapse.handlers.password_policy import PasswordPolicyHandler
from synapse.handlers.presence import PresenceHandler
from synapse.handlers.profile import BaseProfileHandler, MasterProfileHandler
from synapse.handlers.profile import ProfileHandler
from synapse.handlers.read_marker import ReadMarkerHandler
from synapse.handlers.receipts import ReceiptsHandler
from synapse.handlers.register import RegistrationHandler
Expand Down Expand Up @@ -191,7 +191,12 @@ class HomeServer(metaclass=abc.ABCMeta):
"""

REQUIRED_ON_BACKGROUND_TASK_STARTUP = [
"account_validity",
"auth",
"deactivate_account",
"message",
"pagination",
"profile",
"stats",
]

Expand Down Expand Up @@ -462,10 +467,7 @@ def get_initial_sync_handler(self) -> InitialSyncHandler:

@cache_in_self
def get_profile_handler(self):
if self.config.worker_app:
return BaseProfileHandler(self)
else:
return MasterProfileHandler(self)
return ProfileHandler(self)

@cache_in_self
def get_event_creation_handler(self) -> EventCreationHandler:
Expand Down
82 changes: 41 additions & 41 deletions synapse/storage/databases/main/profile.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,27 +91,6 @@ async def set_profile_avatar_url(
desc="set_profile_avatar_url",
)


class ProfileStore(ProfileWorkerStore):
async def add_remote_profile_cache(
self, user_id: str, displayname: str, avatar_url: str
) -> None:
"""Ensure we are caching the remote user's profiles.

This should only be called when `is_subscribed_remote_profile_for_user`
would return true for the user.
"""
await self.db_pool.simple_upsert(
table="remote_profile_cache",
keyvalues={"user_id": user_id},
values={
"displayname": displayname,
"avatar_url": avatar_url,
"last_check": self._clock.time_msec(),
},
desc="add_remote_profile_cache",
)

async def update_remote_profile_cache(
self, user_id: str, displayname: str, avatar_url: str
) -> int:
Expand All @@ -138,6 +117,31 @@ async def maybe_delete_remote_profile_cache(self, user_id):
desc="delete_remote_profile_cache",
)

async def is_subscribed_remote_profile_for_user(self, user_id):
"""Check whether we are interested in a remote user's profile.
"""
res = await self.db_pool.simple_select_one_onecol(
table="group_users",
keyvalues={"user_id": user_id},
retcol="user_id",
allow_none=True,
desc="should_update_remote_profile_cache_for_user",
)

if res:
return True

res = await self.db_pool.simple_select_one_onecol(
table="group_invites",
keyvalues={"user_id": user_id},
retcol="user_id",
allow_none=True,
desc="should_update_remote_profile_cache_for_user",
)

if res:
return True

async def get_remote_profile_cache_entries_that_expire(
self, last_checked: int
) -> Dict[str, str]:
Expand All @@ -160,27 +164,23 @@ def _get_remote_profile_cache_entries_that_expire_txn(txn):
_get_remote_profile_cache_entries_that_expire_txn,
)

async def is_subscribed_remote_profile_for_user(self, user_id):
"""Check whether we are interested in a remote user's profile.
"""
res = await self.db_pool.simple_select_one_onecol(
table="group_users",
keyvalues={"user_id": user_id},
retcol="user_id",
allow_none=True,
desc="should_update_remote_profile_cache_for_user",
)

if res:
return True
class ProfileStore(ProfileWorkerStore):
async def add_remote_profile_cache(
self, user_id: str, displayname: str, avatar_url: str
) -> None:
"""Ensure we are caching the remote user's profiles.

res = await self.db_pool.simple_select_one_onecol(
table="group_invites",
This should only be called when `is_subscribed_remote_profile_for_user`
would return true for the user.
"""
await self.db_pool.simple_upsert(
table="remote_profile_cache",
keyvalues={"user_id": user_id},
retcol="user_id",
allow_none=True,
desc="should_update_remote_profile_cache_for_user",
values={
"displayname": displayname,
"avatar_url": avatar_url,
"last_check": self._clock.time_msec(),
},
desc="add_remote_profile_cache",
)

if res:
return True
52 changes: 26 additions & 26 deletions synapse/storage/databases/main/registration.py
Original file line number Diff line number Diff line change
Expand Up @@ -862,6 +862,32 @@ def set_expiration_date_for_user_txn(self, txn, user_id, use_delta=False):
values={"expiration_ts_ms": expiration_ts, "email_sent": False},
)

async def get_user_pending_deactivation(self) -> Optional[str]:
"""
Gets one user from the table of users waiting to be parted from all the rooms
they're in.
"""
return await self.db_pool.simple_select_one_onecol(
"users_pending_deactivation",
keyvalues={},
retcol="user_id",
allow_none=True,
desc="get_users_pending_deactivation",
)

async def del_user_pending_deactivation(self, user_id: str) -> None:
"""
Removes the given user to the table of users who need to be parted from all the
rooms they're in, effectively marking that user as fully deactivated.
"""
# XXX: This should be simple_delete_one but we failed to put a unique index on
# the table, so somehow duplicate entries have ended up in it.
await self.db_pool.simple_delete(
"users_pending_deactivation",
keyvalues={"user_id": user_id},
desc="del_user_pending_deactivation",
)


class RegistrationBackgroundUpdateStore(RegistrationWorkerStore):
def __init__(self, database: DatabasePool, db_conn, hs):
Expand Down Expand Up @@ -1367,32 +1393,6 @@ async def add_user_pending_deactivation(self, user_id: str) -> None:
desc="add_user_pending_deactivation",
)

async def del_user_pending_deactivation(self, user_id: str) -> None:
"""
Removes the given user to the table of users who need to be parted from all the
rooms they're in, effectively marking that user as fully deactivated.
"""
# XXX: This should be simple_delete_one but we failed to put a unique index on
# the table, so somehow duplicate entries have ended up in it.
await self.db_pool.simple_delete(
"users_pending_deactivation",
keyvalues={"user_id": user_id},
desc="del_user_pending_deactivation",
)

async def get_user_pending_deactivation(self) -> Optional[str]:
"""
Gets one user from the table of users waiting to be parted from all the rooms
they're in.
"""
return await self.db_pool.simple_select_one_onecol(
"users_pending_deactivation",
keyvalues={},
retcol="user_id",
allow_none=True,
desc="get_users_pending_deactivation",
)

async def validate_threepid_session(
self, session_id: str, client_secret: str, token: str, current_ts: int
) -> Optional[str]:
Expand Down
Loading