Skip to content
This repository has been archived by the owner on Jul 1, 2021. It is now read-only.

Let go of the event loop more often #1992

Merged
merged 8 commits into from
Aug 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ COPY . /usr/src/app
RUN apt-get update
RUN apt-get -y install libsnappy-dev

RUN pip install -e .[dev] --no-cache-dir
RUN pip install -U trinity --no-cache-dir
RUN pip install -e .[dev] --no-cache-dir --use-feature=2020-resolver
RUN pip install -U trinity --no-cache-dir --use-feature=2020-resolver

RUN echo "Type \`trinity\` to boot or \`trinity --help\` for an overview of commands"

Expand Down
2 changes: 1 addition & 1 deletion docker/beacon.Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ RUN apt-get update
RUN apt-get -y install libsnappy-dev gcc g++ cmake

RUN git clone https://github.com/ethereum/trinity.git .
RUN pip install -e .[eth2-dev] --no-cache-dir
RUN pip install -e .[eth2-dev] --no-cache-dir --use-feature=2020-resolver

RUN echo "Type \`trinity-beacon\` to boot or \`trinity-beacon --help\` for an overview of commands"

Expand Down
1 change: 1 addition & 0 deletions newsfragments/1992.performance.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
More cooperative asyncio coroutines in a few places.
8 changes: 5 additions & 3 deletions p2p/multiplexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -294,10 +294,12 @@ async def _stream_protocol_messages(self,
try:
# We use an optimistic strategy here of using
# `get_nowait()` to reduce the number of times we yield to
# the event loop. Since this is an async generator it will
# yield to the loop each time it returns a value so we
# don't have to worry about this blocking other processes.
# the event loop.
yield msg_queue.get_nowait()

# Manually release the event loop if it won't happen during
# the queue get().
await asyncio.sleep(0)
except asyncio.QueueEmpty:
yield await msg_queue.get()

Expand Down
1 change: 1 addition & 0 deletions p2p/peer.py
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,7 @@ async def _handle_subscriber_message(self,
for subscriber in self._subscribers:
self.logger.debug2("Adding %s msg to queue of %s", type(cmd), subscriber)
subscriber.add_msg(subscriber_msg)
await asyncio.sleep(0)

async def disconnect(self, reason: DisconnectReason) -> None:
"""
Expand Down
6 changes: 6 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@
"libp2p==0.1.5",
# The direct dependency resolves a version conflict between multiaddr and libp2p
"base58>=1.0.3,<2.0.0",
# Temporary patch to match a py-trie pin. After it's loosened in py-evm, drop the
# typing-extensions requirement altogether.
"typing-extensions==3.7.4.2",
],
'test': [
"async-timeout>=3.0.1,<4",
Expand Down Expand Up @@ -118,6 +121,9 @@
"asks>=2.3.6,<3", # validator client
"anyio>1.3,<1.4",
"eth-keyfile", # validator client
# Temporary patch to match a py-trie pin. After it's loosened in py-evm, drop the
# typing-extensions requirement altogether.
"typing-extensions==3.7.4.2",
],
'eth2-extra': [
"milagro-bls-binding==1.3.0",
Expand Down
4 changes: 3 additions & 1 deletion trinity/sync/beam/backfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -795,7 +795,9 @@ def _get_eligible_fog(self) -> fog.HexaryTrieFog:
Return the Trie Fog that can be searched, ignoring any nodes that are currently
being requested.
"""
return self._trie_fog.mark_all_complete(self._active_prefixes)
# Must pass in a copy of prefixes, so the set doesn't get modified while
# mark_all_complete is iterating over it.
return self._trie_fog.mark_all_complete(self._active_prefixes.copy())

def next_path_to_explore(self, starting_index: Nibbles) -> Nibbles:
return self._get_eligible_fog().nearest_unknown(starting_index)
Expand Down
20 changes: 17 additions & 3 deletions trinity/sync/beam/chain.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
AsyncIterator,
Iterable,
Sequence,
Set,
Tuple,
)

Expand All @@ -22,6 +23,7 @@
from eth.constants import GENESIS_PARENT_HASH
from eth.typing import BlockRange
from eth_typing import (
Address,
BlockNumber,
Hash32,
)
Expand Down Expand Up @@ -865,6 +867,15 @@ def _log_stats(self) -> None:
mean_stats,
)

