diff --git a/newsfragments/2960.docs.rst b/newsfragments/2960.docs.rst new file mode 100644 index 0000000000..97566c5761 --- /dev/null +++ b/newsfragments/2960.docs.rst @@ -0,0 +1 @@ +Completed docstrings for `ContractFunction` and `AsyncContractFunction` classes diff --git a/newsfragments/2960.misc.rst b/newsfragments/2960.misc.rst new file mode 100644 index 0000000000..0405ed3e8b --- /dev/null +++ b/newsfragments/2960.misc.rst @@ -0,0 +1 @@ +Put related contract classes in similar order in their respective files diff --git a/web3/contract/async_contract.py b/web3/contract/async_contract.py index 689249e9c1..3eb37be398 100644 --- a/web3/contract/async_contract.py +++ b/web3/contract/async_contract.py @@ -93,194 +93,131 @@ from web3 import AsyncWeb3 # noqa: F401 -class AsyncContractFunctions(BaseContractFunctions): - def __init__( +class AsyncContractEvent(BaseContractEvent): + # mypy types + w3: "AsyncWeb3" + + @combomethod + async def get_logs( self, - abi: ABI, - w3: "AsyncWeb3", - address: Optional[ChecksumAddress] = None, - decode_tuples: Optional[bool] = False, - ) -> None: - super().__init__(abi, w3, AsyncContractFunction, address, decode_tuples) + argument_filters: Optional[Dict[str, Any]] = None, + fromBlock: Optional[BlockIdentifier] = None, + toBlock: Optional[BlockIdentifier] = None, + block_hash: Optional[HexBytes] = None, + ) -> Awaitable[Iterable[EventData]]: + """Get events for this contract instance using eth_getLogs API. - def __getattr__(self, function_name: str) -> "AsyncContractFunction": - if self.abi is None: - raise NoABIFound( - "There is no ABI found for this contract.", - ) - if "_functions" not in self.__dict__: - raise NoABIFunctionsFound( - "The abi for this contract contains no function definitions. ", - "Are you sure you provided the correct contract abi?", - ) - elif function_name not in self.__dict__["_functions"]: - raise ABIFunctionNotFound( - f"The function '{function_name}' was not found in this contract's abi.", - " Are you sure you provided the correct contract abi?", - ) - else: - return super().__getattribute__(function_name) + This is a stateless method, as opposed to createFilter. + It can be safely called against nodes which do not provide + eth_newFilter API, like Infura nodes. + If there are many events, + like ``Transfer`` events for a popular token, + the Ethereum node might be overloaded and timeout + on the underlying JSON-RPC call. -class AsyncContractEvents(BaseContractEvents): - def __init__( - self, abi: ABI, w3: "AsyncWeb3", address: Optional[ChecksumAddress] = None - ) -> None: - super().__init__(abi, w3, AsyncContractEvent, address) + Example - how to get all ERC-20 token transactions + for the latest 10 blocks: + .. code-block:: python -class AsyncContract(BaseContract): - functions: AsyncContractFunctions = None - caller: "AsyncContractCaller" = None + from = max(mycontract.web3.eth.block_number - 10, 1) + to = mycontract.web3.eth.block_number - # mypy types - w3: "AsyncWeb3" + events = mycontract.events.Transfer.getLogs(fromBlock=from, toBlock=to) - #: Instance of :class:`ContractEvents` presenting available Event ABIs - events: AsyncContractEvents = None + for e in events: + print(e["args"]["from"], + e["args"]["to"], + e["args"]["value"]) - def __init__(self, address: Optional[ChecksumAddress] = None) -> None: - """Create a new smart contract proxy object. + The returned processed log values will look like: - :param address: Contract address as 0x hex string""" + .. code-block:: python - if self.w3 is None: - raise AttributeError( - "The `Contract` class has not been initialized. Please use the " - "`web3.contract` interface to create your contract class." + ( + AttributeDict({ + 'args': AttributeDict({}), + 'event': 'LogNoArguments', + 'logIndex': 0, + 'transactionIndex': 0, + 'transactionHash': HexBytes('...'), + 'address': '0xF2E246BB76DF876Cef8b38ae84130F4F55De395b', + 'blockHash': HexBytes('...'), + 'blockNumber': 3 + }), + AttributeDict(...), + ... ) - if address: - self.address = normalize_address_no_ens(address) + See also: :func:`web3.middleware.filter.local_filter_middleware`. - if not self.address: - raise TypeError( - "The address argument is required to instantiate a contract." + :param argument_filters: + :param fromBlock: block number or "latest", defaults to "latest" + :param toBlock: block number or "latest". Defaults to "latest" + :param blockHash: block hash. blockHash cannot be set at the + same time as fromBlock or toBlock + :yield: Tuple of :class:`AttributeDict` instances + """ + abi = self._get_event_abi() + # Call JSON-RPC API + logs = await self.w3.eth.get_logs( + self._get_event_filter_params( + abi, argument_filters, fromBlock, toBlock, block_hash ) - self.functions = AsyncContractFunctions( - self.abi, self.w3, self.address, decode_tuples=self.decode_tuples - ) - self.caller = AsyncContractCaller( - self.abi, self.w3, self.address, decode_tuples=self.decode_tuples - ) - self.events = AsyncContractEvents(self.abi, self.w3, self.address) - self.fallback = AsyncContract.get_fallback_function( - self.abi, self.w3, AsyncContractFunction, self.address - ) - self.receive = AsyncContract.get_receive_function( - self.abi, self.w3, AsyncContractFunction, self.address ) - @classmethod - def factory( - cls, w3: "AsyncWeb3", class_name: Optional[str] = None, **kwargs: Any - ) -> "AsyncContract": - kwargs["w3"] = w3 - - normalizers = { - "abi": normalize_abi, - "address": normalize_address_no_ens, - "bytecode": normalize_bytecode, - "bytecode_runtime": normalize_bytecode, - } - - contract = cast( - AsyncContract, - PropertyCheckingFactory( - class_name or cls.__name__, - (cls,), - kwargs, - normalizers=normalizers, - ), - ) - contract.functions = AsyncContractFunctions( - contract.abi, contract.w3, decode_tuples=contract.decode_tuples - ) - contract.caller = AsyncContractCaller( - contract.abi, - contract.w3, - contract.address, - decode_tuples=contract.decode_tuples, - ) - contract.events = AsyncContractEvents(contract.abi, contract.w3) - contract.fallback = AsyncContract.get_fallback_function( - contract.abi, - contract.w3, - AsyncContractFunction, - ) - contract.receive = AsyncContract.get_receive_function( - contract.abi, - contract.w3, - AsyncContractFunction, + # Convert raw binary data to Python proxy objects as described by ABI + return tuple( # type: ignore + get_event_data(self.w3.codec, abi, entry) for entry in logs ) - return contract - @classmethod - def constructor(cls, *args: Any, **kwargs: Any) -> "AsyncContractConstructor": + @combomethod + async def create_filter( + self, + *, # PEP 3102 + argument_filters: Optional[Dict[str, Any]] = None, + fromBlock: Optional[BlockIdentifier] = None, + toBlock: BlockIdentifier = "latest", + address: Optional[ChecksumAddress] = None, + topics: Optional[Sequence[Any]] = None, + ) -> AsyncLogFilter: """ - :param args: The contract constructor arguments as positional arguments - :param kwargs: The contract constructor arguments as keyword arguments - :return: a contract constructor object + Create filter object that tracks logs emitted by this contract event. """ - if cls.bytecode is None: - raise ValueError( - "Cannot call constructor on a contract that does not have " - "'bytecode' associated with it" - ) - - return AsyncContractConstructor(cls.w3, cls.abi, cls.bytecode, *args, **kwargs) - - @combomethod - def find_functions_by_identifier( - cls, - contract_abi: ABI, - w3: "AsyncWeb3", - address: ChecksumAddress, - callable_check: Callable[..., Any], - ) -> List["AsyncContractFunction"]: - return cast( - List[AsyncContractFunction], - find_functions_by_identifier( - contract_abi, w3, address, callable_check, AsyncContractFunction - ), + filter_builder = AsyncEventFilterBuilder(self._get_event_abi(), self.w3.codec) + self._set_up_filter_builder( + argument_filters, + fromBlock, + toBlock, + address, + topics, + filter_builder, ) + log_filter = await filter_builder.deploy(self.w3) + log_filter.log_entry_formatter = get_event_data( + self.w3.codec, self._get_event_abi() + ) + log_filter.builder = filter_builder - @combomethod - def get_function_by_identifier( - cls, fns: Sequence["AsyncContractFunction"], identifier: str - ) -> "AsyncContractFunction": - return get_function_by_identifier(fns, identifier) - - -class AsyncContractConstructor(BaseContractConstructor): - # mypy types - w3: "AsyncWeb3" - - @combomethod - async def transact(self, transaction: Optional[TxParams] = None) -> HexBytes: - return await self.w3.eth.send_transaction(self._get_transaction(transaction)) + return log_filter @combomethod - async def build_transaction( - self, transaction: Optional[TxParams] = None - ) -> TxParams: - """ - Build the transaction dictionary without sending - """ - built_transaction = self._build_transaction(transaction) - return await async_fill_transaction_defaults(self.w3, built_transaction) + def build_filter(self) -> AsyncEventFilterBuilder: + builder = AsyncEventFilterBuilder( + self._get_event_abi(), + self.w3.codec, + formatter=get_event_data(self.w3.codec, self._get_event_abi()), + ) + builder.address = self.address + return builder - @combomethod - async def estimate_gas( - self, - transaction: Optional[TxParams] = None, - block_identifier: Optional[BlockIdentifier] = None, - ) -> int: - transaction = self._estimate_gas(transaction) - return await self.w3.eth.estimate_gas( - transaction, block_identifier=block_identifier - ) +class AsyncContractEvents(BaseContractEvents): + def __init__( + self, abi: ABI, w3: "AsyncWeb3", address: Optional[ChecksumAddress] = None + ) -> None: + super().__init__(abi, w3, AsyncContractEvent, address) class AsyncContractFunction(BaseContractFunction): @@ -333,6 +270,9 @@ async def call( addr = contract.functions.owner().call() :param transaction: Dictionary of transaction info for web3 interface + :param block_identifier TODO + :param state_override TODO + :param ccip_read_enabled TODO :return: ``Caller`` object that has contract public functions and variables exposed as Python methods """ @@ -435,124 +375,156 @@ def get_receive_function( return cast(AsyncContractFunction, NonExistentReceiveFunction()) -class AsyncContractEvent(BaseContractEvent): - # mypy types - w3: "AsyncWeb3" - - @combomethod - async def get_logs( +class AsyncContractFunctions(BaseContractFunctions): + def __init__( self, - argument_filters: Optional[Dict[str, Any]] = None, - fromBlock: Optional[BlockIdentifier] = None, - toBlock: Optional[BlockIdentifier] = None, - block_hash: Optional[HexBytes] = None, - ) -> Awaitable[Iterable[EventData]]: - """Get events for this contract instance using eth_getLogs API. - - This is a stateless method, as opposed to createFilter. - It can be safely called against nodes which do not provide - eth_newFilter API, like Infura nodes. - - If there are many events, - like ``Transfer`` events for a popular token, - the Ethereum node might be overloaded and timeout - on the underlying JSON-RPC call. + abi: ABI, + w3: "AsyncWeb3", + address: Optional[ChecksumAddress] = None, + decode_tuples: Optional[bool] = False, + ) -> None: + super().__init__(abi, w3, AsyncContractFunction, address, decode_tuples) - Example - how to get all ERC-20 token transactions - for the latest 10 blocks: + def __getattr__(self, function_name: str) -> "AsyncContractFunction": + if self.abi is None: + raise NoABIFound( + "There is no ABI found for this contract.", + ) + if "_functions" not in self.__dict__: + raise NoABIFunctionsFound( + "The abi for this contract contains no function definitions. ", + "Are you sure you provided the correct contract abi?", + ) + elif function_name not in self.__dict__["_functions"]: + raise ABIFunctionNotFound( + f"The function '{function_name}' was not found in this contract's abi.", + " Are you sure you provided the correct contract abi?", + ) + else: + return super().__getattribute__(function_name) - .. code-block:: python - from = max(mycontract.web3.eth.block_number - 10, 1) - to = mycontract.web3.eth.block_number +class AsyncContract(BaseContract): + functions: AsyncContractFunctions = None + caller: "AsyncContractCaller" = None - events = mycontract.events.Transfer.getLogs(fromBlock=from, toBlock=to) + # mypy types + w3: "AsyncWeb3" - for e in events: - print(e["args"]["from"], - e["args"]["to"], - e["args"]["value"]) + #: Instance of :class:`ContractEvents` presenting available Event ABIs + events: AsyncContractEvents = None - The returned processed log values will look like: + def __init__(self, address: Optional[ChecksumAddress] = None) -> None: + """Create a new smart contract proxy object. - .. code-block:: python + :param address: Contract address as 0x hex string""" - ( - AttributeDict({ - 'args': AttributeDict({}), - 'event': 'LogNoArguments', - 'logIndex': 0, - 'transactionIndex': 0, - 'transactionHash': HexBytes('...'), - 'address': '0xF2E246BB76DF876Cef8b38ae84130F4F55De395b', - 'blockHash': HexBytes('...'), - 'blockNumber': 3 - }), - AttributeDict(...), - ... + if self.w3 is None: + raise AttributeError( + "The `Contract` class has not been initialized. Please use the " + "`web3.contract` interface to create your contract class." ) - See also: :func:`web3.middleware.filter.local_filter_middleware`. + if address: + self.address = normalize_address_no_ens(address) - :param argument_filters: - :param fromBlock: block number or "latest", defaults to "latest" - :param toBlock: block number or "latest". Defaults to "latest" - :param blockHash: block hash. blockHash cannot be set at the - same time as fromBlock or toBlock - :yield: Tuple of :class:`AttributeDict` instances - """ - abi = self._get_event_abi() - # Call JSON-RPC API - logs = await self.w3.eth.get_logs( - self._get_event_filter_params( - abi, argument_filters, fromBlock, toBlock, block_hash + if not self.address: + raise TypeError( + "The address argument is required to instantiate a contract." ) + self.functions = AsyncContractFunctions( + self.abi, self.w3, self.address, decode_tuples=self.decode_tuples + ) + self.caller = AsyncContractCaller( + self.abi, self.w3, self.address, decode_tuples=self.decode_tuples + ) + self.events = AsyncContractEvents(self.abi, self.w3, self.address) + self.fallback = AsyncContract.get_fallback_function( + self.abi, self.w3, AsyncContractFunction, self.address + ) + self.receive = AsyncContract.get_receive_function( + self.abi, self.w3, AsyncContractFunction, self.address ) - # Convert raw binary data to Python proxy objects as described by ABI - return tuple( # type: ignore - get_event_data(self.w3.codec, abi, entry) for entry in logs + @classmethod + def factory( + cls, w3: "AsyncWeb3", class_name: Optional[str] = None, **kwargs: Any + ) -> "AsyncContract": + kwargs["w3"] = w3 + + normalizers = { + "abi": normalize_abi, + "address": normalize_address_no_ens, + "bytecode": normalize_bytecode, + "bytecode_runtime": normalize_bytecode, + } + + contract = cast( + AsyncContract, + PropertyCheckingFactory( + class_name or cls.__name__, + (cls,), + kwargs, + normalizers=normalizers, + ), + ) + contract.functions = AsyncContractFunctions( + contract.abi, contract.w3, decode_tuples=contract.decode_tuples + ) + contract.caller = AsyncContractCaller( + contract.abi, + contract.w3, + contract.address, + decode_tuples=contract.decode_tuples, + ) + contract.events = AsyncContractEvents(contract.abi, contract.w3) + contract.fallback = AsyncContract.get_fallback_function( + contract.abi, + contract.w3, + AsyncContractFunction, + ) + contract.receive = AsyncContract.get_receive_function( + contract.abi, + contract.w3, + AsyncContractFunction, ) + return contract - @combomethod - async def create_filter( - self, - *, # PEP 3102 - argument_filters: Optional[Dict[str, Any]] = None, - fromBlock: Optional[BlockIdentifier] = None, - toBlock: BlockIdentifier = "latest", - address: Optional[ChecksumAddress] = None, - topics: Optional[Sequence[Any]] = None, - ) -> AsyncLogFilter: + @classmethod + def constructor(cls, *args: Any, **kwargs: Any) -> "AsyncContractConstructor": """ - Create filter object that tracks logs emitted by this contract event. + :param args: The contract constructor arguments as positional arguments + :param kwargs: The contract constructor arguments as keyword arguments + :return: a contract constructor object """ - filter_builder = AsyncEventFilterBuilder(self._get_event_abi(), self.w3.codec) - self._set_up_filter_builder( - argument_filters, - fromBlock, - toBlock, - address, - topics, - filter_builder, - ) - log_filter = await filter_builder.deploy(self.w3) - log_filter.log_entry_formatter = get_event_data( - self.w3.codec, self._get_event_abi() - ) - log_filter.builder = filter_builder + if cls.bytecode is None: + raise ValueError( + "Cannot call constructor on a contract that does not have " + "'bytecode' associated with it" + ) - return log_filter + return AsyncContractConstructor(cls.w3, cls.abi, cls.bytecode, *args, **kwargs) @combomethod - def build_filter(self) -> AsyncEventFilterBuilder: - builder = AsyncEventFilterBuilder( - self._get_event_abi(), - self.w3.codec, - formatter=get_event_data(self.w3.codec, self._get_event_abi()), + def find_functions_by_identifier( + cls, + contract_abi: ABI, + w3: "AsyncWeb3", + address: ChecksumAddress, + callable_check: Callable[..., Any], + ) -> List["AsyncContractFunction"]: + return cast( + List[AsyncContractFunction], + find_functions_by_identifier( + contract_abi, w3, address, callable_check, AsyncContractFunction + ), ) - builder.address = self.address - return builder + + @combomethod + def get_function_by_identifier( + cls, fns: Sequence["AsyncContractFunction"], identifier: str + ) -> "AsyncContractFunction": + return get_function_by_identifier(fns, identifier) class AsyncContractCaller(BaseContractCaller): @@ -619,3 +591,34 @@ def __call__( ccip_read_enabled=ccip_read_enabled, decode_tuples=self.decode_tuples, ) + + +class AsyncContractConstructor(BaseContractConstructor): + # mypy types + w3: "AsyncWeb3" + + @combomethod + async def transact(self, transaction: Optional[TxParams] = None) -> HexBytes: + return await self.w3.eth.send_transaction(self._get_transaction(transaction)) + + @combomethod + async def build_transaction( + self, transaction: Optional[TxParams] = None + ) -> TxParams: + """ + Build the transaction dictionary without sending + """ + built_transaction = self._build_transaction(transaction) + return await async_fill_transaction_defaults(self.w3, built_transaction) + + @combomethod + async def estimate_gas( + self, + transaction: Optional[TxParams] = None, + block_identifier: Optional[BlockIdentifier] = None, + ) -> int: + transaction = self._estimate_gas(transaction) + + return await self.w3.eth.estimate_gas( + transaction, block_identifier=block_identifier + ) diff --git a/web3/contract/base_contract.py b/web3/contract/base_contract.py index b74d93c335..a82344e2a3 100644 --- a/web3/contract/base_contract.py +++ b/web3/contract/base_contract.py @@ -125,350 +125,201 @@ from .contract import ContractFunction # noqa: F401 -class BaseContract: - """Base class for Contract proxy classes. - - First you need to create your Contract classes using - :meth:`web3.eth.Eth.contract` that takes compiled Solidity contract - ABI definitions as input. The created class object will be a subclass of - this base class. - - After you have your Contract proxy class created you can interact with - smart contracts - - * Create a Contract proxy object for an existing deployed smart contract by - its address using :meth:`__init__` +class BaseContractEvent: + """Base class for contract events - * Deploy a new smart contract using :py:meth:`Contract.constructor.transact()` + An event accessed via the api `contract.events.myEvents(*args, **kwargs)` + is a subclass of this class. """ - # set during class construction - w3: Union["Web3", "AsyncWeb3"] = None - - # instance level properties address: ChecksumAddress = None + event_name: str = None + w3: Union["Web3", "AsyncWeb3"] = None + contract_abi: ABI = None + abi: ABIEvent = None - # class properties (overridable at instance level) - abi: ABI = None - - asm = None - ast = None + def __init__(self, *argument_names: Tuple[str]) -> None: + if argument_names is None: + # https://github.com/python/mypy/issues/6283 + self.argument_names = tuple() # type: ignore + else: + self.argument_names = argument_names - bytecode = None - bytecode_runtime = None - clone_bin = None + self.abi = self._get_event_abi() - decode_tuples = None - dev_doc = None - interface = None - metadata = None - opcodes = None - src_map = None - src_map_runtime = None - user_doc = None + @classmethod + def _get_event_abi(cls) -> ABIEvent: + return find_matching_event_abi(cls.contract_abi, event_name=cls.event_name) - # Public API - # @combomethod - def encodeABI( - cls, - fn_name: str, - args: Optional[Any] = None, - kwargs: Optional[Any] = None, - data: Optional[HexStr] = None, - ) -> HexStr: - """ - Encodes the arguments using the Ethereum ABI for the contract function - that matches the given name and arguments.. - - :param data: defaults to function selector - """ - fn_abi, fn_selector, fn_arguments = get_function_info( - fn_name, - cls.w3.codec, - contract_abi=cls.abi, - args=args, - kwargs=kwargs, - ) + def process_receipt( + self, txn_receipt: TxReceipt, errors: EventLogErrorFlags = WARN + ) -> Iterable[EventData]: + return self._parse_logs(txn_receipt, errors) - if data is None: - data = fn_selector + @to_tuple + def _parse_logs( + self, txn_receipt: TxReceipt, errors: EventLogErrorFlags + ) -> Iterable[EventData]: + try: + errors.name + except AttributeError: + raise AttributeError( + f"Error flag must be one of: {EventLogErrorFlags.flag_options()}" + ) - return encode_abi(cls.w3, fn_abi, fn_arguments, data) + for log in txn_receipt["logs"]: + try: + rich_log = get_event_data(self.w3.codec, self.abi, log) + except (MismatchedABI, LogTopicError, InvalidEventABI, TypeError) as e: + if errors == DISCARD: + continue + elif errors == IGNORE: + # type ignores b/c rich_log set on 1092 conflicts with mutated types + new_log = MutableAttributeDict(log) # type: ignore + new_log["errors"] = e + rich_log = AttributeDict(new_log) # type: ignore + elif errors == STRICT: + raise e + else: + warnings.warn( + f"The log with transaction hash: {log['transactionHash']!r} " + f"and logIndex: {log['logIndex']} encountered the following " + f"error during processing: {type(e).__name__}({e}). It has " + "been discarded." + ) + continue + yield rich_log @combomethod - def all_functions( - self, - ) -> "BaseContractFunction": - return self.find_functions_by_identifier( - self.abi, self.w3, self.address, lambda _: True - ) + def process_log(self, log: HexStr) -> EventData: + return get_event_data(self.w3.codec, self.abi, log) @combomethod - def get_function_by_signature(self, signature: str) -> "BaseContractFunction": - if " " in signature: - raise ValueError( - "Function signature should not contain any spaces. " - f"Found spaces in input: {signature}" + def _get_event_filter_params( + self, + abi: ABIEvent, + argument_filters: Optional[Dict[str, Any]] = None, + fromBlock: Optional[BlockIdentifier] = None, + toBlock: Optional[BlockIdentifier] = None, + blockHash: Optional[HexBytes] = None, + ) -> FilterParams: + if not self.address: + raise TypeError( + "This method can be only called on " + "an instated contract with an address" ) - def callable_check(fn_abi: ABIFunction) -> bool: - return abi_to_signature(fn_abi) == signature + if argument_filters is None: + argument_filters = dict() - fns = self.find_functions_by_identifier( - self.abi, self.w3, self.address, callable_check - ) - return self.get_function_by_identifier(fns, "signature") + _filters = dict(**argument_filters) - @combomethod - def find_functions_by_name(self, fn_name: str) -> "BaseContractFunction": - def callable_check(fn_abi: ABIFunction) -> bool: - return fn_abi["name"] == fn_name + blkhash_set = blockHash is not None + blknum_set = fromBlock is not None or toBlock is not None + if blkhash_set and blknum_set: + raise Web3ValidationError( + "blockHash cannot be set at the same time as fromBlock or toBlock" + ) - return self.find_functions_by_identifier( - self.abi, self.w3, self.address, callable_check + # Construct JSON-RPC raw filter presentation based on human readable + # Python descriptions. Namely, convert event names to their keccak signatures + data_filter_set, event_filter_params = construct_event_filter_params( + abi, + self.w3.codec, + contract_address=self.address, + argument_filters=_filters, + fromBlock=fromBlock, + toBlock=toBlock, + address=self.address, ) - @combomethod - def get_function_by_name(self, fn_name: str) -> "BaseContractFunction": - fns = self.find_functions_by_name(fn_name) - return self.get_function_by_identifier(fns, "name") + if blockHash is not None: + event_filter_params["blockHash"] = blockHash - @combomethod - def get_function_by_selector( - self, selector: Union[bytes, int, HexStr] - ) -> "BaseContractFunction": - def callable_check(fn_abi: ABIFunction) -> bool: - # typed dict cannot be used w/ a normal Dict - # https://github.com/python/mypy/issues/4976 - return encode_hex(function_abi_to_4byte_selector(fn_abi)) == to_4byte_hex(selector) # type: ignore # noqa: E501 + return event_filter_params - fns = self.find_functions_by_identifier( - self.abi, self.w3, self.address, callable_check - ) - return self.get_function_by_identifier(fns, "selector") + @classmethod + def factory(cls, class_name: str, **kwargs: Any) -> PropertyCheckingFactory: + return PropertyCheckingFactory(class_name, (cls,), kwargs) - @combomethod - def decode_function_input( - self, data: HexStr - ) -> Tuple["BaseContractFunction", Dict[str, Any]]: - # type ignored b/c expects data arg to be HexBytes - data = HexBytes(data) # type: ignore - func = self.get_function_by_selector(data[:4]) - arguments = decode_transaction_data( - func.abi, data, normalizers=BASE_RETURN_NORMALIZERS - ) - return func, arguments + @staticmethod + def check_for_forbidden_api_filter_arguments( + event_abi: ABIEvent, _filters: Dict[str, Any] + ) -> None: + name_indexed_inputs = {_input["name"]: _input for _input in event_abi["inputs"]} + + for filter_name, filter_value in _filters.items(): + _input = name_indexed_inputs[filter_name] + if is_array_type(_input["type"]): + raise TypeError( + "createFilter no longer supports array type filter arguments. " + "see the build_filter method for filtering array type filters." + ) + if is_list_like(filter_value) and is_dynamic_sized_type(_input["type"]): + raise TypeError( + "createFilter no longer supports setting filter argument options " + "for dynamic sized types. See the build_filter method for setting " + "filters with the match_any method." + ) @combomethod - def find_functions_by_args(self, *args: Any) -> "BaseContractFunction": - def callable_check(fn_abi: ABIFunction) -> bool: - return check_if_arguments_can_be_encoded( - fn_abi, self.w3.codec, args=args, kwargs={} + def _set_up_filter_builder( + self, + argument_filters: Optional[Dict[str, Any]] = None, + fromBlock: Optional[BlockIdentifier] = None, + toBlock: BlockIdentifier = "latest", + address: Optional[ChecksumAddress] = None, + topics: Optional[Sequence[Any]] = None, + filter_builder: Union[EventFilterBuilder, AsyncEventFilterBuilder] = None, + ) -> None: + if fromBlock is None: + raise TypeError( + "Missing mandatory keyword argument to create_filter: fromBlock" ) - return self.find_functions_by_identifier( - self.abi, self.w3, self.address, callable_check - ) + if argument_filters is None: + argument_filters = dict() - @combomethod - def get_function_by_args(self, *args: Any) -> "BaseContractFunction": - fns = self.find_functions_by_args(*args) - return self.get_function_by_identifier(fns, "args") + _filters = dict(**argument_filters) - # - # Private Helpers - # - _return_data_normalizers: Tuple[Callable[..., Any], ...] = tuple() + event_abi = self._get_event_abi() - @classmethod - def _prepare_transaction( - cls, - fn_name: str, - fn_args: Optional[Any] = None, - fn_kwargs: Optional[Any] = None, - transaction: Optional[TxParams] = None, - ) -> TxParams: - return prepare_transaction( - cls.address, - cls.w3, - fn_identifier=fn_name, - contract_abi=cls.abi, - transaction=transaction, - fn_args=fn_args, - fn_kwargs=fn_kwargs, - ) - - @classmethod - def _find_matching_fn_abi( - cls, - fn_identifier: Optional[str] = None, - args: Optional[Any] = None, - kwargs: Optional[Any] = None, - ) -> ABIFunction: - return find_matching_fn_abi( - cls.abi, cls.w3.codec, fn_identifier=fn_identifier, args=args, kwargs=kwargs - ) - - @classmethod - def _find_matching_event_abi( - cls, - event_name: Optional[str] = None, - argument_names: Optional[Sequence[str]] = None, - ) -> ABIEvent: - return find_matching_event_abi( - abi=cls.abi, event_name=event_name, argument_names=argument_names - ) - - @combomethod - def _encode_constructor_data( - cls, args: Optional[Any] = None, kwargs: Optional[Any] = None - ) -> HexStr: - constructor_abi = get_constructor_abi(cls.abi) - - if constructor_abi: - if args is None: - args = tuple() - if kwargs is None: - kwargs = {} - - arguments = merge_args_and_kwargs(constructor_abi, args, kwargs) - - deploy_data = add_0x_prefix( - encode_abi(cls.w3, constructor_abi, arguments, data=cls.bytecode) - ) - else: - if args is not None or kwargs is not None: - msg = "Constructor args were provided, but no constructor function was provided." # noqa: E501 - raise TypeError(msg) - - deploy_data = to_hex(cls.bytecode) - - return deploy_data + self.check_for_forbidden_api_filter_arguments(event_abi, _filters) - @combomethod - def find_functions_by_identifier( - cls, - contract_abi: ABI, - w3: Union["Web3", "AsyncWeb3"], - address: ChecksumAddress, - callable_check: Callable[..., Any], - ) -> List[Any]: - raise NotImplementedError( - "This method should be implemented in the inherited class" + _, event_filter_params = construct_event_filter_params( + self._get_event_abi(), + self.w3.codec, + contract_address=self.address, + argument_filters=_filters, + fromBlock=fromBlock, + toBlock=toBlock, + address=address, + topics=topics, ) - @combomethod - def get_function_by_identifier( - cls, fns: Sequence["BaseContractFunction"], identifier: str - ) -> "BaseContractFunction": - raise NotImplementedError( - "This method should be implemented in the inherited class" + filter_builder.address = cast( + ChecksumAddress, event_filter_params.get("address") ) + filter_builder.fromBlock = event_filter_params.get("fromBlock") + filter_builder.toBlock = event_filter_params.get("toBlock") + match_any_vals = { + arg: value + for arg, value in _filters.items() + if not is_array_type(filter_builder.args[arg].arg_type) + and is_list_like(value) + } + for arg, value in match_any_vals.items(): + filter_builder.args[arg].match_any(*value) - @staticmethod - def get_fallback_function( - abi: ABI, - w3: Union["Web3", "AsyncWeb3"], - function_type: Type["BaseContractFunction"], - address: Optional[ChecksumAddress] = None, - ) -> "BaseContractFunction": - if abi and fallback_func_abi_exists(abi): - return function_type.factory( - "fallback", - w3=w3, - contract_abi=abi, - address=address, - function_identifier=FallbackFn, - )() - - return cast(function_type, NonExistentFallbackFunction()) # type: ignore - - @staticmethod - def get_receive_function( - abi: ABI, - w3: Union["Web3", "AsyncWeb3"], - function_type: Type["BaseContractFunction"], - address: Optional[ChecksumAddress] = None, - ) -> "BaseContractFunction": - if abi and receive_func_abi_exists(abi): - return function_type.factory( - "receive", - w3=w3, - contract_abi=abi, - address=address, - function_identifier=ReceiveFn, - )() - - return cast(function_type, NonExistentReceiveFunction()) # type: ignore - - -class NonExistentFallbackFunction: - @staticmethod - def _raise_exception() -> NoReturn: - raise FallbackNotFound("No fallback function was found in the contract ABI.") - - def __getattr__(self, attr: Any) -> Callable[[], None]: - return self._raise_exception - - -class NonExistentReceiveFunction: - @staticmethod - def _raise_exception() -> NoReturn: - raise FallbackNotFound("No receive function was found in the contract ABI.") - - def __getattr__(self, attr: Any) -> Callable[[], None]: - return self._raise_exception - - -class BaseContractFunctions: - """Class containing contract function objects""" - - def __init__( - self, - abi: ABI, - w3: Union["Web3", "AsyncWeb3"], - contract_function_class: Union[ - Type["ContractFunction"], Type["AsyncContractFunction"] - ], - address: Optional[ChecksumAddress] = None, - decode_tuples: Optional[bool] = False, - ) -> None: - self.abi = abi - self.w3 = w3 - self.address = address - - if self.abi: - self._functions = filter_by_type("function", self.abi) - for func in self._functions: - setattr( - self, - func["name"], - contract_function_class.factory( - func["name"], - w3=self.w3, - contract_abi=self.abi, - address=self.address, - decode_tuples=decode_tuples, - function_identifier=func["name"], - ), - ) - - def __iter__(self) -> Generator[str, None, None]: - if not hasattr(self, "_functions") or not self._functions: - return - - for func in self._functions: - yield func["name"] - - def __getitem__(self, function_name: str) -> ABIFunction: - return getattr(self, function_name) - - def __hasattr__(self, function_name: str) -> bool: - try: - return function_name in self.__dict__["_functions"] - except ABIFunctionNotFound: - return False + match_single_vals = { + arg: value + for arg, value in _filters.items() + if not is_array_type(filter_builder.args[arg].arg_type) + and not is_list_like(value) + } + for arg, value in match_single_vals.items(): + filter_builder.args[arg].match_single(value) class BaseContractEvents: @@ -547,120 +398,27 @@ def __hasattr__(self, event_name: str) -> bool: return False -class BaseContractConstructor: - """ - Class for contract constructor API. - """ +class BaseContractFunction: + """Base class for contract functions - def __init__( - self, - w3: Union["Web3", "AsyncWeb3"], - abi: ABI, - bytecode: HexStr, - *args: Any, - **kwargs: Any, - ) -> None: - self.w3 = w3 - self.abi = abi - self.bytecode = bytecode - self.data_in_transaction = self._encode_data_in_transaction(*args, **kwargs) + A function accessed via the api `contract.functions.myMethod(*args, **kwargs)` + is a subclass of this class. + """ - @combomethod - def _encode_data_in_transaction(self, *args: Any, **kwargs: Any) -> HexStr: - constructor_abi = get_constructor_abi(self.abi) + address: ChecksumAddress = None + function_identifier: FunctionIdentifier = None + w3: Union["Web3", "AsyncWeb3"] = None + contract_abi: ABI = None + abi: ABIFunction = None + transaction: TxParams = None + arguments: Tuple[Any, ...] = None + decode_tuples: Optional[bool] = False + args: Any = None + kwargs: Any = None - if constructor_abi: - if not args: - args = tuple() - if not kwargs: - kwargs = {} - - arguments = merge_args_and_kwargs(constructor_abi, args, kwargs) - data = add_0x_prefix( - encode_abi(self.w3, constructor_abi, arguments, data=self.bytecode) - ) - else: - data = to_hex(self.bytecode) - - return data - - @combomethod - def _estimate_gas(self, transaction: Optional[TxParams] = None) -> TxParams: - if transaction is None: - estimate_gas_transaction: TxParams = {} - else: - estimate_gas_transaction = cast(TxParams, dict(**transaction)) - self.check_forbidden_keys_in_transaction( - estimate_gas_transaction, ["data", "to"] - ) - - if self.w3.eth.default_account is not empty: - # type ignored b/c check prevents an empty default_account - estimate_gas_transaction.setdefault( - "from", self.w3.eth.default_account # type: ignore - ) - - estimate_gas_transaction["data"] = self.data_in_transaction - - return estimate_gas_transaction - - def _get_transaction(self, transaction: Optional[TxParams] = None) -> TxParams: - if transaction is None: - transact_transaction: TxParams = {} - else: - transact_transaction = cast(TxParams, dict(**transaction)) - self.check_forbidden_keys_in_transaction( - transact_transaction, ["data", "to"] - ) - - if self.w3.eth.default_account is not empty: - # type ignored b/c check prevents an empty default_account - transact_transaction.setdefault( - "from", self.w3.eth.default_account # type: ignore - ) - - transact_transaction["data"] = self.data_in_transaction - - return transact_transaction - - @combomethod - def _build_transaction(self, transaction: Optional[TxParams] = None) -> TxParams: - built_transaction = self._get_transaction(transaction) - built_transaction["to"] = Address(b"") - return built_transaction - - @staticmethod - def check_forbidden_keys_in_transaction( - transaction: TxParams, forbidden_keys: Optional[Collection[str]] = None - ) -> None: - keys_found = transaction.keys() & forbidden_keys - if keys_found: - raise ValueError( - f"Cannot set '{', '.join(keys_found)}' field(s) in transaction" - ) - - -class BaseContractFunction: - """Base class for contract functions - - A function accessed via the api `contract.functions.myMethod(*args, **kwargs)` - is a subclass of this class. - """ - - address: ChecksumAddress = None - function_identifier: FunctionIdentifier = None - w3: Union["Web3", "AsyncWeb3"] = None - contract_abi: ABI = None - abi: ABIFunction = None - transaction: TxParams = None - arguments: Tuple[Any, ...] = None - decode_tuples: Optional[bool] = False - args: Any = None - kwargs: Any = None - - def __init__(self, abi: Optional[ABIFunction] = None) -> None: - self.abi = abi - self.fn_name = type(self).__name__ + def __init__(self, abi: Optional[ABIFunction] = None) -> None: + self.abi = abi + self.fn_name = type(self).__name__ def _set_function_info(self) -> None: if not self.abi: @@ -822,201 +580,332 @@ def factory( return PropertyCheckingFactory(class_name, (cls,), kwargs)(kwargs.get("abi")) -class BaseContractEvent: - """Base class for contract events +class BaseContractFunctions: + """Class containing contract function objects""" - An event accessed via the api `contract.events.myEvents(*args, **kwargs)` - is a subclass of this class. + def __init__( + self, + abi: ABI, + w3: Union["Web3", "AsyncWeb3"], + contract_function_class: Union[ + Type["ContractFunction"], Type["AsyncContractFunction"] + ], + address: Optional[ChecksumAddress] = None, + decode_tuples: Optional[bool] = False, + ) -> None: + self.abi = abi + self.w3 = w3 + self.address = address + + if self.abi: + self._functions = filter_by_type("function", self.abi) + for func in self._functions: + setattr( + self, + func["name"], + contract_function_class.factory( + func["name"], + w3=self.w3, + contract_abi=self.abi, + address=self.address, + decode_tuples=decode_tuples, + function_identifier=func["name"], + ), + ) + + def __iter__(self) -> Generator[str, None, None]: + if not hasattr(self, "_functions") or not self._functions: + return + + for func in self._functions: + yield func["name"] + + def __getitem__(self, function_name: str) -> ABIFunction: + return getattr(self, function_name) + + def __hasattr__(self, function_name: str) -> bool: + try: + return function_name in self.__dict__["_functions"] + except ABIFunctionNotFound: + return False + + +class BaseContract: + """Base class for Contract proxy classes. + + First you need to create your Contract classes using + :meth:`web3.eth.Eth.contract` that takes compiled Solidity contract + ABI definitions as input. The created class object will be a subclass of + this base class. + + After you have your Contract proxy class created you can interact with + smart contracts + + * Create a Contract proxy object for an existing deployed smart contract by + its address using :meth:`__init__` + + * Deploy a new smart contract using :py:meth:`Contract.constructor.transact()` """ - address: ChecksumAddress = None - event_name: str = None + # set during class construction w3: Union["Web3", "AsyncWeb3"] = None - contract_abi: ABI = None - abi: ABIEvent = None - def __init__(self, *argument_names: Tuple[str]) -> None: - if argument_names is None: - # https://github.com/python/mypy/issues/6283 - self.argument_names = tuple() # type: ignore - else: - self.argument_names = argument_names + # instance level properties + address: ChecksumAddress = None - self.abi = self._get_event_abi() + # class properties (overridable at instance level) + abi: ABI = None - @classmethod - def _get_event_abi(cls) -> ABIEvent: - return find_matching_event_abi(cls.contract_abi, event_name=cls.event_name) + asm = None + ast = None + bytecode = None + bytecode_runtime = None + clone_bin = None + + decode_tuples = None + dev_doc = None + interface = None + metadata = None + opcodes = None + src_map = None + src_map_runtime = None + user_doc = None + + # Public API + # @combomethod - def process_receipt( - self, txn_receipt: TxReceipt, errors: EventLogErrorFlags = WARN - ) -> Iterable[EventData]: - return self._parse_logs(txn_receipt, errors) + def encodeABI( + cls, + fn_name: str, + args: Optional[Any] = None, + kwargs: Optional[Any] = None, + data: Optional[HexStr] = None, + ) -> HexStr: + """ + Encodes the arguments using the Ethereum ABI for the contract function + that matches the given name and arguments.. - @to_tuple - def _parse_logs( - self, txn_receipt: TxReceipt, errors: EventLogErrorFlags - ) -> Iterable[EventData]: - try: - errors.name - except AttributeError: - raise AttributeError( - f"Error flag must be one of: {EventLogErrorFlags.flag_options()}" - ) + :param data: defaults to function selector + """ + fn_abi, fn_selector, fn_arguments = get_function_info( + fn_name, + cls.w3.codec, + contract_abi=cls.abi, + args=args, + kwargs=kwargs, + ) - for log in txn_receipt["logs"]: - try: - rich_log = get_event_data(self.w3.codec, self.abi, log) - except (MismatchedABI, LogTopicError, InvalidEventABI, TypeError) as e: - if errors == DISCARD: - continue - elif errors == IGNORE: - # type ignores b/c rich_log set on 1092 conflicts with mutated types - new_log = MutableAttributeDict(log) # type: ignore - new_log["errors"] = e - rich_log = AttributeDict(new_log) # type: ignore - elif errors == STRICT: - raise e - else: - warnings.warn( - f"The log with transaction hash: {log['transactionHash']!r} " - f"and logIndex: {log['logIndex']} encountered the following " - f"error during processing: {type(e).__name__}({e}). It has " - "been discarded." - ) - continue - yield rich_log + if data is None: + data = fn_selector - @combomethod - def process_log(self, log: HexStr) -> EventData: - return get_event_data(self.w3.codec, self.abi, log) + return encode_abi(cls.w3, fn_abi, fn_arguments, data) @combomethod - def _get_event_filter_params( + def all_functions( self, - abi: ABIEvent, - argument_filters: Optional[Dict[str, Any]] = None, - fromBlock: Optional[BlockIdentifier] = None, - toBlock: Optional[BlockIdentifier] = None, - blockHash: Optional[HexBytes] = None, - ) -> FilterParams: - if not self.address: - raise TypeError( - "This method can be only called on " - "an instated contract with an address" + ) -> "BaseContractFunction": + return self.find_functions_by_identifier( + self.abi, self.w3, self.address, lambda _: True + ) + + @combomethod + def get_function_by_signature(self, signature: str) -> "BaseContractFunction": + if " " in signature: + raise ValueError( + "Function signature should not contain any spaces. " + f"Found spaces in input: {signature}" ) - if argument_filters is None: - argument_filters = dict() + def callable_check(fn_abi: ABIFunction) -> bool: + return abi_to_signature(fn_abi) == signature - _filters = dict(**argument_filters) + fns = self.find_functions_by_identifier( + self.abi, self.w3, self.address, callable_check + ) + return self.get_function_by_identifier(fns, "signature") - blkhash_set = blockHash is not None - blknum_set = fromBlock is not None or toBlock is not None - if blkhash_set and blknum_set: - raise Web3ValidationError( - "blockHash cannot be set at the same time as fromBlock or toBlock" + @combomethod + def find_functions_by_name(self, fn_name: str) -> "BaseContractFunction": + def callable_check(fn_abi: ABIFunction) -> bool: + return fn_abi["name"] == fn_name + + return self.find_functions_by_identifier( + self.abi, self.w3, self.address, callable_check + ) + + @combomethod + def get_function_by_name(self, fn_name: str) -> "BaseContractFunction": + fns = self.find_functions_by_name(fn_name) + return self.get_function_by_identifier(fns, "name") + + @combomethod + def get_function_by_selector( + self, selector: Union[bytes, int, HexStr] + ) -> "BaseContractFunction": + def callable_check(fn_abi: ABIFunction) -> bool: + # typed dict cannot be used w/ a normal Dict + # https://github.com/python/mypy/issues/4976 + return encode_hex(function_abi_to_4byte_selector(fn_abi)) == to_4byte_hex(selector) # type: ignore # noqa: E501 + + fns = self.find_functions_by_identifier( + self.abi, self.w3, self.address, callable_check + ) + return self.get_function_by_identifier(fns, "selector") + + @combomethod + def decode_function_input( + self, data: HexStr + ) -> Tuple["BaseContractFunction", Dict[str, Any]]: + # type ignored b/c expects data arg to be HexBytes + data = HexBytes(data) # type: ignore + func = self.get_function_by_selector(data[:4]) + arguments = decode_transaction_data( + func.abi, data, normalizers=BASE_RETURN_NORMALIZERS + ) + return func, arguments + + @combomethod + def find_functions_by_args(self, *args: Any) -> "BaseContractFunction": + def callable_check(fn_abi: ABIFunction) -> bool: + return check_if_arguments_can_be_encoded( + fn_abi, self.w3.codec, args=args, kwargs={} ) - # Construct JSON-RPC raw filter presentation based on human readable - # Python descriptions. Namely, convert event names to their keccak signatures - data_filter_set, event_filter_params = construct_event_filter_params( - abi, - self.w3.codec, - contract_address=self.address, - argument_filters=_filters, - fromBlock=fromBlock, - toBlock=toBlock, - address=self.address, + return self.find_functions_by_identifier( + self.abi, self.w3, self.address, callable_check + ) + + @combomethod + def get_function_by_args(self, *args: Any) -> "BaseContractFunction": + fns = self.find_functions_by_args(*args) + return self.get_function_by_identifier(fns, "args") + + # + # Private Helpers + # + _return_data_normalizers: Tuple[Callable[..., Any], ...] = tuple() + + @classmethod + def _prepare_transaction( + cls, + fn_name: str, + fn_args: Optional[Any] = None, + fn_kwargs: Optional[Any] = None, + transaction: Optional[TxParams] = None, + ) -> TxParams: + return prepare_transaction( + cls.address, + cls.w3, + fn_identifier=fn_name, + contract_abi=cls.abi, + transaction=transaction, + fn_args=fn_args, + fn_kwargs=fn_kwargs, + ) + + @classmethod + def _find_matching_fn_abi( + cls, + fn_identifier: Optional[str] = None, + args: Optional[Any] = None, + kwargs: Optional[Any] = None, + ) -> ABIFunction: + return find_matching_fn_abi( + cls.abi, cls.w3.codec, fn_identifier=fn_identifier, args=args, kwargs=kwargs + ) + + @classmethod + def _find_matching_event_abi( + cls, + event_name: Optional[str] = None, + argument_names: Optional[Sequence[str]] = None, + ) -> ABIEvent: + return find_matching_event_abi( + abi=cls.abi, event_name=event_name, argument_names=argument_names ) - if blockHash is not None: - event_filter_params["blockHash"] = blockHash - - return event_filter_params - - @classmethod - def factory(cls, class_name: str, **kwargs: Any) -> PropertyCheckingFactory: - return PropertyCheckingFactory(class_name, (cls,), kwargs) + @combomethod + def _encode_constructor_data( + cls, args: Optional[Any] = None, kwargs: Optional[Any] = None + ) -> HexStr: + constructor_abi = get_constructor_abi(cls.abi) - @staticmethod - def check_for_forbidden_api_filter_arguments( - event_abi: ABIEvent, _filters: Dict[str, Any] - ) -> None: - name_indexed_inputs = {_input["name"]: _input for _input in event_abi["inputs"]} + if constructor_abi: + if args is None: + args = tuple() + if kwargs is None: + kwargs = {} - for filter_name, filter_value in _filters.items(): - _input = name_indexed_inputs[filter_name] - if is_array_type(_input["type"]): - raise TypeError( - "createFilter no longer supports array type filter arguments. " - "see the build_filter method for filtering array type filters." - ) - if is_list_like(filter_value) and is_dynamic_sized_type(_input["type"]): - raise TypeError( - "createFilter no longer supports setting filter argument options " - "for dynamic sized types. See the build_filter method for setting " - "filters with the match_any method." - ) + arguments = merge_args_and_kwargs(constructor_abi, args, kwargs) - @combomethod - def _set_up_filter_builder( - self, - argument_filters: Optional[Dict[str, Any]] = None, - fromBlock: Optional[BlockIdentifier] = None, - toBlock: BlockIdentifier = "latest", - address: Optional[ChecksumAddress] = None, - topics: Optional[Sequence[Any]] = None, - filter_builder: Union[EventFilterBuilder, AsyncEventFilterBuilder] = None, - ) -> None: - if fromBlock is None: - raise TypeError( - "Missing mandatory keyword argument to create_filter: fromBlock" + deploy_data = add_0x_prefix( + encode_abi(cls.w3, constructor_abi, arguments, data=cls.bytecode) ) + else: + if args is not None or kwargs is not None: + msg = "Constructor args were provided, but no constructor function was provided." # noqa: E501 + raise TypeError(msg) - if argument_filters is None: - argument_filters = dict() - - _filters = dict(**argument_filters) - - event_abi = self._get_event_abi() + deploy_data = to_hex(cls.bytecode) - self.check_for_forbidden_api_filter_arguments(event_abi, _filters) + return deploy_data - _, event_filter_params = construct_event_filter_params( - self._get_event_abi(), - self.w3.codec, - contract_address=self.address, - argument_filters=_filters, - fromBlock=fromBlock, - toBlock=toBlock, - address=address, - topics=topics, + @combomethod + def find_functions_by_identifier( + cls, + contract_abi: ABI, + w3: Union["Web3", "AsyncWeb3"], + address: ChecksumAddress, + callable_check: Callable[..., Any], + ) -> List[Any]: + raise NotImplementedError( + "This method should be implemented in the inherited class" ) - filter_builder.address = cast( - ChecksumAddress, event_filter_params.get("address") + @combomethod + def get_function_by_identifier( + cls, fns: Sequence["BaseContractFunction"], identifier: str + ) -> "BaseContractFunction": + raise NotImplementedError( + "This method should be implemented in the inherited class" ) - filter_builder.fromBlock = event_filter_params.get("fromBlock") - filter_builder.toBlock = event_filter_params.get("toBlock") - match_any_vals = { - arg: value - for arg, value in _filters.items() - if not is_array_type(filter_builder.args[arg].arg_type) - and is_list_like(value) - } - for arg, value in match_any_vals.items(): - filter_builder.args[arg].match_any(*value) - match_single_vals = { - arg: value - for arg, value in _filters.items() - if not is_array_type(filter_builder.args[arg].arg_type) - and not is_list_like(value) - } - for arg, value in match_single_vals.items(): - filter_builder.args[arg].match_single(value) + @staticmethod + def get_fallback_function( + abi: ABI, + w3: Union["Web3", "AsyncWeb3"], + function_type: Type["BaseContractFunction"], + address: Optional[ChecksumAddress] = None, + ) -> "BaseContractFunction": + if abi and fallback_func_abi_exists(abi): + return function_type.factory( + "fallback", + w3=w3, + contract_abi=abi, + address=address, + function_identifier=FallbackFn, + )() + + return cast(function_type, NonExistentFallbackFunction()) # type: ignore + + @staticmethod + def get_receive_function( + abi: ABI, + w3: Union["Web3", "AsyncWeb3"], + function_type: Type["BaseContractFunction"], + address: Optional[ChecksumAddress] = None, + ) -> "BaseContractFunction": + if abi and receive_func_abi_exists(abi): + return function_type.factory( + "receive", + w3=w3, + contract_abi=abi, + address=address, + function_identifier=ReceiveFn, + )() + + return cast(function_type, NonExistentReceiveFunction()) # type: ignore class BaseContractCaller: @@ -1101,3 +990,114 @@ def call_function( block_identifier=block_identifier, ccip_read_enabled=ccip_read_enabled, ) + + +class BaseContractConstructor: + """ + Class for contract constructor API. + """ + + def __init__( + self, + w3: Union["Web3", "AsyncWeb3"], + abi: ABI, + bytecode: HexStr, + *args: Any, + **kwargs: Any, + ) -> None: + self.w3 = w3 + self.abi = abi + self.bytecode = bytecode + self.data_in_transaction = self._encode_data_in_transaction(*args, **kwargs) + + @combomethod + def _encode_data_in_transaction(self, *args: Any, **kwargs: Any) -> HexStr: + constructor_abi = get_constructor_abi(self.abi) + + if constructor_abi: + if not args: + args = tuple() + if not kwargs: + kwargs = {} + + arguments = merge_args_and_kwargs(constructor_abi, args, kwargs) + data = add_0x_prefix( + encode_abi(self.w3, constructor_abi, arguments, data=self.bytecode) + ) + else: + data = to_hex(self.bytecode) + + return data + + @combomethod + def _estimate_gas(self, transaction: Optional[TxParams] = None) -> TxParams: + if transaction is None: + estimate_gas_transaction: TxParams = {} + else: + estimate_gas_transaction = cast(TxParams, dict(**transaction)) + self.check_forbidden_keys_in_transaction( + estimate_gas_transaction, ["data", "to"] + ) + + if self.w3.eth.default_account is not empty: + # type ignored b/c check prevents an empty default_account + estimate_gas_transaction.setdefault( + "from", self.w3.eth.default_account # type: ignore + ) + + estimate_gas_transaction["data"] = self.data_in_transaction + + return estimate_gas_transaction + + def _get_transaction(self, transaction: Optional[TxParams] = None) -> TxParams: + if transaction is None: + transact_transaction: TxParams = {} + else: + transact_transaction = cast(TxParams, dict(**transaction)) + self.check_forbidden_keys_in_transaction( + transact_transaction, ["data", "to"] + ) + + if self.w3.eth.default_account is not empty: + # type ignored b/c check prevents an empty default_account + transact_transaction.setdefault( + "from", self.w3.eth.default_account # type: ignore + ) + + transact_transaction["data"] = self.data_in_transaction + + return transact_transaction + + @combomethod + def _build_transaction(self, transaction: Optional[TxParams] = None) -> TxParams: + built_transaction = self._get_transaction(transaction) + built_transaction["to"] = Address(b"") + return built_transaction + + @staticmethod + def check_forbidden_keys_in_transaction( + transaction: TxParams, forbidden_keys: Optional[Collection[str]] = None + ) -> None: + keys_found = transaction.keys() & forbidden_keys + if keys_found: + raise ValueError( + f"Cannot set '{', '.join(keys_found)}' field(s) in transaction" + ) + + +class NonExistentFallbackFunction: + @staticmethod + def _raise_exception() -> NoReturn: + raise FallbackNotFound("No fallback function was found in the contract ABI.") + + def __getattr__(self, attr: Any) -> Callable[[], None]: + return self._raise_exception + + +class NonExistentReceiveFunction: + @staticmethod + def _raise_exception() -> NoReturn: + raise FallbackNotFound("No receive function was found in the contract ABI.") + + def __getattr__(self, attr: Any) -> Callable[[], None]: + return self._raise_exception diff --git a/web3/contract/contract.py b/web3/contract/contract.py index 904c4a4ec3..abb2a0ddc6 100644 --- a/web3/contract/contract.py +++ b/web3/contract/contract.py @@ -91,42 +91,6 @@ from web3 import Web3 # noqa: F401 -class ContractFunctions(BaseContractFunctions): - def __init__( - self, - abi: ABI, - w3: "Web3", - address: Optional[ChecksumAddress] = None, - decode_tuples: Optional[bool] = False, - ) -> None: - super().__init__(abi, w3, ContractFunction, address, decode_tuples) - - def __getattr__(self, function_name: str) -> "ContractFunction": - if self.abi is None: - raise NoABIFound( - "There is no ABI found for this contract.", - ) - if "_functions" not in self.__dict__: - raise NoABIFunctionsFound( - "The abi for this contract contains no function definitions. ", - "Are you sure you provided the correct contract abi?", - ) - elif function_name not in self.__dict__["_functions"]: - raise ABIFunctionNotFound( - f"The function '{function_name}' was not found in this contract's abi.", - " Are you sure you provided the correct contract abi?", - ) - else: - return super().__getattribute__(function_name) - - -class ContractEvents(BaseContractEvents): - def __init__( - self, abi: ABI, w3: "Web3", address: Optional[ChecksumAddress] = None - ) -> None: - super().__init__(abi, w3, ContractEvent, address) - - class ContractEvent(BaseContractEvent): # mypy types w3: "Web3" @@ -245,160 +209,11 @@ def build_filter(self) -> EventFilterBuilder: return builder -class Contract(BaseContract): - # mypy types - w3: "Web3" - functions: ContractFunctions = None - caller: "ContractCaller" = None - - # Instance of :class:`ContractEvents` presenting available Event ABIs - events: ContractEvents = None - - def __init__(self, address: Optional[ChecksumAddress] = None) -> None: - """Create a new smart contract proxy object. - :param address: Contract address as 0x hex string""" - _w3 = self.w3 - if _w3 is None: - raise AttributeError( - "The `Contract` class has not been initialized. Please use the " - "`web3.contract` interface to create your contract class." - ) - - if address: - self.address = normalize_address(cast("ENS", _w3.ens), address) - - if not self.address: - raise TypeError( - "The address argument is required to instantiate a contract." - ) - - self.functions = ContractFunctions( - self.abi, _w3, self.address, decode_tuples=self.decode_tuples - ) - self.caller = ContractCaller( - self.abi, _w3, self.address, decode_tuples=self.decode_tuples - ) - self.events = ContractEvents(self.abi, _w3, self.address) - self.fallback = Contract.get_fallback_function( - self.abi, - _w3, - ContractFunction, - self.address, - ) - self.receive = Contract.get_receive_function( - self.abi, - _w3, - ContractFunction, - self.address, - ) - - @classmethod - def factory( - cls, w3: "Web3", class_name: Optional[str] = None, **kwargs: Any - ) -> "Contract": - kwargs["w3"] = w3 - - normalizers = { - "abi": normalize_abi, - "address": partial(normalize_address, w3.ens), - "bytecode": normalize_bytecode, - "bytecode_runtime": normalize_bytecode, - } - - contract = cast( - Contract, - PropertyCheckingFactory( - class_name or cls.__name__, - (cls,), - kwargs, - normalizers=normalizers, - ), - ) - contract.functions = ContractFunctions( - contract.abi, contract.w3, decode_tuples=contract.decode_tuples - ) - contract.caller = ContractCaller( - contract.abi, - contract.w3, - contract.address, - decode_tuples=contract.decode_tuples, - ) - contract.events = ContractEvents(contract.abi, contract.w3) - contract.fallback = Contract.get_fallback_function( - contract.abi, - contract.w3, - ContractFunction, - ) - contract.receive = Contract.get_receive_function( - contract.abi, - contract.w3, - ContractFunction, - ) - - return contract - - @classmethod - def constructor(cls, *args: Any, **kwargs: Any) -> "ContractConstructor": - """ - :param args: The contract constructor arguments as positional arguments - :param kwargs: The contract constructor arguments as keyword arguments - :return: a contract constructor object - """ - if cls.bytecode is None: - raise ValueError( - "Cannot call constructor on a contract that does not have " - "'bytecode' associated with it" - ) - - return ContractConstructor(cls.w3, cls.abi, cls.bytecode, *args, **kwargs) - - @combomethod - def find_functions_by_identifier( - cls, - contract_abi: ABI, - w3: "Web3", - address: ChecksumAddress, - callable_check: Callable[..., Any], - ) -> List["ContractFunction"]: - return cast( - List["ContractFunction"], - find_functions_by_identifier( - contract_abi, w3, address, callable_check, ContractFunction - ), - ) - - @combomethod - def get_function_by_identifier( - cls, fns: Sequence["ContractFunction"], identifier: str - ) -> "ContractFunction": - return get_function_by_identifier(fns, identifier) - - -class ContractConstructor(BaseContractConstructor): - # mypy types - w3: "Web3" - - @combomethod - def transact(self, transaction: Optional[TxParams] = None) -> HexBytes: - return self.w3.eth.send_transaction(self._get_transaction(transaction)) - - @combomethod - def build_transaction(self, transaction: Optional[TxParams] = None) -> TxParams: - """ - Build the transaction dictionary without sending - """ - built_transaction = self._build_transaction(transaction) - return fill_transaction_defaults(self.w3, built_transaction) - - @combomethod - def estimate_gas( - self, - transaction: Optional[TxParams] = None, - block_identifier: Optional[BlockIdentifier] = None, - ) -> int: - transaction = self._estimate_gas(transaction) - - return self.w3.eth.estimate_gas(transaction, block_identifier=block_identifier) +class ContractEvents(BaseContractEvents): + def __init__( + self, abi: ABI, w3: "Web3", address: Optional[ChecksumAddress] = None + ) -> None: + super().__init__(abi, w3, ContractEvent, address) class ContractFunction(BaseContractFunction): @@ -451,6 +266,9 @@ def call( addr = contract.functions.owner().call() :param transaction: Dictionary of transaction info for web3 interface + :param block_identifier: TODO + :param state_override TODO + :param ccip_read_enabled TODO :return: ``Caller`` object that has contract public functions and variables exposed as Python methods """ @@ -551,6 +369,164 @@ def get_receive_function( return cast(ContractFunction, NonExistentReceiveFunction()) +class ContractFunctions(BaseContractFunctions): + def __init__( + self, + abi: ABI, + w3: "Web3", + address: Optional[ChecksumAddress] = None, + decode_tuples: Optional[bool] = False, + ) -> None: + super().__init__(abi, w3, ContractFunction, address, decode_tuples) + + def __getattr__(self, function_name: str) -> "ContractFunction": + if self.abi is None: + raise NoABIFound( + "There is no ABI found for this contract.", + ) + if "_functions" not in self.__dict__: + raise NoABIFunctionsFound( + "The abi for this contract contains no function definitions. ", + "Are you sure you provided the correct contract abi?", + ) + elif function_name not in self.__dict__["_functions"]: + raise ABIFunctionNotFound( + f"The function '{function_name}' was not found in this contract's abi.", + " Are you sure you provided the correct contract abi?", + ) + else: + return super().__getattribute__(function_name) + + +class Contract(BaseContract): + # mypy types + w3: "Web3" + functions: ContractFunctions = None + caller: "ContractCaller" = None + + # Instance of :class:`ContractEvents` presenting available Event ABIs + events: ContractEvents = None + + def __init__(self, address: Optional[ChecksumAddress] = None) -> None: + """Create a new smart contract proxy object. + :param address: Contract address as 0x hex string""" + _w3 = self.w3 + if _w3 is None: + raise AttributeError( + "The `Contract` class has not been initialized. Please use the " + "`web3.contract` interface to create your contract class." + ) + + if address: + self.address = normalize_address(cast("ENS", _w3.ens), address) + + if not self.address: + raise TypeError( + "The address argument is required to instantiate a contract." + ) + + self.functions = ContractFunctions( + self.abi, _w3, self.address, decode_tuples=self.decode_tuples + ) + self.caller = ContractCaller( + self.abi, _w3, self.address, decode_tuples=self.decode_tuples + ) + self.events = ContractEvents(self.abi, _w3, self.address) + self.fallback = Contract.get_fallback_function( + self.abi, + _w3, + ContractFunction, + self.address, + ) + self.receive = Contract.get_receive_function( + self.abi, + _w3, + ContractFunction, + self.address, + ) + + @classmethod + def factory( + cls, w3: "Web3", class_name: Optional[str] = None, **kwargs: Any + ) -> "Contract": + kwargs["w3"] = w3 + + normalizers = { + "abi": normalize_abi, + "address": partial(normalize_address, w3.ens), + "bytecode": normalize_bytecode, + "bytecode_runtime": normalize_bytecode, + } + + contract = cast( + Contract, + PropertyCheckingFactory( + class_name or cls.__name__, + (cls,), + kwargs, + normalizers=normalizers, + ), + ) + contract.functions = ContractFunctions( + contract.abi, contract.w3, decode_tuples=contract.decode_tuples + ) + contract.caller = ContractCaller( + contract.abi, + contract.w3, + contract.address, + decode_tuples=contract.decode_tuples, + ) + contract.events = ContractEvents(contract.abi, contract.w3) + contract.fallback = Contract.get_fallback_function( + contract.abi, + contract.w3, + ContractFunction, + ) + contract.receive = Contract.get_receive_function( + contract.abi, + contract.w3, + ContractFunction, + ) + + return contract + + @classmethod + def constructor(cls, *args: Any, **kwargs: Any) -> "ContractConstructor": + """ + :param args: The contract constructor arguments as positional arguments + :param kwargs: The contract constructor arguments as keyword arguments + :return: a contract constructor object + """ + if cls.bytecode is None: + raise ValueError( + "Cannot call constructor on a contract that does not have " + "'bytecode' associated with it" + ) + + return ContractConstructor(cls.w3, cls.abi, cls.bytecode, *args, **kwargs) + + @combomethod + def find_functions_by_identifier( + cls, + contract_abi: ABI, + w3: "Web3", + address: ChecksumAddress, + callable_check: Callable[..., Any], + ) -> List["ContractFunction"]: + return cast( + List["ContractFunction"], + find_functions_by_identifier( + contract_abi, w3, address, callable_check, ContractFunction + ), + ) + + @combomethod + def get_function_by_identifier( + cls, fns: Sequence["ContractFunction"], identifier: str + ) -> "ContractFunction": + return get_function_by_identifier(fns, identifier) + + class ContractCaller(BaseContractCaller): # mypy types w3: "Web3" @@ -611,3 +587,30 @@ def __call__( ccip_read_enabled=ccip_read_enabled, decode_tuples=self.decode_tuples, ) + + +class ContractConstructor(BaseContractConstructor): + # mypy types + w3: "Web3" + + @combomethod + def transact(self, transaction: Optional[TxParams] = None) -> HexBytes: + return self.w3.eth.send_transaction(self._get_transaction(transaction)) + + @combomethod + def build_transaction(self, transaction: Optional[TxParams] = None) -> TxParams: + """ + Build the transaction dictionary without sending + """ + built_transaction = self._build_transaction(transaction) + return fill_transaction_defaults(self.w3, built_transaction) + + @combomethod + def estimate_gas( + self, + transaction: Optional[TxParams] = None, + block_identifier: Optional[BlockIdentifier] = None, + ) -> int: + transaction = self._estimate_gas(transaction) + + return self.w3.eth.estimate_gas(transaction, block_identifier=block_identifier)