Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Stress Tests #4406

Merged
merged 19 commits into from
Mar 2, 2023
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions codalab/lib/upload_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,13 +244,12 @@ def write_fileobj(
try:
bytes_uploaded = 0
CHUNK_SIZE = 16 * 1024
ITERATIONS_PER_DISK_CHECK = 1
ITERATIONS_PER_DISK_CHECK = 2000
iteration = 0
with FileSystems.create(
bundle_path, compression_type=CompressionTypes.UNCOMPRESSED
) as out:
while True:
iteration += 1
to_send = output_fileobj.read(CHUNK_SIZE)
if not to_send:
break
Expand All @@ -276,6 +275,7 @@ def write_fileobj(
should_resume = progress_callback(bytes_uploaded)
if not should_resume:
raise Exception('Upload aborted by client')
iteration += 1
with FileSystems.open(
bundle_path, compression_type=CompressionTypes.UNCOMPRESSED
) as ttf, tempfile.NamedTemporaryFile(suffix=".sqlite") as tmp_index_file:
Expand Down
49 changes: 39 additions & 10 deletions codalab/model/bundle_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -2714,21 +2714,50 @@ def update_user_info(self, user_info):
cl_user.update().where(cl_user.c.user_id == user_info['user_id']).values(user_info)
)

def increment_user_time_used(self, user_id, amount):
def increment_user_disk_used(self, user_id: str, amount: int):
"""
User used some time.
Increment disk_used for user by amount.
When incrementing values, we have to use a special query to ensure that we avoid
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we shorten this docstring? probably a bit too much detail for posterity

race conditions or deadlock arising from multiple threads calling functions
concurrently.
In particular, we use with_for_update() and then commit() to ensure that
we lock the user table between the disk_used read and the increment write
because otherwise we might have the following interleaving between threads:
READ disk_used
READ disk_used
UPDATE disk_used + amount
UPDATE disk_used + amount
And this can actually lead to deadlock or to the second thread writing the
incorrect disk usage to the user table.
"""
user_info = self.get_user_info(user_id)
user_info['time_used'] += amount
self.update_user_info(user_info)
with self.engine.begin() as connection:
rows = connection.execute(
select([cl_user.c.disk_used]).where(cl_user.c.user_id == user_id).with_for_update()
)
if not rows:
raise NotFoundError("User with ID %s not found" % user_id)
disk_used = rows.first()[0] + amount
connection.execute(
cl_user.update().where(cl_user.c.user_id == user_id).values(disk_used=disk_used)
)
connection.commit()

def increment_user_disk_used(self, user_id: str, amount: int):
def increment_user_time_used(self, user_id: str, amount: int):
"""
Increment disk_used for user by amount
User used some time.
See comment for increment_user_disk_used.
"""
user_info = self.get_user_info(user_id)
user_info['disk_used'] += amount
self.update_user_info(user_info)
with self.engine.begin() as connection:
rows = connection.execute(
select([cl_user.c.time_used]).where(cl_user.c.user_id == user_id).with_for_update()
)
if not rows:
raise NotFoundError("User with ID %s not found" % user_id)
time_used = rows.first()[0] + amount
connection.execute(
cl_user.update().where(cl_user.c.user_id == user_id).values(time_used=time_used)
)
connection.commit()

def get_user_time_quota_left(self, user_id, user_info=None):
if not user_info:
Expand Down
4 changes: 2 additions & 2 deletions codalab/model/worker_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,7 @@ def send_json_message(self, socket_id, worker_id, message, timeout_secs, autoret
else:
success = True
except socket.error:
logging.debug("socket error when calling send_json_message")
logging.error("socket error when calling send_json_message")

if not success:
# Shouldn't be too expensive just to keep retrying.
Expand All @@ -430,7 +430,7 @@ def send_json_message(self, socket_id, worker_id, message, timeout_secs, autoret

sock.sendall(json.dumps(message).encode())
return True

logging.error("Socket message timeout.")
return False

def has_reply_permission(self, user_id, worker_id, socket_id):
Expand Down
2 changes: 1 addition & 1 deletion codalab/server/bundle_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -712,7 +712,7 @@ def _try_start_bundle(self, workers, worker, bundle, bundle_resources):
worker['socket_id'],
worker['worker_id'],
self._construct_run_message(worker['shared_file_system'], bundle, bundle_resources),
0.2,
1,
):
logger.info(
'Starting run bundle {} on worker {}'.format(bundle.uuid, worker['worker_id'])
Expand Down
2 changes: 1 addition & 1 deletion codalab/worker/upload_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def upload_with_chunked_encoding(

# Use chunked transfer encoding to send the data through.
bytes_uploaded = 0
ITERATIONS_PER_DISK_CHECK = 1
ITERATIONS_PER_DISK_CHECK = 2000
iteration = 0
while True:
to_send = fileobj.read(CHUNK_SIZE)
Expand Down
2 changes: 1 addition & 1 deletion codalab/worker_manager/azure_batch_worker_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ def get_worker_jobs(self) -> List[WorkerJob]:
def start_worker_job(self) -> None:
worker_image: str = 'codalab/worker:' + os.environ.get('CODALAB_VERSION', 'latest')
worker_id: str = uuid.uuid4().hex
logger.debug('Starting worker {} with image {}'.format(worker_id, worker_image))
logger.info('Starting worker {} with image {}'.format(worker_id, worker_image))
work_dir: str = (
self.args.worker_work_dir_prefix if self.args.worker_work_dir_prefix else "/tmp/"
)
Expand Down
47 changes: 47 additions & 0 deletions scripts/test_util.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import io
import signal
import subprocess
import sys
import time
import traceback

global cl
Expand Down Expand Up @@ -179,3 +181,48 @@ def cleanup(cl, tag, should_wait=True):
run_command([cl, 'wrm', uuid, '--force'])
worksheets_removed += 1
print('Removed {} bundles and {} worksheets.'.format(bundles_removed, worksheets_removed))


class timer:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

uppercase?

"""
Class that uses signal to interrupt functions while they're running
if they run for longer than timeout_seconds.
Can also be used to time how long functions take within its context manager.
Used for the timing tests.
"""

def __init__(self, timeout_seconds=1, handle_timeouts=True, uuid=None):
"""
A class that can be used as a context manager to ensure that code within that context manager times out
after timeout_seconds time and which times the execution of code within the context manager.
Parameters:
timeout_seconds (float): Amount of time before execution in context manager is interrupted for timeout
handle_timeouts (bool): If True, do not timeout, only return the time taken for execution in context manager.
uuid (str): Uuid of bundles running within context manager.
"""
self.handle_timeouts = handle_timeouts
self.timeout_seconds = timeout_seconds
self.uuid = None

def handle_timeout(self, signum, frame):
timeout_message = "Timeout ocurred"
if self.uuid:
timeout_message += " while waiting for %s to run" % self.uuid
raise TimeoutError(timeout_message)

def time_elapsed(self):
return time.time() - self.start_time

def __enter__(self):
self.start_time = time.time()
if self.handle_timeouts:
signal.signal(signal.SIGALRM, self.handle_timeout)
signal.setitimer(signal.ITIMER_REAL, self.timeout_seconds, self.timeout_seconds)

# now, reset itimer.
signal.setitimer(signal.ITIMER_REAL, 0, 0)

def __exit__(self, type, value, traceback):
self.time_elapsed = time.time() - self.start_time
if self.handle_timeouts:
signal.alarm(0)
Loading