Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Merge commit 'c978f6c45' into anoa/dinsic_release_1_21_x
Browse files Browse the repository at this point in the history
* commit 'c978f6c45':
  Convert federation client to async/await. (#7975)
  • Loading branch information
anoadragon453 committed Oct 16, 2020
2 parents 6d02e38 + c978f6c commit 4ae35a3
Show file tree
Hide file tree
Showing 18 changed files with 209 additions and 221 deletions.
1 change: 1 addition & 0 deletions changelog.d/7975.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Convert various parts of the codebase to async/await.
16 changes: 9 additions & 7 deletions contrib/cmdclient/console.py
Original file line number Diff line number Diff line change
Expand Up @@ -609,13 +609,15 @@ def do_stream(self, line):

@defer.inlineCallbacks
def _do_event_stream(self, timeout):
res = yield self.http_client.get_json(
self._url() + "/events",
{
"access_token": self._tok(),
"timeout": str(timeout),
"from": self.event_stream_token,
},
res = yield defer.ensureDeferred(
self.http_client.get_json(
self._url() + "/events",
{
"access_token": self._tok(),
"timeout": str(timeout),
"from": self.event_stream_token,
},
)
)
print(json.dumps(res, indent=4))

Expand Down
60 changes: 32 additions & 28 deletions synapse/crypto/keyring.py
Original file line number Diff line number Diff line change
Expand Up @@ -632,18 +632,20 @@ def get_server_verify_key_v2_indirect(self, keys_to_fetch, key_server):
)

try:
query_response = yield self.client.post_json(
destination=perspective_name,
path="/_matrix/key/v2/query",
data={
"server_keys": {
server_name: {
key_id: {"minimum_valid_until_ts": min_valid_ts}
for key_id, min_valid_ts in server_keys.items()
query_response = yield defer.ensureDeferred(
self.client.post_json(
destination=perspective_name,
path="/_matrix/key/v2/query",
data={
"server_keys": {
server_name: {
key_id: {"minimum_valid_until_ts": min_valid_ts}
for key_id, min_valid_ts in server_keys.items()
}
for server_name, server_keys in keys_to_fetch.items()
}
for server_name, server_keys in keys_to_fetch.items()
}
},
},
)
)
except (NotRetryingDestination, RequestSendFailed) as e:
# these both have str() representations which we can't really improve upon
Expand Down Expand Up @@ -792,23 +794,25 @@ def get_server_verify_key_v2_direct(self, server_name, key_ids):

time_now_ms = self.clock.time_msec()
try:
response = yield self.client.get_json(
destination=server_name,
path="/_matrix/key/v2/server/"
+ urllib.parse.quote(requested_key_id),
ignore_backoff=True,
# we only give the remote server 10s to respond. It should be an
# easy request to handle, so if it doesn't reply within 10s, it's
# probably not going to.
#
# Furthermore, when we are acting as a notary server, we cannot
# wait all day for all of the origin servers, as the requesting
# server will otherwise time out before we can respond.
#
# (Note that get_json may make 4 attempts, so this can still take
# almost 45 seconds to fetch the headers, plus up to another 60s to
# read the response).
timeout=10000,
response = yield defer.ensureDeferred(
self.client.get_json(
destination=server_name,
path="/_matrix/key/v2/server/"
+ urllib.parse.quote(requested_key_id),
ignore_backoff=True,
# we only give the remote server 10s to respond. It should be an
# easy request to handle, so if it doesn't reply within 10s, it's
# probably not going to.
#
# Furthermore, when we are acting as a notary server, we cannot
# wait all day for all of the origin servers, as the requesting
# server will otherwise time out before we can respond.
#
# (Note that get_json may make 4 attempts, so this can still take
# almost 45 seconds to fetch the headers, plus up to another 60s to
# read the response).
timeout=10000,
)
)
except (NotRetryingDestination, RequestSendFailed) as e:
# these both have str() representations which we can't really improve
Expand Down
8 changes: 4 additions & 4 deletions synapse/federation/federation_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ def make_query(
and try the request anyway.
Returns:
a Deferred which will eventually yield a JSON object from the
a Awaitable which will eventually yield a JSON object from the
response
"""
sent_queries_counter.labels(query_type).inc()
Expand All @@ -157,7 +157,7 @@ def query_client_keys(self, destination, content, timeout):
content (dict): The query content.
Returns:
a Deferred which will eventually yield a JSON object from the
an Awaitable which will eventually yield a JSON object from the
response
"""
sent_queries_counter.labels("client_device_keys").inc()
Expand All @@ -180,7 +180,7 @@ def claim_client_keys(self, destination, content, timeout):
content (dict): The query content.
Returns:
a Deferred which will eventually yield a JSON object from the
an Awaitable which will eventually yield a JSON object from the
response
"""
sent_queries_counter.labels("client_one_time_keys").inc()
Expand Down Expand Up @@ -900,7 +900,7 @@ def get_public_rooms(
party instance
Returns:
Deferred[Dict[str, Any]]: The response from the remote server, or None if
Awaitable[Dict[str, Any]]: The response from the remote server, or None if
`remote_server` is the same as the local server_name
Raises:
Expand Down
19 changes: 6 additions & 13 deletions synapse/federation/sender/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -288,8 +288,7 @@ def _send_pdu(self, pdu: EventBase, destinations: Iterable[str]) -> None:
for destination in destinations:
self._get_per_destination_queue(destination).send_pdu(pdu, order)

@defer.inlineCallbacks
def send_read_receipt(self, receipt: ReadReceipt):
async def send_read_receipt(self, receipt: ReadReceipt) -> None:
"""Send a RR to any other servers in the room
Args:
Expand Down Expand Up @@ -330,9 +329,7 @@ def send_read_receipt(self, receipt: ReadReceipt):
room_id = receipt.room_id

# Work out which remote servers should be poked and poke them.
domains = yield defer.ensureDeferred(
self.state.get_current_hosts_in_room(room_id)
)
domains = await self.state.get_current_hosts_in_room(room_id)
domains = [
d
for d in domains
Expand Down Expand Up @@ -387,8 +384,7 @@ def _flush_rrs_for_room(self, room_id: str) -> None:
queue.flush_read_receipts_for_room(room_id)

@preserve_fn # the caller should not yield on this
@defer.inlineCallbacks
def send_presence(self, states: List[UserPresenceState]):
async def send_presence(self, states: List[UserPresenceState]):
"""Send the new presence states to the appropriate destinations.
This actually queues up the presence states ready for sending and
Expand Down Expand Up @@ -423,7 +419,7 @@ def send_presence(self, states: List[UserPresenceState]):
if not states_map:
break

yield self._process_presence_inner(list(states_map.values()))
await self._process_presence_inner(list(states_map.values()))
except Exception:
logger.exception("Error sending presence states to servers")
finally:
Expand All @@ -450,14 +446,11 @@ def send_presence_to_destinations(
self._get_per_destination_queue(destination).send_presence(states)

@measure_func("txnqueue._process_presence")
@defer.inlineCallbacks
def _process_presence_inner(self, states: List[UserPresenceState]):
async def _process_presence_inner(self, states: List[UserPresenceState]):
"""Given a list of states populate self.pending_presence_by_dest and
poke to send a new transaction to each destination
"""
hosts_and_states = yield defer.ensureDeferred(
get_interested_remotes(self.store, states, self.state)
)
hosts_and_states = await get_interested_remotes(self.store, states, self.state)

for destinations, states in hosts_and_states:
for destination in destinations:
Expand Down
Loading

0 comments on commit 4ae35a3

Please sign in to comment.