diff --git a/codalab/lib/upload_manager.py b/codalab/lib/upload_manager.py index f2f740312..e57092436 100644 --- a/codalab/lib/upload_manager.py +++ b/codalab/lib/upload_manager.py @@ -244,13 +244,12 @@ def write_fileobj( try: bytes_uploaded = 0 CHUNK_SIZE = 16 * 1024 - ITERATIONS_PER_DISK_CHECK = 1 + ITERATIONS_PER_DISK_CHECK = 2000 iteration = 0 with FileSystems.create( bundle_path, compression_type=CompressionTypes.UNCOMPRESSED ) as out: while True: - iteration += 1 to_send = output_fileobj.read(CHUNK_SIZE) if not to_send: break @@ -276,6 +275,7 @@ def write_fileobj( should_resume = progress_callback(bytes_uploaded) if not should_resume: raise Exception('Upload aborted by client') + iteration += 1 with FileSystems.open( bundle_path, compression_type=CompressionTypes.UNCOMPRESSED ) as ttf, tempfile.NamedTemporaryFile(suffix=".sqlite") as tmp_index_file: diff --git a/codalab/model/bundle_model.py b/codalab/model/bundle_model.py index e907ec0b8..36f1f80ac 100644 --- a/codalab/model/bundle_model.py +++ b/codalab/model/bundle_model.py @@ -2714,21 +2714,41 @@ def update_user_info(self, user_info): cl_user.update().where(cl_user.c.user_id == user_info['user_id']).values(user_info) ) - def increment_user_time_used(self, user_id, amount): + def increment_user_disk_used(self, user_id: str, amount: int): """ - User used some time. + Increment disk_used for user by amount. + When incrementing values, we have to use a special query to ensure that we avoid + race conditions or deadlock arising from multiple threads calling functions + concurrently. We do this using with_for_update() and commit(). """ - user_info = self.get_user_info(user_id) - user_info['time_used'] += amount - self.update_user_info(user_info) + with self.engine.begin() as connection: + rows = connection.execute( + select([cl_user.c.disk_used]).where(cl_user.c.user_id == user_id).with_for_update() + ) + if not rows: + raise NotFoundError("User with ID %s not found" % user_id) + disk_used = rows.first()[0] + amount + connection.execute( + cl_user.update().where(cl_user.c.user_id == user_id).values(disk_used=disk_used) + ) + connection.commit() - def increment_user_disk_used(self, user_id: str, amount: int): + def increment_user_time_used(self, user_id: str, amount: int): """ - Increment disk_used for user by amount + User used some time. + See comment for increment_user_disk_used. """ - user_info = self.get_user_info(user_id) - user_info['disk_used'] += amount - self.update_user_info(user_info) + with self.engine.begin() as connection: + rows = connection.execute( + select([cl_user.c.time_used]).where(cl_user.c.user_id == user_id).with_for_update() + ) + if not rows: + raise NotFoundError("User with ID %s not found" % user_id) + time_used = rows.first()[0] + amount + connection.execute( + cl_user.update().where(cl_user.c.user_id == user_id).values(time_used=time_used) + ) + connection.commit() def get_user_time_quota_left(self, user_id, user_info=None): if not user_info: diff --git a/codalab/model/worker_model.py b/codalab/model/worker_model.py index 95e3d98e1..6fe1ecbc4 100644 --- a/codalab/model/worker_model.py +++ b/codalab/model/worker_model.py @@ -385,8 +385,8 @@ def send_json_message(self, socket_id, worker_id, message, timeout_secs, autoret Note, only the worker should call this method with autoretry set to False. See comments below. """ - start_time = time.time() self._ping_worker_ws(worker_id) + start_time = time.time() while time.time() - start_time < timeout_secs: with closing(socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)) as sock: sock.settimeout(timeout_secs) @@ -407,15 +407,14 @@ def send_json_message(self, socket_id, worker_id, message, timeout_secs, autoret success = sock.recv(len(WorkerModel.ACK)) == WorkerModel.ACK else: success = True - except socket.error: - logging.debug("socket error when calling send_json_message") + except socket.error as e: + logging.error(f"socket error when calling send_json_message: {e}") if not success: # Shouldn't be too expensive just to keep retrying. # TODO: maybe exponential backoff - time.sleep( - 0.3 - ) # changed from 0.003 to keep from rate-limiting due to dead workers + logging.error("Sleeping for 0.1 seconds.") + time.sleep(0.3) continue if not autoretry: @@ -430,7 +429,7 @@ def send_json_message(self, socket_id, worker_id, message, timeout_secs, autoret sock.sendall(json.dumps(message).encode()) return True - + logging.info("Socket message timeout.") return False def has_reply_permission(self, user_id, worker_id, socket_id): diff --git a/codalab/server/bundle_manager.py b/codalab/server/bundle_manager.py index 9a1cc549c..64ed682bd 100644 --- a/codalab/server/bundle_manager.py +++ b/codalab/server/bundle_manager.py @@ -712,7 +712,7 @@ def _try_start_bundle(self, workers, worker, bundle, bundle_resources): worker['socket_id'], worker['worker_id'], self._construct_run_message(worker['shared_file_system'], bundle, bundle_resources), - 0.2, + 1, ): logger.info( 'Starting run bundle {} on worker {}'.format(bundle.uuid, worker['worker_id']) diff --git a/codalab/worker/upload_util.py b/codalab/worker/upload_util.py index aa9548753..d73cbaaa2 100644 --- a/codalab/worker/upload_util.py +++ b/codalab/worker/upload_util.py @@ -65,7 +65,7 @@ def upload_with_chunked_encoding( # Use chunked transfer encoding to send the data through. bytes_uploaded = 0 - ITERATIONS_PER_DISK_CHECK = 1 + ITERATIONS_PER_DISK_CHECK = 2000 iteration = 0 while True: to_send = fileobj.read(CHUNK_SIZE) diff --git a/codalab/worker_manager/azure_batch_worker_manager.py b/codalab/worker_manager/azure_batch_worker_manager.py index fad705dad..9a61aa2d0 100644 --- a/codalab/worker_manager/azure_batch_worker_manager.py +++ b/codalab/worker_manager/azure_batch_worker_manager.py @@ -97,7 +97,7 @@ def get_worker_jobs(self) -> List[WorkerJob]: def start_worker_job(self) -> None: worker_image: str = 'codalab/worker:' + os.environ.get('CODALAB_VERSION', 'latest') worker_id: str = uuid.uuid4().hex - logger.debug('Starting worker {} with image {}'.format(worker_id, worker_image)) + logger.info('Starting worker {} with image {}'.format(worker_id, worker_image)) work_dir: str = ( self.args.worker_work_dir_prefix if self.args.worker_work_dir_prefix else "/tmp/" ) diff --git a/scripts/test_util.py b/scripts/test_util.py index f750cb93b..dccc5c038 100644 --- a/scripts/test_util.py +++ b/scripts/test_util.py @@ -1,6 +1,8 @@ import io +import signal import subprocess import sys +import time import traceback global cl @@ -179,3 +181,48 @@ def cleanup(cl, tag, should_wait=True): run_command([cl, 'wrm', uuid, '--force']) worksheets_removed += 1 print('Removed {} bundles and {} worksheets.'.format(bundles_removed, worksheets_removed)) + + +class Timer: + """ + Class that uses signal to interrupt functions while they're running + if they run for longer than timeout_seconds. + Can also be used to time how long functions take within its context manager. + Used for the timing tests. + """ + + def __init__(self, timeout_seconds=1, handle_timeouts=True, uuid=None): + """ + A class that can be used as a context manager to ensure that code within that context manager times out + after timeout_seconds time and which times the execution of code within the context manager. + Parameters: + timeout_seconds (float): Amount of time before execution in context manager is interrupted for timeout + handle_timeouts (bool): If True, do not timeout, only return the time taken for execution in context manager. + uuid (str): Uuid of bundles running within context manager. + """ + self.handle_timeouts = handle_timeouts + self.timeout_seconds = timeout_seconds + self.uuid = None + + def handle_timeout(self, signum, frame): + timeout_message = "Timeout ocurred" + if self.uuid: + timeout_message += " while waiting for %s to run" % self.uuid + raise TimeoutError(timeout_message) + + def time_elapsed(self): + return time.time() - self.start_time + + def __enter__(self): + self.start_time = time.time() + if self.handle_timeouts: + signal.signal(signal.SIGALRM, self.handle_timeout) + signal.setitimer(signal.ITIMER_REAL, self.timeout_seconds, self.timeout_seconds) + + # now, reset itimer. + signal.setitimer(signal.ITIMER_REAL, 0, 0) + + def __exit__(self, type, value, traceback): + self.time_elapsed = time.time() - self.start_time + if self.handle_timeouts: + signal.alarm(0) diff --git a/tests/stress/stress_test.py b/tests/stress/stress_test.py index 37f681e98..5e47ea7df 100644 --- a/tests/stress/stress_test.py +++ b/tests/stress/stress_test.py @@ -1,4 +1,6 @@ import argparse +from collections import defaultdict +import json import os import random import string @@ -9,7 +11,13 @@ from multiprocessing import cpu_count, Pool from threading import Thread -from scripts.test_util import cleanup, run_command +from scripts.test_util import cleanup, run_command, Timer + + +def temp_path(file_name): + root = '/tmp' + return os.path.join(root, file_name) + """ Script to stress test CodaLab's backend. The following is a list of what's being tested: @@ -38,6 +46,7 @@ class TestFile: def __init__(self, file_name, size_mb=1, content=None): self._file_name = file_name + self._file_path = temp_path(file_name) if content is None: self._size_mb = size_mb self._make_random_file() @@ -45,27 +54,30 @@ def __init__(self, file_name, size_mb=1, content=None): self._write_content(content) def _make_random_file(self): - with open(self._file_name, 'wb') as file: - file.seek(self._size_mb * 1024 * 1024) # Seek takes in file size in terms of bytes + with open(self._file_path, 'wb') as file: + file.seek(int(self._size_mb * 1024 * 1024)) # Seek takes in file size in terms of bytes file.write(b'0') - print('Created file {} of size {} MB.'.format(self._file_name, self._size_mb)) + print('Created file {} of size {} MB.'.format(self._file_path, self._size_mb)) def _write_content(self, content): - with open(self._file_name, 'w') as file: + with open(self._file_path, 'w') as file: file.write(content) def name(self): return self._file_name + def path(self): + return self._file_path + def delete(self): ''' Removes the file. ''' - if os.path.exists(self._file_name): - os.remove(self._file_name) - print('Deleted file {}.'.format(self._file_name)) + if os.path.exists(self._file_path): + os.remove(self._file_path) + print('Deleted file {}.'.format(self._file_path)) else: - print('File {} has already been deleted.'.format(self._file_name)) + print('File {} has already been deleted.'.format(self._file_path)) class StressTestRunner: @@ -106,65 +118,51 @@ class StressTestRunner: def __init__(self, cl, args): self._cl = cl self._args = args + self._runs = defaultdict(list) # Connect to the instance the stress tests will run on print('Connecting to instance %s...' % args.instance) subprocess.call([self._cl, 'work', '%s::' % args.instance]) + def time_function(self, fn): + t = Timer(handle_timeouts=False) + with t: + fn() + self.cleanup() + print(f'{fn.__name__} finished in {t.time_elapsed}') + self._runs[fn.__name__].append(t.time_elapsed) + + def test_function(self, fn): + try: + self.time_function(fn) + except Exception as e: + print(f"Exception for function {fn.__name__}: {e}") + self._runs[fn.__name__].append(str(e)) + def run(self): print('Cleaning up stress test files from other runs...') - cleanup(self._cl, StressTestRunner._TAG, should_wait=False) + cleanup(self._cl, StressTestRunner._TAG, should_wait=True) print('Running stress tests...') self._start_heartbeat() - self._test_large_bundle_result() - print('_test_large_bundle_result finished') - self.cleanup() - - self._test_large_bundle_upload() - print('_test_large_bundle_upload finished') - self.cleanup() - - self._test_many_gpu_runs() - print('_test_many_gpu_runs finished') - self.cleanup() - - self._test_multiple_cpus_runs_count() - print('_test_multiple_cpus_runs_count finished') - self.cleanup() - - self._test_many_bundle_uploads() - print('_test_many_bundle_uploads finished') - self.cleanup() - - self._test_many_worksheet_copies() - print('_test_many_worksheet_copies finished') - self.cleanup() - - self._test_parallel_runs() - print('_test_parallel_runs finished') - self.cleanup() - - self._test_many_docker_runs() - print('_test_many_docker_runs finished') - self.cleanup() - - self._test_infinite_memory() - print('_test_infinite_memory finished') - self.cleanup() - - self._test_infinite_gpu() - print('_test_infinite_gpu finished') - self.cleanup() - - self._test_infinite_disk() - print('_test_infinite_disk finished') - self.cleanup() - - self._test_many_disk_writes() - print('_test_many_disk_writes finished') - self.cleanup() + functions = [ + self._test_large_bundle_upload, + self._test_large_bundle_result, + self._test_many_gpu_runs, + self._test_multiple_cpus_runs_count, + self._test_many_bundle_uploads, + self._test_many_worksheet_copies, + self._test_parallel_runs, + self._test_many_docker_runs, + self._test_infinite_memory, + self._test_infinite_gpu, + self._test_infinite_disk, + self._test_many_disk_writes, + ] + + for fn in functions: + self.test_function(fn) print('Done.') def _start_heartbeat(self): @@ -182,8 +180,8 @@ def _heartbeat(self): if t.is_alive(): print('Heartbeat failed. Exiting...') sys.exit(1) - # Have heartbeat run every 30 seconds - time.sleep(30) + # Have heartbeat run every 10 minutes + time.sleep(600) def _test_large_bundle_result(self) -> None: def create_large_file_in_bundle(large_file_size_gb: int) -> TestFile: @@ -194,7 +192,7 @@ def create_large_file_in_bundle(large_file_size_gb: int) -> TestFile: self._set_worksheet('large_bundle_result') file: TestFile = create_large_file_in_bundle(self._args.large_dependency_size_gb) - self._run_bundle([self._cl, 'upload', file.name()]) + self._run_bundle([self._cl, 'upload', file.path()]) file.delete() dependency_uuid: str = self._run_bundle( @@ -214,7 +212,7 @@ def create_large_file_in_bundle(large_file_size_gb: int) -> TestFile: def _test_large_bundle_upload(self) -> None: self._set_worksheet('large_bundle_upload') large_file: TestFile = TestFile('large_file', self._args.large_file_size_gb * 1000) - dependency_uuid: str = self._run_bundle([self._cl, 'upload', large_file.name()]) + dependency_uuid: str = self._run_bundle([self._cl, 'upload', large_file.path()]) large_file.delete() uuid: str = self._run_bundle( [ @@ -241,7 +239,7 @@ def _test_many_bundle_uploads(self): self._set_worksheet('many_bundle_uploads') file = TestFile('small_file', 1) for _ in range(self._args.bundle_upload_count): - self._run_bundle([self._cl, 'upload', file.name()]) + self._run_bundle([self._cl, 'upload', file.path()]) file.delete() def _test_many_worksheet_copies(self): @@ -249,7 +247,7 @@ def _test_many_worksheet_copies(self): worksheet_uuid = self._set_worksheet('many_worksheet_copies') file = TestFile('copy_file', 1) for _ in range(10): - self._run_bundle([self._cl, 'upload', file.name()]) + self._run_bundle([self._cl, 'upload', file.path()]) file.delete() # Create many worksheets with current worksheet's content copied over @@ -284,7 +282,7 @@ def _test_infinite_memory(self): return self._set_worksheet('infinite_memory') file = self._create_infinite_memory_script() - self._run_bundle([self._cl, 'upload', file.name()]) + self._run_bundle([self._cl, 'upload', file.path()]) self._run_bundle( [self._cl, 'run', ':' + file.name(), 'python ' + file.name()], expected_exit_code=1 ) @@ -299,7 +297,7 @@ def _test_infinite_gpu(self): return self._set_worksheet('infinite_gpu') file = self._create_infinite_memory_script() - self._run_bundle([self._cl, 'upload', file.name()]) + self._run_bundle([self._cl, 'upload', file.path()]) for _ in range(self._args.infinite_gpu_runs_count): self._run_bundle( [self._cl, 'run', ':' + file.name(), 'python ' + file.name(), '--request-gpus=1'], @@ -393,6 +391,7 @@ def main(): runner.run() duration_seconds = time.time() - start_time print("--- Completion Time: {} minutes---".format(duration_seconds / 60)) + print(json.dumps(runner._runs)) if __name__ == '__main__':