Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Stop user directory from failing if it encounters users not in the users table. #11053

Merged
merged 18 commits into from
Oct 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/10825.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add an 'approximate difference' method to `StateFilter`.
1 change: 1 addition & 0 deletions changelog.d/10970.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix inconsistent behavior of `get_last_client_by_ip` when reporting data that has not been stored in the database yet.
1 change: 1 addition & 0 deletions changelog.d/10996.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix a bug introduced in Synapse 1.21.0 that causes opentracing and Prometheus metrics for replication requests to be measured incorrectly.
2 changes: 2 additions & 0 deletions changelog.d/11053.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Fix a bug introduced in Synapse 1.45.0rc1 where the user directory would stop updating if it processed an event from a
user not in the `users` table.
8 changes: 8 additions & 0 deletions synapse/logging/opentracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -807,6 +807,14 @@ def err_back(result):
result.addCallbacks(call_back, err_back)

else:
if inspect.isawaitable(result):
logger.error(
"@trace may not have wrapped %s correctly! "
"The function is not async but returned a %s.",
func.__qualname__,
type(result).__name__,
)

scope.__exit__(None, None, None)

return result
Expand Down
154 changes: 78 additions & 76 deletions synapse/replication/http/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,85 +182,87 @@ def make_client(cls, hs):
)

