Skip to content

Commit

Permalink
Merge pull request #82 from LeComptoirDesPharmacies/LDS-3702-Inventor…
Browse files Browse the repository at this point in the history
…ist-Import-tr-s-long

LDS-3702 : get all sale offers/products. use threads
  • Loading branch information
AntoineDuComptoirDesPharmacies authored Jun 18, 2024
2 parents 8a778ad + bc445a2 commit 3a6612a
Show file tree
Hide file tree
Showing 7 changed files with 261 additions and 47 deletions.
5 changes: 4 additions & 1 deletion business/constant.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,7 @@
"/edit?usp=drive_link&ouid=113182943247309099469&rtpof=true&sd=true")

# get the latest tag
GITHUB_REPOSITORY_LATEST_RELEASE = "https://api.github.com/repos/LeComptoirDesPharmacies/lcdp-inventorist-app/releases/latest"
GITHUB_REPOSITORY_LATEST_RELEASE = ("https://api.github.com/repos/LeComptoirDesPharmacies/lcdp-inventorist-app"
"/releases/latest")

CHUNK_SIZE = 50
154 changes: 137 additions & 17 deletions business/services/excel.py
Original file line number Diff line number Diff line change
@@ -1,34 +1,152 @@
import logging
import os
import tempfile
import time
import os
from concurrent.futures import as_completed, ThreadPoolExecutor

from openpyxl import load_workbook, Workbook

from api.consume.gen.sale_offer import ApiException as SaleOfferApiException
from api.consume.gen.product import ApiException as ProductApiException
from api.consume.gen.laboratory import ApiException as LaboratoryApiException
from api.consume.gen.configuration import ApiException as ConfigurationApiException
from api.consume.gen.catalog import ApiException as ProductInsightApiException
from api.consume.gen.configuration import ApiException as ConfigurationApiException
from api.consume.gen.laboratory import ApiException as LaboratoryApiException
from api.consume.gen.product import ApiException as ProductApiException
from api.consume.gen.sale_offer import ApiException as SaleOfferApiException
from business.constant import CHUNK_SIZE
from business.exceptions import CannotUpdateSaleOfferStatus
from business.mappers.excel_mapper import error_mapper
from business.mappers.api_error import sale_offer_api_exception_to_muggle, product_api_exception_to_muggle, \
api_exception_to_muggle, product_insight_api_exception_to_muggle
from business.services.product import update_or_create_product, change_product_status
from business.services.sale_offer import create_or_edit_sale_offer, delete_deprecated_sale_offers, \
change_sale_offer_status
from business.utils import rgetattr
from business.mappers.excel_mapper import error_mapper
from business.models.update_policy import UpdatePolicy
from business.services.product import update_or_create_product, change_product_status, __get_products_by_barcodes
from business.services.sale_offer import create_or_edit_sale_offer, delete_deprecated_sale_offers, __get_sale_offers, \
__get_latest_sale_offers
from business.utils import rgetattr, execution_time


def __prefetch(prefetch_type, keys_from_file, get_from_api, get_keys_from_api_object):
packets = [list(keys_from_file)[i:i + CHUNK_SIZE] for i in range(0, len(keys_from_file), CHUNK_SIZE)]

array_from_api = list(map(get_from_api, packets))

flatten_array = []
for sub_list in array_from_api:
flatten_array.extend(sub_list)

map_of_objects = {}
for obj in flatten_array:
keys = get_keys_from_api_object(obj)
for key in keys:
map_of_objects[key] = obj

logging.info(f"Map of {prefetch_type} has {len(map_of_objects)} element(s)")

return map_of_objects


@execution_time
def create_sale_offer_from_excel_lines(lines):
logging.info(f"{len(lines)} excel line(s) are candide for sale offer modification/creation")
results = list(map(__create_sale_offer_from_excel_line, lines))

products_barcodes = {x.sale_offer.product.principal_barcode for x in lines if
x.sale_offer.product.principal_barcode is not None}

