Skip to content

Commit

Permalink
Merge branch 'master' into rc1.5.13
Browse files Browse the repository at this point in the history
  • Loading branch information
yifanmai committed Dec 19, 2022
2 parents f7b62f5 + f2d4189 commit 698baf2
Show file tree
Hide file tree
Showing 27 changed files with 578 additions and 133 deletions.
20 changes: 16 additions & 4 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -113,11 +113,16 @@ jobs:
- search link read kill write mimic workers edit_user sharing_workers
- resources
- memoize
- copy netcat netcurl
- copy
- netcat netcurl
- edit
- open wopen
- store_add
runtime: [docker, kubernetes]
exclude:
# netcat / netcurl not supported for kubernetes.
- test: netcat netcurl
runtime: kubernetes
steps:
- name: Clear free space
run: |
Expand Down Expand Up @@ -150,14 +155,16 @@ jobs:
TEST: ${{ matrix.test }}
VERSION: ${{ github.head_ref || 'master' }}
CODALAB_LINK_MOUNTS: /tmp
- uses: medyagh/setup-minikube@latest
- uses: actions/setup-go@v3
if: matrix.runtime == 'kubernetes'
with:
go-version: '1.18.1'
- name: Run tests using Kubernetes runtime
if: matrix.runtime == 'kubernetes'
run: |
sh ./tests/test-setup.sh
sh ./scripts/local-k8s/setup-ci.sh
#python3 test_runner.py --version ${VERSION} ${TEST}
python3 test_runner.py --version ${VERSION} ${TEST}
env:
TEST: ${{ matrix.test }}
VERSION: ${{ github.head_ref || 'master' }}
Expand All @@ -167,11 +174,16 @@ jobs:
run: |
mkdir /tmp/logs
for c in $(docker ps -a --format="{{.Names}}"); do docker logs $c > /tmp/logs/$c.log 2> /tmp/logs/$c.err.log; done
- name: Save kubernetes logs
if: always() && matrix.runtime == 'kubernetes'
run: |
kubectl config use-context kind-codalab
kubectl cluster-info dump --output-directory /tmp/logs
- name: Upload logs
if: always()
uses: actions/upload-artifact@v1
with:
name: logs-test-${{ matrix.test }}
name: logs-test-${{ matrix.runtime }}-${{ matrix.test }}
path: /tmp/logs

test_backend_on_worker_restart:
Expand Down
11 changes: 6 additions & 5 deletions codalab/bin/ws_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

logger = logging.getLogger(__name__)
logger.setLevel(logging.WARNING)
logging.basicConfig(format='%(asctime)s %(message)s %(pathname)s %(lineno)d')

worker_to_ws: Dict[str, Any] = {}

Expand All @@ -20,7 +21,7 @@ async def rest_server_handler(websocket):
"""
# Got a message from the rest server.
worker_id = await websocket.recv()
logger.debug(f"Got a message from the rest server, to ping worker: {worker_id}.")
logger.warning(f"Got a message from the rest server, to ping worker: {worker_id}.")

try:
worker_ws = worker_to_ws[worker_id]
Expand All @@ -36,7 +37,7 @@ async def worker_handler(websocket, worker_id):
"""
# runs on worker connect
worker_to_ws[worker_id] = websocket
logger.debug(f"Connected to worker {worker_id}!")
logger.warning(f"Connected to worker {worker_id}!")

while True:
try:
Expand All @@ -49,15 +50,15 @@ async def worker_handler(websocket, worker_id):


ROUTES = (
(r'^/main$', rest_server_handler),
(r'^/worker/(.+)$', worker_handler),
(r'^.*/main$', rest_server_handler),
(r'^.*/worker/(.+)$', worker_handler),
)