@trace(opname="outgoing_replication_request")
@outgoing_gauge.track_inprogress()
async def send_request(*, instance_name="master", **kwargs):
if instance_name == local_instance_name:
raise Exception("Trying to send HTTP request to self")
if instance_name == "master":
host = master_host
port = master_port
elif instance_name in instance_map:
host = instance_map[instance_name].host
port = instance_map[instance_name].port
else:
raise Exception(
"Instance %r not in 'instance_map' config" % (instance_name,)
with outgoing_gauge.track_inprogress():
if instance_name == local_instance_name:
raise Exception("Trying to send HTTP request to self")
if instance_name == "master":
host = master_host
port = master_port
elif instance_name in instance_map:
host = instance_map[instance_name].host
port = instance_map[instance_name].port
else:
raise Exception(
"Instance %r not in 'instance_map' config" % (instance_name,)
)

data = await cls._serialize_payload(**kwargs)

url_args = [
urllib.parse.quote(kwargs[name], safe="") for name in cls.PATH_ARGS
]

if cls.CACHE:
txn_id = random_string(10)
url_args.append(txn_id)

if cls.METHOD == "POST":
request_func = client.post_json_get_json
elif cls.METHOD == "PUT":
request_func = client.put_json
elif cls.METHOD == "GET":
request_func = client.get_json
else:
# We have already asserted in the constructor that a
# compatible was picked, but lets be paranoid.
raise Exception(
"Unknown METHOD on %s replication endpoint" % (cls.NAME,)
)

uri = "http://%s:%s/_synapse/replication/%s/%s" % (
host,
port,
cls.NAME,
"/".join(url_args),
)

data = await cls._serialize_payload(**kwargs)

url_args = [
urllib.parse.quote(kwargs[name], safe="") for name in cls.PATH_ARGS
]

if cls.CACHE:
txn_id = random_string(10)
url_args.append(txn_id)

if cls.METHOD == "POST":
request_func = client.post_json_get_json
elif cls.METHOD == "PUT":
request_func = client.put_json
elif cls.METHOD == "GET":
request_func = client.get_json
else:
# We have already asserted in the constructor that a
# compatible was picked, but lets be paranoid.
raise Exception(
"Unknown METHOD on %s replication endpoint" % (cls.NAME,)
)

uri = "http://%s:%s/_synapse/replication/%s/%s" % (
host,
port,
cls.NAME,
"/".join(url_args),
)

try:
# We keep retrying the same request for timeouts. This is so that we
# have a good idea that the request has either succeeded or failed on
# the master, and so whether we should clean up or not.
while True:
headers: Dict[bytes, List[bytes]] = {}
# Add an authorization header, if configured.
if replication_secret:
headers[b"Authorization"] = [b"Bearer " + replication_secret]
opentracing.inject_header_dict(headers, check_destination=False)
try:
result = await request_func(uri, data, headers=headers)
break
except RequestTimedOutError:
if not cls.RETRY_ON_TIMEOUT:
raise

logger.warning("%s request timed out; retrying", cls.NAME)

# If we timed out we probably don't need to worry about backing
# off too much, but lets just wait a little anyway.
await clock.sleep(1)
except HttpResponseException as e:
# We convert to SynapseError as we know that it was a SynapseError
# on the main process that we should send to the client. (And
# importantly, not stack traces everywhere)
_outgoing_request_counter.labels(cls.NAME, e.code).inc()
raise e.to_synapse_error()
except Exception as e:
_outgoing_request_counter.labels(cls.NAME, "ERR").inc()
raise SynapseError(502, "Failed to talk to main process") from e

_outgoing_request_counter.labels(cls.NAME, 200).inc()
return result
try:
# We keep retrying the same request for timeouts. This is so that we
# have a good idea that the request has either succeeded or failed
# on the master, and so whether we should clean up or not.
while True:
headers: Dict[bytes, List[bytes]] = {}
# Add an authorization header, if configured.
if replication_secret:
headers[b"Authorization"] = [
b"Bearer " + replication_secret
]
opentracing.inject_header_dict(headers, check_destination=False)
try:
result = await request_func(uri, data, headers=headers)
break
except RequestTimedOutError:
if not cls.RETRY_ON_TIMEOUT:
raise

logger.warning("%s request timed out; retrying", cls.NAME)

# If we timed out we probably don't need to worry about backing
# off too much, but lets just wait a little anyway.
await clock.sleep(1)
except HttpResponseException as e:
# We convert to SynapseError as we know that it was a SynapseError
# on the main process that we should send to the client. (And
# importantly, not stack traces everywhere)
_outgoing_request_counter.labels(cls.NAME, e.code).inc()
raise e.to_synapse_error()
except Exception as e:
_outgoing_request_counter.labels(cls.NAME, "ERR").inc()
raise SynapseError(502, "Failed to talk to main process") from e

_outgoing_request_counter.labels(cls.NAME, 200).inc()
return result

return send_request

Expand Down
13 changes: 9 additions & 4 deletions synapse/storage/databases/main/client_ips.py
Original file line number Diff line number Diff line change
Expand Up @@ -538,15 +538,20 @@ async def get_last_client_ip_by_device(
"""
ret = await super().get_last_client_ip_by_device(user_id, device_id)

# Update what is retrieved from the database with data which is pending insertion.
# Update what is retrieved from the database with data which is pending
# insertion, as if it has already been stored in the database.
for key in self._batch_row_update:
uid, access_token, ip = key
uid, _access_token, ip = key
if uid == user_id:
user_agent, did, last_seen = self._batch_row_update[key]

if did is None:
# These updates don't make it to the `devices` table
continue

if not device_id or did == device_id:
ret[(user_id, device_id)] = {
ret[(user_id, did)] = {
"user_id": user_id,
"access_token": access_token,
"ip": ip,
"user_agent": user_agent,
"device_id": did,
Expand Down
24 changes: 22 additions & 2 deletions synapse/storage/databases/main/user_directory.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
cast,
)

from synapse.api.errors import StoreError

if TYPE_CHECKING:
from synapse.server import HomeServer

Expand Down Expand Up @@ -383,7 +385,19 @@ async def should_include_local_user_in_dir(self, user: str) -> bool:
"""Certain classes of local user are omitted from the user directory.
Is this user one of them?
"""
# App service users aren't usually contactable, so exclude them.
# We're opting to exclude the appservice sender (user defined by the
# `sender_localpart` in the appservice registration) even though
# technically it could be DM-able. In the future, this could potentially
# be configurable per-appservice whether the appservice sender can be
# contacted.
if self.get_app_service_by_user_id(user) is not None:
return False

# We're opting to exclude appservice users (anyone matching the user
# namespace regex in the appservice registration) even though technically
# they could be DM-able. In the future, this could potentially
# be configurable per-appservice whether the appservice users can be
# contacted.
if self.get_if_app_services_interested_in_user(user):
# TODO we might want to make this configurable for each app service
return False
Expand All @@ -393,8 +407,14 @@ async def should_include_local_user_in_dir(self, user: str) -> bool:
return False

# Deactivated users aren't contactable, so should not appear in the user directory.
if await self.get_user_deactivated_status(user):
try:
if await self.get_user_deactivated_status(user):
return False
except StoreError:
# No such user in the users table. No need to do this when calling
# is_support_user---that returns False if the user is missing.
return False

return True

async def is_room_world_readable_or_publicly_joinable(self, room_id: str) -> bool:
Expand Down
Loading