# [barcode, product]
prefetched_products = __prefetch('products',
products_barcodes,
__get_products_by_barcodes,
lambda x: list(
filter(None, [x.barcodes.principal, x.barcodes.cip,
x.barcodes.cip13] + x.barcodes.eans)))

# assume that all lines have the same update_policy
update_policy = lines[0].sale_offer.update_policy

if update_policy == UpdatePolicy.PRODUCT_BARCODE.value:
logging.info("Update policy is PRODUCT_BARCODE, we will get the latest sale offers by product_id")

# [owner_id, [product_id]]
owner_id_barcodes = {}
for line in lines:
if (line.sale_offer.product.principal_barcode and
line.sale_offer.product.principal_barcode in prefetched_products):
owner_id_barcodes.setdefault(line.sale_offer.owner_id, []).append(
prefetched_products[line.sale_offer.product.principal_barcode].id)

# [(owner_id, product_id), sale_offer]
prefetched_sale_offers = {}
for (owner_id, product_ids) in owner_id_barcodes.items():
prefetched_sale_offers = __prefetch('sale_offers enabled by (owner_id, product_id)',
product_ids,
lambda x: __get_latest_sale_offers(x, owner_id, ['ENABLED']),
lambda x: [(owner_id, x.product.id)])

product_ids_not_found = [product_id for (owner_id, product_ids) in owner_id_barcodes.items() for product_id in
product_ids if (owner_id, product_id) not in prefetched_sale_offers]

if len(product_ids_not_found):
prefetched_sale_offers_others_status = __prefetch('sale_offers others status by (owner_id, product_id)',
product_ids_not_found,
lambda x: __get_latest_sale_offers(x, owner_id,
['WAITING_FOR_PRODUCT',
'ASKING_FOR_INVOICE',
'HOLIDAY',
'DISABLED']),
lambda x: [(owner_id, x.product.id)])
prefetched_sale_offers.update(prefetched_sale_offers_others_status)
else:
logging.info("Update policy is PRODUCT_REFERENCE, we will get the sale offers by reference")

# [reference, sale_offer]
sale_offers_reference = {x.sale_offer.reference for x in lines if
x.sale_offer.reference is not None}

prefetched_sale_offers = __prefetch('sale_offers by reference',
sale_offers_reference,
__get_sale_offers,
lambda x: [x.reference])

results = []
with ThreadPoolExecutor() as executor:
futures = {executor.submit(__create_sale_offer_from_excel_line,
prefetched_sale_offers,
prefetched_products,
line): line for line in lines}
for future in as_completed(futures):
try:
result = future.result()
results.append(result)
except Exception as e:
logging.error(f"Error processing line {futures[future]}: {e}")

return results


@execution_time
def create_or_update_product_from_excel_lines(lines):
logging.info(f"{len(lines)} excel line(s) are candide for product modification/creation")
results = list(map(__create_or_update_product_from_excel_line, lines))

products_barcodes = {x.sale_offer.product.principal_barcode for x in lines if
x.sale_offer.product.principal_barcode is not None}

# [barcode, product]
prefetched_products = __prefetch('products',
products_barcodes,
__get_products_by_barcodes,
lambda x: list(
filter(None, [x.barcodes.principal, x.barcodes.cip,
x.barcodes.cip13] + x.barcodes.eans)))

results = []
with ThreadPoolExecutor() as executor:
futures = {executor.submit(__create_or_update_product_from_excel_line, prefetched_products, line): line for
line in lines}
for future in as_completed(futures):
try:
result = future.result()
results.append(result)
except Exception as e:
logging.error(f"Error processing line {futures[future]}: {e}")

return results