async def ws_handler(websocket, *args):
"""Handler for websocket connections. Routes websockets to the appropriate
route handler defined in ROUTES."""
logger.warn(f"websocket handler, path: {websocket.path}.")
logger.warning(f"websocket handler, path: {websocket.path}.")
for (pattern, handler) in ROUTES:
match = re.match(pattern, websocket.path)
if match:
Expand Down
5 changes: 5 additions & 0 deletions codalab/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@
logger.setLevel(logging.WARNING)
logger = logging.getLogger('apache_beam')
logger.setLevel(logging.WARNING)
logger = logging.getLogger('kubernetes')
logger.setLevel(logging.WARNING)
logger = logging.getLogger('urllib3')
logger.setLevel(logging.ERROR)


class IntegrityError(ValueError):
Expand Down Expand Up @@ -408,4 +412,5 @@ class BundleRuntime(Enum):
"""

DOCKER = "docker"
KUBERNETES = "kubernetes"
SINGULARITY = "singularity"
12 changes: 6 additions & 6 deletions codalab/lib/bundle_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def __init__(self, bundle_model, codalab_home):
def get_bundle_location(self, uuid, bundle_store_uuid=None):
raise NotImplementedError

def cleanup(self, uuid, dry_run):
def cleanup(self, bundle_location, dry_run):
raise NotImplementedError


Expand Down Expand Up @@ -226,14 +226,14 @@ def ls_partitions(self):
)
)

def cleanup(self, uuid, dry_run):
def cleanup(self, bundle_location, dry_run):
'''
Remove the bundle with given UUID from on-disk storage.
'''
absolute_path = self.get_bundle_location(uuid)
print("cleanup: data %s" % absolute_path, file=sys.stderr)
if not dry_run:
path_util.remove(absolute_path)
print("cleanup: data %s" % bundle_location, file=sys.stderr)
if dry_run:
return False
return path_util.remove(bundle_location)

def health_check(self, model, force=False, compute_data_hash=False, repair_hashes=False):
"""
Expand Down
37 changes: 22 additions & 15 deletions codalab/lib/path_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -308,22 +308,29 @@ def remove(path):
if parse_linked_bundle_url(path).uses_beam:
from apache_beam.io.filesystems import FileSystems

if not FileSystems.exists(path):
FileSystems.delete([path])
return
check_isvalid(path, 'remove')
set_write_permissions(path) # Allow permissions
if os.path.islink(path):
os.unlink(path)
elif os.path.isdir(path):
try:
shutil.rmtree(path)
except shutil.Error:
pass
if FileSystems.exists(path):
# Get the full folder location (without contents.gz) deleted
file_location = '/'.join(path.split('/')[0:-1]) + "/"
FileSystems.delete([file_location])
return True
else:
os.remove(path)
if os.path.exists(path):
print('Failed to remove %s' % path)
check_isvalid(path, 'remove')
set_write_permissions(path) # Allow permissions
if os.path.islink(path):
os.unlink(path)
return False
elif os.path.isdir(path):
try:
shutil.rmtree(path)
return True
except shutil.Error:
pass
else:
os.remove(path)
return True
if os.path.exists(path):
print('Failed to remove %s' % path)
return False


