Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bundle store details page #3860

Merged
merged 29 commits into from
Dec 14, 2021
Merged
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
eeaccd7
fix error message
jzwang43 Sep 27, 2021
2f637a5
add bundle store cli commands
jzwang43 Oct 10, 2021
8488f1c
added store view
jzwang43 Oct 26, 2021
2fb660e
fix text
jzwang43 Oct 26, 2021
ce738f9
fix test
jzwang43 Oct 26, 2021
6be7cac
Update codalab/lib/bundle_cli.py
jzwang43 Dec 2, 2021
bcc516c
Remove replicate for now
jzwang43 Dec 2, 2021
4cd9231
Revert "fix error message"
jzwang43 Dec 2, 2021
dbcf8bd
Merge branch 'master' into bundle-store-cli
jzwang43 Dec 2, 2021
68ac1a3
Merge branch 'master' into bundle-store-cli
jzwang43 Dec 10, 2021
8e4eddc
Merge branch 'master' into bundle-details
jzwang43 Dec 10, 2021
33dfc3f
Merge branch 'master' of github.com:codalab/codalab-worksheets into b…
epicfaace Dec 12, 2021
9b1fff9
Fix endpoints to use bulk, add "cl store add" impl
epicfaace Dec 12, 2021
b0ca14d
refresh
epicfaace Dec 12, 2021
75f60b8
more impl for bundle store commands
epicfaace Dec 12, 2021
2c901f6
update
epicfaace Dec 12, 2021
7184cf2
fix
epicfaace Dec 12, 2021
04aaa4d
Add deletion, support deletion logic
epicfaace Dec 12, 2021
974ef31
Fix delete
epicfaace Dec 12, 2021
f864bb2
Support "cl upload --store"
epicfaace Dec 12, 2021
e7c629c
Merge branch 'master' into bundle-details
jzwang43 Dec 13, 2021
5a5f662
Merge branch 'master' into bundle-store-cli
epicfaace Dec 13, 2021
5957626
Update references
epicfaace Dec 13, 2021
d9f34cb
Merge branch 'bundle-store-cli' of github.com:codalab/codalab-workshe…
epicfaace Dec 13, 2021
8ff4d38
Merge branch 'bundle-store-cli' into bundle-details
jzwang43 Dec 13, 2021
5fee8ff
finishing up
jzwang43 Dec 13, 2021
3fddd17
Add name field
jzwang43 Dec 13, 2021
a61eb21
Merge branch 'master' into bundle-details
jzwang43 Dec 14, 2021
be6cd44
Update bundles.py
jzwang43 Dec 14, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
"""bundle store name unique

Revision ID: 685cb4bc0b79
Revises: 26a5e6b3bfa5
Create Date: 2021-12-12 17:20:58.853560

"""

# revision identifiers, used by Alembic.
revision = '685cb4bc0b79'
down_revision = '26a5e6b3bfa5'

from alembic import op
import sqlalchemy as sa


def upgrade():
op.create_unique_constraint('nix_1', 'bundle_store', ['name'])


def downgrade():
op.drop_constraint('nix_1', 'bundle_store', type_='unique')
83 changes: 82 additions & 1 deletion codalab/lib/bundle_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@
'write',
'mount',
'netcat',
'store',
)

WORKSHEET_COMMANDS = ('new', 'add', 'wadd', 'work', 'print', 'wedit', 'wrm', 'wls')
Expand Down Expand Up @@ -1010,6 +1011,69 @@ def do_help_command(self, args):
return
print(Commands.help_text(args.verbose, args.markdown), file=self.stdout)