Expand Down Expand Up @@ -108,14 +226,15 @@ def excel_to_dict(obj_class, excel_path, excel_mapper, sheet_name, header_row,
return results


def __create_sale_offer_from_excel_line(excel_line):
def __create_sale_offer_from_excel_line(prefetched_sale_offers, prefetched_products, excel_line):
sale_offer = None
error = None
try:
product = update_or_create_product(excel_line.sale_offer.product,
product = update_or_create_product(prefetched_products, excel_line.sale_offer.product,
excel_line.can_create_product_from_scratch())
change_product_status(product=product, new_status=excel_line.sale_offer.product.status)
sale_offer = create_or_edit_sale_offer(excel_line.sale_offer, product, excel_line.can_create_sale_offer())
sale_offer = create_or_edit_sale_offer(prefetched_sale_offers, excel_line.sale_offer, product,
excel_line.can_create_sale_offer())
except SaleOfferApiException as sale_offer_api_err:
logging.error('An API error occur in sale offer api', sale_offer_api_err)
error = sale_offer_api_exception_to_muggle(sale_offer_api_err)
Expand All @@ -142,11 +261,12 @@ def __create_sale_offer_from_excel_line(excel_line):
}


def __create_or_update_product_from_excel_line(excel_line):
def __create_or_update_product_from_excel_line(prefetched_products, excel_line):
product = None
error = None
try:
product = update_or_create_product(excel_line.sale_offer.product, excel_line.can_create_product_from_scratch())
product = update_or_create_product(prefetched_products, excel_line.sale_offer.product,
excel_line.can_create_product_from_scratch())
change_product_status(product=product, new_status=excel_line.sale_offer.product.status)
except ProductApiException as product_api_err:
logging.error('An API error occur in product api', product_api_err)
Expand Down
33 changes: 30 additions & 3 deletions business/services/product.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,15 @@ def service_unavailable_status_code(e):
max_tries=3,
max_time=20,
giveup=service_unavailable_status_code)
def update_or_create_product(product, can_create_product_from_scratch):
def update_or_create_product(prefetched_products, product, can_create_product_from_scratch):
if product and not product.is_empty():
product_type = __find_product_type_by_name(product.product_type.name)
vat = get_vat_by_value(product.vat.value)
laboratory = find_or_create_laboratory(product.laboratory.name)

