diff --git a/packages/fetchai/connections/ledger/base.py b/packages/fetchai/connections/ledger/base.py index 63dfd19ad7..9824f99737 100644 --- a/packages/fetchai/connections/ledger/base.py +++ b/packages/fetchai/connections/ledger/base.py @@ -61,13 +61,19 @@ def __init__( self._api_configs = api_configs def api_config(self, ledger_id: str) -> Dict[str, str]: - """Get api config""" + """Get api config.""" config = {} # type: Dict[str, str] if self._api_configs is not None and ledger_id in self._api_configs: config = self._api_configs[ledger_id] return config - async def run_async(self, func: Callable[[Any], Task], *args): + async def run_async( + self, + func: Callable[[Any], Task], + api: LedgerApi, + message: Message, + dialogue: Dialogue, + ): """ Run a function in executor. @@ -76,10 +82,12 @@ async def run_async(self, func: Callable[[Any], Task], *args): :return: the return value of the function. """ try: - response = await self.loop.run_in_executor(self.executor, func, *args) + response = await self.loop.run_in_executor( + self.executor, func, api, message, dialogue + ) return response except Exception as e: # pylint: disable=broad-except - return self.get_error_message(e, *args) + return self.get_error_message(e, api, message, dialogue) def dispatch(self, envelope: Envelope) -> Task: """ @@ -106,7 +114,7 @@ def get_handler(self, performative: Any) -> Callable[[Any], Task]: :param performative: the message performative. :return: the method that will send the request. """ - handler = getattr(self, performative.value, lambda *args, **kwargs: None) + handler = getattr(self, performative.value, None) if handler is None: raise Exception("Performative not recognized.") return handler diff --git a/packages/fetchai/connections/ledger/connection.py b/packages/fetchai/connections/ledger/connection.py index e8914a824f..6ffcf8c3ad 100644 --- a/packages/fetchai/connections/ledger/connection.py +++ b/packages/fetchai/connections/ledger/connection.py @@ -47,10 +47,7 @@ class LedgerConnection(Connection): connection_id = CONNECTION_ID def __init__(self, **kwargs): - """ - Initialize a connection to interact with a ledger APIs. - - """ + """Initialize a connection to interact with a ledger APIs.""" super().__init__(**kwargs) self._ledger_dispatcher: Optional[LedgerApiRequestDispatcher] = None @@ -84,7 +81,7 @@ async def disconnect(self) -> None: """Tear down the connection.""" self.connection_status.is_connected = False for task in self.receiving_tasks: - if not task.cancelled(): + if not task.cancelled(): # pragma: nocover task.cancel() self._ledger_dispatcher = None self._contract_dispatcher = None @@ -129,7 +126,7 @@ async def receive(self, *args, **kwargs) -> Optional["Envelope"]: :return: the envelope received, or None. """ # if there are done tasks, return the result - if len(self.done_tasks) > 0: + if len(self.done_tasks) > 0: # pragma: nocover done_task = self.done_tasks.pop() return self._handle_done_task(done_task) diff --git a/packages/fetchai/connections/ledger/connection.yaml b/packages/fetchai/connections/ledger/connection.yaml index 45c7e9256d..d632ff983b 100644 --- a/packages/fetchai/connections/ledger/connection.yaml +++ b/packages/fetchai/connections/ledger/connection.yaml @@ -6,10 +6,10 @@ license: Apache-2.0 aea_version: '>=0.5.0, <0.6.0' fingerprint: __init__.py: QmZvYZ5ECcWwqiNGh8qNTg735wu51HqaLxTSifUxkQ4KGj - base.py: QmZecsNSNpct1Zrs7HsJPQJN2buKJCirz6Z7nYH2FQbJFH - connection.py: QmP6kzX6pnsT44tu3bH9PC486mxcTnZ8CR6SngqxtrjHnb - contract_dispatcher.py: QmPtV5PxCP3YCtyA4EeGijqgpNwqPp3xvNZvtvk1nkhRJk - ledger_dispatcher.py: QmUk2J1FokJR6iLQYfyZbSSvR5y5g3ozYq7H6yQcv7YqmJ + base.py: QmegtTTPWhrnpHNiFpSm3TMcH9MTQLaWDvogL9dVvKPwXR + connection.py: QmTPj9CGkDtPMT7bXXDQi3i8zoRvSJvPVr6fyK2giPjmW1 + contract_dispatcher.py: QmSkA75HLriYkKXd7wcFqchSkrQsP8RxHK1be5qtXTpgwz + ledger_dispatcher.py: QmaETup4DzFYVkembK2yZL6TfbNDL13fdr6i29CPubG3CN fingerprint_ignore_patterns: [] protocols: - fetchai/contract_api:0.1.0 diff --git a/packages/fetchai/connections/ledger/contract_dispatcher.py b/packages/fetchai/connections/ledger/contract_dispatcher.py index 40e477f765..aebdac3b85 100644 --- a/packages/fetchai/connections/ledger/contract_dispatcher.py +++ b/packages/fetchai/connections/ledger/contract_dispatcher.py @@ -55,7 +55,8 @@ def __init__(self, **kwargs) -> None: @staticmethod def role_from_first_message(message: Message) -> BaseDialogue.Role: - """Infer the role of the agent from an incoming/outgoing first message + """ + Infer the role of the agent from an incoming/outgoing first message. :param message: an incoming/outgoing first message :return: The role of the agent @@ -156,7 +157,7 @@ def get_state( ) response.counterparty = message.counterparty dialogue.update(response) - except Exception as e: # pylint: disable=broad-except + except Exception as e: # pylint: disable=broad-except # pragma: nocover response = self.get_error_message(e, api, message, dialogue) return response @@ -187,7 +188,7 @@ def get_deploy_transaction( ) response.counterparty = message.counterparty dialogue.update(response) - except Exception as e: # pylint: disable=broad-except + except Exception as e: # pylint: disable=broad-except # pragma: nocover response = self.get_error_message(e, api, message, dialogue) return response @@ -218,7 +219,7 @@ def get_raw_transaction( ) response.counterparty = message.counterparty dialogue.update(response) - except Exception as e: # pylint: disable=broad-except + except Exception as e: # pylint: disable=broad-except # pragma: nocover response = self.get_error_message(e, api, message, dialogue) return response @@ -249,6 +250,6 @@ def get_raw_message( ) response.counterparty = message.counterparty dialogue.update(response) - except Exception as e: # pylint: disable=broad-except + except Exception as e: # pylint: disable=broad-except # pragma: nocover response = self.get_error_message(e, api, message, dialogue) return response diff --git a/packages/fetchai/connections/ledger/ledger_dispatcher.py b/packages/fetchai/connections/ledger/ledger_dispatcher.py index e35d671658..a5b6566060 100644 --- a/packages/fetchai/connections/ledger/ledger_dispatcher.py +++ b/packages/fetchai/connections/ledger/ledger_dispatcher.py @@ -55,7 +55,8 @@ def __init__(self, **kwargs) -> None: @staticmethod def role_from_first_message(message: Message) -> BaseDialogue.Role: - """Infer the role of the agent from an incoming/outgoing first message + """ + Infer the role of the agent from an incoming/outgoing first message. :param message: an incoming/outgoing first message :return: The role of the agent @@ -211,18 +212,18 @@ def get_transaction_receipt( time.sleep(self.TIMEOUT) transaction = api.get_transaction(message.transaction_digest.body) attempts += 1 - if not is_settled: + if not is_settled: # pragma: nocover response = self.get_error_message( ValueError("Transaction not settled within timeout"), api, message, dialogue, ) - elif transaction_receipt is None: + elif transaction_receipt is None: # pragma: nocover response = self.get_error_message( ValueError("No transaction_receipt returned"), api, message, dialogue ) - elif transaction is None: + elif transaction is None: # pragma: nocover response = self.get_error_message( ValueError("No tx returned"), api, message, dialogue ) @@ -255,7 +256,7 @@ def send_signed_transaction( transaction_digest = api.send_signed_transaction( message.signed_transaction.body ) - if transaction_digest is None: + if transaction_digest is None: # pragma: nocover response = self.get_error_message( ValueError("No transaction_digest returned"), api, message, dialogue ) diff --git a/packages/hashes.csv b/packages/hashes.csv index 6b13e7a8ec..91d641df00 100644 --- a/packages/hashes.csv +++ b/packages/hashes.csv @@ -21,7 +21,7 @@ fetchai/agents/weather_station,QmfD44aXS4TmcZFMASb8vDxYK6eNFsQMkSTBmTdcqzGPhc fetchai/connections/gym,QmZNEJvgi9n5poswQrHav3fvSv5vA1nbxxkTzWENCoCdrc fetchai/connections/http_client,QmXQrA6gA4hMEMkMQsEp1MQwDEqRw5BnnqR4gCrP5xqVD2 fetchai/connections/http_server,QmPMSyX1iaWM7mWqFtW8LnSyR9r88RzYbGtyYmopT6tshC -fetchai/connections/ledger,QmezMgaJkk9wbQ4nzURERnNJdrzkQyvV5PiieH6uGbVzc3 +fetchai/connections/ledger,QmWDietq5YFkZAYpyyrmq7AFbTVN4skDWKsG4CXrc5uZ37 fetchai/connections/local,QmVcTEJxGbWbtXi2fLN5eJA6XuEAneaNd83UJPugrtb9xU fetchai/connections/oef,QmfHVVcwUb8SqGYHs51iH5ymK5xJvxCCtShEmc9cw4FNVZ fetchai/connections/p2p_client,QmbwCDuAB1eq6JikqeAAqpqjVhxevGNeWCLqRD67Uvqiaz diff --git a/tests/test_packages/test_connections/test_ledger/test_contract_api.py b/tests/test_packages/test_connections/test_ledger/test_contract_api.py index e3b3d212fa..9688b7b0a4 100644 --- a/tests/test_packages/test_connections/test_ledger/test_contract_api.py +++ b/tests/test_packages/test_connections/test_ledger/test_contract_api.py @@ -16,7 +16,6 @@ # limitations under the License. # # ------------------------------------------------------------------------------ - """This module contains the tests of the ledger API connection for the contract APIs.""" import asyncio from pathlib import Path @@ -24,7 +23,7 @@ import pytest -from aea.connections.base import Connection +from aea.connections.base import Connection, ConnectionStatus from aea.crypto.ethereum import EthereumCrypto from aea.crypto.fetchai import FetchAICrypto from aea.crypto.wallet import CryptoStore @@ -32,7 +31,10 @@ from aea.identity.base import Identity from aea.mail.base import Envelope -from packages.fetchai.connections.ledger.contract_dispatcher import ContractApiDialogues +from packages.fetchai.connections.ledger.contract_dispatcher import ( + ContractApiDialogues, + ContractApiRequestDispatcher, +) from packages.fetchai.protocols.contract_api import ContractApiMessage from tests.conftest import ETHEREUM_ADDRESS_ONE, ROOT_DIR @@ -40,6 +42,7 @@ @pytest.fixture() async def ledger_apis_connection(request): + """Create connection.""" identity = Identity("name", FetchAICrypto().address) crypto_store = CryptoStore() directory = Path(ROOT_DIR, "packages", "fetchai", "connections", "ledger") @@ -243,3 +246,43 @@ async def test_erc1155_get_state(erc1155_contract, ledger_apis_connection): result = response_message.state.body.get("balance", None) expected_result = {token_id: 0} assert result is not None and result == expected_result + + +@pytest.mark.asyncio +async def test_run_async(): + """Test run async error handled.""" + # for pydocstyle + def _raise(): + raise Exception("Expected") + + contract_api_dialogues = ContractApiDialogues() + message = ContractApiMessage( + performative=ContractApiMessage.Performative.GET_RAW_TRANSACTION, + dialogue_reference=contract_api_dialogues.new_self_initiated_dialogue_reference(), + ledger_id=EthereumCrypto.identifier, + contract_id="fetchai/erc1155:0.6.0", + contract_address="test addr", + callable="get_create_batch_transaction", + kwargs=ContractApiMessage.Kwargs( + { + "deployer_address": "test_addr", + "token_ids": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10], + } + ), + ) + message.counterparty = "test" + dialogue = contract_api_dialogues.update(message) + api = None + msg = await ContractApiRequestDispatcher(ConnectionStatus()).run_async( + _raise, api, message, dialogue + ) + assert msg.performative == ContractApiMessage.Performative.ERROR + + +@pytest.mark.asyncio +async def test_get_handler(): + """Test failed to get handler.""" + with pytest.raises(Exception, match="Performative not recognized."): + ContractApiRequestDispatcher(ConnectionStatus()).get_handler( + ContractApiMessage.Performative.ERROR + ) diff --git a/tests/test_packages/test_connections/test_ledger/test_ledger_api.py b/tests/test_packages/test_connections/test_ledger/test_ledger_api.py index fbdf5c623b..3958f80084 100644 --- a/tests/test_packages/test_connections/test_ledger/test_ledger_api.py +++ b/tests/test_packages/test_connections/test_ledger/test_ledger_api.py @@ -16,16 +16,17 @@ # limitations under the License. # # ------------------------------------------------------------------------------ - """This module contains the tests of the ledger API connection module.""" import asyncio import logging from pathlib import Path from typing import cast +from unittest.mock import Mock, patch import pytest -from aea.connections.base import Connection +from aea.configurations.base import ProtocolId +from aea.connections.base import Connection, ConnectionStatus from aea.crypto.cosmos import CosmosCrypto from aea.crypto.ethereum import EthereumApi, EthereumCrypto from aea.crypto.fetchai import FetchAICrypto @@ -40,9 +41,15 @@ from aea.identity.base import Identity from aea.mail.base import Envelope -from packages.fetchai.connections.ledger.ledger_dispatcher import LedgerApiDialogues +from packages.fetchai.connections.ledger.connection import LedgerConnection +from packages.fetchai.connections.ledger.contract_dispatcher import ContractApiDialogues +from packages.fetchai.connections.ledger.ledger_dispatcher import ( + LedgerApiDialogues, + LedgerApiRequestDispatcher, +) from packages.fetchai.protocols.ledger_api.message import LedgerApiMessage + from tests.conftest import ( COSMOS_ADDRESS_ONE, COSMOS_TESTNET_CONFIG, @@ -69,6 +76,7 @@ @pytest.fixture() async def ledger_apis_connection(request): + """Make a connection.""" identity = Identity("name", FetchAICrypto().address) crypto_store = CryptoStore() directory = Path(ROOT_DIR, "packages", "fetchai", "connections", "ledger") @@ -277,3 +285,97 @@ async def test_send_signed_transaction_ethereum(ledger_apis_connection: Connecti # is_settled = api.is_transaction_settled(tx_receipt) # await asyncio.sleep(4.0) # assert is_settled, "Transaction not settled." + + +@pytest.mark.asyncio +async def test_unsupported_protocol(ledger_apis_connection: LedgerConnection): + """Test fail on protocol not supported.""" + envelope = Envelope( + to=str(ledger_apis_connection.connection_id), + sender="test", + protocol_id=ProtocolId.from_str("author/package_name:0.1.0"), + message=b"message", + ) + with pytest.raises(ValueError): + ledger_apis_connection._schedule_request(envelope) + + +@pytest.mark.asyncio +async def test_new_message_wait_flag(ledger_apis_connection: LedgerConnection): + """Test wait for new message.""" + task = asyncio.ensure_future(ledger_apis_connection.receive()) + await asyncio.sleep(0.1) + assert not task.done() + task.cancel() + + +@pytest.mark.asyncio +async def test_no_balance(): + """Test no balance.""" + dispatcher = LedgerApiRequestDispatcher(ConnectionStatus()) + mock_api = Mock() + contract_api_dialogues = ContractApiDialogues() + message = LedgerApiMessage( + performative=LedgerApiMessage.Performative.GET_BALANCE, + dialogue_reference=contract_api_dialogues.new_self_initiated_dialogue_reference(), + ledger_id=EthereumCrypto.identifier, + address="test", + ) + message.counterparty = "test" + dialogue = contract_api_dialogues.update(message) + mock_api.get_balance.return_value = None + msg = dispatcher.get_balance(mock_api, message, dialogue) + + assert msg.performative == LedgerApiMessage.Performative.ERROR + + +@pytest.mark.asyncio +async def test_no_raw_tx(): + """Test no raw tx returned.""" + dispatcher = LedgerApiRequestDispatcher(ConnectionStatus()) + mock_api = Mock() + contract_api_dialogues = ContractApiDialogues() + message = LedgerApiMessage( + performative=LedgerApiMessage.Performative.GET_RAW_TRANSACTION, + dialogue_reference=contract_api_dialogues.new_self_initiated_dialogue_reference(), + terms=Terms( + ledger_id=EthereumCrypto.identifier, + sender_address="1111", + counterparty_address="22222", + amount_by_currency_id={"ETH": -1}, + quantities_by_good_id={"some_service_id": 1}, + is_sender_payable_tx_fee=True, + nonce="", + fee_by_currency_id={"ETH": 10}, + chain_id=3, + ), + ) + message.counterparty = "test" + dialogue = contract_api_dialogues.update(message) + mock_api.get_transfer_transaction.return_value = None + msg = dispatcher.get_raw_transaction(mock_api, message, dialogue) + + assert msg.performative == LedgerApiMessage.Performative.ERROR + + +@pytest.mark.asyncio +async def test_attempts_get_transaction_receipt(): + """Test retry and sleep.""" + dispatcher = LedgerApiRequestDispatcher(ConnectionStatus()) + dispatcher.connection_status.is_connected = True + mock_api = Mock() + contract_api_dialogues = ContractApiDialogues() + message = LedgerApiMessage( + performative=LedgerApiMessage.Performative.GET_TRANSACTION_RECEIPT, + dialogue_reference=contract_api_dialogues.new_self_initiated_dialogue_reference(), + transaction_digest=TransactionDigest("asdad", "sdfdsf"), + ) + message.counterparty = "test" + dialogue = contract_api_dialogues.update(message) + mock_api.get_transaction.return_value = None + mock_api.is_transaction_settled.return_value = True + with patch.object(dispatcher, "MAX_ATTEMPTS", 2): + with patch.object(dispatcher, "TIMEOUT", 0.001): + msg = dispatcher.get_transaction_receipt(mock_api, message, dialogue) + + assert msg.performative == LedgerApiMessage.Performative.ERROR