Skip to content

Commit

Permalink
Subscribe to address topics (#263)
Browse files Browse the repository at this point in the history
* Subscribe to address topics

* Guardian address provider

* Multiple dimensions of topics

* remove hardcoded endpoint
  • Loading branch information
hweawer authored Oct 9, 2024
1 parent 06de9c8 commit 1b26811
Show file tree
Hide file tree
Showing 6 changed files with 36 additions and 14 deletions.
1 change: 1 addition & 0 deletions src/bots/depositor.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ def __init__(
onchain_address=variables.ONCHAIN_TRANSPORT_ADDRESS,
message_schema=Schema(Or(DepositMessageSchema, PingMessageSchema)),
parsers_providers=[DepositParser, PingParser],
allowed_guardians_provider=self.w3.lido.deposit_security_module.get_guardians,
)
)

Expand Down
1 change: 1 addition & 0 deletions src/bots/pauser.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ def __init__(self, w3: Web3):
onchain_address=variables.ONCHAIN_TRANSPORT_ADDRESS,
message_schema=Schema(Or(PauseMessageSchema, PingMessageSchema)),
parsers_providers=[PauseV2Parser, PauseV3Parser, PingParser],
allowed_guardians_provider=self.w3.lido.deposit_security_module.get_guardians,
)
)

Expand Down
1 change: 1 addition & 0 deletions src/bots/unvetter.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ def prepare_transport_bus(self):
onchain_address=variables.ONCHAIN_TRANSPORT_ADDRESS,
message_schema=Schema(Or(UnvetMessageSchema, PingMessageSchema)),
parsers_providers=[UnvetParser, PingParser],
allowed_guardians_provider=self.w3.lido.deposit_security_module.get_guardians,
)
)

Expand Down
13 changes: 11 additions & 2 deletions src/transport/msg_providers/onchain_transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from typing import Callable, List, Optional

from eth_typing import ChecksumAddress
from eth_utils import to_bytes
from metrics.metrics import ONCHAIN_TRANSPORT_FETCHED_MESSAGES, ONCHAIN_TRANSPORT_PROCESSED_MESSAGES, ONCHAIN_TRANSPORT_VALID_MESSAGES
from prometheus_client import Gauge
from schema import Schema
Expand Down Expand Up @@ -214,6 +215,11 @@ def _create_message(self, parsed_data: tuple, guardian: str) -> dict:
)


def _32padding_address(address: ChecksumAddress) -> bytes:
address_bytes = to_bytes(hexstr=address)
return address_bytes.rjust(32, b'\0')


class OnchainTransportProvider(BaseMessageProvider):
STANDARD_OFFSET: int = 256