@Commands.command(
'store',
help=['Add a bundle store.'],
arguments=(
Commands.Argument(
'command',
help='Set to "add" to add a new bundle store, "ls" to list bundle stores, and "rm" to remove a bundle store.',
nargs='?',
),
Commands.Argument(
'bundle_store_uuid',
help='Bundle store uuid. Specified when running "cl store rm [uuid]".',
nargs='?',
),
Commands.Argument(
'-n', '--name', help='Name of the bundle store; must be globally unique.',
),
Commands.Argument(
'--storage-type',
help='Storage type of the bundle store. Acceptable values are "disk" and "azure_blob".',
),
Commands.Argument(
'--storage-format',
help='Storage format of the bundle store. Acceptable values are "uncompressed" and "compressed_v1". Optional; if unspecified, will be set to an optimal default.',
),
Commands.Argument(
'--url', help='A self-referential URL that points to the bundle store.',
),
Commands.Argument(
'--authentication', help='Key for authentication that the bundle store uses.',
),
),
)
def do_store_command(self, args):
client = self.manager.current_client()
if args.command == 'add':
bundle_store_info = {
"name": args.name,
"storage_type": args.storage_type,
"storage_format": args.storage_format,
"url": args.url,
"authentication": args.authentication,
}
new_bundle_store = client.create('bundle_stores', bundle_store_info)
print(new_bundle_store["id"], file=self.stdout)
elif args.command == 'ls':
bundle_stores = client.fetch('bundle_stores')
print("\t".join(["id", "name", "storage_type", "storage_format"]), file=self.stdout)
print(
"\n".join(
"\t".join([b["id"], b["name"], b["storage_type"], b["storage_format"]])
for b in bundle_stores
),
file=self.stdout,
)
elif args.command == 'rm':
client.delete('bundle_stores', resource_ids=[args.bundle_store_uuid])
print(args.bundle_store_uuid, file=self.stdout)
else:
raise UsageError(
f"cl store {args.command} is not supported. Only the following subcommands are supported: 'cl store add', 'cl store ls', 'cl store rm'."
)

@Commands.command('status', aliases=('st',), help='Show current client status.')
def do_status_command(self, args):
client, worksheet_uuid = self.manager.get_current_worksheet_uuid()
Expand Down Expand Up @@ -1277,6 +1341,10 @@ def do_workers_command(self, args):
action='store_true',
default=False,
),
Commands.Argument(
'--store',
help='The name of the bundle store where the bundle should be uploaded to. If unspecified, the CLI will pick the optimal available bundle store.',
),
)
+ Commands.metadata_arguments([UploadedBundle])
+ EDIT_ARGUMENTS,
Expand Down Expand Up @@ -1333,6 +1401,7 @@ def do_upload_command(self, args):
'state_on_success': State.READY,
'finalize_on_success': True,
'use_azure_blob_beta': args.use_azure_blob_beta,
'store': args.store,
},
)

Expand All @@ -1353,6 +1422,7 @@ def do_upload_command(self, args):
'state_on_success': State.READY,
'finalize_on_success': True,
'use_azure_blob_beta': args.use_azure_blob_beta,
'store': args.store,
},
)

Expand Down Expand Up @@ -1418,6 +1488,7 @@ def do_upload_command(self, args):
'state_on_success': State.READY,
'finalize_on_success': True,
'use_azure_blob_beta': args.use_azure_blob_beta,
'store': args.store,
},
progress_callback=progress.update,
)
Expand Down Expand Up @@ -1732,6 +1803,10 @@ def derive_bundle(self, bundle_type, command, targets, metadata):
help='Beta feature - Start an interactive session to construct your run command.',
action='store_true',
),
Commands.Argument(
'--store',
help='The name of the bundle store where results should be uploaded. If unspecified, the worker will pick the optimal available bundle store.',
),
)
+ Commands.metadata_arguments([RunBundle])
+ EDIT_ARGUMENTS
Expand Down Expand Up @@ -1987,7 +2062,7 @@ def do_detach_command(self, args):
help='Remove a bundle (permanent!).',
arguments=(
Commands.Argument(
'bundle_spec', help=BUNDLE_SPEC_FORMAT, nargs='+', completer=BundlesCompleter
'bundle_spec', help=BUNDLE_SPEC_FORMAT, nargs='*', completer=BundlesCompleter
),
Commands.Argument(
'--force',
Expand Down Expand Up @@ -2018,6 +2093,12 @@ def do_detach_command(self, args):
help='Operate on this worksheet (%s).' % WORKSHEET_SPEC_FORMAT,
completer=WorksheetsCompleter,
),
# TODO: this feature is not implemented yet, implement as part of https://github.com/codalab/codalab-worksheets/issues/3803.
# Commands.Argument(
# '-b',
# '--store',
# help='Keeps the bundle, but removes the bundle contents from the specified bundle store.',
# ),
),
)
def do_rm_command(self, args):
Expand Down
74 changes: 65 additions & 9 deletions codalab/lib/bundle_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from codalab.lib import path_util, spec_util
from codalab.worker.bundle_state import State
from functools import reduce
from codalab.common import StorageType
from codalab.common import StorageType, StorageFormat