def _extract_relevant_accounts(
self,
header: BlockHeaderAPI,
transactions: Tuple[SignedTransactionAPI, ...]) -> Set[Address]:

senders = [transaction.sender for transaction in transactions]
recipients = [transaction.to for transaction in transactions if transaction.to]
return set(senders + recipients + [header.coinbase])

async def _request_address_nodes(
self,
header: BlockHeaderAPI,
Expand All @@ -877,9 +888,12 @@ async def _request_address_nodes(
:param urgent: are these addresses needed immediately? If False, they should they queue
up behind the urgent trie nodes.
"""
senders = [transaction.sender for transaction in transactions]
recipients = [transaction.to for transaction in transactions if transaction.to]
addresses = set(senders + recipients + [header.coinbase])
addresses = await asyncio.get_event_loop().run_in_executor(
None,
self._extract_relevant_accounts,
header,
transactions,
)
collected_nodes = await self._state_downloader.download_accounts(
addresses,
parent_state_root,
Expand Down
60 changes: 43 additions & 17 deletions trinity/sync/beam/importer.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,15 @@ class BeamStats:
def num_nodes(self) -> int:
return self.num_account_nodes + self.num_bytecodes + self.num_storage_nodes

def __str__(self) -> str:
@property
def avg_rtt(self) -> float:
if self.num_nodes:
avg_rtt = self.data_pause_time / self.num_nodes
return self.data_pause_time / self.num_nodes
else:
avg_rtt = 0
return 0

def __str__(self) -> str:
avg_rtt = self.avg_rtt

wait_time = humanize_seconds(self.data_pause_time)

Expand Down Expand Up @@ -432,22 +436,44 @@ def transaction_applied_hook(
num_transactions = len(transactions)

now = time.monotonic()
if now - self.last_log_time > MIN_GAS_LOG_WAIT:
if urgent:
if urgent:
# The currently-importing block
if transaction_index == num_transactions - 1:
logger = self.logger.info
log_header = "Beamed"
elif now - self.last_log_time > MIN_GAS_LOG_WAIT:
logger = self.logger.info
log_header = "Beaming"
else:
logger = self.logger.debug

logger(
"Beaming: #%d txn %d/%d, gas: %s/%s (%.1f%%)",
base_header.block_number,
transaction_index + 1,
num_transactions,
f"{partial_header.gas_used:,d}",
f"{base_header.gas_used:,d}",
100 * partial_header.gas_used / base_header.gas_used,
)
self.last_log_time = now
# Logged an update too recently, skipping...
return
else:
# Don't log anything for preview executions, for now
return

beam_stats = self.get_beam_stats()
fraction_complete = partial_header.gas_used / base_header.gas_used
if fraction_complete:
total_est = beam_stats.data_pause_time / fraction_complete
est_time = humanize_seconds(total_est - beam_stats.data_pause_time)
else:
est_time = "?"

logger(
"%s: #%d txn %d/%d, rtt: %.3fs, wait: %s, nodes: %d, gas: %s/%s (%.1f%%) ETA %s",
log_header,
base_header.block_number,
transaction_index + 1,
num_transactions,
beam_stats.avg_rtt,
humanize_seconds(beam_stats.data_pause_time),
beam_stats.num_nodes,
f"{partial_header.gas_used:,d}",
f"{base_header.gas_used:,d}",
100 * fraction_complete,
est_time,
)
self.last_log_time = now

return PausingVM

Expand Down