def soft_link(source, path):
Expand Down
1 change: 1 addition & 0 deletions codalab/lib/upload_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,7 @@ def upload_to_bundle_store(
should_unpack=unpack_before_upload,
progress_callback=progress.update,
)
self._client.update_bundle_state(bundle['id'], params={'success': True})
except Exception as err:
self._client.update_bundle_state(
bundle['id'],
Expand Down
8 changes: 8 additions & 0 deletions codalab/model/bundle_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -2719,6 +2719,14 @@ def increment_user_time_used(self, user_id, amount):
user_info['time_used'] += amount
self.update_user_info(user_info)

def increment_user_disk_used(self, user_id: str, amount: int):
"""
Increment disk_used for user by amount
"""
user_info = self.get_user_info(user_id)
user_info['disk_used'] += amount
self.update_user_info(user_info)

def get_user_time_quota_left(self, user_id, user_info=None):
if not user_info:
user_info = self.get_user_info(user_id)
Expand Down
42 changes: 24 additions & 18 deletions codalab/rest/bundles.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@
from codalab.server.authenticated_plugin import AuthenticatedProtectedPlugin, ProtectedPlugin
from codalab.worker.bundle_state import State
from codalab.worker.download_util import BundleTarget
from apache_beam.io.filesystems import FileSystems

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -594,10 +593,10 @@ def _update_bundle_state(bundle_uuid: str):
logging.info(f"_update_bundle_location, bundle_location is {bundle_location}")

if success:
local.model.update_disk_metadata(bundle, bundle_location, enforce_disk_quota=True)
local.model.update_bundle(
bundle, {'state': state_on_success},
)
local.model.update_disk_metadata(bundle, bundle_location, enforce_disk_quota=True)
else: # If the upload failed, cleanup the uploaded file and update bundle state
local.model.update_bundle(
bundle, {'state': state_on_failure, 'metadata': {'failure_message': error_msg},},
Expand Down Expand Up @@ -1269,6 +1268,7 @@ def delete_bundles(uuids, force, recursive, data_only, dry_run):
% (' '.join(uuids), '\n '.join(bundle.simple_str() for bundle in relevant))
)
relevant_uuids = uuids

check_bundles_have_all_permission(local.model, request.user, relevant_uuids)

# Make sure we don't delete bundles which are active.
Expand Down Expand Up @@ -1308,28 +1308,21 @@ def delete_bundles(uuids, force, recursive, data_only, dry_run):
% (uuid, '\n '.join(worksheet.simple_str() for worksheet in worksheets))
)

# cache these so we have them even after the metadata for the bundle has been deleted
bundle_data_sizes = local.model.get_bundle_metadata(relevant_uuids, 'data_size')
bundle_locations = {
uuid: local.bundle_store.get_bundle_location(uuid) for uuid in relevant_uuids
}

# Delete the actual bundle
if not dry_run:
if data_only:
# Just remove references to the data hashes
local.model.remove_data_hash_references(relevant_uuids)
else:
# If the bundle is stored on cloud, first delete data on cloud.
for uuid in relevant_uuids:
bundle_location = local.bundle_store.get_bundle_location(uuid)

file_location = '/'.join(bundle_location.split('/')[0:-1]) + "/"
if bundle_location.startswith(
StorageURLScheme.AZURE_BLOB_STORAGE.value
) or bundle_location.startswith(StorageURLScheme.GCS_STORAGE.value):
FileSystems.delete([file_location])

# Actually delete the bundle
local.model.delete_bundles(relevant_uuids)

# Update user statistics
local.model.update_user_disk_used(request.user.user_id)

# Delete the data.
bundle_link_urls = local.model.get_bundle_metadata(relevant_uuids, "link_url")
for uuid in relevant_uuids:
Expand All @@ -1338,9 +1331,22 @@ def delete_bundles(uuids, force, recursive, data_only, dry_run):
# Don't physically delete linked bundles.
pass
else:
bundle_location = local.bundle_store.get_bundle_location(uuid)
if os.path.lexists(bundle_location):
local.bundle_store.cleanup(uuid, dry_run)
bundle_location = bundle_locations[uuid]

# Remove bundle
removed = False
if (
os.path.lexists(bundle_location)
or bundle_location.startswith(StorageURLScheme.AZURE_BLOB_STORAGE.value)
or bundle_location.startswith(StorageURLScheme.GCS_STORAGE.value)
):
removed = local.bundle_store.cleanup(bundle_location, dry_run)

# Update user disk used.
if removed and uuid in bundle_data_sizes:
local.model.increment_user_disk_used(
request.user.user_id, -int(bundle_data_sizes[uuid])
)

return relevant_uuids

Expand Down
Loading

0 comments on commit 698baf2

Please sign in to comment.