diff --git a/.github/workflows/tests_integration.yml b/.github/workflows/tests_integration.yml index bf2e526c5..40823fc10 100644 --- a/.github/workflows/tests_integration.yml +++ b/.github/workflows/tests_integration.yml @@ -54,18 +54,15 @@ jobs: - name: dependencies run: | - if [[ ! -d "~/.cargo/bin" ]]; then - wget --no-check-certificate --secure-protocol=TLSv1_2 -qO- https://sh.rustup.rs | sh -s -- -y - fi export PATH="~/.cargo/bin:${PATH}" python -m pip install -U pip python -m pip install -U poetry - name: install run: | - pip install pytest - poetry lock --no-update - pip install -e . + poetry install --with dev + source $(poetry env info -p)/bin/activate + pip install psycopg2-binary - name: tests env: @@ -74,4 +71,4 @@ jobs: POSTGRES_DB: milabench POSTGRES_HOST: localhost POSTGRES_PORT: 5432 - run: pytest tests/integration + run: poetry run pytest tests/integration diff --git a/.github/workflows/tests_unit.yml b/.github/workflows/tests_unit.yml index 2f8ca02c4..05788e115 100644 --- a/.github/workflows/tests_unit.yml +++ b/.github/workflows/tests_unit.yml @@ -35,9 +35,22 @@ jobs: - name: dependencies run: | + pip install -U pip pip install poetry + poetry env use python3.10 + source $(poetry env info -p)/bin/activate + # + # poetry doesnot work when installing those !? + # + pip install antlr4-python3-runtime==4.9.3 + pip install -e . + pip install -e benchmate + # + # + # poetry install --with dev - name: tests run: | - poetry run pytest --ignore=tests/integration tests/ + source $(poetry env info -p)/bin/activate + pytest --ignore=tests/integration tests/ diff --git a/benchmate/benchmate/warden.py b/benchmate/benchmate/warden.py index dbf2aec3b..395648ea6 100644 --- a/benchmate/benchmate/warden.py +++ b/benchmate/benchmate/warden.py @@ -233,12 +233,12 @@ def __exit__(self, *args): def destroy(*processes, step=1, timeout=30): processes = list(processes) - def kill(proc, signal): + def kill(proc, sig): try: if getattr(proc, "did_setsid", False): - os.killpg(os.getpgid(proc.pid), signal.SIGTERM) + os.killpg(os.getpgid(proc.pid), sig) else: - os.kill(proc.pid, signal.SIGTERM) + os.kill(proc.pid, sig) except ProcessLookupError: pass @@ -249,11 +249,9 @@ def kill(proc, signal): elapsed = 0 def wait(proc): nonlocal elapsed - while (ret := proc.poll()) is None and elapsed < timeout: time.sleep(step) elapsed += step - return ret is None k = 0 @@ -280,14 +278,24 @@ def wait(proc): @contextmanager -def process_cleaner(): +def process_cleaner(timeout=30): """Delay signal handling until all the processes have been killed""" - with Protected(): + def kill_everything(processes): + def _(): + destroy(*processes, timeout=timeout) + + return _ + + with Protected() as signalhandler: with GPUProcessWarden() as warden: # => SIGTERM all processes using GPUs processes = [] - try: # NOTE: we have not waited much between both signals + + # when a signal is received kill the known processes first + # then handle the signal + signalhandler.stop = kill_everything(processes) + try: # NOTE: we have not waited much between both signals warden.kill() # => SIGKILL all processes using GPUs yield processes # => Run milabench, spawning processes for the benches @@ -295,10 +303,9 @@ def process_cleaner(): finally: warden.terminate() # => SIGTERM all processes using GPUs - destroy(*processes) # => SIGTERM+SIGKILL milabench processes + destroy(*processes, timeout=timeout) # => SIGTERM+SIGKILL milabench processes # destroy waited 30s # warden.__exit__ # => SIGKILL all processes still using GPUs - diff --git a/milabench/commands/executors.py b/milabench/commands/executors.py index 7962dd7f5..8914d1251 100644 --- a/milabench/commands/executors.py +++ b/milabench/commands/executors.py @@ -61,7 +61,6 @@ async def execute_command( for pack in command.packs(): pack.phase = phase - timeout_tasks = [] with process_cleaner() as warden: for pack, argv, _kwargs in command.commands(): await pack.send(event="config", data=pack.config) @@ -77,7 +76,10 @@ async def execute_command( try: return await asyncio.wait_for(asyncio.gather(*coro), timeout=delay) - except TimeoutError | asyncio.TimeoutError: + except TimeoutError: + await force_terminate(pack, delay) + return [-1 for _ in coro] + except asyncio.TimeoutError: await force_terminate(pack, delay) return [-1 for _ in coro] diff --git a/milabench/metadata.py b/milabench/metadata.py index d28005542..392f01f12 100644 --- a/milabench/metadata.py +++ b/milabench/metadata.py @@ -2,7 +2,7 @@ import os import subprocess import traceback -from datetime import datetime +import datetime import cpuinfo from voir.instruments.gpu import get_gpu_info @@ -36,6 +36,7 @@ def fetch_torch_version(pack): return json.loads(result.stdout) +@error_guard({}) def machine_metadata(pack=None): """Retrieve machine metadata""" @@ -61,7 +62,7 @@ def machine_metadata(pack=None): "machine": uname.machine, }, "accelerators": gpus, - "date": datetime.utcnow().timestamp(), + "date": datetime.datetime.now(datetime.UTC).timestamp(), "milabench": retrieve_git_versions( __tag__, __commit__, diff --git a/poetry.lock b/poetry.lock index 965c963f1..66f14cc37 100644 --- a/poetry.lock +++ b/poetry.lock @@ -2128,13 +2128,13 @@ test = ["covdefaults (>=2.3)", "coverage (>=7.2.7)", "coverage-enable-subprocess [[package]] name = "voir" -version = "0.2.14" +version = "0.2.16" description = "Instrument, extend and visualize your programs" optional = false python-versions = "<4.0,>=3.7" files = [ - {file = "voir-0.2.14-py3-none-any.whl", hash = "sha256:5cc15e120a987cd96217368d0aa6881b94d067c2355e227dee103fe5dbf79173"}, - {file = "voir-0.2.14.tar.gz", hash = "sha256:ee4cb041546b8160806afea2a7a6d156b383c02e68235d8232426db73fba4d6d"}, + {file = "voir-0.2.16-py3-none-any.whl", hash = "sha256:d0beee6778e4d37f6c087362f55baa526b286399b509443b547ca3844332808c"}, + {file = "voir-0.2.16.tar.gz", hash = "sha256:73feda0b941e9247ca333611b9a8207ab6d154b2f6c5cf34f25770204aef9b1e"}, ] [package.dependencies] @@ -2189,4 +2189,4 @@ test = ["big-O", "importlib-resources", "jaraco.functools", "jaraco.itertools", [metadata] lock-version = "2.0" python-versions = ">=3.10,<4.0" -content-hash = "fc56300d7125b71e5aa10314e2987da44864f8585ffcf515450bc9704cae5c57" +content-hash = "e7242eff960d340d5675e36f416a14f90638f4b1671eee58bcd6c35af29fdb84" diff --git a/pyproject.toml b/pyproject.toml index d9c101a69..fa2bf0aab 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -10,7 +10,7 @@ authors = [ license = "MIT" [tool.poetry.dependencies] -voir = "0.2.14" +voir = ">=0.2.14" benchmate = {path = "benchmate", develop = false} python = ">=3.10,<4.0" giving = "^0.4.0" diff --git a/tests/benchmate/test_protected.py b/tests/benchmate/test_protected.py index ce0ca377f..be5ca59fc 100644 --- a/tests/benchmate/test_protected.py +++ b/tests/benchmate/test_protected.py @@ -16,14 +16,30 @@ def _worker(delay): print('done') +def fake_poll(self): + def poll(*args, **kwargs): + try: + if self.exitcode is not None: + return self.exitcode + + if self.alive(): + pid, rc = os.waitpid(self.pid, 0) + return rc + else: + return 0 + except: + return None + + return poll def spawn(delay, warden): procs = [] for _ in range(10): proc = multiprocessing.Process(target=_worker, args=(delay,)) proc.start() + proc.poll = fake_poll(proc) procs.append(proc) - warden.add_process(proc) + warden.append(proc) return procs @@ -58,7 +74,7 @@ def test_process_cleaner_process(): proc = multiprocessing.Process(target=_protected_process, args=(60,)) proc.start() - time.sleep(1) + time.sleep(2) os.kill(proc.pid, signal.SIGINT) elapsed = time.time() - start @@ -69,16 +85,19 @@ def test_keyboard_cleaner_process(): start = time.time() with pytest.raises(KeyboardInterrupt): - with process_cleaner() as warden: + with process_cleaner(10) as warden: procs = spawn(60, warden) + assert len(warden) != 0 + print(warden[0]) + time.sleep(1) os.kill(os.getpid(), signal.SIGINT) wait(procs) elapsed = time.time() - start - assert elapsed < 30 + assert elapsed < 12 def test_keyboard_cleaner_process_ended(): @@ -106,10 +125,10 @@ def ctor(*args, **kwargs): return kwargs with pytest.raises(KeyboardInterrupt): - with process_cleaner() as warden: + with process_cleaner(timeout=10) as warden: mx = Multiplexer(timeout=0, constructor=ctor) proc = mx.start(["sleep", "60"], info={}, env={}, **{}) - warden.add_process(proc) + warden.append(proc) time.sleep(2) os.kill(os.getpid(), signal.SIGINT) @@ -119,7 +138,7 @@ def ctor(*args, **kwargs): print(entry) elapsed = time.time() - start - assert elapsed < 30 + assert elapsed < 12 def test_protected_multiplexer_ended(): @@ -128,10 +147,10 @@ def test_protected_multiplexer_ended(): start = time.time() with pytest.raises(KeyboardInterrupt): - with process_cleaner() as warden: + with process_cleaner(timeout=10) as warden: mx = Multiplexer(timeout=0, constructor=lambda **kwargs: kwargs) proc = mx.start(["sleep", "1"], info={}, env={}, **{}) - warden.add_process(proc) + warden.append(proc) time.sleep(2) os.kill(os.getpid(), signal.SIGINT) @@ -141,4 +160,4 @@ def test_protected_multiplexer_ended(): print(entry) elapsed = time.time() - start - assert elapsed < 30 \ No newline at end of file + assert elapsed < 10 \ No newline at end of file diff --git a/tests/test_executors.py b/tests/test_executors.py index 43158feb5..9fbb84294 100644 --- a/tests/test_executors.py +++ b/tests/test_executors.py @@ -140,7 +140,7 @@ def test_pack_executor(): print(r) acc += 1 - assert acc == 4, "Only 4 message received (config, meta, start, end)" + assert acc >= 4, "Only 4 message received (config, meta, start, end)" def test_voir_executor(): @@ -234,7 +234,7 @@ def test_njobs_novoir_executor(): print(r) acc += 1 - assert acc == 2 * 10 + assert acc >= 2 * 10 def mock_gpu_list(): diff --git a/tests/test_metadata.py b/tests/test_metadata.py new file mode 100644 index 000000000..5b35d6bdd --- /dev/null +++ b/tests/test_metadata.py @@ -0,0 +1,5 @@ +from milabench.metadata import machine_metadata + + +def test_machine_metadata(): + print(machine_metadata()) diff --git a/tests/test_mock_run.py b/tests/test_mock_run.py new file mode 100644 index 000000000..e69de29bb