Skip to content

Commit

Permalink
Added changes to make the parallel disk updates work with the disk in…
Browse files Browse the repository at this point in the history
…cremental updates
  • Loading branch information
AndrewJGaut committed Feb 8, 2023
1 parent 769870e commit 47a8b47
Show file tree
Hide file tree
Showing 7 changed files with 40 additions and 8 deletions.
3 changes: 2 additions & 1 deletion codalab/client/json_api_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -634,7 +634,7 @@ def fetch_contents_blob(self, target, range_=None, head=None, tail=None, truncat

@wrap_exception('Unable to upload contents of bundle {1}')
def upload_contents_blob(
self, bundle_id, fileobj=None, pass_self=False, params=None, progress_callback=None
self, bundle_id, fileobj=None, pass_self=False, bundle_uuid=None, params=None, progress_callback=None
):
"""
Uploads the contents of the given fileobj as the contents of specified
Expand All @@ -660,6 +660,7 @@ def upload_contents_blob(
query_params=params,
fileobj=fileobj,
pass_self=pass_self,
bundle_uuid=bundle_uuid,
progress_callback=progress_callback,
)

Expand Down
12 changes: 11 additions & 1 deletion codalab/lib/upload_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ def write_fileobj(
unpack_archive: bool,
bundle_conn_str=None,
index_conn_str=None,
bundle_uuid=None,
progress_callback=None,
):
"""Writes fileobj indicated, unpacks if specified, and uploads it to the path at bundle_path.
Expand Down Expand Up @@ -195,6 +196,7 @@ def write_fileobj(
unpack_archive: bool,
bundle_conn_str=None,
index_conn_str=None,
bundle_uuid=None,
progress_callback=None,
):
if unpack_archive:
Expand Down Expand Up @@ -227,6 +229,7 @@ def write_fileobj(
unpack_archive: bool,
bundle_conn_str=None,
index_conn_str=None,
bundle_uuid=None,
progress_callback=None,
):
if unpack_archive:
Expand Down Expand Up @@ -256,7 +259,7 @@ def write_fileobj(
# Update disk and check if client has gone over disk usage.
if self._client and iteration % ITERATIONS_PER_DISK_CHECK == 0:
self._client.update(
'user/increment_disk_used', {'disk_used_increment': len(to_send)}
'user/increment_disk_used', {'disk_used_increment': len(to_send), 'bundle_uuid': bundle_uuid}
)
user_info = self._client.fetch('user')
if user_info['disk_used'] >= user_info['disk_quota']:
Expand Down Expand Up @@ -486,6 +489,7 @@ def upload_to_bundle_store(
source_ext=source_ext,
should_unpack=unpack_before_upload,
json_api_client=self._client,
bundle_uuid=bundle['id'],
progress_callback=progress.update,
)
self._client.update_bundle_state(bundle['id'], params={'success': True})
Expand All @@ -512,6 +516,7 @@ def upload_to_bundle_store(
},
progress_callback=progress.update,
pass_self=True,
bundle_uuid=bundle['id']
)

def upload_Azure_blob_storage(
Expand All @@ -524,6 +529,7 @@ def upload_Azure_blob_storage(
source_ext,
should_unpack,
json_api_client,
bundle_uuid,
progress_callback=None,
):
"""
Expand Down Expand Up @@ -552,6 +558,7 @@ def upload_Azure_blob_storage(
should_unpack,
bundle_conn_str,
index_conn_str,
bundle_uuid,
progress_callback,
)

Expand All @@ -565,6 +572,7 @@ def upload_GCS_blob_storage(
source_ext,
should_unpack,
json_api_client,
bundle_uuid,
progress_callback=None,
):
from codalab.lib import zip_util
Expand All @@ -582,6 +590,7 @@ def upload_GCS_blob_storage(
fileobj=output_fileobj,
query_params={},
progress_callback=progress_callback,
bundle_uuid=bundle_uuid,
json_api_client=json_api_client,
)
# upload the index file
Expand All @@ -602,5 +611,6 @@ def upload_GCS_blob_storage(
query_params={},
fileobj=open(tmp_index_file.name, "rb"),
progress_callback=None,
bundle_uuid=bundle_uuid,
json_api_client=self._client,
)
13 changes: 11 additions & 2 deletions codalab/model/bundle_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -1143,17 +1143,26 @@ def update_disk_metadata(self, bundle, bundle_location, enforce_disk_quota=False
# TODO(Ashwin): make this non-fs specific
data_hash = '0x%s' % (path_util.hash_directory(bundle_location, dirs_and_files))
data_size = path_util.get_size(bundle_location, dirs_and_files)
current_data_size = 0
try:
if 'data_size' in bundle.metadata.__dict__:
current_data_size = bundle.metadata.data_size
else:
current_data_size = int(self.get_bundle_metadata([bundle.uuid], 'data_size')[bundle.uuid])
except:
current_data_size = 0
disk_increment = data_size - current_data_size
if enforce_disk_quota:
disk_left = self.get_user_disk_quota_left(bundle.owner_id)
if data_size > disk_left:
if disk_increment > disk_left:
raise UsageError(
"Can't save bundle, bundle size %s greater than user's disk quota left: %s"
% (data_size, disk_left)
)

bundle_update = {'data_hash': data_hash, 'metadata': {'data_size': data_size}}
self.update_bundle(bundle, bundle_update)
self.increment_user_disk_used(bundle.owner_id, data_size)
self.increment_user_disk_used(bundle.owner_id, disk_increment)

def bundle_checkin(self, bundle, worker_run, user_id, worker_id):
"""
Expand Down
10 changes: 10 additions & 0 deletions codalab/rest/users.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,12 +260,22 @@ def increment_user_disk_used():
# TODO(agaut): Potentially convert the below to use a Schema (like those in schemas.py)
# (Although, that does have downsides in this case.)
disk_used_increment = request.json['data'][0]['attributes']['disk_used_increment']
bundle_uuid = request.json['data'][0]['attributes']['bundle_uuid']

# only allow positive disk increments so that users can't abuse this endpoint.
if disk_used_increment <= 0:
abort(http.client.BAD_REQUEST, "Only positive disk increments are allowed.")

local.model.increment_user_disk_used(request.user.user_id, disk_used_increment)
data_size = 0
try:
data_size = int(local.model.get_bundle_metadata([bundle_uuid], 'data_size')[bundle_uuid])
except:
data_size = 0
#import pdb; pdb.set_trace()
new_data_size = data_size + disk_used_increment
bundle = local.model.get_bundle(bundle_uuid)
local.model.update_bundle(bundle, {'metadata': {'data_size': new_data_size}})
return (
AuthenticatedUserSchema(many=True).dump([local.model.get_user(request.user.user_id)]).data
)
3 changes: 2 additions & 1 deletion codalab/worker/rest_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ def _make_request(
raise RestClientException('Invalid JSON: ' + response_data, False)

def _upload_with_chunked_encoding(
self, method, url, query_params, fileobj, pass_self=False, progress_callback=None
self, method, url, query_params, fileobj, pass_self=False, bundle_uuid=None, progress_callback=None
):
"""
Uploads the fileobj to url using method with query_params,
Expand Down Expand Up @@ -148,4 +148,5 @@ def _upload_with_chunked_encoding(
url=url,
progress_callback=progress_callback,
json_api_client=json_api_client,
bundle_uuid=bundle_uuid
)
3 changes: 2 additions & 1 deletion codalab/worker/upload_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ def upload_with_chunked_encoding(
need_response=False,
url="",
progress_callback=None,
bundle_uuid=None,
json_api_client=None,
):
"""
Expand Down Expand Up @@ -76,7 +77,7 @@ def upload_with_chunked_encoding(
# Update disk and check if client has gone over disk usage.
if json_api_client and iteration % ITERATIONS_PER_DISK_CHECK == 0:
json_api_client.update(
'user/increment_disk_used', {'disk_used_increment': len(to_send)}
'user/increment_disk_used', {'disk_used_increment': len(to_send), 'bundle_uuid': bundle_uuid}
)
user_info = json_api_client.fetch('user')
if user_info['disk_used'] >= user_info['disk_quota']:
Expand Down
4 changes: 2 additions & 2 deletions tests/cli/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -1373,16 +1373,16 @@ def test_disk(ctx):
disk_used = _run_command([cl, 'uinfo', 'codalab', '-f', 'disk_used'])
uuid = _run_command([cl, 'upload', test_path('dir1')])
wait_until_state(uuid, State.READY)

# Directories are stored in their gzipped format when uploaded to Azure/GCS
# but are stored in their full file size format on disk.
if os.environ.get("CODALAB_ALWAYS_USE_AZURE_BLOB_BETA") == '1':
tarred_dir = pack_files_for_upload([test_path('dir1')], True, False)['fileobj']
file_size = len(tarred_dir.read())
else:
file_size = path_util.get_size(test_path('dir1'))
data_size = _run_command([cl, 'info', uuid, '-f', 'data_size'])
check_equals(
str(int(disk_used) + file_size), _run_command([cl, 'uinfo', 'codalab', '-f', 'disk_used'])
str(int(disk_used) + int(data_size)), _run_command([cl, 'uinfo', 'codalab', '-f', 'disk_used'])
)
_run_command([cl, 'rm', uuid])
check_equals(disk_used, _run_command([cl, 'uinfo', 'codalab', '-f', 'disk_used']))
Expand Down

0 comments on commit 47a8b47

Please sign in to comment.