Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

optimize() with num_workers > 1 leads to deletion issues #245

Open
awaelchli opened this issue Jul 19, 2024 · 7 comments · May be fixed by #375
Open

optimize() with num_workers > 1 leads to deletion issues #245

awaelchli opened this issue Jul 19, 2024 · 7 comments · May be fixed by #375
Labels
bug Something isn't working ci / tests help wanted Extra attention is needed

Comments

@awaelchli
Copy link
Member

awaelchli commented Jul 19, 2024

🐛 Bug

In the LitData tests, we only ever call optimize() with num_workers=1. In the PR #237 I found that if optimize is called with more workers, then we get a race condition (??) causing some chunks to be deleted and then streaming fails.
#237 (comment)

This happens in this test:

def test_dataset_resume_on_future_chunks(shuffle, tmpdir, monkeypatch):

(see ToDo comments).

The test fails with

__________________ test_dataset_resume_on_future_chunks[True] __________________

shuffle = True
tmpdir = local('/tmp/pytest-of-runner/pytest-0/test_dataset_resume_on_future_0')
monkeypatch = <_pytest.monkeypatch.MonkeyPatch object at 0x7f6a4124f460>

    @pytest.mark.skipif(sys.platform == "win32", reason="Not tested on windows and MacOs")
    @mock.patch.dict(os.environ, {}, clear=True)
    @pytest.mark.timeout(60)
    @pytest.mark.parametrize("shuffle", [True, False])
    def test_dataset_resume_on_future_chunks(shuffle, tmpdir, monkeypatch):
        """This test is constructed to test resuming from a chunk past the first chunk, when subsequent chunks don't have
        the same size."""
        s3_cache_dir = str(tmpdir / "s3cache")
        optimize_data_cache_dir = str(tmpdir / "optimize_data_cache")
        optimize_cache_dir = str(tmpdir / "optimize_cache")
        data_dir = str(tmpdir / "optimized")
        monkeypatch.setenv("DATA_OPTIMIZER_DATA_CACHE_FOLDER", optimize_data_cache_dir)
        monkeypatch.setenv("DATA_OPTIMIZER_CACHE_FOLDER", optimize_cache_dir)
    