Expand All @@ -223,6 +229,7 @@ def __init__(
onchain_address: ChecksumAddress,
message_schema: Schema,
parsers_providers: list[Callable[[Web3], EventParser]],
allowed_guardians_provider: Callable[[], list[ChecksumAddress]],
):
super().__init__(message_schema)
self._onchain_address = onchain_address
Expand All @@ -234,20 +241,22 @@ def __init__(

self._w3 = w3
self._chain_id = self._w3.eth.chain_id
self._allowed_guardians_provider = allowed_guardians_provider
self._parsers: List[EventParser] = [provider(w3) for provider in parsers_providers]
self._topics = [self._w3.keccak(text=parser.message_abi) for parser in self._parsers]

def _fetch_messages(self) -> list:
latest_block_number = self._w3.eth.block_number
from_block = max(0, latest_block_number - self.STANDARD_OFFSET) if self._latest_block == -1 else self._latest_block
# If block distance is 0, then skip fetching to avoid looping on a single block
if from_block == latest_block_number:
return []
event_ids = [self._w3.keccak(text=parser.message_abi) for parser in self._parsers]
addresses_with_padding = [_32padding_address(address) for address in self._allowed_guardians_provider()]
filter_params = FilterParams(
fromBlock=from_block,
toBlock=latest_block_number,
address=self._onchain_address,
topics=[self._topics],
topics=[event_ids, addresses_with_padding],
)
try:
logs = self._w3.eth.get_logs(filter_params)
Expand Down
16 changes: 8 additions & 8 deletions tests/transport/onchain_sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def __init__(self, w3: Web3, data_bus_contract: DataBusContract):
self._data_bus = data_bus_contract

def send_deposit(self, deposit_mes: DepositMessage):
deposit_topic = self._w3.keccak(text=DepositParser.message_abi)
event_id = self._w3.keccak(text=DepositParser.message_abi)
deposit_root, nonce, block_number, block_hash, staking_module_id, app = (
deposit_mes['depositRoot'],
deposit_mes['nonce'],
Expand All @@ -39,11 +39,11 @@ def send_deposit(self, deposit_mes: DepositMessage):
types=[DepositParser.DEPOSIT_V1_DATA_SCHEMA],
args=[(block_number, block_hash, deposit_root, staking_module_id, nonce, self._DEFAULT_SIGNATURE, app)],
)
tx = self._data_bus.functions.sendMessage(deposit_topic, mes)
tx = self._data_bus.functions.sendMessage(event_id, mes)
return tx.transact()

def send_pause_v2(self, pause_mes: PauseMessage):
pause_topic = self._w3.keccak(text=PauseV2Parser.message_abi)
event_id = self._w3.keccak(text=PauseV2Parser.message_abi)
block_number, staking_module_id, app = (
pause_mes['blockNumber'],
pause_mes['stakingModuleId'],
Expand All @@ -53,7 +53,7 @@ def send_pause_v2(self, pause_mes: PauseMessage):
types=[PauseV2Parser.PAUSE_V2_DATA_SCHEMA],
args=[(block_number, self._DEFAULT_BLOCK_HASH, self._DEFAULT_SIGNATURE, staking_module_id, app)],
)
tx = self._data_bus.functions.sendMessage(pause_topic, mes)
tx = self._data_bus.functions.sendMessage(event_id, mes)
return tx.transact()

def send_pause_v3(self, pause_mes: PauseMessage):
Expand All @@ -66,7 +66,7 @@ def send_pause_v3(self, pause_mes: PauseMessage):
return tx.transact()

def send_unvet(self, unvet_mes: UnvetMessage):
unvet_topic = self._w3.keccak(text=UnvetParser.message_abi)
event_id = self._w3.keccak(text=UnvetParser.message_abi)
nonce, block_number, block_hash, staking_module_id, operator_ids, vetted_keys, version = (
unvet_mes['nonce'],
unvet_mes['blockNumber'],
Expand All @@ -80,12 +80,12 @@ def send_unvet(self, unvet_mes: UnvetMessage):
types=[UnvetParser.UNVET_V1_DATA_SCHEMA],
args=[(block_number, block_hash, staking_module_id, nonce, operator_ids, vetted_keys, self._DEFAULT_SIGNATURE, (version,))],
)
tx = self._data_bus.functions.sendMessage(unvet_topic, mes)
tx = self._data_bus.functions.sendMessage(event_id, mes)
return tx.transact()

def send_ping(self, ping_mes: PingMessage):
ping_topic = self._w3.keccak(text=PingParser.message_abi)
event_id = self._w3.keccak(text=PingParser.message_abi)
block_number, version = ping_mes['blockNumber'], (1).to_bytes(32)
mes = self._w3.codec.encode(types=[PingParser.PING_V1_DATA_SCHEMA], args=[(block_number, (version,))])
tx = self._data_bus.functions.sendMessage(ping_topic, mes)
tx = self._data_bus.functions.sendMessage(event_id, mes)
return tx.transact()
18 changes: 14 additions & 4 deletions tests/transport/test_data_bus.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

from tests.transport.onchain_sender import OnchainTransportSender

_DEFAULT_GUARDIAN = '0xf39Fd6e51aad88F6F4ce6aB8827279cffFb92266'


# Started with config: {
# NODE_HOST: 'http://127.0.0.1:8888',
Expand Down Expand Up @@ -80,14 +82,21 @@ def test_data_bus_provider(web3_transaction_integration):
onchain_address=variables.ONCHAIN_TRANSPORT_ADDRESS,
message_schema=Schema(Or(DepositMessageSchema, PingMessageSchema)),
parsers_providers=[DepositParser, PingParser],
allowed_guardians_provider=lambda: [Web3.to_checksum_address(_DEFAULT_GUARDIAN[:-1] + '7')],
)
messages = provider.get_messages()
assert not messages
provider = OnchainTransportProvider(
w3=web3_transaction_integration,
onchain_address=variables.ONCHAIN_TRANSPORT_ADDRESS,
message_schema=Schema(Or(DepositMessageSchema, PingMessageSchema)),
parsers_providers=[DepositParser, PingParser],
allowed_guardians_provider=lambda: [Web3.to_checksum_address(_DEFAULT_GUARDIAN)],
)
messages = provider.get_messages()
assert messages
for mes in messages:
print(mes)


_DEFAULT_GUARDIAN = '0xf39Fd6e51aad88F6F4ce6aB8827279cffFb92266'
assert messages


@pytest.mark.unit
Expand All @@ -103,6 +112,7 @@ def test_data_bus_mock_responses(web3_lido_unit):
onchain_address=variables.ONCHAIN_TRANSPORT_ADDRESS,
message_schema=Schema(Or(DepositMessageSchema, PingMessageSchema)),
parsers_providers=[DepositParser, PingParser],
allowed_guardians_provider=lambda: [Web3.to_checksum_address(_DEFAULT_GUARDIAN)],
)

for parser in provider._parsers:
Expand Down

0 comments on commit 1b26811

Please sign in to comment.