def require_partitions(f: Callable[['MultiDiskBundleStore', Any], Any]):
Expand Down Expand Up @@ -44,7 +44,7 @@ def __init__(self, bundle_model, codalab_home):
self._bundle_model = bundle_model
self.codalab_home = path_util.normalize(codalab_home)

def get_bundle_location(self, uuid):
def get_bundle_location(self, uuid, bundle_store_uuid=None):
raise NotImplementedError

def cleanup(self, uuid, dry_run):
Expand Down Expand Up @@ -93,7 +93,7 @@ def get_node_avail(self, node):
return free

@require_partitions
def get_bundle_location(self, uuid):
def get_bundle_location(self, uuid, bundle_store_uuid=None):
"""
get_bundle_location: look for bundle in the cache, or if not in cache, go through every partition.
If not in any partition, return disk with largest free space.
Expand Down Expand Up @@ -397,8 +397,9 @@ class MultiDiskBundleStore(_MultiDiskBundleStoreBase):
A multi-disk bundle store that also supports storing bundles in a CodaLab-managed
Blob Storage container.

If bundles are indicated to be stored in Blob Storage, they are retrieved from Blob
Storage. Otherwise, the bundle is retrieved from the underlying disk bundle store.
If bundles are indicated to be stored in a custom BundleStore, they are retrieved from
that bundle store. Otherwise, their storage type is determined by the legacy "storage_type"
column, which indicates if they are in Blob Storage or from the underlying disk bundle store.

In Blob Storage, each bundle is stored in the format:
azfs://{container name}/bundles/{bundle uuid}/contents.tar.gz if a directory,
Expand All @@ -417,10 +418,65 @@ def __init__(self, bundle_model, codalab_home, azure_blob_account_name):

self._azure_blob_account_name = azure_blob_account_name

def get_bundle_location(self, uuid):
def get_bundle_location(self, uuid, bundle_store_uuid=None):
"""
Get the bundle location.
Arguments:
uuid (str): uuid of the bundle.
bundle_store_uuid (str): uuid of a specific BundleLocation to use when retrieving the bundle's location.
If unspecified, will pick an optimal location.
Returns: a string with the path to the bundle.
"""
bundle_locations = self._bundle_model.get_bundle_locations(uuid)
if bundle_store_uuid:
assert len(bundle_locations) >= 1
storage_type, is_dir = self._bundle_model.get_bundle_storage_info(uuid)
if storage_type == StorageType.AZURE_BLOB_STORAGE.value:
if len(bundle_locations) >= 1:
# Use the BundleLocations stored with the bundle, along with some
# precedence rules, to determine where the bundle is stored.
selected_location = None
selected_location_priority = 999
for location in bundle_locations:
# Highest precedence: bundle_store_uuid specified in this function.
PRIORITY = 1
if (
location["bundle_store_uuid"] == bundle_store_uuid
and PRIORITY < selected_location_priority
):
selected_location = location
selected_location_priority = PRIORITY
# Next precedence: prefer blob storage over disk storage.
PRIORITY = 2
if (
location["storage_type"] == StorageType.AZURE_BLOB_STORAGE.value
and PRIORITY < selected_location_priority
):
selected_location = location
selected_location_priority = PRIORITY
# Last precedence: pick whatever storage is available.
PRIORITY = 3
if PRIORITY < selected_location_priority:
selected_location = location
selected_location_priority = PRIORITY
assert selected_location is not None

# Now get the BundleLocation.
# TODO: refactor this into a class-based system so different storage types can implement this method.
if selected_location["storage_type"] == StorageType.AZURE_BLOB_STORAGE.value:
assert (
selected_location["storage_format"] == StorageFormat.COMPRESSED_V1.value
) # Only supported format on Blob Storage
file_name = "contents.tar.gz" if is_dir else "contents.gz"
url = selected_location["url"] # Format: "azfs://[container name]/bundles"
assert url.startswith("azfs://")
return f"{url}/{uuid}/{file_name}"
else:
assert (
selected_location["storage_format"] == StorageFormat.UNCOMPRESSED.value
) # Only supported format on disk
return _MultiDiskBundleStoreBase.get_bundle_location(self, uuid)
# If no BundleLocations are available, use the legacy "storage_type" column to determine where the bundle is stored.
elif storage_type == StorageType.AZURE_BLOB_STORAGE.value:
file_name = "contents.tar.gz" if is_dir else "contents.gz"
return f"azfs://{self._azure_blob_account_name}/bundles/{uuid}/{file_name}"
else:
return _MultiDiskBundleStoreBase.get_bundle_location(self, uuid)
return _MultiDiskBundleStoreBase.get_bundle_location(self, uuid)
53 changes: 42 additions & 11 deletions codalab/lib/upload_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from apache_beam.io.filesystem import CompressionTypes
from apache_beam.io.filesystems import FileSystems
from typing import Union, Tuple, IO, cast
from typing import Any, Union, Tuple, IO, cast
from codalab.lib.beam.ratarmount import SQLiteIndexedTar

from codalab.common import UsageError, StorageType, urlopen_with_retry, parse_linked_bundle_url
Expand All @@ -20,9 +20,12 @@ class Uploader:
"""Uploader base class. Subclasses should extend this class and implement the
non-implemented methods that perform the uploads to a bundle store."""

def __init__(self, bundle_model, bundle_store):
def __init__(self, bundle_model, bundle_store, destination_bundle_store=None):
self._bundle_model = bundle_model
self._bundle_store = bundle_store
self.destination_bundle_store = destination_bundle_store

destination_bundle_store = None

@property
def storage_type(self):
Expand Down Expand Up @@ -96,10 +99,24 @@ def _update_and_get_bundle_location(self, bundle: Bundle, is_directory: bool) ->
Returns:
str: Bundle location.
"""
self._bundle_model.update_bundle(
bundle, {'storage_type': self.storage_type, 'is_dir': is_directory,},
)
return self._bundle_store.get_bundle_location(bundle.uuid)
if self.destination_bundle_store is not None:
# In this case, we are using the new BundleStore / BundleLocation model to track the bundle location.
# Create the appropriate bundle location.
self._bundle_model.add_bundle_location(
bundle.uuid, self.destination_bundle_store["uuid"]
)
self._bundle_model.update_bundle(
bundle, {'is_dir': is_directory},
)
return self._bundle_store.get_bundle_location(
bundle.uuid, bundle_store_uuid=self.destination_bundle_store["uuid"]
)
else:
# Else, continue to set the legacy "storage_type" column.
self._bundle_model.update_bundle(
bundle, {'storage_type': self.storage_type, 'is_dir': is_directory,},
)
return self._bundle_store.get_bundle_location(bundle.uuid)

