Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix logic bug in external file clean up #956

Merged
merged 19 commits into from
Sep 14, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 15 additions & 11 deletions datajoint/external.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,10 @@ def _remove_external_file(self, external_path):
if self.spec['protocol'] == 's3':
self.s3.remove_object(external_path)
elif self.spec['protocol'] == 'file':
Path(external_path).unlink()
try:
Path(external_path).unlink()
except FileNotFoundError:
pass

def exists(self, external_filepath):
"""
Expand Down Expand Up @@ -337,18 +340,19 @@ def delete(self, *, delete_external_files=None, limit=None, display_progress=Tru
# delete items one by one, close to transaction-safe
error_list = []
for uuid, external_path in items:
guzman-raphael marked this conversation as resolved.
Show resolved Hide resolved
try:
count = len(self & {'hash': uuid}) # optimize
except Exception:
pass # if delete failed, do not remove the external file
else:
assert count in (0, 1)
row = (self & {'hash': uuid}).fetch()
if row.size:
try:
self._remove_external_file(external_path)
except Exception as error:
error_list.append((uuid, external_path, str(error)))
(self & {'hash': uuid}).delete_quick()
except Exception:
pass # if delete failed, do not remove the external file
else:
(self & {'hash': uuid}).delete_quick(get_count=True)
try:
self._remove_external_file(external_path)
except Exception as error:
# adding row back into table after failed delete
self.insert1(row[0])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

set skip_duplicates=True to address potential race conditions.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's okay not to re-insert if the delete fails. The procedure will return the undeleted files to the user for special handling. Without implementing an actual transaction management process, we can allow some orphaned files in the external storage and provide an additional cleanup utility.

Copy link
Collaborator

@guzman-raphael guzman-raphael Sep 13, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dimitri-yatsenko That is a good suggestion for the skipping duplicates. Regarding the cleanup, I feel this is actually necessary. There is basically 2 primary concerns:

  • The user has only one chance to catch the errors. Meaning that running the external delete/clean up multiple times will yield no change though it has only been removed from the external tracking table. This gives a false sense that the system has been 'cleaned-up' but it has not been removed from the actual store (this is the main complaint from the user in issue ExternalTable.delete should not remove row on error #953). We can provide an additional utility to resolve this but that would be expensive to run as it would have to 'crawl' the entire store to find objects that don't exist within the external tracking table. By simply inserting it back when there is an exception, we are allowing the error to be more visible but most importantly reproducible.
  • schema.external[store].delete() is not currently documented that it returns a list of the errors. Users are most likely unaware of this feature and therefore aren't using it properly. @zitrosolrac Could you open an issue on this in our datajoint-docs and reference it in ExternalTable.delete should not remove row on error #953? (Filing for now since our new team members haven't been oriented to the docs setup yet).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
self.insert1(row[0])
self.insert1(row[0], skip_duplicates=True)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@guzman-raphael Got it! I'll open an issue on this in the datajoint-docs and add the necessary reference.

error_list.append((uuid, external_path, str(error)))
return error_list


Expand Down
Binary file removed djtest_extern/4/c/4c16e9e15b64474e4b56ba22bb17faae
Binary file not shown.
Binary file not shown.
120 changes: 33 additions & 87 deletions tests/test_external.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,10 @@
from nose.tools import assert_true, assert_equal
from datajoint.external import ExternalTable
from datajoint.blob import pack, unpack


import datajoint as dj
from .schema_external import stores_config, SimpleRemote, Simple, schema
import os

import json
import os
from os import stat
import pwd
from pwd import getpwuid
current_location_s3 = dj.config['stores']['share']['location']
current_location_local = dj.config['stores']['local']['location']

Expand All @@ -26,6 +20,27 @@ def tearDown(self):
dj.config['stores']['local']['location'] = current_location_local


def test_external_put():
"""
external storage put and get and remove
"""
ext = ExternalTable(schema.connection, store='raw', database=schema.database)
initial_length = len(ext)
input_ = np.random.randn(3, 7, 8)
count = 7
extra = 3
for i in range(count):
hash1 = ext.put(pack(input_))
for i in range(extra):
hash2 = ext.put(pack(np.random.randn(4, 3, 2)))

fetched_hashes = ext.fetch('hash')
assert_true(all(hash in fetched_hashes for hash in (hash1, hash2)))
assert_equal(len(ext), initial_length + 1 + extra)

output_ = unpack(ext.get(hash1))
assert_array_equal(input_, output_)


def test_s3_leading_slash(index=100, store='share'):
"""
Expand Down Expand Up @@ -86,95 +101,26 @@ def test_s3_leading_slash(index=100, store='share'):

