diff --git a/docs/middleware.rst b/docs/middleware.rst index 82a171da83..ca29dee3b0 100644 --- a/docs/middleware.rst +++ b/docs/middleware.rst @@ -368,3 +368,25 @@ which uses a prototype PoA for it's development mode and the Rinkeby test networ Unfortunately, it does deviate from the yellow paper specification, which constrains the ``extraData`` field in each block to a maximum of 32-bytes. Geth's PoA uses more than 32 bytes, so this middleware modifies the block data a bit before returning it. + +.. _local-filter: + +Locally Managed Log and Block Filters +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +This middleware provides an alternative to ethereum node managed filters. When used, Log and +Block filter logic are handled locally while using the same web3 filter api. Filter results are +retrieved using JSON-RPC endpoints that don't rely on server state. + +.. code-block:: python + + >>> from web3 import Web3, EthereumTesterProvider + >>> w3 = Web3(EthereumTesterProvider) + >>> from web3.middleware import local_filter_middleware + >>> w3.middleware_stack.add(local_filter_middleware()) + + # Normal block and log filter apis behave as before. + >>> block_filter = w3.eth.filter("latest") + + >>> log_filter = myContract.events.myEvent.build_filter().deploy() + diff --git a/tests/core/filtering/conftest.py b/tests/core/filtering/conftest.py index 7b28565fbf..ed96ba9508 100644 --- a/tests/core/filtering/conftest.py +++ b/tests/core/filtering/conftest.py @@ -9,16 +9,29 @@ ) from web3 import Web3 +from web3.middleware import ( + local_filter_middleware, +) from web3.providers.eth_tester import ( EthereumTesterProvider, ) @pytest.fixture() +def tester_snapshot(web3): + return web3.providers[0].ethereum_tester.take_snapshot() + + +@pytest.fixture( + scope='function', + params=[True, False], + ids=["local_filter_middleware", "node_based_filter"]) def web3(request): + use_filter_middleware = request.param provider = EthereumTesterProvider() w3 = Web3(provider) - + if use_filter_middleware: + w3.middleware_stack.add(local_filter_middleware) return w3 diff --git a/tests/core/filtering/test_basic_filter_tests.py b/tests/core/filtering/test_basic_filter_tests.py new file mode 100644 index 0000000000..46d7b0401f --- /dev/null +++ b/tests/core/filtering/test_basic_filter_tests.py @@ -0,0 +1,40 @@ + + +def test_filtering_sequential_blocks_with_bounded_range( + web3, + emitter, + Emitter, + wait_for_transaction): + builder = emitter.events.LogNoArguments.build_filter() + builder.fromBlock = "latest" + + initial_block_number = web3.eth.blockNumber + + builder.toBlock = initial_block_number + 100 + filter_ = builder.deploy(web3) + for i in range(100): + emitter.functions.logNoArgs(which=1).transact() + assert web3.eth.blockNumber == initial_block_number + 100 + assert len(filter_.get_new_entries()) == 100 + + +def test_filtering_starting_block_range( + web3, + emitter, + Emitter, + wait_for_transaction): + for i in range(10): + emitter.functions.logNoArgs(which=1).transact() + builder = emitter.events.LogNoArguments.build_filter() + filter_ = builder.deploy(web3) + initial_block_number = web3.eth.blockNumber + for i in range(10): + emitter.functions.logNoArgs(which=1).transact() + assert web3.eth.blockNumber == initial_block_number + 10 + assert len(filter_.get_new_entries()) == 10 + + +def test_requesting_results_with_no_new_blocks(web3, emitter): + builder = emitter.events.LogNoArguments.build_filter() + filter_ = builder.deploy(web3) + assert len(filter_.get_new_entries()) == 0 diff --git a/tests/core/filtering/test_contract_data_filters.py b/tests/core/filtering/test_contract_data_filters.py index c0a97ab1e3..accaffa923 100644 --- a/tests/core/filtering/test_contract_data_filters.py +++ b/tests/core/filtering/test_contract_data_filters.py @@ -56,6 +56,14 @@ def array_values(draw): return (matching, non_matching) +def clear_chain_state(web3, snapshot): + """Clear chain state + Hypothesis doesn't allow function scoped fixtures to re-run between test runs + so chain state needs to be explicitly cleared + """ + web3.providers[0].ethereum_tester.revert_to_snapshot(snapshot) + + @pytest.mark.parametrize('api_style', ('v4', 'build_filter')) @given(vals=dynamic_values()) @settings(max_examples=5, deadline=None) @@ -65,7 +73,9 @@ def test_data_filters_with_dynamic_arguments( create_filter, emitter, api_style, + tester_snapshot, vals): + clear_chain_state(web3, tester_snapshot) if api_style == 'build_filter': filter_builder = emitter.events.LogDynamicArgs.build_filter() @@ -104,7 +114,9 @@ def test_data_filters_with_fixed_arguments( create_filter, api_style, vals, + tester_snapshot, request): + clear_chain_state(web3, tester_snapshot) if api_style == 'build_filter': filter_builder = emitter.events.LogQuadrupleArg.build_filter() @@ -156,7 +168,9 @@ def test_data_filters_with_list_arguments( create_filter, api_style, vals, + tester_snapshot, request): + clear_chain_state(web3, tester_snapshot) matching, non_matching = vals diff --git a/tests/core/filtering/test_contract_topic_filters.py b/tests/core/filtering/test_contract_topic_filters.py index 23e9494883..09e81bdb0b 100644 --- a/tests/core/filtering/test_contract_topic_filters.py +++ b/tests/core/filtering/test_contract_topic_filters.py @@ -58,6 +58,14 @@ def array_values(draw): return (matching, non_matching) +def clear_chain_state(web3, snapshot): + """Clear chain state + Hypothesis doesn't allow function scoped fixtures to re-run between test runs + so chain state needs to be explicitly cleared + """ + web3.providers[0].ethereum_tester.revert_to_snapshot(snapshot) + + @pytest.mark.parametrize('api_style', ('v4', 'build_filter')) @given(vals=dynamic_values()) @settings(max_examples=5, deadline=None) @@ -68,7 +76,9 @@ def test_topic_filters_with_dynamic_arguments( emitter_event_ids, create_filter, api_style, + tester_snapshot, vals): + clear_chain_state(web3, tester_snapshot) if api_style == 'build_filter': filter_builder = emitter.events.LogDynamicArgs.build_filter() @@ -110,7 +120,9 @@ def test_topic_filters_with_fixed_arguments( call_as_instance, create_filter, api_style, + tester_snapshot, vals): + clear_chain_state(web3, tester_snapshot) if api_style == 'build_filter': filter_builder = emitter.events.LogQuadrupleWithIndex.build_filter() @@ -162,7 +174,9 @@ def test_topic_filters_with_list_arguments( call_as_instance, create_filter, api_style, + tester_snapshot, vals): + clear_chain_state(web3, tester_snapshot) matching, non_matching = vals diff --git a/tests/core/filtering/test_filters_against_many_blocks.py b/tests/core/filtering/test_filters_against_many_blocks.py index 29b8b83fa9..ae27014de6 100644 --- a/tests/core/filtering/test_filters_against_many_blocks.py +++ b/tests/core/filtering/test_filters_against_many_blocks.py @@ -60,7 +60,7 @@ def test_event_filter_new_events( is_match = bool(random.randint(0, 1)) if is_match: expected_match_counter += 1 - matching_transact() + wait_for_transaction(web3, matching_transact()) pad_with_transactions(web3) continue non_matching_transact() diff --git a/tests/core/middleware/test_filter_middleware.py b/tests/core/middleware/test_filter_middleware.py new file mode 100644 index 0000000000..ac114878c8 --- /dev/null +++ b/tests/core/middleware/test_filter_middleware.py @@ -0,0 +1,148 @@ +import pytest + +from web3 import Web3 +from web3.middleware import ( + construct_result_generator_middleware, + local_filter_middleware, +) +from web3.middleware.filter import ( + block_ranges, + iter_latest_block_ranges, +) +from web3.providers.base import ( + BaseProvider, +) + + +class DummyProvider(BaseProvider): + def make_request(self, method, params): + raise NotImplementedError("Cannot make request for {0}:{1}".format( + method, + params, + )) + + +@pytest.fixture(scope='function') +def iter_block_number(start=0): + def iterator(): + block_number = start + while True: + sent_value = (yield block_number) + if sent_value is not None: + block_number = sent_value + block_number = iterator() + next(block_number) + return block_number + + +@pytest.fixture(scope='function') +def result_generator_middleware(iter_block_number): + return construct_result_generator_middleware({ + 'eth_getLogs': lambda *_: ["middleware"], + 'eth_getBlockByNumber': lambda *_: type('block', (object,), {'hash': 'middleware'}), + 'net_version': lambda *_: 1, + 'eth_blockNumber': lambda *_: next(iter_block_number), + }) + + +@pytest.fixture(scope='function') +def w3_base(): + return Web3(providers=[DummyProvider()], middlewares=[]) + + +@pytest.fixture(scope='function') +def w3(w3_base, result_generator_middleware): + w3_base.middleware_stack.add(result_generator_middleware) + w3_base.middleware_stack.add(local_filter_middleware) + return w3_base + + +@pytest.mark.parametrize("start, stop, expected", [ + (2, 7, [ + (2, 6), + (7, 7) + ]), + (0, 12, [ + (0, 4), + (5, 9), + (10, 12) + ]), + (0, 15, [ + (0, 4), + (5, 9), + (10, 14), + (15, 15) + ]), + (0, 0, [ + (0, 0), + ]), + (1, 1, [ + (1, 1), + ]), + (5, 0, TypeError), +]) +def test_block_ranges(start, stop, expected): + if isinstance(expected, type) and issubclass(expected, Exception): + with pytest.raises(expected): + block_ranges(start, stop) + else: + actual = tuple(block_ranges(start, stop)) + assert len(actual) == len(expected) + for actual, expected in zip(actual, expected): + assert actual == expected + + +@pytest.mark.parametrize("from_block,to_block,current_block,expected", [ + (0, 10, [10], [ + (0, 10), + ]), + (0, 55, [0, 19, 55], [ + (0, 0), + (1, 19), + (20, 55), + ]), +]) +def test_iter_latest_block_ranges( + w3, + iter_block_number, + from_block, + to_block, + current_block, + expected): + latest_block_ranges = iter_latest_block_ranges(w3, from_block, to_block) + for index, block in enumerate(current_block): + iter_block_number.send(block) + expected_tuple = expected[index] + actual_tuple = next(latest_block_ranges) + assert actual_tuple == expected_tuple + + +def test_pending_block_filter_middleware(w3): + with pytest.raises(NotImplementedError): + w3.eth.filter('pending') + + +def test_local_filter_middleware(w3, iter_block_number): + block_filter = w3.eth.filter('latest') + iter_block_number.send(1) + + log_filter = w3.eth.filter(filter_params={'fromBlock': 'latest'}) + + assert w3.eth.getFilterChanges(block_filter.filter_id) == ["middleware"] + + iter_block_number.send(2) + results = w3.eth.getFilterChanges(log_filter.filter_id) + assert results == ["middleware"] + + assert w3.eth.getFilterLogs(log_filter.filter_id) == ["middleware"] + + filter_ids = ( + block_filter.filter_id, + log_filter.filter_id + ) + + # Test that all ids are str types + assert all(isinstance(_filter_id, (str,)) for _filter_id in filter_ids) + + # Test that all ids are unique + assert len(filter_ids) == len(set(filter_ids)) diff --git a/web3/middleware/__init__.py b/web3/middleware/__init__.py index e6b609aa93..30ffeab501 100644 --- a/web3/middleware/__init__.py +++ b/web3/middleware/__init__.py @@ -17,6 +17,9 @@ from .exception_handling import ( # noqa: F401 construct_exception_handler_middleware, ) +from .filter import ( # noqa: F401 + local_filter_middleware, +) from .fixture import ( # noqa: F401 construct_fixture_middleware, construct_result_generator_middleware, diff --git a/web3/middleware/filter.py b/web3/middleware/filter.py new file mode 100644 index 0000000000..0c1cf28d83 --- /dev/null +++ b/web3/middleware/filter.py @@ -0,0 +1,320 @@ +import itertools +import os + +from eth_utils import ( + apply_key_map, + to_hex, + to_list, +) + +from web3._utils.toolz import ( + concat, + valfilter, +) + +if 'WEB3_MAX_BLOCK_REQUEST' in os.environ: + MAX_BLOCK_REQUEST = os.environ['WEB3_MAX_BLOCK_REQUEST'] +else: + MAX_BLOCK_REQUEST = 50 + + +def segment_count(start, stop, step=5): + """Creates a segment counting generator + + The generator returns tuple pairs of integers + that correspond to segments in the provided range. + + :param start: The initial value of the counting range + :param stop: The last value in the + counting range + :param step: Optional, the segment length. Default is 5. + :type start: int + :type stop: int + :return: returns a generator object + + + Example: + + >>> segment_counter = segment_count(start=0, stop=10, step=3) + >>> next(segment_counter) + (0, 3) + >>> next(segment_counter) + (3, 6) + >>> next(segment_counter) + (6, 9) + >>> next(segment_counter) # Remainder is also returned + (9, 10) + """ + return gen_bounded_segments(start, stop, step) + + +def gen_bounded_segments(start, stop, step): + # If the initial range is less than the step + # just return (start, stop) + if start + step >= stop: + yield (start, stop) + return + for segment in zip( + range(start, stop - step + 1, step), + range(start + step, stop + 1, step)): + yield segment + + remainder = (stop - start) % step + # Handle the remainder + if remainder: + yield (stop - remainder, stop) + + +def block_ranges(start_block, last_block, step=5): + """Returns 2-tuple ranges describing ranges of block from start_block to last_block + + Ranges do not overlap to facilitate use as ``toBlock``, ``fromBlock`` + json-rpc arguments, which are both inclusive. + """ + + if last_block is not None and start_block > last_block: + raise TypeError( + "Incompatible start and stop arguments.", + "Start must be less than or equal to stop.") + + return ( + (from_block, to_block - 1) + for from_block, to_block + in segment_count(start_block, last_block + 1, step) + ) + + +def iter_latest_block(w3, to_block=None): + """Returns a generator that dispenses the latest block, if + any new blocks have been mined since last iteration. + + If there are no new blocks None is returned. + + If ``to_block`` is defined, ``StopIteration`` is raised + after to_block is reached. + + >>> mined_blocks = dispense_mined_blocks(w3, 0, 10) + >>> next(new_blocks) # Latest block = 0 + 0 + >>> next(new_blocks) # No new blocks + >>> next(new_blocks) # Latest block = 1 + 1 + >>> next(new_blocks) # Latest block = 10 + 10 + >>> next(new_blocks) + Traceback (most recent call last): + File "", line 1, in + StopIteration + >>> + """ + _last = None + + is_bounded_range = ( + to_block is not None and + to_block is not 'latest' + ) + + while True: + latest_block = w3.eth.blockNumber + if is_bounded_range and latest_block > to_block: + return + # No new blocks since last iteration. + if _last is not None and _last == latest_block: + yield None + else: + yield latest_block + _last = latest_block + + +def iter_latest_block_ranges(w3, from_block, to_block=None): + """Returns an iterator unloading ranges of available blocks + + starting from `fromBlock` to the latest mined block, + until reaching toBlock. e.g.: + + + >>> blocks_to_filter = iter_latest_block_ranges(w3, 0, 50) + >>> next(blocks_to_filter) # latest block number = 11 + (0, 11) + >>> next(blocks_to_filter) # latest block number = 45 + (12, 45) + >>> next(blocks_to_filter) # latest block number = 50 + (46, 50) + """ + for latest_block in iter_latest_block(w3, to_block): + if latest_block is None: + yield (None, None) + elif from_block > latest_block: + yield (None, None) + else: + yield (from_block, latest_block) + from_block = latest_block + 1 + + +def drop_items_with_none_value(params): + return valfilter(lambda x: x is not None, params) + + +def get_logs_multipart( + w3, + startBlock, + stopBlock, + address, + topics, + max_blocks): + """Used to break up requests to ``eth_getLogs`` + + The getLog request is partitioned into multiple calls of the max number of blocks + ``max_blocks``. + """ + _block_ranges = block_ranges(startBlock, stopBlock, max_blocks) + for from_block, to_block in _block_ranges: + params = { + 'fromBlock': from_block, + 'toBlock': to_block, + 'address': address, + 'topics': topics + } + yield w3.eth.getLogs( + drop_items_with_none_value(params)) + + +class RequestLogs: + def __init__( + self, + w3, + from_block=None, + to_block=None, + address=None, + topics=None): + + self.address = address + self.topics = topics + self.w3 = w3 + if from_block is None or from_block == 'latest': + self._from_block = w3.eth.blockNumber + 1 + else: + self._from_block = from_block + self._to_block = to_block + self.filter_changes = self._get_filter_changes() + + @property + def from_block(self): + return self._from_block + + @property + def to_block(self): + if self._to_block is None: + to_block = self.w3.eth.blockNumber + elif self._to_block == 'latest': + to_block = self.w3.eth.blockNumber + else: + to_block = self._to_block + + return to_block + + def _get_filter_changes(self): + for start, stop in iter_latest_block_ranges(self.w3, self.from_block, self.to_block): + if None in (start, stop): + yield [] + + yield list( + concat( + get_logs_multipart( + self.w3, + start, + stop, + self.address, + self.topics, + max_blocks=MAX_BLOCK_REQUEST))) + + def get_logs(self): + return list( + concat( + get_logs_multipart( + self.w3, + self.from_block, + self.to_block, + self.address, + self.topics, + max_blocks=MAX_BLOCK_REQUEST))) + + +FILTER_PARAMS_KEY_MAP = { + 'toBlock': 'to_block', + 'fromBlock': 'from_block' +} + +NEW_FILTER_METHODS = set([ + 'eth_newBlockFilter', + 'eth_newFilter']) + +FILTER_CHANGES_METHODS = set([ + 'eth_getFilterChanges', + 'eth_getFilterLogs']) + + +class RequestBlocks: + def __init__(self, w3): + self.w3 = w3 + self.start_block = w3.eth.blockNumber + 1 + + @property + def filter_changes(self): + return self.get_filter_changes() + + def get_filter_changes(self): + + block_range_iter = iter_latest_block_ranges( + self.w3, + self.start_block, + None) + + for block_range in block_range_iter: + yield(block_hashes_in_range(self.w3, block_range)) + + +@to_list +def block_hashes_in_range(w3, block_range): + from_block, to_block = block_range + for block_number in range(from_block, to_block + 1): + yield getattr(w3.eth.getBlock(block_number), 'hash', None) + + +def local_filter_middleware(make_request, w3): + filters = {} + filter_id_counter = map(to_hex, itertools.count()) + + def middleware(method, params): + if method in NEW_FILTER_METHODS: + + filter_id = next(filter_id_counter) + + if method == 'eth_newFilter': + _filter = RequestLogs(w3, **apply_key_map(FILTER_PARAMS_KEY_MAP, params[0])) + + elif method == 'eth_newBlockFilter': + _filter = RequestBlocks(w3) + + else: + raise NotImplementedError(method) + + filters[filter_id] = _filter + return {'result': filter_id} + + elif method in FILTER_CHANGES_METHODS: + filter_id = params[0] + # Pass through to filters not created by middleware + if filter_id not in filters: + return make_request(method, params) + _filter = filters[filter_id] + if method == 'eth_getFilterChanges': + return {'result': next(_filter.filter_changes)} + elif method == 'eth_getFilterLogs': + return {'result': _filter.get_logs()} + else: + raise NotImplementedError(method) + else: + return make_request(method, params) + + return middleware