Skip to content

Commit

Permalink
Fix Stress Tests (#4406)
Browse files Browse the repository at this point in the history
* bump version to 1.6.1

* fix mysql db deadlock

* do upload first so we can check for deadlock

* minor update

* Modified the code to use better iterations per disk check to be more effiicnet

* add changes to increment time used as well

* revert chnages from rc1.6.1 since I acttually branched off from that rather than matster

* minor change for debugging stress tests

* minor changes

* update stress tests to get more fine-grained information about what might be goign wrong

* update worker manager to log image name

* small changes to try and debug stress test issue

* get remaining tests to pass

* Add in better error logging for send_json_message

* minor update to help with finalization issue

* Address comments

---------

Co-authored-by: Jiani Wang <[email protected]>
  • Loading branch information
AndrewJGaut and wwwjn authored Mar 2, 2023
1 parent 5955017 commit 7833e8e
Show file tree
Hide file tree
Showing 8 changed files with 152 additions and 87 deletions.
4 changes: 2 additions & 2 deletions codalab/lib/upload_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,13 +244,12 @@ def write_fileobj(
try:
bytes_uploaded = 0
CHUNK_SIZE = 16 * 1024
ITERATIONS_PER_DISK_CHECK = 1
ITERATIONS_PER_DISK_CHECK = 2000
iteration = 0
with FileSystems.create(
bundle_path, compression_type=CompressionTypes.UNCOMPRESSED
) as out:
while True:
iteration += 1
to_send = output_fileobj.read(CHUNK_SIZE)
if not to_send:
break
Expand All @@ -276,6 +275,7 @@ def write_fileobj(
should_resume = progress_callback(bytes_uploaded)
if not should_resume:
raise Exception('Upload aborted by client')
iteration += 1
with FileSystems.open(
bundle_path, compression_type=CompressionTypes.UNCOMPRESSED
) as ttf, tempfile.NamedTemporaryFile(suffix=".sqlite") as tmp_index_file:
Expand Down
40 changes: 30 additions & 10 deletions codalab/model/bundle_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -2714,21 +2714,41 @@ def update_user_info(self, user_info):
cl_user.update().where(cl_user.c.user_id == user_info['user_id']).values(user_info)
)

def increment_user_time_used(self, user_id, amount):
def increment_user_disk_used(self, user_id: str, amount: int):
"""
User used some time.
Increment disk_used for user by amount.
When incrementing values, we have to use a special query to ensure that we avoid
race conditions or deadlock arising from multiple threads calling functions
concurrently. We do this using with_for_update() and commit().
"""
user_info = self.get_user_info(user_id)
user_info['time_used'] += amount
self.update_user_info(user_info)
with self.engine.begin() as connection:
rows = connection.execute(
select([cl_user.c.disk_used]).where(cl_user.c.user_id == user_id).with_for_update()
)
if not rows:
raise NotFoundError("User with ID %s not found" % user_id)
disk_used = rows.first()[0] + amount
connection.execute(
cl_user.update().where(cl_user.c.user_id == user_id).values(disk_used=disk_used)
)
connection.commit()

def increment_user_disk_used(self, user_id: str, amount: int):
def increment_user_time_used(self, user_id: str, amount: int):
"""
Increment disk_used for user by amount
User used some time.
See comment for increment_user_disk_used.
"""
user_info = self.get_user_info(user_id)
user_info['disk_used'] += amount
self.update_user_info(user_info)
with self.engine.begin() as connection:
rows = connection.execute(
select([cl_user.c.time_used]).where(cl_user.c.user_id == user_id).with_for_update()
)
if not rows:
raise NotFoundError("User with ID %s not found" % user_id)
time_used = rows.first()[0] + amount
connection.execute(
cl_user.update().where(cl_user.c.user_id == user_id).values(time_used=time_used)
)
connection.commit()

def get_user_time_quota_left(self, user_id, user_info=None):
if not user_info:
Expand Down
13 changes: 6 additions & 7 deletions codalab/model/worker_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -385,8 +385,8 @@ def send_json_message(self, socket_id, worker_id, message, timeout_secs, autoret
Note, only the worker should call this method with autoretry set to
False. See comments below.
"""
start_time = time.time()
self._ping_worker_ws(worker_id)
start_time = time.time()
while time.time() - start_time < timeout_secs:
with closing(socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)) as sock:
sock.settimeout(timeout_secs)
Expand All @@ -407,15 +407,14 @@ def send_json_message(self, socket_id, worker_id, message, timeout_secs, autoret
success = sock.recv(len(WorkerModel.ACK)) == WorkerModel.ACK
else:
success = True
except socket.error:
logging.debug("socket error when calling send_json_message")
except socket.error as e:
logging.error(f"socket error when calling send_json_message: {e}")

if not success:
# Shouldn't be too expensive just to keep retrying.
# TODO: maybe exponential backoff
time.sleep(
0.3
) # changed from 0.003 to keep from rate-limiting due to dead workers
logging.error("Sleeping for 0.1 seconds.")
time.sleep(0.3)
continue

if not autoretry:
Expand All @@ -430,7 +429,7 @@ def send_json_message(self, socket_id, worker_id, message, timeout_secs, autoret

sock.sendall(json.dumps(message).encode())
return True

logging.info("Socket message timeout.")
return False

def has_reply_permission(self, user_id, worker_id, socket_id):
Expand Down
2 changes: 1 addition & 1 deletion codalab/server/bundle_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -712,7 +712,7 @@ def _try_start_bundle(self, workers, worker, bundle, bundle_resources):
worker['socket_id'],
worker['worker_id'],
self._construct_run_message(worker['shared_file_system'], bundle, bundle_resources),
0.2,
1,
):
logger.info(
'Starting run bundle {} on worker {}'.format(bundle.uuid, worker['worker_id'])
Expand Down
2 changes: 1 addition & 1 deletion codalab/worker/upload_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def upload_with_chunked_encoding(

# Use chunked transfer encoding to send the data through.
bytes_uploaded = 0
ITERATIONS_PER_DISK_CHECK = 1
ITERATIONS_PER_DISK_CHECK = 2000
iteration = 0
while True:
to_send = fileobj.read(CHUNK_SIZE)
Expand Down
2 changes: 1 addition & 1 deletion codalab/worker_manager/azure_batch_worker_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ def get_worker_jobs(self) -> List[WorkerJob]:
def start_worker_job(self) -> None:
worker_image: str = 'codalab/worker:' + os.environ.get('CODALAB_VERSION', 'latest')
worker_id: str = uuid.uuid4().hex
logger.debug('Starting worker {} with image {}'.format(worker_id, worker_image))
logger.info('Starting worker {} with image {}'.format(worker_id, worker_image))
work_dir: str = (
self.args.worker_work_dir_prefix if self.args.worker_work_dir_prefix else "/tmp/"
)
Expand Down
47 changes: 47 additions & 0 deletions scripts/test_util.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import io
import signal
import subprocess
import sys
import time
import traceback

global cl
Expand Down Expand Up @@ -179,3 +181,48 @@ def cleanup(cl, tag, should_wait=True):
run_command([cl, 'wrm', uuid, '--force'])
worksheets_removed += 1
print('Removed {} bundles and {} worksheets.'.format(bundles_removed, worksheets_removed))


class Timer:
"""
Class that uses signal to interrupt functions while they're running
if they run for longer than timeout_seconds.
Can also be used to time how long functions take within its context manager.
Used for the timing tests.
"""

def __init__(self, timeout_seconds=1, handle_timeouts=True, uuid=None):
"""
A class that can be used as a context manager to ensure that code within that context manager times out
after timeout_seconds time and which times the execution of code within the context manager.
Parameters:
timeout_seconds (float): Amount of time before execution in context manager is interrupted for timeout
handle_timeouts (bool): If True, do not timeout, only return the time taken for execution in context manager.
uuid (str): Uuid of bundles running within context manager.
"""
self.handle_timeouts = handle_timeouts
self.timeout_seconds = timeout_seconds
self.uuid = None

def handle_timeout(self, signum, frame):
timeout_message = "Timeout ocurred"
if self.uuid:
timeout_message += " while waiting for %s to run" % self.uuid
raise TimeoutError(timeout_message)

def time_elapsed(self):
return time.time() - self.start_time

def __enter__(self):
self.start_time = time.time()
if self.handle_timeouts:
signal.signal(signal.SIGALRM, self.handle_timeout)
signal.setitimer(signal.ITIMER_REAL, self.timeout_seconds, self.timeout_seconds)

# now, reset itimer.
signal.setitimer(signal.ITIMER_REAL, 0, 0)

def __exit__(self, type, value, traceback):
self.time_elapsed = time.time() - self.start_time
if self.handle_timeouts:
signal.alarm(0)
Loading

0 comments on commit 7833e8e

Please sign in to comment.