From 02518b1dc2f69759e5c25e6953f8ee24d722935d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20D=C3=ADaz=20Toro?= Date: Mon, 29 Jul 2024 17:16:58 -0400 Subject: [PATCH 01/17] add non blocker script --- ...over_entries_for_categories_non_blocker.py | 40 ++ storescraper/store.py | 408 +++++++++++------- 2 files changed, 293 insertions(+), 155 deletions(-) create mode 100644 storescraper/bin/discover_entries_for_categories_non_blocker.py diff --git a/storescraper/bin/discover_entries_for_categories_non_blocker.py b/storescraper/bin/discover_entries_for_categories_non_blocker.py new file mode 100644 index 000000000..924aae6a0 --- /dev/null +++ b/storescraper/bin/discover_entries_for_categories_non_blocker.py @@ -0,0 +1,40 @@ +import argparse +import json +import logging +import sys + +sys.path.append("../..") + +from storescraper.utils import get_store_class_by_name # noqa + + +def main(): + logging.basicConfig(level=logging.INFO, stream=sys.stdout) + logging.basicConfig(level=logging.WARNING, stream=sys.stdout) + + parser = argparse.ArgumentParser( + description="Discovers the URLs of the given store and (optional) " "categories" + ) + parser.add_argument("store", type=str, help="The name of the store to be parsed") + parser.add_argument( + "--categories", type=str, nargs="*", help="Specific categories to be parsed" + ) + parser.add_argument( + "--extra_args", + type=json.loads, + nargs="?", + default={}, + help="Optional arguments to pass to the parser " + "(usually username/password) for private sites)", + ) + + args = parser.parse_args() + store = get_store_class_by_name(args.store) + + store.discover_entries_for_categories_non_blocker( + categories=args.categories, extra_args=args.extra_args + ) + + +if __name__ == "__main__": + main() diff --git a/storescraper/store.py b/storescraper/store.py index 9075ae5ef..7e7314998 100644 --- a/storescraper/store.py +++ b/storescraper/store.py @@ -1,16 +1,29 @@ import traceback from collections import defaultdict, OrderedDict -from celery import shared_task, group +from celery import chain, group, shared_task from celery.result import allow_join_result from celery.utils.log import get_task_logger - +import uuid from .product import Product from .utils import get_store_class_by_name, chunks logger = get_task_logger(__name__) +import logging +import logstash + +import redis + +# Conectar a Redis (puede necesitar ajustar la IP y el puerto según tu configuración) +client = redis.StrictRedis(host="localhost", port=6379, db=0, decode_responses=True) + + +logstash_logger = logging.getLogger("python-logstash-logger") +logstash_logger.setLevel(logging.INFO) +logstash_logger.addHandler(logstash.LogstashHandler("localhost", 5959)) + class StoreScrapError(Exception): def __init__(self, message): @@ -27,24 +40,30 @@ class Store: ########################################################################## @classmethod - def products(cls, categories=None, extra_args=None, - discover_urls_concurrency=None, - products_for_url_concurrency=None, use_async=None): + def products( + cls, + categories=None, + extra_args=None, + discover_urls_concurrency=None, + products_for_url_concurrency=None, + use_async=None, + ): sanitized_parameters = cls.sanitize_parameters( categories=categories, discover_urls_concurrency=discover_urls_concurrency, products_for_url_concurrency=products_for_url_concurrency, - use_async=use_async) + use_async=use_async, + ) - categories = sanitized_parameters['categories'] - discover_urls_concurrency = \ - sanitized_parameters['discover_urls_concurrency'] - products_for_url_concurrency = \ - sanitized_parameters['products_for_url_concurrency'] - use_async = sanitized_parameters['use_async'] + categories = sanitized_parameters["categories"] + discover_urls_concurrency = sanitized_parameters["discover_urls_concurrency"] + products_for_url_concurrency = sanitized_parameters[ + "products_for_url_concurrency" + ] + use_async = sanitized_parameters["use_async"] - logger.info('Obtaining products from: {}'.format(cls.__name__)) - logger.info('Categories: {}'.format(', '.join(categories))) + logger.info("Obtaining products from: {}".format(cls.__name__)) + logger.info("Categories: {}".format(", ".join(categories))) extra_args = cls._extra_args_with_preflight(extra_args) @@ -52,72 +71,111 @@ def products(cls, categories=None, extra_args=None, categories=categories, extra_args=extra_args, discover_urls_concurrency=discover_urls_concurrency, - use_async=use_async + use_async=use_async, ) return cls.products_for_urls( discovered_entries, extra_args=extra_args, products_for_url_concurrency=products_for_url_concurrency, - use_async=use_async + use_async=use_async, ) @classmethod - def products_for_keyword(cls, keyword, threshold, extra_args=None, - products_for_url_concurrency=None, - use_async=None): + def products_for_keyword( + cls, + keyword, + threshold, + extra_args=None, + products_for_url_concurrency=None, + use_async=None, + ): sanitized_parameters = cls.sanitize_parameters( products_for_url_concurrency=products_for_url_concurrency, - use_async=use_async) + use_async=use_async, + ) - products_for_url_concurrency = \ - sanitized_parameters['products_for_url_concurrency'] - use_async = sanitized_parameters['use_async'] + products_for_url_concurrency = sanitized_parameters[ + "products_for_url_concurrency" + ] + use_async = sanitized_parameters["use_async"] extra_args = cls._extra_args_with_preflight(extra_args) - product_urls = cls.discover_urls_for_keyword( - keyword, - threshold, - extra_args) + product_urls = cls.discover_urls_for_keyword(keyword, threshold, extra_args) product_entries = OrderedDict() for url in product_urls: product_entries[url] = { - 'positions': [], - 'category': None, + "positions": [], + "category": None, } if extra_args is None: extra_args = {} - extra_args['source'] = 'keyword_search' + extra_args["source"] = "keyword_search" return cls.products_for_urls( product_entries, extra_args=extra_args, products_for_url_concurrency=products_for_url_concurrency, - use_async=use_async + use_async=use_async, ) @classmethod - def discover_entries_for_categories(cls, categories=None, - extra_args=None, - discover_urls_concurrency=None, - use_async=True): + def discover_entries_for_categories_non_blocker( + cls, categories=None, extra_args=None, discover_urls_concurrency=None + ): + sanitized_parameters = cls.sanitize_parameters( + categories=categories, discover_urls_concurrency=discover_urls_concurrency + ) + + categories = sanitized_parameters["categories"] + discover_urls_concurrency = sanitized_parameters["discover_urls_concurrency"] + extra_args = cls._extra_args_with_preflight(extra_args) + process_id = str(uuid.uuid4()) + extra_args["process_id"] = process_id + category_chunks = chunks(categories, discover_urls_concurrency) + task_groups = [] + + for category_chunk in category_chunks: + chunk_tasks = [ + cls.discover_entries_for_category_task.si( + cls.__name__, category, extra_args + ).set(queue="storescraper") + for category in category_chunk + ] + + task_groups.append(group(*chunk_tasks)) + + chain( + *task_groups, + cls.log_process_summary.si(process_id).set(queue="storescraper"), + )() + + @classmethod + @classmethod + def discover_entries_for_categories( + cls, + categories=None, + extra_args=None, + discover_urls_concurrency=None, + use_async=True, + ): sanitized_parameters = cls.sanitize_parameters( categories=categories, discover_urls_concurrency=discover_urls_concurrency, - use_async=use_async) + use_async=use_async, + ) - categories = sanitized_parameters['categories'] - discover_urls_concurrency = \ - sanitized_parameters['discover_urls_concurrency'] - use_async = sanitized_parameters['use_async'] + categories = sanitized_parameters["categories"] + discover_urls_concurrency = sanitized_parameters["discover_urls_concurrency"] + use_async = sanitized_parameters["use_async"] - logger.info('Discovering URLs for: {}'.format(cls.__name__)) + logger.info("Discovering URLs for: {}".format(cls.__name__)) entry_positions = defaultdict(lambda: list()) url_category_weights = defaultdict(lambda: defaultdict(lambda: 0)) @@ -129,15 +187,13 @@ def discover_entries_for_categories(cls, categories=None, for category_chunk in category_chunks: chunk_tasks = [] - logger.info('Discovering URLs for: {}'.format( - category_chunk)) + logger.info("Discovering URLs for: {}".format(category_chunk)) for category in category_chunk: task = cls.discover_entries_for_category_task.s( - cls.__name__, category, extra_args) - task.set( - queue='storescraper' + cls.__name__, category, extra_args ) + task.set(queue="storescraper") chunk_tasks.append(task) tasks_group = cls.create_celery_group(chunk_tasks) @@ -147,7 +203,7 @@ def discover_entries_for_categories(cls, categories=None, for idx, task_result in enumerate(task_results): category = category_chunk[idx] - logger.info('Discovered URLs for {}:'.format(category)) + logger.info("Discovered URLs for {}:".format(category)) for url, positions in task_result.items(): logger.info(url) logger.info(positions) @@ -155,27 +211,31 @@ def discover_entries_for_categories(cls, categories=None, if positions: for pos in positions: entry_positions[url].append( - (pos['section_name'], pos['value'])) - url_category_weights[url][category] += \ - pos['category_weight'] + (pos["section_name"], pos["value"]) + ) + url_category_weights[url][category] += pos[ + "category_weight" + ] else: # Legacy for implementations without position data url_category_weights[url][category] = 1 entry_positions[url] = [] else: - logger.info('Using sync method') + logger.info("Using sync method") for category in categories: for url, positions in cls.discover_entries_for_category( - category, extra_args).items(): - logger.info('Discovered URL: {} ({})'.format( - url, category)) + category, extra_args + ).items(): + logger.info("Discovered URL: {} ({})".format(url, category)) if positions: for pos in positions: entry_positions[url].append( - (pos['section_name'], pos['value'])) - url_category_weights[url][category] += \ - pos['category_weight'] + (pos["section_name"], pos["value"]) + ) + url_category_weights[url][category] += pos[ + "category_weight" + ] else: # Legacy for implementations without position data url_category_weights[url][category] = 1 @@ -183,8 +243,10 @@ def discover_entries_for_categories(cls, categories=None, discovered_entries = {} for url, positions in entry_positions.items(): - category, max_weight = max(url_category_weights[url].items(), - key=lambda x: x[1],) + category, max_weight = max( + url_category_weights[url].items(), + key=lambda x: x[1], + ) # Only include the url in the discovery set if it appears in a # weighted section, for example generic "Electrodomésticos" @@ -195,30 +257,36 @@ def discover_entries_for_categories(cls, categories=None, # relevant section if max_weight: discovered_entries[url] = { - 'positions': positions, - 'category': category, - 'category_weight': max_weight + "positions": positions, + "category": category, + "category_weight": max_weight, } return discovered_entries @classmethod - def products_for_urls(cls, discovered_entries, extra_args=None, - products_for_url_concurrency=None, - use_async=True): + def products_for_urls( + cls, + discovered_entries, + extra_args=None, + products_for_url_concurrency=None, + use_async=True, + ): sanitized_parameters = cls.sanitize_parameters( products_for_url_concurrency=products_for_url_concurrency, - use_async=use_async) + use_async=use_async, + ) - products_for_url_concurrency = \ - sanitized_parameters['products_for_url_concurrency'] - use_async = sanitized_parameters['use_async'] + products_for_url_concurrency = sanitized_parameters[ + "products_for_url_concurrency" + ] + use_async = sanitized_parameters["use_async"] - logger.info('Retrieving products for: {}'.format(cls.__name__)) + logger.info("Retrieving products for: {}".format(cls.__name__)) logger.info(discovered_entries) for url, entry_metadata in discovered_entries.items(): - logger.info('{} ({})'.format(url, entry_metadata['category'])) + logger.info("{} ({})".format(url, entry_metadata["category"])) products = [] discovery_urls_without_products = [] @@ -226,22 +294,23 @@ def products_for_urls(cls, discovered_entries, extra_args=None, if use_async: discovery_entries_chunks = chunks( - list(discovered_entries.items()), products_for_url_concurrency) + list(discovered_entries.items()), products_for_url_concurrency + ) task_counter = 1 for discovery_entries_chunk in discovery_entries_chunks: chunk_tasks = [] for entry_url, entry_metadata in discovery_entries_chunk: - logger.info('Retrieving URL ({} / {}): {}'.format( - task_counter, len(discovered_entries), - entry_url)) + logger.info( + "Retrieving URL ({} / {}): {}".format( + task_counter, len(discovered_entries), entry_url + ) + ) task = cls.products_for_url_task.s( - cls.__name__, entry_url, - entry_metadata['category'], extra_args) - task.set( - queue='storescraper' + cls.__name__, entry_url, entry_metadata["category"], extra_args ) + task.set(queue="storescraper") chunk_tasks.append(task) task_counter += 1 @@ -255,35 +324,36 @@ def products_for_urls(cls, discovered_entries, extra_args=None, for serialized_product in task_result: product = Product.deserialize(serialized_product) if not product.positions: - product.positions = \ - discovery_entries_chunk[idx][1]['positions'] + product.positions = discovery_entries_chunk[idx][1][ + "positions" + ] - logger.info('{}\n'.format(product)) + logger.info("{}\n".format(product)) products.append(product) if not task_result: discovery_urls_without_products.append( - discovery_entries_chunk[idx][0]) + discovery_entries_chunk[idx][0] + ) else: - logger.info('Using sync method') + logger.info("Using sync method") for entry_url, entry_metadata in discovered_entries.items(): retrieved_products = cls.products_for_url( - entry_url, - entry_metadata['category'], - extra_args) + entry_url, entry_metadata["category"], extra_args + ) for product in retrieved_products: if not product.positions: - product.positions = entry_metadata['positions'] - logger.info('{}\n'.format(product)) + product.positions = entry_metadata["positions"] + logger.info("{}\n".format(product)) products.append(product) if not retrieved_products: discovery_urls_without_products.append(entry_url) return { - 'products': products, - 'discovery_urls_without_products': discovery_urls_without_products + "products": products, + "discovery_urls_without_products": discovery_urls_without_products, } ########################################################################## @@ -291,79 +361,101 @@ def products_for_urls(cls, discovered_entries, extra_args=None, ########################################################################## @staticmethod - @shared_task(autoretry_for=(StoreScrapError,), - max_retries=5, - default_retry_delay=5) - def discover_entries_for_category_task(store_class_name, category, - extra_args=None): + @shared_task( + autoretry_for=(StoreScrapError,), + max_retries=5, + default_retry_delay=5, + ) + def log_process_summary(process_id): + redis_urls = client.lrange(process_id, 0, -1) + logstash_logger.info( + f"Redis URLs: {process_id}", + extra={"urls": redis_urls}, + ) + + @staticmethod + @shared_task( + autoretry_for=(StoreScrapError,), + max_retries=5, + default_retry_delay=5, + ) + def discover_entries_for_category_task(store_class_name, category, extra_args=None): store = get_store_class_by_name(store_class_name) - logger.info('Discovering URLs') - logger.info('Store: ' + store.__name__) - logger.info('Category: ' + category) + logger.info("Discovering URLs") + logger.info("Store: " + store.__name__) + logger.info("Category: " + category) try: discovered_entries = store.discover_entries_for_category( - category, extra_args) + category, extra_args + ) except Exception: - error_message = 'Error discovering URLs from {}: {} - {}'.format( - store_class_name, - category, - traceback.format_exc()) + error_message = "Error discovering URLs from {}: {} - {}".format( + store_class_name, category, traceback.format_exc() + ) logger.error(error_message) raise StoreScrapError(error_message) + process_id = extra_args.get("process_id", None) + for url in discovered_entries.keys(): + if process_id: + client.rpush(process_id, url) + logger.info(url) + logstash_logger.info( + f"{store.__name__}: Discovered URL", + extra={"process_id": process_id or None, "url": url}, + ) return discovered_entries @staticmethod - @shared_task(autoretry_for=(StoreScrapError,), - max_retries=5, - default_retry_delay=5) - def products_for_url_task(store_class_name, url, category=None, - extra_args=None): + @shared_task(autoretry_for=(StoreScrapError,), max_retries=5, default_retry_delay=5) + def products_for_url_task(store_class_name, url, category=None, extra_args=None): store = get_store_class_by_name(store_class_name) - logger.info('Obtaining products for URL') - logger.info('Store: ' + store.__name__) - logger.info('Category: {}'.format(category)) - logger.info('URL: ' + url) + logger.info("Obtaining products for URL") + logger.info("Store: " + store.__name__) + logger.info("Category: {}".format(category)) + logger.info("URL: " + url) try: - raw_products = store.products_for_url( - url, category, extra_args) + raw_products = store.products_for_url(url, category, extra_args) except Exception: - error_message = 'Error retrieving products from {}: {} - {}' \ - ''.format(store_class_name, url, - traceback.format_exc()) + error_message = "Error retrieving products from {}: {} - {}" "".format( + store_class_name, url, traceback.format_exc() + ) logger.error(error_message) raise StoreScrapError(error_message) serialized_products = [p.serialize() for p in raw_products] for idx, product in enumerate(serialized_products): - logger.info('{} - {}'.format(idx, product)) + logger.info("{} - {}".format(idx, product)) return serialized_products @staticmethod - @shared_task(autoretry_for=(StoreScrapError,), - max_retries=5, - default_retry_delay=5) - def products_for_urls_task(store_class_name, - discovery_entries, - extra_args=None, - products_for_url_concurrency=None, - use_async=True): + @shared_task(autoretry_for=(StoreScrapError,), max_retries=5, default_retry_delay=5) + def products_for_urls_task( + store_class_name, + discovery_entries, + extra_args=None, + products_for_url_concurrency=None, + use_async=True, + ): store = get_store_class_by_name(store_class_name) result = store.products_for_urls( - discovery_entries, extra_args=extra_args, + discovery_entries, + extra_args=extra_args, products_for_url_concurrency=products_for_url_concurrency, - use_async=use_async) + use_async=use_async, + ) serialized_result = { - 'products': [p.serialize() for p in result['products']], - 'discovery_urls_without_products': - result['discovery_urls_without_products'] + "products": [p.serialize() for p in result["products"]], + "discovery_urls_without_products": result[ + "discovery_urls_without_products" + ], } return serialized_result @@ -374,23 +466,27 @@ def products_for_urls_task(store_class_name, @classmethod def categories(cls): - raise NotImplementedError('This method must be implemented by ' - 'subclasses of Store') + raise NotImplementedError( + "This method must be implemented by " "subclasses of Store" + ) @classmethod def discover_urls_for_category(cls, category, extra_args=None): - raise NotImplementedError('This method must be implemented by ' - 'subclasses of Store') + raise NotImplementedError( + "This method must be implemented by " "subclasses of Store" + ) @classmethod def products_for_url(cls, url, category=None, extra_args=None): - raise NotImplementedError('This method must be implemented by ' - 'subclasses of Store') + raise NotImplementedError( + "This method must be implemented by " "subclasses of Store" + ) @classmethod def discover_urls_for_keyword(cls, keyword, threshold, extra_args=None): - raise NotImplementedError('This method must be implemented by ' - 'subclasses of Store') + raise NotImplementedError( + "This method must be implemented by " "subclasses of Store" + ) @classmethod def discover_entries_for_category(cls, category, extra_args=None): @@ -422,30 +518,34 @@ def create_celery_group(cls, tasks): return g @classmethod - def sanitize_parameters(cls, categories=None, - discover_urls_concurrency=None, - products_for_url_concurrency=None, use_async=None): + def sanitize_parameters( + cls, + categories=None, + discover_urls_concurrency=None, + products_for_url_concurrency=None, + use_async=None, + ): if categories is None: categories = cls.categories() else: - categories = [category for category in cls.categories() - if category in categories] + categories = [ + category for category in cls.categories() if category in categories + ] if discover_urls_concurrency is None: discover_urls_concurrency = cls.preferred_discover_urls_concurrency if products_for_url_concurrency is None: - products_for_url_concurrency = \ - cls.preferred_products_for_url_concurrency + products_for_url_concurrency = cls.preferred_products_for_url_concurrency if use_async is None: use_async = cls.prefer_async return { - 'categories': categories, - 'discover_urls_concurrency': discover_urls_concurrency, - 'products_for_url_concurrency': products_for_url_concurrency, - 'use_async': use_async, + "categories": categories, + "discover_urls_concurrency": discover_urls_concurrency, + "products_for_url_concurrency": products_for_url_concurrency, + "use_async": use_async, } ###################################################################### @@ -458,12 +558,10 @@ def _extra_args_with_preflight(cls, extra_args=None): # preflight from being called twice unnecesarily # If the preflight args have already been calculated, return - if extra_args is not None and 'preflight_done' in extra_args: + if extra_args is not None and "preflight_done" in extra_args: return extra_args - preflight_args = { - 'preflight_done': True - } + preflight_args = {"preflight_done": True} preflight_args.update(cls.preflight(extra_args)) if extra_args is not None: preflight_args.update(extra_args) From df2a4f317dc0f5d737ebdf280e237d0e20025249 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20D=C3=ADaz=20Toro?= Date: Thu, 1 Aug 2024 10:10:42 -0400 Subject: [PATCH 02/17] add logs tracking --- .gitignore | 1 + storescraper/bin/celeryconfig/defaults.py | 13 +-- storescraper/store.py | 97 ++++++++++++++++------- 3 files changed, 78 insertions(+), 33 deletions(-) diff --git a/.gitignore b/.gitignore index 1a2ea1711..273444399 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,4 @@ env **/__pycache__ .idea +*.crt \ No newline at end of file diff --git a/storescraper/bin/celeryconfig/defaults.py b/storescraper/bin/celeryconfig/defaults.py index 31d6f1ec0..f53398178 100644 --- a/storescraper/bin/celeryconfig/defaults.py +++ b/storescraper/bin/celeryconfig/defaults.py @@ -1,9 +1,10 @@ import sys -sys.path.append('../..') -broker_url = 'amqp://storescraper:storescraper@localhost/storescraper' -result_backend = 'rpc://' +sys.path.append("../..") -imports = ( - 'storescraper.store' -) +broker_url = "redis://localhost:6379/0" +result_backend = "redis://localhost:6379/0" + +imports = "storescraper.store" + +worker_hijack_root_logger = False diff --git a/storescraper/store.py b/storescraper/store.py index 7e7314998..6f8bd70e6 100644 --- a/storescraper/store.py +++ b/storescraper/store.py @@ -1,28 +1,30 @@ +import logging +import logstash +import time import traceback +import uuid from collections import defaultdict, OrderedDict from celery import chain, group, shared_task from celery.result import allow_join_result from celery.utils.log import get_task_logger -import uuid +from elasticsearch import Elasticsearch from .product import Product from .utils import get_store_class_by_name, chunks logger = get_task_logger(__name__) -import logging -import logstash - -import redis - -# Conectar a Redis (puede necesitar ajustar la IP y el puerto según tu configuración) -client = redis.StrictRedis(host="localhost", port=6379, db=0, decode_responses=True) - +ES = Elasticsearch( + "https://localhost:9200", + ca_certs="http_ca.crt", + http_auth=("elastic", "Mo+2hdOYBpIJi4NYG*KL"), +) +logging.getLogger("elastic_transport").setLevel(logging.WARNING) logstash_logger = logging.getLogger("python-logstash-logger") logstash_logger.setLevel(logging.INFO) -logstash_logger.addHandler(logstash.LogstashHandler("localhost", 5959)) +logstash_logger.addHandler(logstash.TCPLogstashHandler("localhost", 5959)) class StoreScrapError(Exception): @@ -137,6 +139,10 @@ def discover_entries_for_categories_non_blocker( discover_urls_concurrency = sanitized_parameters["discover_urls_concurrency"] extra_args = cls._extra_args_with_preflight(extra_args) process_id = str(uuid.uuid4()) + + print(f"Process ID: {process_id}") + print(f"Discovering URLs for: {cls.__name__}") + extra_args["process_id"] = process_id category_chunks = chunks(categories, discover_urls_concurrency) task_groups = [] @@ -148,15 +154,52 @@ def discover_entries_for_categories_non_blocker( ).set(queue="storescraper") for category in category_chunk ] - task_groups.append(group(*chunk_tasks)) chain( *task_groups, - cls.log_process_summary.si(process_id).set(queue="storescraper"), + cls.finish_process.si(cls.__name__, process_id).set(queue="storescraper"), )() + cls.fetch_es_logs(process_id) + @classmethod + def fetch_es_logs(cls, process_id): + timestamp = None + count = 0 + + while True: + query = { + "size": 100, + "query": { + "bool": { + "must": [ + {"match": {"@fields.process_id": process_id}}, + {"range": {"@timestamp": {"gt": timestamp}}}, + ] + } + }, + "sort": [{"@timestamp": {"order": "asc"}}], + } + + response = ES.search(index="logs-test", body=query) + + for hit in response["hits"]["hits"]: + if "url" in hit["_source"]["@fields"]: + count += 1 + print("*", end="", flush=True) + + if hit["_source"]["@message"] == f"{cls.__name__}: process finished": + print() + print( + f"{cls.__name__} process {process_id} finished with {count} URLs" + ) + return + + timestamp = hit["_source"]["@timestamp"] + + time.sleep(5) + @classmethod def discover_entries_for_categories( cls, @@ -361,16 +404,12 @@ def products_for_urls( ########################################################################## @staticmethod - @shared_task( - autoretry_for=(StoreScrapError,), - max_retries=5, - default_retry_delay=5, - ) - def log_process_summary(process_id): - redis_urls = client.lrange(process_id, 0, -1) + @shared_task + def finish_process(store, process_id): + time.sleep(1) logstash_logger.info( - f"Redis URLs: {process_id}", - extra={"urls": redis_urls}, + f"{store}: process finished", + extra={"process_id": process_id}, ) @staticmethod @@ -384,6 +423,10 @@ def discover_entries_for_category_task(store_class_name, category, extra_args=No logger.info("Discovering URLs") logger.info("Store: " + store.__name__) logger.info("Category: " + category) + + process_id = extra_args.get("process_id", None) + logger.info(f"Process ID: {process_id}") + try: discovered_entries = store.discover_entries_for_category( category, extra_args @@ -395,16 +438,16 @@ def discover_entries_for_category_task(store_class_name, category, extra_args=No logger.error(error_message) raise StoreScrapError(error_message) - process_id = extra_args.get("process_id", None) - for url in discovered_entries.keys(): - if process_id: - client.rpush(process_id, url) - logger.info(url) logstash_logger.info( f"{store.__name__}: Discovered URL", - extra={"process_id": process_id or None, "url": url}, + extra={ + "process_id": process_id or None, + "store": store.__name__, + "category": category, + "url": url, + }, ) return discovered_entries From a2801b1f699ca298bd1eeba7a3ddaf69478e8fc8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20D=C3=ADaz=20Toro?= Date: Tue, 6 Aug 2024 18:01:26 -0400 Subject: [PATCH 03/17] add product scraper --- storescraper/store.py | 203 +++++++++++++++++++++++++++++++----------- 1 file changed, 151 insertions(+), 52 deletions(-) diff --git a/storescraper/store.py b/storescraper/store.py index 6f8bd70e6..c2f79afe0 100644 --- a/storescraper/store.py +++ b/storescraper/store.py @@ -10,8 +10,11 @@ from celery.utils.log import get_task_logger from elasticsearch import Elasticsearch +from datetime import datetime, timezone + from .product import Product from .utils import get_store_class_by_name, chunks +import redis logger = get_task_logger(__name__) @@ -26,6 +29,12 @@ logstash_logger.setLevel(logging.INFO) logstash_logger.addHandler(logstash.TCPLogstashHandler("localhost", 5959)) +redis_client = redis.StrictRedis( + host="localhost", + port=6379, + db=0, +) + class StoreScrapError(Exception): def __init__(self, message): @@ -129,7 +138,11 @@ def products_for_keyword( @classmethod def discover_entries_for_categories_non_blocker( - cls, categories=None, extra_args=None, discover_urls_concurrency=None + cls, + categories=None, + extra_args=None, + discover_urls_concurrency=None, + scrape_products=True, ): sanitized_parameters = cls.sanitize_parameters( categories=categories, discover_urls_concurrency=discover_urls_concurrency @@ -145,32 +158,69 @@ def discover_entries_for_categories_non_blocker( extra_args["process_id"] = process_id category_chunks = chunks(categories, discover_urls_concurrency) - task_groups = [] + task_group = [] for category_chunk in category_chunks: - chunk_tasks = [ - cls.discover_entries_for_category_task.si( + chunk_tasks = [] + + for category in category_chunk: + task = cls.discover_entries_for_category_task.si( cls.__name__, category, extra_args ).set(queue="storescraper") - for category in category_chunk - ] - task_groups.append(group(*chunk_tasks)) - chain( - *task_groups, - cls.finish_process.si(cls.__name__, process_id).set(queue="storescraper"), - )() + if scrape_products: + task = chain( + task, + cls.products_for_urls_task_non_blocker.s( + store_class_name=cls.__name__, + category=category, + extra_args=extra_args, + ).set(queue="storescraper"), + ) + + chunk_tasks.append(task) + + task_group.append(group(*chunk_tasks)) + + task_group.append( + cls.finish_process.si(cls.__name__, process_id).set(queue="storescraper") + ) - cls.fetch_es_logs(process_id) + # for i in range(len(task_group) - 1): + # task_group[i].link(task_group[i + 1]) + + # task_group[0].apply_async() + + chain(cls.nothing.si().set(queue="storescraper"), *task_group).apply_async() + + cls.fetch_es_url_logs(process_id) + + @shared_task() + def nothing(): + pass + + def fetch_redis_data(cls, process_id): + while True: + url_count = redis_client.llen(f"{process_id}_urls") + print(f"Discovered URLs: {url_count}") + + if redis_client.exists(f"{process_id}_urls_finished"): + print(f"Process {process_id} finished: {url_count} URLs discovered") + return + + time.sleep(5) @classmethod - def fetch_es_logs(cls, process_id): + def fetch_es_url_logs(cls, process_id, include_category_counts=False): + products_count = 0 + urls_count = 0 + categories = {} + current_time = datetime.now(timezone.utc) timestamp = None - count = 0 while True: query = { - "size": 100, + "size": 1000, "query": { "bool": { "must": [ @@ -185,20 +235,35 @@ def fetch_es_logs(cls, process_id): response = ES.search(index="logs-test", body=query) for hit in response["hits"]["hits"]: - if "url" in hit["_source"]["@fields"]: - count += 1 - print("*", end="", flush=True) + if "product" in hit["_source"]["@fields"]: + products_count += 1 + elif "url" in hit["_source"]["@fields"]: + urls_count += 1 + + if include_category_counts: + category = hit["_source"]["@fields"]["category"] + categories[category] = categories.get(category, 0) + 1 if hit["_source"]["@message"] == f"{cls.__name__}: process finished": - print() print( - f"{cls.__name__} process {process_id} finished with {count} URLs" + f"\n\n{cls.__name__} process {process_id} finished with {urls_count} URLs" ) + + if include_category_counts: + print("-" * 80) + + for category in categories: + print(f"{category}: {categories[category]}") + + print("\n") + return timestamp = hit["_source"]["@timestamp"] - time.sleep(5) + print(f"Discovered URLs: {urls_count} | Scraped products: {products_count}") + + time.sleep(10) @classmethod def discover_entries_for_categories( @@ -223,7 +288,7 @@ def discover_entries_for_categories( entry_positions = defaultdict(lambda: list()) url_category_weights = defaultdict(lambda: defaultdict(lambda: 0)) extra_args = cls._extra_args_with_preflight(extra_args) - + count = 0 if use_async: category_chunks = chunks(categories, discover_urls_concurrency) @@ -269,21 +334,11 @@ def discover_entries_for_categories( for url, positions in cls.discover_entries_for_category( category, extra_args ).items(): - logger.info("Discovered URL: {} ({})".format(url, category)) - - if positions: - for pos in positions: - entry_positions[url].append( - (pos["section_name"], pos["value"]) - ) - url_category_weights[url][category] += pos[ - "category_weight" - ] - else: - # Legacy for implementations without position data - url_category_weights[url][category] = 1 - entry_positions[url] = [] - + count += 1 + url_category_weights[url][category] = 1 + entry_positions[url] = [] + print(len(entry_positions), count) + exit() discovered_entries = {} for url, positions in entry_positions.items(): category, max_weight = max( @@ -298,12 +353,12 @@ def discover_entries_for_categories( # map generic sections positioning without considering their # products if they don't appear in a specifically mapped # relevant section - if max_weight: - discovered_entries[url] = { - "positions": positions, - "category": category, - "category_weight": max_weight, - } + + discovered_entries[url] = { + "positions": positions, + "category": category, + "category_weight": 1, + } return discovered_entries @@ -404,20 +459,17 @@ def products_for_urls( ########################################################################## @staticmethod - @shared_task + @shared_task() def finish_process(store, process_id): - time.sleep(1) + time.sleep(10) + redis_client.rpush(f"{process_id}_urls_finished", "finished") logstash_logger.info( f"{store}: process finished", extra={"process_id": process_id}, ) @staticmethod - @shared_task( - autoretry_for=(StoreScrapError,), - max_retries=5, - default_retry_delay=5, - ) + @shared_task() def discover_entries_for_category_task(store_class_name, category, extra_args=None): store = get_store_class_by_name(store_class_name) logger.info("Discovering URLs") @@ -456,10 +508,12 @@ def discover_entries_for_category_task(store_class_name, category, extra_args=No @shared_task(autoretry_for=(StoreScrapError,), max_retries=5, default_retry_delay=5) def products_for_url_task(store_class_name, url, category=None, extra_args=None): store = get_store_class_by_name(store_class_name) + """ logger.info("Obtaining products for URL") logger.info("Store: " + store.__name__) logger.info("Category: {}".format(category)) logger.info("URL: " + url) + """ try: raw_products = store.products_for_url(url, category, extra_args) @@ -472,8 +526,8 @@ def products_for_url_task(store_class_name, url, category=None, extra_args=None) serialized_products = [p.serialize() for p in raw_products] - for idx, product in enumerate(serialized_products): - logger.info("{} - {}".format(idx, product)) + # for idx, product in enumerate(serialized_products): + # logger.info("{} - {}".format(idx, product)) return serialized_products @@ -503,6 +557,51 @@ def products_for_urls_task( return serialized_result + @staticmethod + @shared_task(autoretry_for=(StoreScrapError,), max_retries=5, default_retry_delay=5) + def products_for_urls_task_non_blocker( + discovered_entries, + store_class_name, + category, + products_for_url_concurrency=10, + extra_args=None, + ): + store = get_store_class_by_name(store_class_name) + # extra_args = cls._extra_args_with_preflight(extra_args) + discovery_entries_chunks = chunks(list(discovered_entries.items()), 6) + + for discovery_entries_chunk in discovery_entries_chunks: + chunk_tasks = [] + + for entry_url, _ in discovery_entries_chunk: + task = chain( + store.products_for_url_task.si( + store_class_name, entry_url, category, extra_args + ).set(queue="storescraper"), + store.log_product.s( + store_class_name=store_class_name, + category=category, + extra_args=extra_args, + ).set(queue="storescraper"), + ) + + chunk_tasks.append(task) + + tasks_group = group(chunk_tasks) + tasks_group.apply_async() + + @shared_task() + def log_product(product, store_class_name, category, extra_args): + logstash_logger.info( + f"{store_class_name}: Discovered product", + extra={ + "process_id": extra_args["process_id"], + "store": store_class_name, + "category": category, + "product": product, + }, + ) + ########################################################################## # Implementation dependant methods ########################################################################## From cb4f1a2fa61663d9dcc1f9702dec00c69e242365 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20D=C3=ADaz=20Toro?= Date: Thu, 8 Aug 2024 10:14:14 -0400 Subject: [PATCH 04/17] add dummy scraper and dummy data --- storescraper/bin/dummy-cell.json | 25 ++++++ storescraper/bin/dummy-groceries.json | 105 ++++++++++++++++++++++++ storescraper/bin/dummy-keyboard.json | 24 ++++++ storescraper/bin/dummy-microphone.json | 24 ++++++ storescraper/bin/dummy-monitor.json | 24 ++++++ storescraper/bin/dummy-motherboard.json | 24 ++++++ storescraper/bin/dummy-mouse.json | 24 ++++++ storescraper/bin/dummy-oven.json | 24 ++++++ storescraper/bin/dummy-printer.json | 24 ++++++ storescraper/bin/dummy-processor.json | 24 ++++++ storescraper/bin/dummy-ram.json | 104 +++++++++++++++++++++++ storescraper/bin/dummy-server.py | 10 +++ storescraper/bin/dummy-stove.json | 25 ++++++ storescraper/store.py | 3 +- storescraper/stores/__init__.py | 1 + storescraper/stores/dummy.py | 84 +++++++++++++++++++ 16 files changed, 547 insertions(+), 2 deletions(-) create mode 100644 storescraper/bin/dummy-cell.json create mode 100644 storescraper/bin/dummy-groceries.json create mode 100644 storescraper/bin/dummy-keyboard.json create mode 100644 storescraper/bin/dummy-microphone.json create mode 100644 storescraper/bin/dummy-monitor.json create mode 100644 storescraper/bin/dummy-motherboard.json create mode 100644 storescraper/bin/dummy-mouse.json create mode 100644 storescraper/bin/dummy-oven.json create mode 100644 storescraper/bin/dummy-printer.json create mode 100644 storescraper/bin/dummy-processor.json create mode 100644 storescraper/bin/dummy-ram.json create mode 100644 storescraper/bin/dummy-server.py create mode 100644 storescraper/bin/dummy-stove.json create mode 100644 storescraper/stores/dummy.py diff --git a/storescraper/bin/dummy-cell.json b/storescraper/bin/dummy-cell.json new file mode 100644 index 000000000..1107e4c93 --- /dev/null +++ b/storescraper/bin/dummy-cell.json @@ -0,0 +1,25 @@ +{ + "urls": [ + "https://dummy.com/producto-A1B2C3D", + "https://dummy.com/producto-4E5F6G7", + "https://dummy.com/producto-H8I9J0K", + "https://dummy.com/producto-L1M2N3O", + "https://dummy.com/producto-P4Q5R6S", + "https://dummy.com/producto-T7U8V9W", + "https://dummy.com/producto-X0Y1Z2A", + "https://dummy.com/producto-B3C4D5E", + "https://dummy.com/producto-F6G7H8I", + "https://dummy.com/producto-J9K0L1M", + "https://dummy.com/producto-N2O3P4Q", + "https://dummy.com/producto-R5S6T7U", + "https://dummy.com/producto-V8W9X0Y", + "https://dummy.com/producto-Z1A2B3C", + "https://dummy.com/producto-D4E5F6G", + "https://dummy.com/producto-H7I8J9K", + "https://dummy.com/producto-L0M1N2O", + "https://dummy.com/producto-P3Q4R5S", + "https://dummy.com/producto-T6U7V8W", + "https://dummy.com/producto-X9Y0Z1A" + ] + } + \ No newline at end of file diff --git a/storescraper/bin/dummy-groceries.json b/storescraper/bin/dummy-groceries.json new file mode 100644 index 000000000..3a1584305 --- /dev/null +++ b/storescraper/bin/dummy-groceries.json @@ -0,0 +1,105 @@ +{ + "urls": [ + "https://dummy.com/producto-abcde01", + "https://dummy.com/producto-fghij23", + "https://dummy.com/producto-klmno45", + "https://dummy.com/producto-pqrst67", + "https://dummy.com/producto-uvwxy89", + "https://dummy.com/producto-zabcd12", + "https://dummy.com/producto-efghi34", + "https://dummy.com/producto-jklmn56", + "https://dummy.com/producto-opqrs78", + "https://dummy.com/producto-tuvwx90", + "https://dummy.com/producto-yzabc23", + "https://dummy.com/producto-defgh45", + "https://dummy.com/producto-ijklm67", + "https://dummy.com/producto-nopqr89", + "https://dummy.com/producto-stuvw01", + "https://dummy.com/producto-xyzab23", + "https://dummy.com/producto-cdefg45", + "https://dummy.com/producto-hijkl67", + "https://dummy.com/producto-mnopq89", + "https://dummy.com/producto-rstuv01", + "https://dummy.com/producto-wxyza23", + "https://dummy.com/producto-bcdef45", + "https://dummy.com/producto-ghijk67", + "https://dummy.com/producto-lmnop89", + "https://dummy.com/producto-opqrs01", + "https://dummy.com/producto-tuvwx23", + "https://dummy.com/producto-yzabc45", + "https://dummy.com/producto-defgh67", + "https://dummy.com/producto-ijklm89", + "https://dummy.com/producto-nopqr01", + "https://dummy.com/producto-stuvw23", + "https://dummy.com/producto-xyzab45", + "https://dummy.com/producto-cdefg67", + "https://dummy.com/producto-hijkl89", + "https://dummy.com/producto-mnopq01", + "https://dummy.com/producto-rstuv23", + "https://dummy.com/producto-wxyza45", + "https://dummy.com/producto-bcdef67", + "https://dummy.com/producto-ghijk89", + "https://dummy.com/producto-lmnop01", + "https://dummy.com/producto-opqrs23", + "https://dummy.com/producto-tuvwx45", + "https://dummy.com/producto-yzabc67", + "https://dummy.com/producto-defgh89", + "https://dummy.com/producto-ijklm01", + "https://dummy.com/producto-nopqr23", + "https://dummy.com/producto-stuvw45", + "https://dummy.com/producto-xyzab67", + "https://dummy.com/producto-cdefg89", + "https://dummy.com/producto-hijkl01", + "https://dummy.com/producto-mnopq23", + "https://dummy.com/producto-rstuv45", + "https://dummy.com/producto-wxyza67", + "https://dummy.com/producto-bcdef89", + "https://dummy.com/producto-ghijk01", + "https://dummy.com/producto-lmnop23", + "https://dummy.com/producto-opqrs45", + "https://dummy.com/producto-tuvwx67", + "https://dummy.com/producto-yzabc89", + "https://dummy.com/producto-defgh01", + "https://dummy.com/producto-ijklm23", + "https://dummy.com/producto-nopqr45", + "https://dummy.com/producto-stuvw67", + "https://dummy.com/producto-xyzab89", + "https://dummy.com/producto-cdefg01", + "https://dummy.com/producto-hijkl23", + "https://dummy.com/producto-mnopq45", + "https://dummy.com/producto-rstuv67", + "https://dummy.com/producto-wxyza89", + "https://dummy.com/producto-bcdef01", + "https://dummy.com/producto-ghijk23", + "https://dummy.com/producto-lmnop45", + "https://dummy.com/producto-opqrs67", + "https://dummy.com/producto-tuvwx89", + "https://dummy.com/producto-yzabc01", + "https://dummy.com/producto-defgh23", + "https://dummy.com/producto-ijklm45", + "https://dummy.com/producto-nopqr67", + "https://dummy.com/producto-stuvw89", + "https://dummy.com/producto-xyzab01", + "https://dummy.com/producto-abcde01u", + "https://dummy.com/producto-fghij23u", + "https://dummy.com/producto-klmno45u", + "https://dummy.com/producto-pqrst67u", + "https://dummy.com/producto-uvwxy89u", + "https://dummy.com/producto-zabcd12u", + "https://dummy.com/producto-efghi34u", + "https://dummy.com/producto-jklmn56u", + "https://dummy.com/producto-opqrs78u", + "https://dummy.com/producto-tuvwx90u", + "https://dummy.com/producto-yzabc23u", + "https://dummy.com/producto-defgh45u", + "https://dummy.com/producto-ijklm67u", + "https://dummy.com/producto-nopqr89u", + "https://dummy.com/producto-stuvw01u", + "https://dummy.com/producto-xyzab23u", + "https://dummy.com/producto-cdefg45u", + "https://dummy.com/producto-hijkl67u", + "https://dummy.com/producto-mnopq89u", + "https://dummy.com/producto-rstuv01u" + ] + } + \ No newline at end of file diff --git a/storescraper/bin/dummy-keyboard.json b/storescraper/bin/dummy-keyboard.json new file mode 100644 index 000000000..517411107 --- /dev/null +++ b/storescraper/bin/dummy-keyboard.json @@ -0,0 +1,24 @@ +{ + "urls": [ + "https://dummy.com/producto-1234567", + "https://dummy.com/producto-2345678", + "https://dummy.com/producto-3456789", + "https://dummy.com/producto-4567890", + "https://dummy.com/producto-5678901", + "https://dummy.com/producto-6789012", + "https://dummy.com/producto-7890123", + "https://dummy.com/producto-8901234", + "https://dummy.com/producto-9012345", + "https://dummy.com/producto-1123456", + "https://dummy.com/producto-2234567", + "https://dummy.com/producto-3345678", + "https://dummy.com/producto-4456789", + "https://dummy.com/producto-5567890", + "https://dummy.com/producto-6678901", + "https://dummy.com/producto-7789012", + "https://dummy.com/producto-8890123", + "https://dummy.com/producto-9901234", + "https://dummy.com/producto-1012345", + "https://dummy.com/producto-2123456" + ] +} \ No newline at end of file diff --git a/storescraper/bin/dummy-microphone.json b/storescraper/bin/dummy-microphone.json new file mode 100644 index 000000000..a3cde8291 --- /dev/null +++ b/storescraper/bin/dummy-microphone.json @@ -0,0 +1,24 @@ +{ + "urls": [ +"https://dummy.com/producto-12345678", + "https://dummy.com/producto-23456789", + "https://dummy.com/producto-34567890", + "https://dummy.com/producto-45678901", + "https://dummy.com/producto-56789012", + "https://dummy.com/producto-67890123", + "https://dummy.com/producto-78901234", + "https://dummy.com/producto-89012345", + "https://dummy.com/producto-90123456", + "https://dummy.com/producto-11234567", + "https://dummy.com/producto-22345678", + "https://dummy.com/producto-33456789", + "https://dummy.com/producto-44567890", + "https://dummy.com/producto-55678901", + "https://dummy.com/producto-66789012", + "https://dummy.com/producto-77890123", + "https://dummy.com/producto-88901234", + "https://dummy.com/producto-99012345", + "https://dummy.com/producto-10123456", + "https://dummy.com/producto-21234567" + ] +} \ No newline at end of file diff --git a/storescraper/bin/dummy-monitor.json b/storescraper/bin/dummy-monitor.json new file mode 100644 index 000000000..0eed11ff8 --- /dev/null +++ b/storescraper/bin/dummy-monitor.json @@ -0,0 +1,24 @@ +{ + "urls": [ +"https://dummy.com/producto-123456789", + "https://dummy.com/producto-234567890", + "https://dummy.com/producto-345678901", + "https://dummy.com/producto-456789012", + "https://dummy.com/producto-567890123", + "https://dummy.com/producto-678901234", + "https://dummy.com/producto-789012345", + "https://dummy.com/producto-890123456", + "https://dummy.com/producto-901234567", + "https://dummy.com/producto-112345678", + "https://dummy.com/producto-223456789", + "https://dummy.com/producto-334567890", + "https://dummy.com/producto-445678901", + "https://dummy.com/producto-556789012", + "https://dummy.com/producto-667890123", + "https://dummy.com/producto-778901234", + "https://dummy.com/producto-889012345", + "https://dummy.com/producto-990123456", + "https://dummy.com/producto-101234567", + "https://dummy.com/producto-212345678" + ] +} \ No newline at end of file diff --git a/storescraper/bin/dummy-motherboard.json b/storescraper/bin/dummy-motherboard.json new file mode 100644 index 000000000..a09ea4a56 --- /dev/null +++ b/storescraper/bin/dummy-motherboard.json @@ -0,0 +1,24 @@ +{ + "urls": [ + "https://dummy.com/producto-19435", + "https://dummy.com/producto-42761", + "https://dummy.com/producto-68593", + "https://dummy.com/producto-78432", + "https://dummy.com/producto-34829", + "https://dummy.com/producto-59614", + "https://dummy.com/producto-12357", + "https://dummy.com/producto-47189", + "https://dummy.com/producto-23974", + "https://dummy.com/producto-65832", + "https://dummy.com/producto-73928", + "https://dummy.com/producto-80147", + "https://dummy.com/producto-96725", + "https://dummy.com/producto-14295", + "https://dummy.com/producto-27394", + "https://dummy.com/producto-48052", + "https://dummy.com/producto-61923", + "https://dummy.com/producto-53789", + "https://dummy.com/producto-81247", + "https://dummy.com/producto-96254" + ] +} \ No newline at end of file diff --git a/storescraper/bin/dummy-mouse.json b/storescraper/bin/dummy-mouse.json new file mode 100644 index 000000000..1c752417a --- /dev/null +++ b/storescraper/bin/dummy-mouse.json @@ -0,0 +1,24 @@ +{ + "urls": [ + "https://dummy.com/producto-5297", + "https://dummy.com/producto-6841", + "https://dummy.com/producto-3528", + "https://dummy.com/producto-7943", + "https://dummy.com/producto-4182", + "https://dummy.com/producto-6573", + "https://dummy.com/producto-9124", + "https://dummy.com/producto-3608", + "https://dummy.com/producto-7485", + "https://dummy.com/producto-5362", + "https://dummy.com/producto-6719", + "https://dummy.com/producto-8234", + "https://dummy.com/producto-5123", + "https://dummy.com/producto-4058", + "https://dummy.com/producto-7391", + "https://dummy.com/producto-6247", + "https://dummy.com/producto-8153", + "https://dummy.com/producto-4826", + "https://dummy.com/producto-5704", + "https://dummy.com/producto-2398" + ] +} \ No newline at end of file diff --git a/storescraper/bin/dummy-oven.json b/storescraper/bin/dummy-oven.json new file mode 100644 index 000000000..52100a8db --- /dev/null +++ b/storescraper/bin/dummy-oven.json @@ -0,0 +1,24 @@ +{ + "urls": [ + "https://dummy.com/producto-8423", + "https://dummy.com/producto-5831", + "https://dummy.com/producto-4267", + "https://dummy.com/producto-7154", + "https://dummy.com/producto-3942", + "https://dummy.com/producto-6589", + "https://dummy.com/producto-7325", + "https://dummy.com/producto-8196", + "https://dummy.com/producto-5412", + "https://dummy.com/producto-6743", + "https://dummy.com/producto-2587", + "https://dummy.com/producto-9134", + "https://dummy.com/producto-3876", + "https://dummy.com/producto-4719", + "https://dummy.com/producto-6842", + "https://dummy.com/producto-5237", + "https://dummy.com/producto-7394", + "https://dummy.com/producto-8916", + "https://dummy.com/producto-4672", + "https://dummy.com/producto-3185" + ] +} \ No newline at end of file diff --git a/storescraper/bin/dummy-printer.json b/storescraper/bin/dummy-printer.json new file mode 100644 index 000000000..9928e51d2 --- /dev/null +++ b/storescraper/bin/dummy-printer.json @@ -0,0 +1,24 @@ +{ + "urls": [ + "https://dummy.com/producto-7423", + "https://dummy.com/producto-5198", + "https://dummy.com/producto-6347", + "https://dummy.com/producto-2815", + "https://dummy.com/producto-9584", + "https://dummy.com/producto-3721", + "https://dummy.com/producto-8094", + "https://dummy.com/producto-1538", + "https://dummy.com/producto-6792", + "https://dummy.com/producto-8465", + "https://dummy.com/producto-4937", + "https://dummy.com/producto-2764", + "https://dummy.com/producto-9051", + "https://dummy.com/producto-3816", + "https://dummy.com/producto-7109", + "https://dummy.com/producto-5397", + "https://dummy.com/producto-6283", + "https://dummy.com/producto-8142", + "https://dummy.com/producto-4759", + "https://dummy.com/producto-3628" + ] +} \ No newline at end of file diff --git a/storescraper/bin/dummy-processor.json b/storescraper/bin/dummy-processor.json new file mode 100644 index 000000000..5fcf5dee5 --- /dev/null +++ b/storescraper/bin/dummy-processor.json @@ -0,0 +1,24 @@ +{ + "urls": [ + "https://dummy.com/producto-49372", + "https://dummy.com/producto-28419", + "https://dummy.com/producto-73645", + "https://dummy.com/producto-91284", + "https://dummy.com/producto-35892", + "https://dummy.com/producto-74028", + "https://dummy.com/producto-19637", + "https://dummy.com/producto-28495", + "https://dummy.com/producto-57362", + "https://dummy.com/producto-94871", + "https://dummy.com/producto-26184", + "https://dummy.com/producto-71945", + "https://dummy.com/producto-48632", + "https://dummy.com/producto-35271", + "https://dummy.com/producto-96842", + "https://dummy.com/producto-59173", + "https://dummy.com/producto-28467", + "https://dummy.com/producto-74219", + "https://dummy.com/producto-91738", + "https://dummy.com/producto-63485" + ] + } \ No newline at end of file diff --git a/storescraper/bin/dummy-ram.json b/storescraper/bin/dummy-ram.json new file mode 100644 index 000000000..686c51aab --- /dev/null +++ b/storescraper/bin/dummy-ram.json @@ -0,0 +1,104 @@ +{ + "urls": [ + "https://dummy.com/producto-58234", + "https://dummy.com/producto-71928", + "https://dummy.com/producto-49572", + "https://dummy.com/producto-83619", + "https://dummy.com/producto-27385", + "https://dummy.com/producto-16493", + "https://dummy.com/producto-72948", + "https://dummy.com/producto-58293", + "https://dummy.com/producto-41827", + "https://dummy.com/producto-73915", + "https://dummy.com/producto-64927", + "https://dummy.com/producto-81532", + "https://dummy.com/producto-49678", + "https://dummy.com/producto-23754", + "https://dummy.com/producto-58413", + "https://dummy.com/producto-71934", + "https://dummy.com/producto-39521", + "https://dummy.com/producto-68249", + "https://dummy.com/producto-47385", + "https://dummy.com/producto-81932", + "https://dummy.com/producto-abc123178t", + "https://dummy.com/producto-def456178t", + "https://dummy.com/producto-ghi789178t", + "https://dummy.com/producto-jkl012178t", + "https://dummy.com/producto-mno345178t", + "https://dummy.com/producto-pqr678178t", + "https://dummy.com/producto-stu901178t", + "https://dummy.com/producto-vwx234178t", + "https://dummy.com/producto-yza567178t", + "https://dummy.com/producto-bcd890178t", + "https://dummy.com/producto-efg123178t", + "https://dummy.com/producto-hij456178t", + "https://dummy.com/producto-klm789178t", + "https://dummy.com/producto-nop012178t", + "https://dummy.com/producto-qrst345178t", + "https://dummy.com/producto-uvw678178t", + "https://dummy.com/producto-xyz901178t", + "https://dummy.com/producto-abc234178t", + "https://dummy.com/producto-def567178t", + "https://dummy.com/producto-ghi890178t", + "https://dummy.com/producto-jkl123178t", + "https://dummy.com/producto-mno456178t", + "https://dummy.com/producto-pqr789178t", + "https://dummy.com/producto-stu012178t", + "https://dummy.com/producto-vwx345178t", + "https://dummy.com/producto-yza678178t", + "https://dummy.com/producto-bcd901178t", + "https://dummy.com/producto-efg234178t", + "https://dummy.com/producto-hij567178t", + "https://dummy.com/producto-klm890178t", + "https://dummy.com/producto-nop123178t", + "https://dummy.com/producto-qrst456178t", + "https://dummy.com/producto-uvw789178t", + "https://dummy.com/producto-xyz012178t", + "https://dummy.com/producto-abc345178t", + "https://dummy.com/producto-def678178t", + "https://dummy.com/producto-ghi901178t", + "https://dummy.com/producto-jkl234178t", + "https://dummy.com/producto-mno567178t", + "https://dummy.com/producto-pqr890178t", + "https://dummy.com/producto-stu123178t", + "https://dummy.com/producto-vwx456178t", + "https://dummy.com/producto-yza789178t", + "https://dummy.com/producto-bcd012178t", + "https://dummy.com/producto-efg345178t", + "https://dummy.com/producto-hij678178t", + "https://dummy.com/producto-klm901178t", + "https://dummy.com/producto-nop234178t", + "https://dummy.com/producto-qrst567178t", + "https://dummy.com/producto-uvw890178t", + "https://dummy.com/producto-xyz123178t", + "https://dummy.com/producto-abc456178t", + "https://dummy.com/producto-def789178t", + "https://dummy.com/producto-ghi012178t", + "https://dummy.com/producto-jkl345178t", + "https://dummy.com/producto-mno678178t", + "https://dummy.com/producto-pqr901178t", + "https://dummy.com/producto-stu234178t", + "https://dummy.com/producto-vwx567178t", + "https://dummy.com/producto-yza890178t", + "https://dummy.com/producto-bcd123178t", + "https://dummy.com/producto-efg456178t", + "https://dummy.com/producto-hij789178t", + "https://dummy.com/producto-klm012178t", + "https://dummy.com/producto-nop345178t", + "https://dummy.com/producto-qrst678178t", + "https://dummy.com/producto-uvw901178t", + "https://dummy.com/producto-xyz234178t", + "https://dummy.com/producto-abc567178t", + "https://dummy.com/producto-def890178t", + "https://dummy.com/producto-ghi123178t", + "https://dummy.com/producto-jkl456178t", + "https://dummy.com/producto-mno789178t", + "https://dummy.com/producto-pqr012178t", + "https://dummy.com/producto-stu345178t", + "https://dummy.com/producto-vwx678178t", + "https://dummy.com/producto-yza901178t", + "https://dummy.com/producto-bcd234178t", + "https://dummy.com/producto-efg567178t", + "https://dummy.com/producto-hij890178t" + ] +} \ No newline at end of file diff --git a/storescraper/bin/dummy-server.py b/storescraper/bin/dummy-server.py new file mode 100644 index 000000000..26041dd9b --- /dev/null +++ b/storescraper/bin/dummy-server.py @@ -0,0 +1,10 @@ +import http.server +import socketserver + +PORT = 9871 + +Handler = http.server.SimpleHTTPRequestHandler + +with socketserver.TCPServer(("", PORT), Handler) as httpd: + print(f"Serving at port {PORT}") + httpd.serve_forever() diff --git a/storescraper/bin/dummy-stove.json b/storescraper/bin/dummy-stove.json new file mode 100644 index 000000000..7ad99a135 --- /dev/null +++ b/storescraper/bin/dummy-stove.json @@ -0,0 +1,25 @@ +{ + "urls": [ + "https://dummy.com/producto-abcdeq", + "https://dummy.com/producto-fghijk", + "https://dummy.com/producto-lmnopq", + "https://dummy.com/producto-rstuvw", + "https://dummy.com/producto-xyzabc", + "https://dummy.com/producto-defghi", + "https://dummy.com/producto-jklmno", + "https://dummy.com/producto-pqrstx", + "https://dummy.com/producto-uvwxyz", + "https://dummy.com/producto-abcdef", + "https://dummy.com/producto-ghijkl", + "https://dummy.com/producto-mnopqr", + "https://dummy.com/producto-stuvwx", + "https://dummy.com/producto-yzabcd", + "https://dummy.com/producto-efghij", + "https://dummy.com/producto-klmnop", + "https://dummy.com/producto-qrstuv", + "https://dummy.com/producto-wxyzab", + "https://dummy.com/producto-cdefgh", + "https://dummy.com/producto-ijklmn" + ] + } + \ No newline at end of file diff --git a/storescraper/store.py b/storescraper/store.py index c2f79afe0..9496c5a80 100644 --- a/storescraper/store.py +++ b/storescraper/store.py @@ -337,8 +337,7 @@ def discover_entries_for_categories( count += 1 url_category_weights[url][category] = 1 entry_positions[url] = [] - print(len(entry_positions), count) - exit() + discovered_entries = {} for url, positions in entry_positions.items(): category, max_weight = max( diff --git a/storescraper/stores/__init__.py b/storescraper/stores/__init__.py index b363348b9..862860c04 100644 --- a/storescraper/stores/__init__.py +++ b/storescraper/stores/__init__.py @@ -66,6 +66,7 @@ from .divino import Divino from .dl_phone import DLPhone from .dreamtec import Dreamtec +from .dummy import Dummy from .dust2 import Dust2 from .e_forest import EForest from .e_vision import EVision diff --git a/storescraper/stores/dummy.py b/storescraper/stores/dummy.py new file mode 100644 index 000000000..53810950a --- /dev/null +++ b/storescraper/stores/dummy.py @@ -0,0 +1,84 @@ +import json +import requests +from decimal import Decimal +from storescraper.categories import ( + PRINTER, + OVEN, + MICROPHONE, + MONITOR, + MOTHERBOARD, + MOUSE, + KEYBOARD, + PROCESSOR, + CELL, + STOVE, + GROCERIES, + RAM, +) +from storescraper.product import Product +from storescraper.store_with_url_extensions import StoreWithUrlExtensions +from storescraper.utils import session_with_proxy +import time + + +class Dummy(StoreWithUrlExtensions): + url_extensions = [ + ["dummy-printer", PRINTER], + ["dummy-oven", OVEN], + ["dummy-microphone", MICROPHONE], + ["dummy-monitor", MONITOR], + ["dummy-motherboard", MOTHERBOARD], + ["dummy-mouse", MOUSE], + ["dummy-keyboard", KEYBOARD], + ["dummy-processor", PROCESSOR], + ["dummy-cell", CELL], + ["dummy-stove", STOVE], + ["dummy-groceries", GROCERIES], + ["dummy-ram", RAM], + ] + + @classmethod + def discover_urls_for_url_extension(cls, url_extension, extra_args): + print(url_extension) + time.sleep(1) + data = json.loads( + requests.get(f"http://localhost:9871/{url_extension}.json").text + ) + + product_urls = [] + print(len(set(data["urls"]))) + if len(set(data["urls"])) != len(data["urls"]): + exit() + + for url in data["urls"]: + product_urls.append(url) + + return product_urls + + @classmethod + def products_for_url(cls, url, category=None, extra_args=None): + print(url) + time.sleep(1) + + data = json.loads( + requests.get(f"http://localhost:9871/dummy-{category.lower()}.json").text + ) + + for link in data["urls"]: + if url == link: + return [ + Product( + f"Dummy {category} {url}", + cls.__name__, + category, + url, + url, + "123", + -1, + Decimal("100000"), + Decimal("90000"), + "CLP", + ) + ] + + return [] From 8a738bc317bd7342897667995a45e79f28e7689c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20D=C3=ADaz=20Toro?= Date: Thu, 8 Aug 2024 15:56:29 -0400 Subject: [PATCH 05/17] fix ES query --- storescraper/store.py | 55 ++++++++++++++++++++++-------------- storescraper/stores/dummy.py | 5 ++-- 2 files changed, 36 insertions(+), 24 deletions(-) diff --git a/storescraper/store.py b/storescraper/store.py index 9496c5a80..6611ee4d7 100644 --- a/storescraper/store.py +++ b/storescraper/store.py @@ -142,7 +142,7 @@ def discover_entries_for_categories_non_blocker( categories=None, extra_args=None, discover_urls_concurrency=None, - scrape_products=True, + scrape_products=False, ): sanitized_parameters = cls.sanitize_parameters( categories=categories, discover_urls_concurrency=discover_urls_concurrency @@ -193,7 +193,7 @@ def discover_entries_for_categories_non_blocker( chain(cls.nothing.si().set(queue="storescraper"), *task_group).apply_async() - cls.fetch_es_url_logs(process_id) + cls.fetch_es_url_logs(process_id, count_by_category=True) @shared_task() def nothing(): @@ -211,26 +211,31 @@ def fetch_redis_data(cls, process_id): time.sleep(5) @classmethod - def fetch_es_url_logs(cls, process_id, include_category_counts=False): + def fetch_es_url_logs(cls, process_id, count_by_category=False): products_count = 0 urls_count = 0 + previous_products_count = -1 + previous_urls_count = -1 categories = {} - current_time = datetime.now(timezone.utc) - timestamp = None + timestamp = datetime.now(timezone.utc).isoformat() + query = { + "size": 1000, + "query": { + "bool": { + "must": [ + {"term": {"@fields.process_id.keyword": None}}, + {"range": {"@timestamp": {"gt": None}}}, + ] + } + }, + "sort": [{"@timestamp": {"order": "asc"}}], + } while True: - query = { - "size": 1000, - "query": { - "bool": { - "must": [ - {"match": {"@fields.process_id": process_id}}, - {"range": {"@timestamp": {"gt": timestamp}}}, - ] - } - }, - "sort": [{"@timestamp": {"order": "asc"}}], - } + query["query"]["bool"]["must"][0]["term"][ + "@fields.process_id.keyword" + ] = process_id + query["query"]["bool"]["must"][1]["range"]["@timestamp"]["gt"] = timestamp response = ES.search(index="logs-test", body=query) @@ -240,7 +245,7 @@ def fetch_es_url_logs(cls, process_id, include_category_counts=False): elif "url" in hit["_source"]["@fields"]: urls_count += 1 - if include_category_counts: + if count_by_category: category = hit["_source"]["@fields"]["category"] categories[category] = categories.get(category, 0) + 1 @@ -249,7 +254,7 @@ def fetch_es_url_logs(cls, process_id, include_category_counts=False): f"\n\n{cls.__name__} process {process_id} finished with {urls_count} URLs" ) - if include_category_counts: + if count_by_category: print("-" * 80) for category in categories: @@ -261,9 +266,17 @@ def fetch_es_url_logs(cls, process_id, include_category_counts=False): timestamp = hit["_source"]["@timestamp"] - print(f"Discovered URLs: {urls_count} | Scraped products: {products_count}") + if ( + urls_count != previous_urls_count + or products_count != previous_products_count + ): + print( + f"Discovered URLs: {urls_count} | Scraped products: {products_count}" + ) + previous_urls_count = urls_count + previous_products_count = products_count - time.sleep(10) + time.sleep(3) @classmethod def discover_entries_for_categories( diff --git a/storescraper/stores/dummy.py b/storescraper/stores/dummy.py index 53810950a..ea4f03dc9 100644 --- a/storescraper/stores/dummy.py +++ b/storescraper/stores/dummy.py @@ -41,12 +41,12 @@ class Dummy(StoreWithUrlExtensions): def discover_urls_for_url_extension(cls, url_extension, extra_args): print(url_extension) time.sleep(1) + data = json.loads( requests.get(f"http://localhost:9871/{url_extension}.json").text ) - product_urls = [] - print(len(set(data["urls"]))) + if len(set(data["urls"])) != len(data["urls"]): exit() @@ -58,7 +58,6 @@ def discover_urls_for_url_extension(cls, url_extension, extra_args): @classmethod def products_for_url(cls, url, category=None, extra_args=None): print(url) - time.sleep(1) data = json.loads( requests.get(f"http://localhost:9871/dummy-{category.lower()}.json").text From 0b3541eb536a35507221e018089c89740dca7920 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20D=C3=ADaz=20Toro?= Date: Thu, 8 Aug 2024 17:21:13 -0400 Subject: [PATCH 06/17] add scrape_products arg and product count summary --- ...over_entries_for_categories_non_blocker.py | 12 +- storescraper/store.py | 115 +++++++++++------- storescraper/stores/dummy.py | 5 +- 3 files changed, 85 insertions(+), 47 deletions(-) diff --git a/storescraper/bin/discover_entries_for_categories_non_blocker.py b/storescraper/bin/discover_entries_for_categories_non_blocker.py index 924aae6a0..42f0d22c6 100644 --- a/storescraper/bin/discover_entries_for_categories_non_blocker.py +++ b/storescraper/bin/discover_entries_for_categories_non_blocker.py @@ -19,6 +19,14 @@ def main(): parser.add_argument( "--categories", type=str, nargs="*", help="Specific categories to be parsed" ) + parser.add_argument( + "--scrape_products", + type=bool, + nargs="?", + default=False, + const=True, + help="Include product scraping", + ) parser.add_argument( "--extra_args", type=json.loads, @@ -32,7 +40,9 @@ def main(): store = get_store_class_by_name(args.store) store.discover_entries_for_categories_non_blocker( - categories=args.categories, extra_args=args.extra_args + categories=args.categories, + scrape_products=args.scrape_products, + extra_args=args.extra_args, ) diff --git a/storescraper/store.py b/storescraper/store.py index 6611ee4d7..c4be7cbd9 100644 --- a/storescraper/store.py +++ b/storescraper/store.py @@ -45,6 +45,7 @@ class Store: preferred_discover_urls_concurrency = 3 preferred_products_for_url_concurrency = 10 prefer_async = True + scrape_products = True ########################################################################## # API methods @@ -142,12 +143,15 @@ def discover_entries_for_categories_non_blocker( categories=None, extra_args=None, discover_urls_concurrency=None, - scrape_products=False, + scrape_products=None, ): sanitized_parameters = cls.sanitize_parameters( - categories=categories, discover_urls_concurrency=discover_urls_concurrency + categories=categories, + discover_urls_concurrency=discover_urls_concurrency, + scrape_products=scrape_products, ) + scrape_products = sanitized_parameters["scrape_products"] categories = sanitized_parameters["categories"] discover_urls_concurrency = sanitized_parameters["discover_urls_concurrency"] extra_args = cls._extra_args_with_preflight(extra_args) @@ -186,38 +190,29 @@ def discover_entries_for_categories_non_blocker( cls.finish_process.si(cls.__name__, process_id).set(queue="storescraper") ) - # for i in range(len(task_group) - 1): - # task_group[i].link(task_group[i + 1]) + for i in range(len(task_group) - 1): + task_group[i].link(task_group[i + 1]) - # task_group[0].apply_async() + task_group[0].apply_async() - chain(cls.nothing.si().set(queue="storescraper"), *task_group).apply_async() + # chain(cls.nothing.si().set(queue="storescraper"), *task_group).apply_async() - cls.fetch_es_url_logs(process_id, count_by_category=True) + cls.fetch_es_url_logs(process_id, scrape_products=scrape_products) @shared_task() def nothing(): pass - def fetch_redis_data(cls, process_id): - while True: - url_count = redis_client.llen(f"{process_id}_urls") - print(f"Discovered URLs: {url_count}") - - if redis_client.exists(f"{process_id}_urls_finished"): - print(f"Process {process_id} finished: {url_count} URLs discovered") - return - - time.sleep(5) - @classmethod - def fetch_es_url_logs(cls, process_id, count_by_category=False): - products_count = 0 - urls_count = 0 - previous_products_count = -1 - previous_urls_count = -1 + def fetch_es_url_logs( + cls, process_id, scrape_products=False, count_by_category=False + ): + with_errors = availables = unavailables = products_count = urls_count = 0 + previous_products_count = previous_urls_count = -1 categories = {} timestamp = datetime.now(timezone.utc).isoformat() + finished_flag = False + query = { "size": 1000, "query": { @@ -240,41 +235,69 @@ def fetch_es_url_logs(cls, process_id, count_by_category=False): response = ES.search(index="logs-test", body=query) for hit in response["hits"]["hits"]: - if "product" in hit["_source"]["@fields"]: + source_fields = hit["_source"]["@fields"] + + if scrape_products and "product" in source_fields: products_count += 1 - elif "url" in hit["_source"]["@fields"]: + + if not source_fields["product"]: + with_errors += 1 + else: + for product in source_fields["product"]: + if product["stock"] == 0: + unavailables += 1 + else: + availables += 1 + + elif "url" in source_fields: urls_count += 1 if count_by_category: - category = hit["_source"]["@fields"]["category"] + category = source_fields["category"] categories[category] = categories.get(category, 0) + 1 if hit["_source"]["@message"] == f"{cls.__name__}: process finished": - print( - f"\n\n{cls.__name__} process {process_id} finished with {urls_count} URLs" - ) + finished_flag = True - if count_by_category: - print("-" * 80) + timestamp = hit["_source"]["@timestamp"] - for category in categories: - print(f"{category}: {categories[category]}") + if ( + finished_flag + and previous_urls_count == urls_count + and (not scrape_products or previous_products_count == products_count) + ): + print( + f"\n{cls.__name__} process {process_id} finished with {urls_count} URLs" + ) - print("\n") + if count_by_category: + print("-" * 80) + for category, count in categories.items(): + print(f"{category}: {count}") + print("\n") - return + if scrape_products: + print(f"\nAvailable: {availables}") + print(f"Unavailable: {unavailables}") + print(f"With error: {with_errors}") + print(f"Total: {products_count}") - timestamp = hit["_source"]["@timestamp"] + return - if ( - urls_count != previous_urls_count - or products_count != previous_products_count + if urls_count != previous_urls_count or ( + scrape_products and products_count != previous_products_count ): print( - f"Discovered URLs: {urls_count} | Scraped products: {products_count}" + f"Discovered URLs: {urls_count}" + + ( + f" | Scraped products: {products_count}" + if scrape_products + else "" + ) ) - previous_urls_count = urls_count - previous_products_count = products_count + + previous_urls_count = urls_count + previous_products_count = products_count time.sleep(3) @@ -474,7 +497,6 @@ def products_for_urls( @shared_task() def finish_process(store, process_id): time.sleep(10) - redis_client.rpush(f"{process_id}_urls_finished", "finished") logstash_logger.info( f"{store}: process finished", extra={"process_id": process_id}, @@ -520,12 +542,10 @@ def discover_entries_for_category_task(store_class_name, category, extra_args=No @shared_task(autoretry_for=(StoreScrapError,), max_retries=5, default_retry_delay=5) def products_for_url_task(store_class_name, url, category=None, extra_args=None): store = get_store_class_by_name(store_class_name) - """ logger.info("Obtaining products for URL") logger.info("Store: " + store.__name__) logger.info("Category: {}".format(category)) logger.info("URL: " + url) - """ try: raw_products = store.products_for_url(url, category, extra_args) @@ -678,6 +698,7 @@ def sanitize_parameters( discover_urls_concurrency=None, products_for_url_concurrency=None, use_async=None, + scrape_products=None, ): if categories is None: categories = cls.categories() @@ -695,11 +716,15 @@ def sanitize_parameters( if use_async is None: use_async = cls.prefer_async + if scrape_products is None: + scrape_products = cls.scrape_products + return { "categories": categories, "discover_urls_concurrency": discover_urls_concurrency, "products_for_url_concurrency": products_for_url_concurrency, "use_async": use_async, + "scrape_products": scrape_products, } ###################################################################### diff --git a/storescraper/stores/dummy.py b/storescraper/stores/dummy.py index ea4f03dc9..10c461f7b 100644 --- a/storescraper/stores/dummy.py +++ b/storescraper/stores/dummy.py @@ -59,6 +59,9 @@ def discover_urls_for_url_extension(cls, url_extension, extra_args): def products_for_url(cls, url, category=None, extra_args=None): print(url) + if category == MOUSE: + return [] + data = json.loads( requests.get(f"http://localhost:9871/dummy-{category.lower()}.json").text ) @@ -73,7 +76,7 @@ def products_for_url(cls, url, category=None, extra_args=None): url, url, "123", - -1, + 0 if category == MOTHERBOARD else -1, Decimal("100000"), Decimal("90000"), "CLP", From bb8102bf6f7d0f33a792a36c8e320a24f10b89d5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20D=C3=ADaz=20Toro?= Date: Fri, 9 Aug 2024 17:45:01 -0400 Subject: [PATCH 07/17] update --- storescraper/store.py | 35 +++++++++++++++++++---------------- 1 file changed, 19 insertions(+), 16 deletions(-) diff --git a/storescraper/store.py b/storescraper/store.py index c4be7cbd9..8c537e836 100644 --- a/storescraper/store.py +++ b/storescraper/store.py @@ -187,15 +187,16 @@ def discover_entries_for_categories_non_blocker( task_group.append(group(*chunk_tasks)) task_group.append( - cls.finish_process.si(cls.__name__, process_id).set(queue="storescraper") + cls.finish_process.si(cls.__name__, process_id).set(queue="storescraper2") ) - for i in range(len(task_group) - 1): + # link failing with high workers concurrency + """for i in range(len(task_group) - 1): task_group[i].link(task_group[i + 1]) - task_group[0].apply_async() + task_group[0].apply_async()""" - # chain(cls.nothing.si().set(queue="storescraper"), *task_group).apply_async() + chain(cls.nothing.si().set(queue="storescraper2"), *task_group).apply_async() cls.fetch_es_url_logs(process_id, scrape_products=scrape_products) @@ -496,7 +497,7 @@ def products_for_urls( @staticmethod @shared_task() def finish_process(store, process_id): - time.sleep(10) + redis_client.delete(f"{process_id}_scraped_urls") logstash_logger.info( f"{store}: process finished", extra={"process_id": process_id}, @@ -525,16 +526,18 @@ def discover_entries_for_category_task(store_class_name, category, extra_args=No raise StoreScrapError(error_message) for url in discovered_entries.keys(): - logger.info(url) - logstash_logger.info( - f"{store.__name__}: Discovered URL", - extra={ - "process_id": process_id or None, - "store": store.__name__, - "category": category, - "url": url, - }, - ) + if not redis_client.sismember(f"{process_id}_scraped_urls", url): + redis_client.sadd(f"{process_id}_scraped_urls", url) + logger.info(url) + logstash_logger.info( + f"{store.__name__}: Discovered URL", + extra={ + "process_id": process_id or None, + "store": store.__name__, + "category": category, + "url": url, + }, + ) return discovered_entries @@ -614,7 +617,7 @@ def products_for_urls_task_non_blocker( store_class_name=store_class_name, category=category, extra_args=extra_args, - ).set(queue="storescraper"), + ).set(queue="storescraper2"), ) chunk_tasks.append(task) From 3460851fbf0f689f144573f537295b3495c35623 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20D=C3=ADaz=20Toro?= Date: Mon, 12 Aug 2024 16:28:16 -0400 Subject: [PATCH 08/17] add positions and weight to logs --- storescraper/store.py | 122 ++++++++++++++++++++++++++++++++---------- 1 file changed, 94 insertions(+), 28 deletions(-) diff --git a/storescraper/store.py b/storescraper/store.py index 8c537e836..3095f5a64 100644 --- a/storescraper/store.py +++ b/storescraper/store.py @@ -1,3 +1,4 @@ +import json import logging import logstash import time @@ -168,7 +169,7 @@ def discover_entries_for_categories_non_blocker( chunk_tasks = [] for category in category_chunk: - task = cls.discover_entries_for_category_task.si( + task = cls.discover_entries_for_category_non_blocker_task.si( cls.__name__, category, extra_args ).set(queue="storescraper") @@ -187,17 +188,16 @@ def discover_entries_for_categories_non_blocker( task_group.append(group(*chunk_tasks)) task_group.append( - cls.finish_process.si(cls.__name__, process_id).set(queue="storescraper2") + cls.finish_process.si(cls.__name__, process_id).set(queue="storescraper") ) # link failing with high workers concurrency - """for i in range(len(task_group) - 1): + for i in range(len(task_group) - 1): task_group[i].link(task_group[i + 1]) - task_group[0].apply_async()""" - - chain(cls.nothing.si().set(queue="storescraper2"), *task_group).apply_async() + task_group[0].apply_async() + # chain(cls.nothing.si().set(queue="storescraper"), *task_group)() cls.fetch_es_url_logs(process_id, scrape_products=scrape_products) @shared_task() @@ -325,7 +325,7 @@ def discover_entries_for_categories( entry_positions = defaultdict(lambda: list()) url_category_weights = defaultdict(lambda: defaultdict(lambda: 0)) extra_args = cls._extra_args_with_preflight(extra_args) - count = 0 + if use_async: category_chunks = chunks(categories, discover_urls_concurrency) @@ -371,9 +371,20 @@ def discover_entries_for_categories( for url, positions in cls.discover_entries_for_category( category, extra_args ).items(): - count += 1 - url_category_weights[url][category] = 1 - entry_positions[url] = [] + logger.info("Discovered URL: {} ({})".format(url, category)) + + if positions: + for pos in positions: + entry_positions[url].append( + (pos["section_name"], pos["value"]) + ) + url_category_weights[url][category] += pos[ + "category_weight" + ] + else: + # Legacy for implementations without position data + url_category_weights[url][category] = 1 + entry_positions[url] = [] discovered_entries = {} for url, positions in entry_positions.items(): @@ -389,12 +400,12 @@ def discover_entries_for_categories( # map generic sections positioning without considering their # products if they don't appear in a specifically mapped # relevant section - - discovered_entries[url] = { - "positions": positions, - "category": category, - "category_weight": 1, - } + if max_weight: + discovered_entries[url] = { + "positions": positions, + "category": category, + "category_weight": max_weight, + } return discovered_entries @@ -510,6 +521,31 @@ def discover_entries_for_category_task(store_class_name, category, extra_args=No logger.info("Discovering URLs") logger.info("Store: " + store.__name__) logger.info("Category: " + category) + try: + discovered_entries = store.discover_entries_for_category( + category, extra_args + ) + except Exception: + error_message = "Error discovering URLs from {}: {} - {}".format( + store_class_name, category, traceback.format_exc() + ) + logger.error(error_message) + raise StoreScrapError(error_message) + + for url in discovered_entries.keys(): + logger.info(url) + + return discovered_entries + + @staticmethod + @shared_task() + def discover_entries_for_category_non_blocker_task( + store_class_name, category, extra_args=None + ): + store = get_store_class_by_name(store_class_name) + logger.info("Discovering URLs") + logger.info("Store: " + store.__name__) + logger.info("Category: " + category) process_id = extra_args.get("process_id", None) logger.info(f"Process ID: {process_id}") @@ -525,19 +561,49 @@ def discover_entries_for_category_task(store_class_name, category, extra_args=No logger.error(error_message) raise StoreScrapError(error_message) - for url in discovered_entries.keys(): + entry_positions = defaultdict(lambda: list()) + url_category_weights = defaultdict(lambda: defaultdict(lambda: 0)) + + for url, positions in discovered_entries.items(): + if positions: + for pos in positions: + entry_positions[url].append((pos["section_name"], pos["value"])) + url_category_weights[url][category] += pos["category_weight"] + else: + url_category_weights[url][category] = 1 + entry_positions[url] = [] + + discovered_entries = {} + + for url, positions in entry_positions.items(): + category, max_weight = max( + url_category_weights[url].items(), + key=lambda x: x[1], + ) + + if max_weight: + discovered_entries[url] = { + "positions": positions, + "category": category, + "category_weight": max_weight, + } + print(discovered_entries) + + for url, data in discovered_entries.items(): if not redis_client.sismember(f"{process_id}_scraped_urls", url): redis_client.sadd(f"{process_id}_scraped_urls", url) - logger.info(url) - logstash_logger.info( - f"{store.__name__}: Discovered URL", - extra={ - "process_id": process_id or None, - "store": store.__name__, - "category": category, - "url": url, - }, - ) + logger.info(url) + logstash_logger.info( + f"{store.__name__}: Discovered URL", + extra={ + "process_id": process_id or None, + "store": store.__name__, + "category": category, + "url": url, + "positions": json.dumps(data["positions"]), + "category_weight": data["category_weight"], + }, + ) return discovered_entries @@ -617,7 +683,7 @@ def products_for_urls_task_non_blocker( store_class_name=store_class_name, category=category, extra_args=extra_args, - ).set(queue="storescraper2"), + ).set(queue="storescraper"), ) chunk_tasks.append(task) From 6995c66fe7701e2cd3311dbc7319dcf6ba0bb943 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20D=C3=ADaz=20Toro?= Date: Tue, 13 Aug 2024 09:43:00 -0400 Subject: [PATCH 09/17] add products_non_blocker.py --- .gitignore | 3 +- ...over_entries_for_categories_non_blocker.py | 9 -- storescraper/bin/products_non_blocker.py | 45 ++++++ storescraper/store.py | 149 ++++++++---------- 4 files changed, 110 insertions(+), 96 deletions(-) create mode 100644 storescraper/bin/products_non_blocker.py diff --git a/.gitignore b/.gitignore index 273444399..9224be523 100644 --- a/.gitignore +++ b/.gitignore @@ -2,4 +2,5 @@ env **/__pycache__ .idea -*.crt \ No newline at end of file +*.crt +.DS_Store \ No newline at end of file diff --git a/storescraper/bin/discover_entries_for_categories_non_blocker.py b/storescraper/bin/discover_entries_for_categories_non_blocker.py index 42f0d22c6..9f24bef64 100644 --- a/storescraper/bin/discover_entries_for_categories_non_blocker.py +++ b/storescraper/bin/discover_entries_for_categories_non_blocker.py @@ -19,14 +19,6 @@ def main(): parser.add_argument( "--categories", type=str, nargs="*", help="Specific categories to be parsed" ) - parser.add_argument( - "--scrape_products", - type=bool, - nargs="?", - default=False, - const=True, - help="Include product scraping", - ) parser.add_argument( "--extra_args", type=json.loads, @@ -41,7 +33,6 @@ def main(): store.discover_entries_for_categories_non_blocker( categories=args.categories, - scrape_products=args.scrape_products, extra_args=args.extra_args, ) diff --git a/storescraper/bin/products_non_blocker.py b/storescraper/bin/products_non_blocker.py new file mode 100644 index 000000000..df4c6a496 --- /dev/null +++ b/storescraper/bin/products_non_blocker.py @@ -0,0 +1,45 @@ +import argparse +import json +import logging +import sys + +sys.path.append("../..") + +from storescraper.utils import get_store_class_by_name # noqa + + +def main(): + logging.basicConfig( + level=logging.INFO, + format="%(asctime)s %(levelname)s %(message)s", + filename="products_non_blocker.log", + filemode="w", + ) + + parser = argparse.ArgumentParser( + description="Retrieves the products of the given store." + ) + + parser.add_argument("store", type=str, help="The name of the store to be parsed") + + parser.add_argument( + "--categories", type=str, nargs="*", help="Specific categories to be parsed" + ) + + parser.add_argument( + "--extra_args", + type=json.loads, + nargs="?", + default={}, + help="Optional arguments to pass to the parser " + "(usually username/password) for private sites)", + ) + + args = parser.parse_args() + store = get_store_class_by_name(args.store) + + store.products_non_blocker(categories=args.categories, extra_args=args.extra_args) + + +if __name__ == "__main__": + main() diff --git a/storescraper/store.py b/storescraper/store.py index 3095f5a64..ce6928467 100644 --- a/storescraper/store.py +++ b/storescraper/store.py @@ -46,7 +46,6 @@ class Store: preferred_discover_urls_concurrency = 3 preferred_products_for_url_concurrency = 10 prefer_async = True - scrape_products = True ########################################################################## # API methods @@ -94,6 +93,39 @@ def products( use_async=use_async, ) + @classmethod + def products_non_blocker( + cls, + categories=None, + extra_args=None, + discover_urls_concurrency=None, + products_for_url_concurrency=None, + ): + sanitized_parameters = cls.sanitize_parameters( + categories=categories, + discover_urls_concurrency=discover_urls_concurrency, + products_for_url_concurrency=products_for_url_concurrency, + ) + + categories = sanitized_parameters["categories"] + discover_urls_concurrency = sanitized_parameters["discover_urls_concurrency"] + products_for_url_concurrency = sanitized_parameters[ + "products_for_url_concurrency" + ] + + extra_args = cls._extra_args_with_preflight(extra_args) + process_id = str(uuid.uuid4()) + extra_args["process_id"] = process_id + + print(f"Starting process: {process_id}") + + cls.discover_entries_for_categories_non_blocker( + categories=categories, + extra_args=extra_args, + discover_urls_concurrency=discover_urls_concurrency, + callback=cls.products_for_urls_non_blocker_task, + ) + @classmethod def products_for_keyword( cls, @@ -144,24 +176,24 @@ def discover_entries_for_categories_non_blocker( categories=None, extra_args=None, discover_urls_concurrency=None, - scrape_products=None, + callback=None, ): sanitized_parameters = cls.sanitize_parameters( categories=categories, discover_urls_concurrency=discover_urls_concurrency, - scrape_products=scrape_products, ) - scrape_products = sanitized_parameters["scrape_products"] categories = sanitized_parameters["categories"] discover_urls_concurrency = sanitized_parameters["discover_urls_concurrency"] - extra_args = cls._extra_args_with_preflight(extra_args) - process_id = str(uuid.uuid4()) - print(f"Process ID: {process_id}") + if not "process_id" in extra_args: + process_id = str(uuid.uuid4()) + extra_args["process_id"] = process_id + print(f"Process ID: {process_id}") + + process_id = extra_args["process_id"] print(f"Discovering URLs for: {cls.__name__}") - extra_args["process_id"] = process_id category_chunks = chunks(categories, discover_urls_concurrency) task_group = [] @@ -173,10 +205,10 @@ def discover_entries_for_categories_non_blocker( cls.__name__, category, extra_args ).set(queue="storescraper") - if scrape_products: + if callback: task = chain( task, - cls.products_for_urls_task_non_blocker.s( + callback.s( store_class_name=cls.__name__, category=category, extra_args=extra_args, @@ -187,18 +219,8 @@ def discover_entries_for_categories_non_blocker( task_group.append(group(*chunk_tasks)) - task_group.append( - cls.finish_process.si(cls.__name__, process_id).set(queue="storescraper") - ) - - # link failing with high workers concurrency - for i in range(len(task_group) - 1): - task_group[i].link(task_group[i + 1]) - - task_group[0].apply_async() - - # chain(cls.nothing.si().set(queue="storescraper"), *task_group)() - cls.fetch_es_url_logs(process_id, scrape_products=scrape_products) + chain(cls.nothing.si().set(queue="storescraper"), *task_group)() + cls.fetch_es_url_logs(process_id) @shared_task() def nothing(): @@ -206,12 +228,10 @@ def nothing(): @classmethod def fetch_es_url_logs( - cls, process_id, scrape_products=False, count_by_category=False + cls, process_id, scrape_products=True, count_by_category=False ): with_errors = availables = unavailables = products_count = urls_count = 0 - previous_products_count = previous_urls_count = -1 categories = {} - timestamp = datetime.now(timezone.utc).isoformat() finished_flag = False query = { @@ -220,7 +240,6 @@ def fetch_es_url_logs( "bool": { "must": [ {"term": {"@fields.process_id.keyword": None}}, - {"range": {"@timestamp": {"gt": None}}}, ] } }, @@ -231,7 +250,6 @@ def fetch_es_url_logs( query["query"]["bool"]["must"][0]["term"][ "@fields.process_id.keyword" ] = process_id - query["query"]["bool"]["must"][1]["range"]["@timestamp"]["gt"] = timestamp response = ES.search(index="logs-test", body=query) @@ -260,29 +278,6 @@ def fetch_es_url_logs( if hit["_source"]["@message"] == f"{cls.__name__}: process finished": finished_flag = True - timestamp = hit["_source"]["@timestamp"] - - if ( - finished_flag - and previous_urls_count == urls_count - and (not scrape_products or previous_products_count == products_count) - ): - print( - f"\n{cls.__name__} process {process_id} finished with {urls_count} URLs" - ) - - if count_by_category: - print("-" * 80) - for category, count in categories.items(): - print(f"{category}: {count}") - print("\n") - - if scrape_products: - print(f"\nAvailable: {availables}") - print(f"Unavailable: {unavailables}") - print(f"With error: {with_errors}") - print(f"Total: {products_count}") - return if urls_count != previous_urls_count or ( @@ -505,15 +500,6 @@ def products_for_urls( # Celery tasks wrappers ########################################################################## - @staticmethod - @shared_task() - def finish_process(store, process_id): - redis_client.delete(f"{process_id}_scraped_urls") - logstash_logger.info( - f"{store}: process finished", - extra={"process_id": process_id}, - ) - @staticmethod @shared_task() def discover_entries_for_category_task(store_class_name, category, extra_args=None): @@ -543,9 +529,10 @@ def discover_entries_for_category_non_blocker_task( store_class_name, category, extra_args=None ): store = get_store_class_by_name(store_class_name) + store_name = store.__name__ logger.info("Discovering URLs") - logger.info("Store: " + store.__name__) - logger.info("Category: " + category) + logger.info(f"Store: {store_name}") + logger.info(f"Category: {category}") process_id = extra_args.get("process_id", None) logger.info(f"Process ID: {process_id}") @@ -587,23 +574,18 @@ def discover_entries_for_category_non_blocker_task( "category": category, "category_weight": max_weight, } - print(discovered_entries) - - for url, data in discovered_entries.items(): - if not redis_client.sismember(f"{process_id}_scraped_urls", url): - redis_client.sadd(f"{process_id}_scraped_urls", url) - logger.info(url) - logstash_logger.info( - f"{store.__name__}: Discovered URL", - extra={ - "process_id": process_id or None, - "store": store.__name__, - "category": category, - "url": url, - "positions": json.dumps(data["positions"]), - "category_weight": data["category_weight"], - }, - ) + logger.info(url) + logstash_logger.info( + f"{store_name}: Discovered URL", + extra={ + "process_id": process_id or None, + "store": store_name, + "category": category, + "url": url, + "positions": json.dumps("positions"), + "category_weight": max_weight, + }, + ) return discovered_entries @@ -627,8 +609,8 @@ def products_for_url_task(store_class_name, url, category=None, extra_args=None) serialized_products = [p.serialize() for p in raw_products] - # for idx, product in enumerate(serialized_products): - # logger.info("{} - {}".format(idx, product)) + for idx, product in enumerate(serialized_products): + logger.info("{} - {}".format(idx, product)) return serialized_products @@ -660,7 +642,7 @@ def products_for_urls_task( @staticmethod @shared_task(autoretry_for=(StoreScrapError,), max_retries=5, default_retry_delay=5) - def products_for_urls_task_non_blocker( + def products_for_urls_non_blocker_task( discovered_entries, store_class_name, category, @@ -767,7 +749,6 @@ def sanitize_parameters( discover_urls_concurrency=None, products_for_url_concurrency=None, use_async=None, - scrape_products=None, ): if categories is None: categories = cls.categories() @@ -785,15 +766,11 @@ def sanitize_parameters( if use_async is None: use_async = cls.prefer_async - if scrape_products is None: - scrape_products = cls.scrape_products - return { "categories": categories, "discover_urls_concurrency": discover_urls_concurrency, "products_for_url_concurrency": products_for_url_concurrency, "use_async": use_async, - "scrape_products": scrape_products, } ###################################################################### From 94c2824452b3b32f90794a8bdbfbc85cd91c6af5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20D=C3=ADaz=20Toro?= Date: Wed, 14 Aug 2024 10:12:45 -0400 Subject: [PATCH 10/17] change ES query strategy --- storescraper/store.py | 57 +++++++++++++++++++++++-------------------- 1 file changed, 31 insertions(+), 26 deletions(-) diff --git a/storescraper/store.py b/storescraper/store.py index ce6928467..49c9af888 100644 --- a/storescraper/store.py +++ b/storescraper/store.py @@ -219,6 +219,10 @@ def discover_entries_for_categories_non_blocker( task_group.append(group(*chunk_tasks)) + task_group.append( + cls.finish_process.si(cls.__name__, process_id).set(queue="storescraper2") + ) + chain(cls.nothing.si().set(queue="storescraper"), *task_group)() cls.fetch_es_url_logs(process_id) @@ -226,37 +230,45 @@ def discover_entries_for_categories_non_blocker( def nothing(): pass + @shared_task + def finish_process(store, process_id): + logstash_logger.info( + f"{store}: process finished", + extra={"process_id": process_id}, + ) + @classmethod def fetch_es_url_logs( - cls, process_id, scrape_products=True, count_by_category=False + cls, + process_id, ): - with_errors = availables = unavailables = products_count = urls_count = 0 - categories = {} finished_flag = False - + current_timestamp = datetime.utcnow().isoformat() + "Z" query = { - "size": 1000, + "size": 10000, "query": { "bool": { "must": [ - {"term": {"@fields.process_id.keyword": None}}, + {"term": {"@fields.process_id.keyword": process_id}}, + {"range": {"@timestamp": {"gte": current_timestamp}}}, ] } }, "sort": [{"@timestamp": {"order": "asc"}}], } + previous_urls_count = previous_products_count = 0 while True: - query["query"]["bool"]["must"][0]["term"][ - "@fields.process_id.keyword" - ] = process_id - response = ES.search(index="logs-test", body=query) + with_errors = availables = unavailables = products_count = urls_count = 0 for hit in response["hits"]["hits"]: source_fields = hit["_source"]["@fields"] - if scrape_products and "product" in source_fields: + if hit["_source"]["@message"] == f"{cls.__name__}: process finished": + finished_flag = True + + if "product" in source_fields: products_count += 1 if not source_fields["product"]: @@ -271,30 +283,23 @@ def fetch_es_url_logs( elif "url" in source_fields: urls_count += 1 - if count_by_category: - category = source_fields["category"] - categories[category] = categories.get(category, 0) + 1 - if hit["_source"]["@message"] == f"{cls.__name__}: process finished": - finished_flag = True - - return + return - if urls_count != previous_urls_count or ( - scrape_products and products_count != previous_products_count + if ( + urls_count != previous_urls_count + or products_count != previous_products_count ): print( - f"Discovered URLs: {urls_count}" - + ( - f" | Scraped products: {products_count}" - if scrape_products - else "" - ) + f"Discovered URLs: {urls_count} | Scraped products: {products_count} | Available: {availables} | Unavailable: {unavailables} | Errors: {with_errors}" ) previous_urls_count = urls_count previous_products_count = products_count + if finished_flag: + break + time.sleep(3) @classmethod From a32d60cea0bc3affd788e9938c4d7f46410fe920 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20D=C3=ADaz=20Toro?= Date: Wed, 14 Aug 2024 18:01:08 -0400 Subject: [PATCH 11/17] change ES search logic --- storescraper/store.py | 71 ++++++++++++++++++++++--------------------- 1 file changed, 37 insertions(+), 34 deletions(-) diff --git a/storescraper/store.py b/storescraper/store.py index 49c9af888..c944fbb60 100644 --- a/storescraper/store.py +++ b/storescraper/store.py @@ -14,7 +14,7 @@ from datetime import datetime, timezone from .product import Product -from .utils import get_store_class_by_name, chunks +from .utils import get_store_class_by_name, chunks, get_timestamp_with_nanoseconds import redis logger = get_task_logger(__name__) @@ -192,7 +192,6 @@ def discover_entries_for_categories_non_blocker( print(f"Process ID: {process_id}") process_id = extra_args["process_id"] - print(f"Discovering URLs for: {cls.__name__}") category_chunks = chunks(categories, discover_urls_concurrency) task_group = [] @@ -219,10 +218,6 @@ def discover_entries_for_categories_non_blocker( task_group.append(group(*chunk_tasks)) - task_group.append( - cls.finish_process.si(cls.__name__, process_id).set(queue="storescraper2") - ) - chain(cls.nothing.si().set(queue="storescraper"), *task_group)() cls.fetch_es_url_logs(process_id) @@ -234,7 +229,11 @@ def nothing(): def finish_process(store, process_id): logstash_logger.info( f"{store}: process finished", - extra={"process_id": process_id}, + extra={ + "log_id": str(uuid.uuid4()), + "process_id": process_id, + "created_at": get_timestamp_with_nanoseconds(), + }, ) @classmethod @@ -242,39 +241,40 @@ def fetch_es_url_logs( cls, process_id, ): - finished_flag = False - current_timestamp = datetime.utcnow().isoformat() + "Z" + scraped = set() query = { "size": 10000, "query": { "bool": { "must": [ {"term": {"@fields.process_id.keyword": process_id}}, - {"range": {"@timestamp": {"gte": current_timestamp}}}, ] } }, - "sort": [{"@timestamp": {"order": "asc"}}], + "sort": [{"@fields.created_at": "desc"}], } - previous_urls_count = previous_products_count = 0 + urls_count = products_count = previous_urls_count = previous_products_count = ( + with_errors + ) = availables = unavailables = 0 while True: response = ES.search(index="logs-test", body=query) - with_errors = availables = unavailables = products_count = urls_count = 0 + hits = response["hits"]["hits"] - for hit in response["hits"]["hits"]: - source_fields = hit["_source"]["@fields"] + for hit in hits: - if hit["_source"]["@message"] == f"{cls.__name__}: process finished": - finished_flag = True + if hit["_id"] in scraped: + continue - if "product" in source_fields: - products_count += 1 + scraped.add(hit["_id"]) + source_fields = hit["_source"]["@fields"] + if "product" in source_fields: if not source_fields["product"]: with_errors += 1 else: for product in source_fields["product"]: + products_count += 1 if product["stock"] == 0: unavailables += 1 else: @@ -283,9 +283,6 @@ def fetch_es_url_logs( elif "url" in source_fields: urls_count += 1 - if hit["_source"]["@message"] == f"{cls.__name__}: process finished": - return - if ( urls_count != previous_urls_count or products_count != previous_products_count @@ -297,10 +294,7 @@ def fetch_es_url_logs( previous_urls_count = urls_count previous_products_count = products_count - if finished_flag: - break - - time.sleep(3) + time.sleep(1) @classmethod def discover_entries_for_categories( @@ -557,13 +551,16 @@ def discover_entries_for_category_non_blocker_task( url_category_weights = defaultdict(lambda: defaultdict(lambda: 0)) for url, positions in discovered_entries.items(): - if positions: - for pos in positions: - entry_positions[url].append((pos["section_name"], pos["value"])) - url_category_weights[url][category] += pos["category_weight"] - else: - url_category_weights[url][category] = 1 - entry_positions[url] = [] + # prevent duplicated URLs, allow to validate numbers for debugging + if not redis_client.sismember(f"{process_id}_urls", url): + redis_client.sadd(f"{process_id}_urls", url) + if positions: + for pos in positions: + entry_positions[url].append((pos["section_name"], pos["value"])) + url_category_weights[url][category] += pos["category_weight"] + else: + url_category_weights[url][category] = 1 + entry_positions[url] = [] discovered_entries = {} @@ -581,14 +578,18 @@ def discover_entries_for_category_non_blocker_task( } logger.info(url) logstash_logger.info( - f"{store_name}: Discovered URL", + { + "message": f"{store_name}: Discovered URL", + }, extra={ + "log_id": str(uuid.uuid4()), "process_id": process_id or None, "store": store_name, "category": category, "url": url, "positions": json.dumps("positions"), "category_weight": max_weight, + "created_at": get_timestamp_with_nanoseconds(), }, ) @@ -683,10 +684,12 @@ def log_product(product, store_class_name, category, extra_args): logstash_logger.info( f"{store_class_name}: Discovered product", extra={ + "log_id": str(uuid.uuid4()), "process_id": extra_args["process_id"], "store": store_class_name, "category": category, "product": product, + "created_at": get_timestamp_with_nanoseconds(), }, ) From 317a694566c67f3c54c830225f19fafb94a857ec Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20D=C3=ADaz=20Toro?= Date: Mon, 19 Aug 2024 10:10:39 -0400 Subject: [PATCH 12/17] Change logs count strategy --- storescraper/store.py | 39 +++++++++++++++++++++++++++++++++------ 1 file changed, 33 insertions(+), 6 deletions(-) diff --git a/storescraper/store.py b/storescraper/store.py index c944fbb60..0c3781dd0 100644 --- a/storescraper/store.py +++ b/storescraper/store.py @@ -192,14 +192,15 @@ def discover_entries_for_categories_non_blocker( print(f"Process ID: {process_id}") process_id = extra_args["process_id"] - - category_chunks = chunks(categories, discover_urls_concurrency) + category_chunks = chunks(categories, 1) task_group = [] for category_chunk in category_chunks: + print(category_chunk) chunk_tasks = [] for category in category_chunk: + print(category) task = cls.discover_entries_for_category_non_blocker_task.si( cls.__name__, category, extra_args ).set(queue="storescraper") @@ -218,7 +219,11 @@ def discover_entries_for_categories_non_blocker( task_group.append(group(*chunk_tasks)) - chain(cls.nothing.si().set(queue="storescraper"), *task_group)() + task_group.append( + cls.finish_process.si(cls.__name__, process_id).set(queue="storescraper") + ) + print(task_group) + chain(cls.nothing.si().set(queue="storescraper"), *task_group).apply_async() cls.fetch_es_url_logs(process_id) @shared_task() @@ -241,6 +246,9 @@ def fetch_es_url_logs( cls, process_id, ): + finished_flag = False + loop_wait = 10 + not_updated = 0 scraped = set() query = { "size": 10000, @@ -267,6 +275,10 @@ def fetch_es_url_logs( continue scraped.add(hit["_id"]) + + if hit["_source"]["@message"] == f"{cls.__name__}: process finished": + finished_flag = True + source_fields = hit["_source"]["@fields"] if "product" in source_fields: @@ -287,14 +299,29 @@ def fetch_es_url_logs( urls_count != previous_urls_count or products_count != previous_products_count ): + not_updated = 0 print( - f"Discovered URLs: {urls_count} | Scraped products: {products_count} | Available: {availables} | Unavailable: {unavailables} | Errors: {with_errors}" + f"Available: {availables} | Unavailable: {unavailables} | With error: {with_errors} | Total: {urls_count}" ) + else: + # print(".", end="", flush=True) + not_updated += 1 + elapsed_time = loop_wait * not_updated + + if (elapsed_time > 600 and urls_count) or ( + elapsed_time > 30 and finished_flag + ): + print(f"\nProcess ID: {process_id} finished:") + print(f"Available: {availables}") + print(f"Unavailable: {unavailables}") + print(f"With error: {with_errors}") + print(f"Total: {urls_count}") + break previous_urls_count = urls_count previous_products_count = products_count - time.sleep(1) + time.sleep(loop_wait) @classmethod def discover_entries_for_categories( @@ -321,7 +348,7 @@ def discover_entries_for_categories( extra_args = cls._extra_args_with_preflight(extra_args) if use_async: - category_chunks = chunks(categories, discover_urls_concurrency) + category_chunks = chunks(categories, 1) for category_chunk in category_chunks: chunk_tasks = [] From 9d98433a4cb9358c26a4e7cb8b6272d4a221424e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20D=C3=ADaz=20Toro?= Date: Mon, 19 Aug 2024 17:51:35 -0400 Subject: [PATCH 13/17] fix duplicated log deletion --- storescraper/store.py | 85 +++++++++++++++++++++++-------------------- 1 file changed, 45 insertions(+), 40 deletions(-) diff --git a/storescraper/store.py b/storescraper/store.py index 0c3781dd0..fb115f9e4 100644 --- a/storescraper/store.py +++ b/storescraper/store.py @@ -6,15 +6,13 @@ import uuid from collections import defaultdict, OrderedDict -from celery import chain, group, shared_task +from celery import chain, group, shared_task, chord from celery.result import allow_join_result from celery.utils.log import get_task_logger from elasticsearch import Elasticsearch -from datetime import datetime, timezone - from .product import Product -from .utils import get_store_class_by_name, chunks, get_timestamp_with_nanoseconds +from .utils import get_store_class_by_name, chunks import redis logger = get_task_logger(__name__) @@ -192,15 +190,14 @@ def discover_entries_for_categories_non_blocker( print(f"Process ID: {process_id}") process_id = extra_args["process_id"] - category_chunks = chunks(categories, 1) + + category_chunks = chunks(categories, discover_urls_concurrency) task_group = [] for category_chunk in category_chunks: - print(category_chunk) chunk_tasks = [] for category in category_chunk: - print(category) task = cls.discover_entries_for_category_non_blocker_task.si( cls.__name__, category, extra_args ).set(queue="storescraper") @@ -216,15 +213,23 @@ def discover_entries_for_categories_non_blocker( ) chunk_tasks.append(task) + group_tasks = group(*chunk_tasks) + task_group.append(group_tasks) - task_group.append(group(*chunk_tasks)) + group_chunks = chunks(task_group, 3) + group_chunks_len = 0 - task_group.append( - cls.finish_process.si(cls.__name__, process_id).set(queue="storescraper") - ) - print(task_group) - chain(cls.nothing.si().set(queue="storescraper"), *task_group).apply_async() - cls.fetch_es_url_logs(process_id) + for group_chunk in group_chunks: + group_chunks_len += 1 + chain( + cls.nothing.si().set(queue="storescraper"), + *group_chunk, + cls.finish_process.si(cls.__name__, process_id).set( + queue="storescraper" + ), + )() + + cls.fetch_es_url_logs(process_id, group_chunks_len) @shared_task() def nothing(): @@ -235,9 +240,7 @@ def finish_process(store, process_id): logstash_logger.info( f"{store}: process finished", extra={ - "log_id": str(uuid.uuid4()), "process_id": process_id, - "created_at": get_timestamp_with_nanoseconds(), }, ) @@ -245,7 +248,9 @@ def finish_process(store, process_id): def fetch_es_url_logs( cls, process_id, + group_chunks_len, ): + finished_groups = 0 finished_flag = False loop_wait = 10 not_updated = 0 @@ -259,7 +264,7 @@ def fetch_es_url_logs( ] } }, - "sort": [{"@fields.created_at": "desc"}], + "sort": [{"@timestamp": "desc"}], } urls_count = products_count = previous_urls_count = previous_products_count = ( with_errors @@ -270,14 +275,20 @@ def fetch_es_url_logs( hits = response["hits"]["hits"] for hit in hits: + source_fields = hit["_source"]["@fields"] - if hit["_id"] in scraped: + if hit["_id"] in scraped or ( + "url" in source_fields and source_fields["url"] in scraped + ): continue scraped.add(hit["_id"]) if hit["_source"]["@message"] == f"{cls.__name__}: process finished": - finished_flag = True + finished_groups += 1 + print(f"Group {finished_groups}/{group_chunks_len} finished") + if finished_groups == group_chunks_len: + finished_flag = True source_fields = hit["_source"]["@fields"] @@ -293,6 +304,7 @@ def fetch_es_url_logs( availables += 1 elif "url" in source_fields: + scraped.add(source_fields["url"]) urls_count += 1 if ( @@ -348,7 +360,7 @@ def discover_entries_for_categories( extra_args = cls._extra_args_with_preflight(extra_args) if use_async: - category_chunks = chunks(categories, 1) + category_chunks = chunks(categories, discover_urls_concurrency) for category_chunk in category_chunks: chunk_tasks = [] @@ -555,12 +567,12 @@ def discover_entries_for_category_non_blocker_task( store_class_name, category, extra_args=None ): store = get_store_class_by_name(store_class_name) - store_name = store.__name__ logger.info("Discovering URLs") - logger.info(f"Store: {store_name}") + logger.info(f"Store: {store_class_name}") logger.info(f"Category: {category}") - process_id = extra_args.get("process_id", None) + extra_args = store._extra_args_with_preflight(extra_args) + logger.info(f"Process ID: {process_id}") try: @@ -578,16 +590,13 @@ def discover_entries_for_category_non_blocker_task( url_category_weights = defaultdict(lambda: defaultdict(lambda: 0)) for url, positions in discovered_entries.items(): - # prevent duplicated URLs, allow to validate numbers for debugging - if not redis_client.sismember(f"{process_id}_urls", url): - redis_client.sadd(f"{process_id}_urls", url) - if positions: - for pos in positions: - entry_positions[url].append((pos["section_name"], pos["value"])) - url_category_weights[url][category] += pos["category_weight"] - else: - url_category_weights[url][category] = 1 - entry_positions[url] = [] + if positions: + for pos in positions: + entry_positions[url].append((pos["section_name"], pos["value"])) + url_category_weights[url][category] += pos["category_weight"] + else: + url_category_weights[url][category] = 1 + entry_positions[url] = [] discovered_entries = {} @@ -606,17 +615,15 @@ def discover_entries_for_category_non_blocker_task( logger.info(url) logstash_logger.info( { - "message": f"{store_name}: Discovered URL", + "message": f"{store_class_name}: Discovered URL", }, extra={ - "log_id": str(uuid.uuid4()), "process_id": process_id or None, - "store": store_name, + "store": store_class_name, "category": category, "url": url, - "positions": json.dumps("positions"), + "positions": json.dumps(positions), "category_weight": max_weight, - "created_at": get_timestamp_with_nanoseconds(), }, ) @@ -711,12 +718,10 @@ def log_product(product, store_class_name, category, extra_args): logstash_logger.info( f"{store_class_name}: Discovered product", extra={ - "log_id": str(uuid.uuid4()), "process_id": extra_args["process_id"], "store": store_class_name, "category": category, "product": product, - "created_at": get_timestamp_with_nanoseconds(), }, ) From a5b13a35e766fe14af17f8d46f77742d28685dc3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20D=C3=ADaz=20Toro?= Date: Tue, 20 Aug 2024 16:20:33 -0400 Subject: [PATCH 14/17] Add chord and with_async param --- storescraper/bin/celeryconfig/defaults.py | 2 - ...over_entries_for_categories_non_blocker.py | 9 +++ storescraper/bin/products_non_blocker.py | 15 +++- storescraper/store.py | 71 +++++++++---------- 4 files changed, 56 insertions(+), 41 deletions(-) diff --git a/storescraper/bin/celeryconfig/defaults.py b/storescraper/bin/celeryconfig/defaults.py index f53398178..130d0d9f5 100644 --- a/storescraper/bin/celeryconfig/defaults.py +++ b/storescraper/bin/celeryconfig/defaults.py @@ -6,5 +6,3 @@ result_backend = "redis://localhost:6379/0" imports = "storescraper.store" - -worker_hijack_root_logger = False diff --git a/storescraper/bin/discover_entries_for_categories_non_blocker.py b/storescraper/bin/discover_entries_for_categories_non_blocker.py index 9f24bef64..8a16bc143 100644 --- a/storescraper/bin/discover_entries_for_categories_non_blocker.py +++ b/storescraper/bin/discover_entries_for_categories_non_blocker.py @@ -19,6 +19,14 @@ def main(): parser.add_argument( "--categories", type=str, nargs="*", help="Specific categories to be parsed" ) + parser.add_argument( + "--with_async", + type=bool, + nargs="?", + default=False, + const=True, + help="Use asynchronous tasks (celery)", + ) parser.add_argument( "--extra_args", type=json.loads, @@ -33,6 +41,7 @@ def main(): store.discover_entries_for_categories_non_blocker( categories=args.categories, + use_async=args.with_async, extra_args=args.extra_args, ) diff --git a/storescraper/bin/products_non_blocker.py b/storescraper/bin/products_non_blocker.py index df4c6a496..1630e2766 100644 --- a/storescraper/bin/products_non_blocker.py +++ b/storescraper/bin/products_non_blocker.py @@ -21,7 +21,14 @@ def main(): ) parser.add_argument("store", type=str, help="The name of the store to be parsed") - + parser.add_argument( + "--with_async", + type=bool, + nargs="?", + default=False, + const=True, + help="Use asynchronous tasks (celery)", + ) parser.add_argument( "--categories", type=str, nargs="*", help="Specific categories to be parsed" ) @@ -38,7 +45,11 @@ def main(): args = parser.parse_args() store = get_store_class_by_name(args.store) - store.products_non_blocker(categories=args.categories, extra_args=args.extra_args) + store.products_non_blocker( + categories=args.categories, + use_async=args.with_async, + extra_args=args.extra_args, + ) if __name__ == "__main__": diff --git a/storescraper/store.py b/storescraper/store.py index fb115f9e4..f0d043dd3 100644 --- a/storescraper/store.py +++ b/storescraper/store.py @@ -5,11 +5,11 @@ import traceback import uuid from collections import defaultdict, OrderedDict - -from celery import chain, group, shared_task, chord +from celery import chain, group, shared_task, chord, Celery from celery.result import allow_join_result from celery.utils.log import get_task_logger from elasticsearch import Elasticsearch +import sys from .product import Product from .utils import get_store_class_by_name, chunks @@ -95,6 +95,7 @@ def products( def products_non_blocker( cls, categories=None, + use_async=None, extra_args=None, discover_urls_concurrency=None, products_for_url_concurrency=None, @@ -120,6 +121,7 @@ def products_non_blocker( cls.discover_entries_for_categories_non_blocker( categories=categories, extra_args=extra_args, + use_async=use_async, discover_urls_concurrency=discover_urls_concurrency, callback=cls.products_for_urls_non_blocker_task, ) @@ -172,10 +174,15 @@ def products_for_keyword( def discover_entries_for_categories_non_blocker( cls, categories=None, + use_async=False, extra_args=None, discover_urls_concurrency=None, callback=None, ): + if not use_async: + app = Celery("celery") + app.conf.task_always_eager = True + sanitized_parameters = cls.sanitize_parameters( categories=categories, discover_urls_concurrency=discover_urls_concurrency, @@ -193,7 +200,7 @@ def discover_entries_for_categories_non_blocker( category_chunks = chunks(categories, discover_urls_concurrency) task_group = [] - + total_size = 0 for category_chunk in category_chunks: chunk_tasks = [] @@ -201,7 +208,7 @@ def discover_entries_for_categories_non_blocker( task = cls.discover_entries_for_category_non_blocker_task.si( cls.__name__, category, extra_args ).set(queue="storescraper") - + total_size += sys.getsizeof(task) if callback: task = chain( task, @@ -216,20 +223,10 @@ def discover_entries_for_categories_non_blocker( group_tasks = group(*chunk_tasks) task_group.append(group_tasks) - group_chunks = chunks(task_group, 3) - group_chunks_len = 0 - - for group_chunk in group_chunks: - group_chunks_len += 1 - chain( - cls.nothing.si().set(queue="storescraper"), - *group_chunk, - cls.finish_process.si(cls.__name__, process_id).set( - queue="storescraper" - ), - )() - - cls.fetch_es_url_logs(process_id, group_chunks_len) + chord(task_group)( + cls.finish_process.si(cls.__name__, process_id).set(queue="storescraper") + ) + cls.fetch_es_url_logs(process_id) @shared_task() def nothing(): @@ -248,13 +245,13 @@ def finish_process(store, process_id): def fetch_es_url_logs( cls, process_id, - group_chunks_len, ): - finished_groups = 0 finished_flag = False - loop_wait = 10 + loop_wait = 5 not_updated = 0 - scraped = set() + scraped_logs = set() + scraped_urls = set() + scraped_products = set() query = { "size": 10000, "query": { @@ -277,18 +274,13 @@ def fetch_es_url_logs( for hit in hits: source_fields = hit["_source"]["@fields"] - if hit["_id"] in scraped or ( - "url" in source_fields and source_fields["url"] in scraped - ): + if hit["_id"] in scraped_logs: continue - scraped.add(hit["_id"]) + scraped_logs.add(hit["_id"]) if hit["_source"]["@message"] == f"{cls.__name__}: process finished": - finished_groups += 1 - print(f"Group {finished_groups}/{group_chunks_len} finished") - if finished_groups == group_chunks_len: - finished_flag = True + finished_flag = True source_fields = hit["_source"]["@fields"] @@ -297,6 +289,10 @@ def fetch_es_url_logs( with_errors += 1 else: for product in source_fields["product"]: + if product["url"] in scraped_products: + continue + + scraped_products.add(product["url"]) products_count += 1 if product["stock"] == 0: unavailables += 1 @@ -304,7 +300,7 @@ def fetch_es_url_logs( availables += 1 elif "url" in source_fields: - scraped.add(source_fields["url"]) + scraped_urls.add(source_fields["url"]) urls_count += 1 if ( @@ -313,7 +309,7 @@ def fetch_es_url_logs( ): not_updated = 0 print( - f"Available: {availables} | Unavailable: {unavailables} | With error: {with_errors} | Total: {urls_count}" + f"Available: {availables} | Unavailable: {unavailables} | With error: {with_errors} | Total: {urls_count}/{len(scraped_urls)}" ) else: # print(".", end="", flush=True) @@ -321,13 +317,13 @@ def fetch_es_url_logs( elapsed_time = loop_wait * not_updated if (elapsed_time > 600 and urls_count) or ( - elapsed_time > 30 and finished_flag + elapsed_time > 10 and finished_flag ): print(f"\nProcess ID: {process_id} finished:") print(f"Available: {availables}") print(f"Unavailable: {unavailables}") print(f"With error: {with_errors}") - print(f"Total: {urls_count}") + print(f"Total: {urls_count}/{len(scraped_urls)}") break previous_urls_count = urls_count @@ -572,7 +568,6 @@ def discover_entries_for_category_non_blocker_task( logger.info(f"Category: {category}") process_id = extra_args.get("process_id", None) extra_args = store._extra_args_with_preflight(extra_args) - logger.info(f"Process ID: {process_id}") try: @@ -690,8 +685,10 @@ def products_for_urls_non_blocker_task( extra_args=None, ): store = get_store_class_by_name(store_class_name) - # extra_args = cls._extra_args_with_preflight(extra_args) - discovery_entries_chunks = chunks(list(discovered_entries.items()), 6) + extra_args = store._extra_args_with_preflight(extra_args) + discovery_entries_chunks = chunks( + list(discovered_entries.items()), products_for_url_concurrency + ) for discovery_entries_chunk in discovery_entries_chunks: chunk_tasks = [] From 9db0669547a289709b75488dc7a9ccb8170c4fd4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20D=C3=ADaz=20Toro?= Date: Tue, 20 Aug 2024 17:13:41 -0400 Subject: [PATCH 15/17] remove unused code --- storescraper/store.py | 238 ++++++++++++++++++++---------------------- 1 file changed, 113 insertions(+), 125 deletions(-) diff --git a/storescraper/store.py b/storescraper/store.py index f0d043dd3..1a2702c52 100644 --- a/storescraper/store.py +++ b/storescraper/store.py @@ -1,21 +1,23 @@ import json import logging -import logstash import time import traceback import uuid from collections import defaultdict, OrderedDict -from celery import chain, group, shared_task, chord, Celery + +from celery import Celery, chain, chord, group, shared_task from celery.result import allow_join_result from celery.utils.log import get_task_logger from elasticsearch import Elasticsearch -import sys +import logstash from .product import Product -from .utils import get_store_class_by_name, chunks -import redis +from .utils import chunks, get_store_class_by_name logger = get_task_logger(__name__) +logstash_logger = logging.getLogger("python-logstash-logger") +logstash_logger.setLevel(logging.INFO) +logstash_logger.addHandler(logstash.TCPLogstashHandler("localhost", 5959)) ES = Elasticsearch( "https://localhost:9200", @@ -24,16 +26,6 @@ ) logging.getLogger("elastic_transport").setLevel(logging.WARNING) -logstash_logger = logging.getLogger("python-logstash-logger") -logstash_logger.setLevel(logging.INFO) -logstash_logger.addHandler(logstash.TCPLogstashHandler("localhost", 5959)) - -redis_client = redis.StrictRedis( - host="localhost", - port=6379, - db=0, -) - class StoreScrapError(Exception): def __init__(self, message): @@ -180,7 +172,7 @@ def discover_entries_for_categories_non_blocker( callback=None, ): if not use_async: - app = Celery("celery") + app = Celery() app.conf.task_always_eager = True sanitized_parameters = cls.sanitize_parameters( @@ -197,10 +189,9 @@ def discover_entries_for_categories_non_blocker( print(f"Process ID: {process_id}") process_id = extra_args["process_id"] - category_chunks = chunks(categories, discover_urls_concurrency) task_group = [] - total_size = 0 + for category_chunk in category_chunks: chunk_tasks = [] @@ -208,7 +199,7 @@ def discover_entries_for_categories_non_blocker( task = cls.discover_entries_for_category_non_blocker_task.si( cls.__name__, category, extra_args ).set(queue="storescraper") - total_size += sys.getsizeof(task) + if callback: task = chain( task, @@ -226,110 +217,8 @@ def discover_entries_for_categories_non_blocker( chord(task_group)( cls.finish_process.si(cls.__name__, process_id).set(queue="storescraper") ) - cls.fetch_es_url_logs(process_id) - - @shared_task() - def nothing(): - pass - @shared_task - def finish_process(store, process_id): - logstash_logger.info( - f"{store}: process finished", - extra={ - "process_id": process_id, - }, - ) - - @classmethod - def fetch_es_url_logs( - cls, - process_id, - ): - finished_flag = False - loop_wait = 5 - not_updated = 0 - scraped_logs = set() - scraped_urls = set() - scraped_products = set() - query = { - "size": 10000, - "query": { - "bool": { - "must": [ - {"term": {"@fields.process_id.keyword": process_id}}, - ] - } - }, - "sort": [{"@timestamp": "desc"}], - } - urls_count = products_count = previous_urls_count = previous_products_count = ( - with_errors - ) = availables = unavailables = 0 - - while True: - response = ES.search(index="logs-test", body=query) - hits = response["hits"]["hits"] - - for hit in hits: - source_fields = hit["_source"]["@fields"] - - if hit["_id"] in scraped_logs: - continue - - scraped_logs.add(hit["_id"]) - - if hit["_source"]["@message"] == f"{cls.__name__}: process finished": - finished_flag = True - - source_fields = hit["_source"]["@fields"] - - if "product" in source_fields: - if not source_fields["product"]: - with_errors += 1 - else: - for product in source_fields["product"]: - if product["url"] in scraped_products: - continue - - scraped_products.add(product["url"]) - products_count += 1 - if product["stock"] == 0: - unavailables += 1 - else: - availables += 1 - - elif "url" in source_fields: - scraped_urls.add(source_fields["url"]) - urls_count += 1 - - if ( - urls_count != previous_urls_count - or products_count != previous_products_count - ): - not_updated = 0 - print( - f"Available: {availables} | Unavailable: {unavailables} | With error: {with_errors} | Total: {urls_count}/{len(scraped_urls)}" - ) - else: - # print(".", end="", flush=True) - not_updated += 1 - elapsed_time = loop_wait * not_updated - - if (elapsed_time > 600 and urls_count) or ( - elapsed_time > 10 and finished_flag - ): - print(f"\nProcess ID: {process_id} finished:") - print(f"Available: {availables}") - print(f"Unavailable: {unavailables}") - print(f"With error: {with_errors}") - print(f"Total: {urls_count}/{len(scraped_urls)}") - break - - previous_urls_count = urls_count - previous_products_count = products_count - - time.sleep(loop_wait) + cls.fetch_process_logs(process_id) @classmethod def discover_entries_for_categories( @@ -438,6 +327,105 @@ def discover_entries_for_categories( return discovered_entries + @shared_task + def finish_process(store, process_id): + logstash_logger.info( + f"{store}: process finished", + extra={ + "process_id": process_id, + }, + ) + + @classmethod + def fetch_process_logs( + cls, + process_id, + ): + finished_flag = False + seconds_between_fetches = 5 + fetches_without_updates = 0 + scraped_logs = set() + scraped_urls = set() + scraped_products = set() + es_query = { + "size": 10000, + "query": { + "bool": { + "must": [ + {"term": {"@fields.process_id.keyword": process_id}}, + ] + } + }, + "sort": [{"@timestamp": "desc"}], + } + urls_count = products_count = previous_urls_count = previous_products_count = ( + products_with_errors + ) = available_products = unavailable_products = 0 + + while True: + response = ES.search(index="logs-test", body=es_query) + hits = response["hits"]["hits"] + + for hit in hits: + if hit["_id"] in scraped_logs: + continue + + scraped_logs.add(hit["_id"]) + + if hit["_source"]["@message"] == f"{cls.__name__}: process finished": + finished_flag = True + + source_fields = hit["_source"]["@fields"] + + if "product" in source_fields: + if not source_fields["product"]: + products_with_errors += 1 + else: + for product in source_fields["product"]: + if product["url"] in scraped_products: + continue + + scraped_products.add(product["url"]) + products_count += 1 + if product["stock"] == 0: + unavailable_products += 1 + else: + available_products += 1 + + elif "url" in source_fields: + scraped_urls.add(source_fields["url"]) + urls_count += 1 + + if ( + urls_count != previous_urls_count + or products_count != previous_products_count + ): + fetches_without_updates = 0 + print( + f"Available: {available_products} | " + f"Unavailable: {unavailable_products} | " + f"With error: {products_with_errors} | " + f"Total: {urls_count}/{len(scraped_urls)}" + ) + else: + fetches_without_updates += 1 + elapsed_time = seconds_between_fetches * fetches_without_updates + + if (elapsed_time > 600 and urls_count) or ( + elapsed_time > 30 and finished_flag + ): + print(f"\nProcess ID: {process_id} finished:") + print(f"Available: {available_products}") + print(f"Unavailable: {unavailable_products}") + print(f"With error: {products_with_errors}") + print(f"Total: {urls_count}/{len(scraped_urls)}") + break + + previous_urls_count = urls_count + previous_products_count = products_count + + time.sleep(seconds_between_fetches) + @classmethod def products_for_urls( cls, @@ -701,7 +689,7 @@ def products_for_urls_non_blocker_task( store.log_product.s( store_class_name=store_class_name, category=category, - extra_args=extra_args, + process_id=extra_args["process_id"], ).set(queue="storescraper"), ) @@ -711,11 +699,11 @@ def products_for_urls_non_blocker_task( tasks_group.apply_async() @shared_task() - def log_product(product, store_class_name, category, extra_args): + def log_product(product, store_class_name, category, process_id): logstash_logger.info( f"{store_class_name}: Discovered product", extra={ - "process_id": extra_args["process_id"], + "process_id": process_id, "store": store_class_name, "category": category, "product": product, From 5463ce5bd6fae44be0a5550550049010c8b62aca Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20D=C3=ADaz=20Toro?= Date: Tue, 3 Sep 2024 16:28:55 -0400 Subject: [PATCH 16/17] add update log id --- storescraper/store.py | 41 +++++++++++++++++++++++++++++++++++------ 1 file changed, 35 insertions(+), 6 deletions(-) diff --git a/storescraper/store.py b/storescraper/store.py index 1a2702c52..1e40d4faa 100644 --- a/storescraper/store.py +++ b/storescraper/store.py @@ -13,6 +13,11 @@ from .product import Product from .utils import chunks, get_store_class_by_name +from solotodo.models.store import ( + process_non_blocker_positions_data, + process_non_blocker_product_data, + process_non_blocker_store_update_log_data, +) logger = get_task_logger(__name__) logstash_logger = logging.getLogger("python-logstash-logger") @@ -88,6 +93,7 @@ def products_non_blocker( cls, categories=None, use_async=None, + update_log_id=None, extra_args=None, discover_urls_concurrency=None, products_for_url_concurrency=None, @@ -108,7 +114,7 @@ def products_non_blocker( process_id = str(uuid.uuid4()) extra_args["process_id"] = process_id - print(f"Starting process: {process_id}") + print(f"Starting process: {process_id} | Update log id: {update_log_id}\n") cls.discover_entries_for_categories_non_blocker( categories=categories, @@ -116,6 +122,7 @@ def products_non_blocker( use_async=use_async, discover_urls_concurrency=discover_urls_concurrency, callback=cls.products_for_urls_non_blocker_task, + update_log_id=update_log_id, ) @classmethod @@ -170,7 +177,9 @@ def discover_entries_for_categories_non_blocker( extra_args=None, discover_urls_concurrency=None, callback=None, + update_log_id=None, ): + logger.info(f"update log id: {update_log_id}") if not use_async: app = Celery() app.conf.task_always_eager = True @@ -207,6 +216,7 @@ def discover_entries_for_categories_non_blocker( store_class_name=cls.__name__, category=category, extra_args=extra_args, + update_log_id=update_log_id, ).set(queue="storescraper"), ) @@ -218,7 +228,7 @@ def discover_entries_for_categories_non_blocker( cls.finish_process.si(cls.__name__, process_id).set(queue="storescraper") ) - cls.fetch_process_logs(process_id) + cls.fetch_process_logs(process_id, update_log_id) @classmethod def discover_entries_for_categories( @@ -340,6 +350,7 @@ def finish_process(store, process_id): def fetch_process_logs( cls, process_id, + update_log_id, ): finished_flag = False seconds_between_fetches = 5 @@ -412,13 +423,23 @@ def fetch_process_logs( elapsed_time = seconds_between_fetches * fetches_without_updates if (elapsed_time > 600 and urls_count) or ( - elapsed_time > 30 and finished_flag + elapsed_time > 15 and finished_flag ): print(f"\nProcess ID: {process_id} finished:") - print(f"Available: {available_products}") + print(f"\nAvailable: {available_products}") print(f"Unavailable: {unavailable_products}") print(f"With error: {products_with_errors}") print(f"Total: {urls_count}/{len(scraped_urls)}") + + process_non_blocker_store_update_log_data( + update_log_id, + { + "available": available_products, + "unavailable": unavailable_products, + "with_error": products_with_errors, + }, + ) + break previous_urls_count = urls_count @@ -595,6 +616,9 @@ def discover_entries_for_category_non_blocker_task( "category": category, "category_weight": max_weight, } + positions = json.dumps(positions) + process_non_blocker_positions_data(url, positions, category) + logger.info(url) logstash_logger.info( { @@ -605,7 +629,7 @@ def discover_entries_for_category_non_blocker_task( "store": store_class_name, "category": category, "url": url, - "positions": json.dumps(positions), + "positions": positions, "category_weight": max_weight, }, ) @@ -670,6 +694,7 @@ def products_for_urls_non_blocker_task( store_class_name, category, products_for_url_concurrency=10, + update_log_id=None, extra_args=None, ): store = get_store_class_by_name(store_class_name) @@ -690,6 +715,7 @@ def products_for_urls_non_blocker_task( store_class_name=store_class_name, category=category, process_id=extra_args["process_id"], + update_log_id=update_log_id, ).set(queue="storescraper"), ) @@ -699,7 +725,7 @@ def products_for_urls_non_blocker_task( tasks_group.apply_async() @shared_task() - def log_product(product, store_class_name, category, process_id): + def log_product(product, store_class_name, category, process_id, update_log_id): logstash_logger.info( f"{store_class_name}: Discovered product", extra={ @@ -709,6 +735,9 @@ def log_product(product, store_class_name, category, process_id): "product": product, }, ) + process_non_blocker_product_data( + product, store_class_name, [category], update_log_id + ) ########################################################################## # Implementation dependant methods From 8bba428f7afcae38aa9875c5fc1e3305b586e5f6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20D=C3=ADaz=20Toro?= Date: Thu, 5 Sep 2024 17:47:54 -0400 Subject: [PATCH 17/17] Add isolated mode --- storescraper/store.py | 190 ++++++++++++++++++++++++++++-------------- 1 file changed, 129 insertions(+), 61 deletions(-) diff --git a/storescraper/store.py b/storescraper/store.py index 1e40d4faa..456587e60 100644 --- a/storescraper/store.py +++ b/storescraper/store.py @@ -8,28 +8,30 @@ from celery import Celery, chain, chord, group, shared_task from celery.result import allow_join_result from celery.utils.log import get_task_logger -from elasticsearch import Elasticsearch -import logstash from .product import Product from .utils import chunks, get_store_class_by_name -from solotodo.models.store import ( - process_non_blocker_positions_data, - process_non_blocker_product_data, - process_non_blocker_store_update_log_data, -) logger = get_task_logger(__name__) -logstash_logger = logging.getLogger("python-logstash-logger") -logstash_logger.setLevel(logging.INFO) -logstash_logger.addHandler(logstash.TCPLogstashHandler("localhost", 5959)) -ES = Elasticsearch( - "https://localhost:9200", - ca_certs="http_ca.crt", - http_auth=("elastic", "Mo+2hdOYBpIJi4NYG*KL"), -) -logging.getLogger("elastic_transport").setLevel(logging.WARNING) +ISOLATED_MODE = True + +if not ISOLATED_MODE: + import logstash + + from django.conf import settings + from solotodo.models.store import ( + process_non_blocker_positions_data, + process_non_blocker_product_data, + process_non_blocker_store_update_log_data, + ) + + ES = settings.ES + logstash_logger = logging.getLogger("python-logstash-logger") + logstash_logger.setLevel(logging.INFO) + logstash_logger.addHandler(logstash.TCPLogstashHandler("localhost", 5959)) + + logging.getLogger("elastic_transport").setLevel(logging.WARNING) class StoreScrapError(Exception): @@ -180,6 +182,7 @@ def discover_entries_for_categories_non_blocker( update_log_id=None, ): logger.info(f"update log id: {update_log_id}") + if not use_async: app = Celery() app.conf.task_always_eager = True @@ -228,7 +231,10 @@ def discover_entries_for_categories_non_blocker( cls.finish_process.si(cls.__name__, process_id).set(queue="storescraper") ) - cls.fetch_process_logs(process_id, update_log_id) + if ISOLATED_MODE: + cls.fetch_process_file(process_id) + else: + cls.fetch_process_logs(process_id, update_log_id) @classmethod def discover_entries_for_categories( @@ -338,13 +344,46 @@ def discover_entries_for_categories( return discovered_entries @shared_task - def finish_process(store, process_id): - logstash_logger.info( - f"{store}: process finished", - extra={ - "process_id": process_id, - }, - ) + def finish_process(store_name, process_id): + print("Process finished") + + store = get_store_class_by_name(store_name) + + if ISOLATED_MODE: + store.update_process_file( + f"process_{process_id}.json", [{"finished": True}] + ) + else: + logstash_logger.info( + f"{store}: process finished", + extra={ + "process_id": process_id, + }, + ) + + @classmethod + def fetch_process_file(cls, process_id): + import os + + while True: + file_name = f"urls_process_{process_id}.json" + + if os.path.exists(file_name): + with open(file_name, "r") as file: + content = json.load(file) + print(f"Available: {len(content)}") + + file_name = f"process_{process_id}.json" + + if os.path.exists(file_name): + with open(file_name, "r") as file: + content = json.load(file) + + for entry in content: + if "finished" in entry and entry["finished"]: + return + + time.sleep(15) @classmethod def fetch_process_logs( @@ -353,7 +392,7 @@ def fetch_process_logs( update_log_id, ): finished_flag = False - seconds_between_fetches = 5 + seconds_between_fetches = 15 fetches_without_updates = 0 scraped_logs = set() scraped_urls = set() @@ -423,7 +462,7 @@ def fetch_process_logs( elapsed_time = seconds_between_fetches * fetches_without_updates if (elapsed_time > 600 and urls_count) or ( - elapsed_time > 15 and finished_flag + elapsed_time > 30 and finished_flag ): print(f"\nProcess ID: {process_id} finished:") print(f"\nAvailable: {available_products}") @@ -431,14 +470,15 @@ def fetch_process_logs( print(f"With error: {products_with_errors}") print(f"Total: {urls_count}/{len(scraped_urls)}") - process_non_blocker_store_update_log_data( - update_log_id, - { - "available": available_products, - "unavailable": unavailable_products, - "with_error": products_with_errors, - }, - ) + if not ISOLATED_MODE: + process_non_blocker_store_update_log_data( + update_log_id, + { + "available": available_products, + "unavailable": unavailable_products, + "with_error": products_with_errors, + }, + ) break @@ -539,6 +579,21 @@ def products_for_urls( "discovery_urls_without_products": discovery_urls_without_products, } + @classmethod + def update_process_file(cls, file_name, updated_content): + import os + + if os.path.exists(file_name): + with open(file_name, "r") as file: + content = json.load(file) + else: + content = [] + + content.extend(updated_content) + + with open(file_name, "w") as file: + json.dump(content, file, indent=4) + ########################################################################## # Celery tasks wrappers ########################################################################## @@ -617,22 +672,28 @@ def discover_entries_for_category_non_blocker_task( "category_weight": max_weight, } positions = json.dumps(positions) - process_non_blocker_positions_data(url, positions, category) - logger.info(url) - logstash_logger.info( - { - "message": f"{store_class_name}: Discovered URL", - }, - extra={ - "process_id": process_id or None, - "store": store_class_name, - "category": category, - "url": url, - "positions": positions, - "category_weight": max_weight, - }, - ) + entry_data = { + "process_id": process_id or None, + "store": store_class_name, + "category": category, + "url": url, + "positions": positions, + "category_weight": max_weight, + } + + if ISOLATED_MODE: + store.update_process_file( + f"urls_process_{process_id}.json", [entry_data] + ) + else: + logstash_logger.info( + { + "message": f"{store_class_name}: Discovered URL", + }, + extra=entry_data, + ) + process_non_blocker_positions_data(url, positions, category) return discovered_entries @@ -726,18 +787,25 @@ def products_for_urls_non_blocker_task( @shared_task() def log_product(product, store_class_name, category, process_id, update_log_id): - logstash_logger.info( - f"{store_class_name}: Discovered product", - extra={ - "process_id": process_id, - "store": store_class_name, - "category": category, - "product": product, - }, - ) - process_non_blocker_product_data( - product, store_class_name, [category], update_log_id - ) + store = get_store_class_by_name(store_class_name) + product_data = { + "process_id": process_id, + "store": store_class_name, + "category": category, + "product": product, + } + + if ISOLATED_MODE: + store.update_process_file( + f"products_process_{process_id}.json", [product_data] + ) + else: + logstash_logger.info( + f"{store_class_name}: Discovered product", + extra={product_data}, + ) + + process_non_blocker_product_data(product, store_class_name) ########################################################################## # Implementation dependant methods