>       optimize(
            fn=_simple_preprocess,
            inputs=list(range(8)),
            output_dir=data_dir,
            chunk_size=190,
            num_workers=4,
            num_uploaders=1,
copying /tmp/pytest-of-runner/pytest-0/test_dataset_resume_on_future_0/optimize_cache/chunk-3-1.bin to /tmp/pytest-of-runner/pytest-0/test_dataset_resume_on_future_0/optimized/chunk-3-1.bin
putting /tmp/pytest-of-runner/pytest-0/test_dataset_resume_on_future_0/optimize_cache/chunk-3-1.bin on the remove queue
Worker 1 is done.
Worker 2 is done.
Worker 3 is done.
Worker 0 is done.
Workers are finished.
----------------------------- Captured stderr call -----------------------------


Progress:   0%|          | 0/8 [00:00<?, ?it/s]Process Process-85:1:
Traceback (most recent call last):
  File "/opt/hostedtoolcache/Python/3.9.19/x64/lib/python3.9/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "/opt/hostedtoolcache/Python/3.9.19/x64/lib/python3.9/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/home/runner/work/litdata/litdata/src/litdata/processing/data_processor.py", line 259, in _upload_fn
    shutil.copy(local_filepath, output_filepath)
  File "/opt/hostedtoolcache/Python/3.9.19/x64/lib/python3.9/shutil.py", line 427, in copy
    copyfile(src, dst, follow_symlinks=follow_symlinks)
  File "/opt/hostedtoolcache/Python/3.9.19/x64/lib/python3.9/shutil.py", line 264, in copyfile
    with open(src, 'rb') as fsrc:
FileNotFoundError: [Errno 2] No such file or directory: '/tmp/pytest-of-runner/pytest-0/test_dataset_resume_on_future_0/optimize_cache/chunk-0-0.bin'

Progress: 100%|██████████| 8/8 [00:00<00:00, 122.77it/s]
=========================== short test summary info ============================
FAILED tests/streaming/test_dataset.py::test_dataset_resume_on_future_chunks[True] - RuntimeError: All the chunks should have been deleted. Found ['chunk-0-1.bin']
====== 1 failed, 191 passed, 8 skipped, 11 warnings in [247](https://github.com/Lightning-AI/litdata/actions/runs/10010459328/job/27671682379?pr=237#step:10:248).94s (0:04:07) =======

when setting optimize(num_workers=4). This needs to be investigated. However, not possible so far to reproduce locally (only observed in CI)!

@awaelchli awaelchli added bug Something isn't working help wanted Extra attention is needed ci / tests labels Jul 19, 2024
@awaelchli
Copy link
Member Author

Some more evidence in another (more rare flaky) test that uses num_workers=2:

https://github.com/Lightning-AI/litdata/actions/runs/10013150667/job/27680138130

_______ test_dataset_for_text_tokens_distributed_num_workers_end_to_end ________

tmpdir = local('/private/var/folders/80/7d6cd1d13n9by5z92zm5q2mr0000gn/T/pytest-of-runner/pytest-0/test_dataset_for_text_tokens_d1')
monkeypatch = <_pytest.monkeypatch.MonkeyPatch object at 0x121e83bb0>

    def test_dataset_for_text_tokens_distributed_num_workers_end_to_end(tmpdir, monkeypatch):
        monkeypatch.setattr(functions, "_get_input_dir", lambda x: str(tmpdir))
    
        seed_everything(42)
    
        with open(tmpdir / "a.txt", "w") as f:
            f.write("hello")
    
        inputs = [(v, str(tmpdir / "a.txt")) for v in range(0, 200, 20)]
    
        cache_dir = os.path.join(tmpdir, "cache")
        output_dir = os.path.join(tmpdir, "target_dir")
        os.makedirs(output_dir, exist_ok=True)
        monkeypatch.setenv("DATA_OPTIMIZER_CACHE_FOLDER", cache_dir)
        monkeypatch.setenv("DATA_OPTIMIZER_DATA_CACHE_FOLDER", cache_dir)
    
>       functions.optimize(
            optimize_fn, inputs, output_dir=str(tmpdir), num_workers=2, chunk_size=2, reorder_files=False, num_downloaders=1
        )

tests/streaming/test_dataset.py:596: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
src/litdata/processing/functions.py:432: in optimize
    data_processor.run(
src/litdata/processing/data_processor.py:1055: in run
    self._exit_on_error(error)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <litdata.processing.data_processor.DataProcessor object at 0x121edb6a0>
error = 'Traceback (most recent call last):\n  File "/Users/runner/work/litdata/litdata/src/litdata/processing/data_processor....te/var/folders/80/7d6cd1d13n9by5z92zm5q2mr0000gn/T/pytest-of-runner/pytest-0/test_dataset_for_text_tokens_d1/cache\'\n'

    def _exit_on_error(self, error: str) -> None:
        for w in self.workers:
            # w.join(0)
            w.terminate()  # already error has occurred. So, no benefit of processing further.
>       raise RuntimeError(f"We found the following error {error}.")
E       RuntimeError: We found the following error Traceback (most recent call last):
E         File "/Users/runner/work/litdata/litdata/src/litdata/processing/data_processor.py", line 435, in run
E           self._setup()
E         File "/Users/runner/work/litdata/litdata/src/litdata/processing/data_processor.py", line 444, in _setup
E           self._create_cache()
E         File "/Users/runner/work/litdata/litdata/src/litdata/processing/data_processor.py", line 511, in _create_cache
E           os.makedirs(self.cache_data_dir, exist_ok=True)
E         File "/Users/runner/hostedtoolcache/Python/3.9.19/x64/lib/python3.9/os.py", line 225, in makedirs
E           mkdir(name, mode)
E       FileExistsError: [Errno 17] File exists: '/private/var/folders/80/7d6cd1d13n9by5z92zm5q2mr0000gn/T/pytest-of-runner/pytest-0/test_dataset_for_text_tokens_d1/cache'
E       .

src/litdata/processing/data_processor.py:1119: RuntimeError
----------------------------- Captured stdout call -----------------------------
Create an account on https://lightning.ai/ to optimize your data faster using multiple nodes and large machines.
Storing the files under /private/var/folders/80/7d6cd1d13n9by5z92zm5q2mr0000gn/T/pytest-of-runner/pytest-0/test_dataset_for_text_tokens_d1
Setup started with fast_dev_run=False.
Setup finished in 0.001 seconds. Found 10 items to process.
Starting 2 workers with 10 items. The progress bar is only updated when a worker finishes.
Workers are ready ! Starting data processing...
Worker 1 is done.
----------------------------- Captured stderr call -----------------------------


Progress:   0%|          | 0/10 [00:00<?, ?it/s]
=========================== short test summary info ============================
FAILED tests/streaming/test_dataset.py::test_dataset_for_text_tokens_distributed_num_workers_end_to_end - RuntimeError: We found the following error Traceback (most recent call last):
  File "/Users/runner/work/litdata/litdata/src/litdata/processing/data_processor.py", line 435, in run
    self._setup()
  File "/Users/runner/work/litdata/litdata/src/litdata/processing/data_processor.py", line 444, in _setup
    self._create_cache()
  File "/Users/runner/work/litdata/litdata/src/litdata/processing/data_processor.py", line 511, in _create_cache
    os.makedirs(self.cache_data_dir, exist_ok=True)
  File "/Users/runner/hostedtoolcache/Python/3.9.19/x64/lib/python3.9/os.py", line 225, in makedirs
    mkdir(name, mode)
FileExistsError: [Errno 17] File exists: '/private/var/folders/80/7d6cd1d13n9by5z92zm5q2mr0000gn/T/pytest-of-runner/pytest-0/test_dataset_for_text_tokens_d1/cache'
.

@hiyyg
Copy link
Contributor

hiyyg commented Jul 23, 2024

Hi, I also met a similar issue with num_workers > 1. How could we resolve it?

@hiyyg
Copy link
Contributor

hiyyg commented Jul 23, 2024

I also found the issue happens when num_workers = 1. For now I can only run with num_workers = 0.

@BDannowitzHudl
Copy link

BDannowitzHudl commented Jul 24, 2024

I'm also experiencing this issue

[Errno 2] No such file or directory: '/tmp/[...]/chunk-0-0.bin'

This error goes away when I set my num_workers = 1

@hiyyg
Copy link
Contributor

hiyyg commented Jul 24, 2024

It seems I can bypass this error with

os.environ["DATA_OPTIMIZER_CACHE_FOLDER"] = f"/tmp/{__name__}"

@deependujha
Copy link
Collaborator

For the second issue related to test_dataset_for_text_tokens_distributed_num_workers_end_to_end:

I think, this is only a weird bug.

We are using os.makedirs(SOME_PATH, exist_ok=True), so even if the file exists, it shouldn't raise an error. But, sometimes, it raises error.

I don't think it has anything to do with num_workers.

Screenshot from 2024-07-25 22-00-25


I faced this issue couple of times, and from what I remember, it only used to fail in macos. So, I added couple of if os.path.exists(): conditions before calling make_dirs. It is present in number of files.

@lxr2
Copy link

lxr2 commented Jul 29, 2024

Same issue on my Ubuntu 16 server with num_workers=16. It doesn't always happen, and one way to solve it is to just rerun the code.

image

psesudo code:

from PIL import Image
import os
import litdata as ld

def process_patch(input_data):
    img_patch, mask_patch, color2label = input_data
    
    img_patch = img_patch.convert("RGB")
    mask_patch = mask_patch.convert("RGB")
    w, h = mask_patch.size
    pixel = mask_patch.getpixel((w//2, h//2))
    label_text = color2label.get(pixel, "BG")

    if label_text == "BG": return None

    label = list(color2label.keys()).index(pixel)
    
    return (img_patch, label)

for slide_id in slide_ids:
    img_path = slide_id + "_HE.jpg"
    mask_path = slide_id + "_mask.jpg"

    img = Image.open(img_path)
    mask = Image.open(mask_path)
    img_patches = split_image_into_patches(img, patch_size, stride_size)
    mask_patches = split_image_into_patches(mask, patch_size, stride_size)

    input_data = [(img, mask, color2label) for img, mask in zip(img_patches, mask_patches)]

    ld.optimize(
        fn=process_patch,
        inputs=input_data,
        output_dir=os.path.join(patch_dir, slide_id),
        num_workers=min(os.cpu_count(), 16),
        mode='overwrite',
        compression="zstd",
        chunk_bytes="64MB"
    )

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working ci / tests help wanted Extra attention is needed
Projects
None yet
5 participants