dj.config['stores'][store]['location'] = oldConfig


def test_file_leading_slash():
"""
file external storage configured with leading slash
"""
test_s3_leading_slash(index=200, store='local')

def test_remove_fail():
#https://github.com/datajoint/datajoint-python/issues/953

#print(json.dumps(dj.config['stores'], indent=4))
#print(dj.config['stores']['local']['location'])

# oldConfig = dj.config['stores']
# dj.config['stores'] = stores_config

dirName = dj.config['stores']['local']['location']

#print('directory: ' + dirName)



data = dict(simple = 2, item = [1, 2, 3])
def test_remove_fail():
# https://github.com/datajoint/datajoint-python/issues/953
data = dict(simple=2, item=[1, 2, 3])
Simple.insert1(data)

#print('location')
# print('\n IN TEST: BEFORE DELETE: list of dir stores, local, location')
print('stores location -----------\n')
print(dj.config['stores']['local']['location'])
print('local location -----------\n')
print(schema.external['local'])
print('----------------------------')

path1 = dj.config['stores']['local']['location'] + '/djtest_extern/4/c/'

argDir = dj.config['stores']['local']['location'] + '/djtest_extern/4/c/'

print(f'argDir----------\n{argDir}\n')

path2 = os.listdir(argDir)

# print(path1 + path2[0])

old_name = path1 + path2[0]

new_name = "/tmp/newfile"

os.rename(old_name, new_name)

# print(f'\n IN TEST: is the new file name a file? {os.path.isfile(new_name)}')
# print(f'\n IN TEST: is the old file name a file? {os.path.isfile(old_name)}')

# print(os.listdir(dj.config['stores']['local']['location'] + '/djtest_extern/4/c/'))

# st = stat(path1 + path2[0])
# print(bool(st.st_mode & stat.S_IXUSR))

#print(getpwuid(stat(path3).st_uid).pw_name)

# print(f' IN TEST: simple table before delete {Simple()}')
currentMode = int(oct(os.stat(path1).st_mode), 8)
os.chmod(path1, 0o40555)
(Simple & 'simple=2').delete()
# print(f' IN TEST: simple table after delete {Simple()}')
# print(' IN TEST: -------------showing external store before delete with flag---------')
# print(schema.external['local'])
listOfErrors = schema.external['local'].delete(delete_external_files=True)
# print(f' IN TEST: list of errors: {listOfErrors}')
# print(' IN TEST: list of dir stores, local, location')
# print(os.listdir(dj.config['stores']['local']['location'] + '/djtest_extern/4/c'))
# print(' IN TEST: -------------showing external store after delete with flag---------')
# print(schema.external['local'])

# print(f'\n IN TEST: is this the UID or HASH? {listOfErrors[0][0]}')

# LENGTH_OF_QUERY = len(schema.external['local'] & dict(hash = listOfErrors[0][0]))

# print(f'\n IN TEST: WHAT IS THE LENGTH OF THIS? {LENGTH_OF_QUERY}')

assert len(listOfErrors) == 1, 'unexpected number of errors'
assert len(schema.external['local'] & dict(hash = listOfErrors[0][0])) == 1, 'unexpected number of rows in external table'

#---------------------CLEAN UP--------------------
os.rename(new_name, old_name) #switching from the new name back to the old name

# print(f'this is the old_name after the asserts {old_name}')

# print(f'\n IN TEST: is the new file name a file? {os.path.isfile(new_name)}')
# print(f'\n IN TEST: is the old file name a file? {os.path.isfile(old_name)}')

listOfErrors = schema.external['local'].delete(delete_external_files=True)

print(len(listOfErrors))

# dj.config['stores'] = oldConfig
assert len(schema.external['local'] & dict(hash=listOfErrors[0][0])) == 1, 'unexpec' + \
'number of rows in external table'
zitrosolrac marked this conversation as resolved.
Show resolved Hide resolved
# ---------------------CLEAN UP--------------------
os.chmod(path1, currentMode)
listOfErrors = schema.external['local'].delete(delete_external_files=True)