logging.info(f'Barcode {product.principal_barcode} : Try to find or create product with barcode')
result_product = __get_product_by_barcode(product.principal_barcode) or __create_product_with_barcode(
result_product = __get_product_by_barcode(prefetched_products,
product.principal_barcode) or __create_product_with_barcode(
product.principal_barcode)

if result_product:
Expand Down Expand Up @@ -86,10 +87,36 @@ def update_or_create_product(product, can_create_product_from_scratch):
raise CannotCreateProduct()


def __get_product_by_barcode(barcode):
def __get_products_by_barcodes(barcodes):
if not len(barcodes):
return None

api = get_search_product_api()

try:
products = api.get_products(
_request_auths=[api.api_client.create_auth_settings("apiKeyAuth", get_api_key())],
barcodes_anyeq=barcodes,
st_eq=['VALIDATED', 'WAITING_FOR_VALIDATION'],
p=0,
pp=len(barcodes)
)
except Exception as exc:
logging.error(f'Error while searching products by barcodes {barcodes}', exc)
return []

return products.records if products else []


def __get_product_by_barcode(prefetched_products, barcode):
if not barcode:
return None

if barcode in prefetched_products:
return prefetched_products[barcode]

logging.info(f'Product {barcode} not found in cache, search in API')

api = get_search_product_api()

products = api.get_products(
Expand Down
68 changes: 59 additions & 9 deletions business/services/sale_offer.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,18 @@ def __create_sale_offer_from_scratch(sale_offer, product):
return __create_sale_offer(sale_offer, product.id)


def __find_existing_sale_offer(sale_offer, product):
def __find_existing_sale_offer(prefetched_sale_offers, sale_offer, product):
logging.info(f'product {sale_offer.product.principal_barcode} : Try to find existing sale offer')
existing_sale_offer = None
if sale_offer.update_policy == UpdatePolicy.PRODUCT_BARCODE.value:
existing_sale_offer = __find_sale_offer_for_version(
sale_offer,
prefetched_sale_offers,
sale_offer.owner_id,
product.id
)
elif sale_offer.update_policy == UpdatePolicy.SALE_OFFER_REFERENCE.value and sale_offer.reference:
existing_sale_offer = __get_sale_offer(
prefetched_sale_offers,
sale_offer.reference
)
return existing_sale_offer
Expand All @@ -46,11 +48,11 @@ def __clone_existing_sale_offer(existing_sale_offer, sale_offer):
return __clone_sale_offer(existing_sale_offer, sale_offer)


def create_or_edit_sale_offer(sale_offer, product, can_create_sale_offer):
def create_or_edit_sale_offer(prefetched_sale_offers, sale_offer, product, can_create_sale_offer):
new_sale_offer = None

if sale_offer:
existing_sale_offer = __find_existing_sale_offer(sale_offer, product)
existing_sale_offer = __find_existing_sale_offer(prefetched_sale_offers, sale_offer, product)

if existing_sale_offer:
if not existing_sale_offer.status.value == 'DISABLED':
Expand All @@ -68,10 +70,15 @@ def create_or_edit_sale_offer(sale_offer, product, can_create_sale_offer):
raise CannotCreateSaleOffer()


def __find_sale_offer_for_version(sale_offer, product_id):
return __find_sale_offer_for_status(product_id, sale_offer.owner_id, ['ENABLED']) or \
__find_sale_offer_for_status(product_id, sale_offer.owner_id, ['WAITING_FOR_PRODUCT',
'ASKING_FOR_INVOICE', 'HOLIDAY', 'DISABLED'])
def __find_sale_offer_for_version(prefetched_sale_offers, owner_id, product_id):
if (owner_id, product_id) in prefetched_sale_offers:
return prefetched_sale_offers[(owner_id, product_id)]

logging.info(f'Sale offer by (owner_id, product_id) {(owner_id, product_id)} not found in cache, search in API')

return __find_sale_offer_for_status(product_id, owner_id, ['ENABLED']) or \
__find_sale_offer_for_status(product_id, owner_id, ['WAITING_FOR_PRODUCT', 'ASKING_FOR_INVOICE',
'HOLIDAY', 'DISABLED'])


def __find_sale_offer_for_status(product_id, owner_id, status):
Expand All @@ -88,7 +95,50 @@ def __find_sale_offer_for_status(product_id, owner_id, status):
return next(iter(sale_offers.records), None)


def __get_sale_offer(reference):
def __get_latest_sale_offers(product_ids, owner_id, status):
api = get_search_sale_offer_api()

try:
sale_offers = api.get_sale_offers(
_request_auths=[api.api_client.create_auth_settings("apiKeyAuth", get_api_key())],
p_eq=product_ids,
o_eq=[owner_id],
st_eq=status,
order_by=['CREATED_AT:desc'],
distinct_by='PRODUCT:LATEST_CREATED',
p=0,
pp=len(product_ids),
)
except Exception as exc:
logging.error(f'Error while searching sale offers by product ids {product_ids}', exc)
return []

return sale_offers.records if sale_offers else []


def __get_sale_offers(references):
api = get_search_sale_offer_api()

try:
sale_offers = api.get_sale_offers(
_request_auths=[api.api_client.create_auth_settings("apiKeyAuth", get_api_key())],
ref_eq=references,
p=0,
pp=len(references)
)
except Exception as exc:
logging.error(f'Error while searching sale offers by references {references}', exc)
return []

return sale_offers.records if sale_offers else []


def __get_sale_offer(prefetched_sale_offers, reference):
if reference in prefetched_sale_offers:
return prefetched_sale_offers[reference]

logging.info(f'Sale offer {reference} not found in cache, search in API')

api = get_search_sale_offer_api()
sale_offer = api.get_sale_offer(
_request_auths=[api.api_client.create_auth_settings("apiKeyAuth", get_api_key())],
Expand Down
Loading

0 comments on commit 3a6612a

Please sign in to comment.