Skip to content

Commit

Permalink
Parallelize verification step 3 with threads
Browse files Browse the repository at this point in the history
Related: #4
  • Loading branch information
taoky committed Aug 27, 2024
1 parent d64ea61 commit 799a477
Showing 1 changed file with 38 additions and 15 deletions.
53 changes: 38 additions & 15 deletions shadowmire.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@

# Note that it's suggested to use only 3 workers for PyPI.
WORKERS = int(os.environ.get("SHADOWMIRE_WORKERS", "3"))
# Use threads to parallelize verification local IO
IOWORKERS = int(os.environ.get("SHADOWMIRE_IOWORKERS", "5"))
# A safety net -- to avoid upstream issues casuing too many packages removed when determinating sync plan.
MAX_DELETION = int(os.environ.get("SHADOWMIRE_MAX_DELETION", "50000"))

Expand Down Expand Up @@ -516,13 +518,11 @@ def check_and_update(
json_files: set[str],
compare_size: bool,
) -> bool:
to_update = []
for package_name in tqdm(package_names, desc="Checking consistency"):
def is_consistent(package_name: str) -> bool:
if package_name not in json_files:
# save a newfstatat() when name already in json_files
logger.info("add %s as it does not have json API file", package_name)
to_update.append(package_name)
continue
return False
package_simple_path = self.simple_dir / package_name
html_simple = package_simple_path / "index.html"
htmlv1_simple = package_simple_path / "index.v1_html"
Expand All @@ -539,29 +539,25 @@ def check_and_update(
"add %s as it does not have index.v1_html or index.v1_json",
package_name,
)
to_update.append(package_name)
continue
return False
if (
hrefs_html is None
or hrefsize_json is None
or hrefs_html != [i[0] for i in hrefsize_json]
):
# something unexpected happens...
logger.info("add %s as its indexes are not consistent", package_name)
to_update.append(package_name)
continue
return False

# OK, check if all hrefs have corresponding files
if self.sync_packages:
should_update = False
for href, size in hrefsize_json:
dest = Path(normpath(package_simple_path / href))
try:
dest_stat = dest.stat()
except FileNotFoundError:
logger.info("add %s as it's missing packages", package_name)
should_update = True
break
return False
if compare_size and size != -1:
dest_size = dest_stat.st_size
if dest_size != size:
Expand All @@ -571,10 +567,37 @@ def check_and_update(
dest_size,
size,
)
should_update = True
break
if should_update:
to_update.append(package_name)
return False
return True

to_update = []
with ThreadPoolExecutor(max_workers=IOWORKERS) as executor:
futures = {
executor.submit(is_consistent, package_name): package_name
for package_name in package_names
}
try:
for future in tqdm(
as_completed(futures),
total=len(package_names),
desc="Checking consistency",
):
package_name = futures[future]
try:
consistent = future.result()
if not consistent:
to_update.append(package_name)
except Exception:
logger.warning(
"%s generated an exception", package_name, exc_info=True
)
raise
except:
logger.info("Get an exception, exiting...")
for future in futures:
future.cancel()
sys.exit(1)

logger.info("%s packages to update in check_and_update()", len(to_update))
return self.parallel_update(to_update, prerelease_excludes)

Expand Down

0 comments on commit 799a477

Please sign in to comment.