def _interpret_source(self, source: Source):
"""Interprets the given source.
Expand Down Expand Up @@ -196,7 +213,13 @@ def __init__(self, bundle_model, bundle_store):
self._bundle_store = bundle_store

def upload_to_bundle_store(
self, bundle: Bundle, source: Source, git: bool, unpack: bool, use_azure_blob_beta: bool,
self,
bundle: Bundle,
source: Source,
git: bool,
unpack: bool,
use_azure_blob_beta: bool,
destination_bundle_store=None,
):
"""
Uploads contents for the given bundle to the bundle store.
Expand All @@ -207,15 +230,23 @@ def upload_to_bundle_store(
|git|: for URLs, whether |source| is a git repo to clone.
|unpack|: whether to unpack |source| if it's an archive.
|use_azure_blob_beta|: whether to use Azure Blob Storage.
|destination_bundle_store|: BundleStore to upload to. If specified, uploads to the given BundleStore.

Exceptions:
- If |git|, then the bundle contains the result of running 'git clone |source|'
- If |unpack| is True or a source is an archive (zip, tar.gz, etc.), then unpack the source.
"""
UploaderCls = BlobStorageUploader if use_azure_blob_beta else DiskStorageUploader
return UploaderCls(self._bundle_model, self._bundle_store).upload_to_bundle_store(
bundle, source, git, unpack
)
UploaderCls: Any = DiskStorageUploader
if destination_bundle_store:
# Set the uploader class based on which bundle store is specified.
if destination_bundle_store["storage_type"] == StorageType.AZURE_BLOB_STORAGE.value:
UploaderCls = BlobStorageUploader
elif use_azure_blob_beta:
# Legacy "-a" flag without specifying a bundle store.
UploaderCls = BlobStorageUploader
return UploaderCls(
self._bundle_model, self._bundle_store, destination_bundle_store
).upload_to_bundle_store(bundle, source, git, unpack)

def has_contents(self, bundle):
# TODO: make this non-fs-specific.
Expand Down
Loading