Skip to content

Commit

Permalink
xroot: implemented locking with EOS-supported special attributes
Browse files Browse the repository at this point in the history
  • Loading branch information
glpatcern committed Oct 18, 2022
1 parent 21c3d11 commit 7ccab6e
Showing 1 changed file with 36 additions and 21 deletions.
57 changes: 36 additions & 21 deletions src/core/xrootiface.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
EOSVERSIONPREFIX = '.sys.v#.'
EXCL_XATTR_MSG = 'exclusive set for existing attribute'
OK_MSG = '[SUCCESS]' # this is what xroot returns on success
EOSLOCKKEY = 'sys.app.lock'

# module-wide state
config = None
Expand Down Expand Up @@ -73,7 +74,7 @@ def _geturlfor(endpoint):
return endpoint if endpoint.find('root://') == 0 else ('root://' + endpoint.replace('newproject', 'eosproject') + '.cern.ch')


def _eosargs(userid, atomicwrite=0, bookingsize=0):
def _eosargs(userid, app='wopi', bookingsize=0):
'''Assume userid is in the form uid:gid and split it into uid, gid
plus generate extra EOS-specific arguments for the xroot URL'''
try:
Expand All @@ -83,12 +84,18 @@ def _eosargs(userid, atomicwrite=0, bookingsize=0):
raise ValueError
ruid = int(userid[0])
rgid = int(userid[1])
return '?eos.ruid=%d&eos.rgid=%d' % (ruid, rgid) + '&eos.app=' + ('fuse::wopi' if not atomicwrite else 'wopi') + \
return '?eos.ruid=%d&eos.rgid=%d' % (ruid, rgid) + '&eos.app=' + app + \
(('&eos.bookingsize=' + str(bookingsize)) if bookingsize else '')
except (ValueError, IndexError):
raise ValueError('Only Unix-based userid is supported with xrootd storage: %s' % userid)


def _geneoslock(appname):
'''One-liner to generate an EOS app lock'''
return 'expires:%d,type:exclusive,owner:*:wopi_%s' % \
(int(time.time()) + config.getint("general", "wopilockexpiration"), appname.replace(' ', '_').lower() )


def _xrootcmd(endpoint, cmd, subcmd, userid, args):
'''Perform the <cmd>/<subcmd> action on the special /proc/user path on behalf of the given userid.
Note that this is entirely EOS-specific.'''
Expand Down Expand Up @@ -249,43 +256,57 @@ def statx(endpoint, fileref, userid, versioninv=1):
}


def setxattr(endpoint, filepath, _userid, key, value, _lockid):
def setxattr(endpoint, filepath, userid, key, value, lockid):
'''Set the extended attribute <key> to <value> via a special open.
The userid is overridden to make sure it also works on shared files.'''
_xrootcmd(endpoint, 'attr', 'set', '0:0', 'mgm.attr.key=user.' + key + '&mgm.attr.value=' + str(value)
if key not in (EOSLOCKKEY, common.LOCKKEY):
common.validatelock(filepath, None, getlock(endpoint, filepath, userid), lockid, 'setxattr', log)
# else skip the check as we're setting the lock itself
if 'user' not in key and 'sys' not in key:
# if nothing is given, assume it's a user attr
key = 'user.' + key
_xrootcmd(endpoint, 'attr', 'set', '0:0', 'mgm.attr.key=' + key + '&mgm.attr.value=' + str(value)
+ '&mgm.path=' + _getfilepath(filepath, encodeamp=True))


def getxattr(endpoint, filepath, _userid, key):
'''Get the extended attribute <key> via a special open.
The userid is overridden to make sure it also works on shared files.'''
if 'user' not in key and 'sys' not in key:
# if nothing is given, assume it's a user attr
key = 'user.' + key
try:
res = _xrootcmd(endpoint, 'attr', 'get', '0:0',
'mgm.attr.key=user.' + key + '&mgm.path=' + _getfilepath(filepath, encodeamp=True))
'mgm.attr.key=' + key + '&mgm.path=' + _getfilepath(filepath, encodeamp=True))
# if no error, the response comes in the format <key>="<value>"
return res.split('"')[1]
except (IndexError, IOError):
return None


def rmxattr(endpoint, filepath, _userid, key, _lockid):
def rmxattr(endpoint, filepath, userid, key, lockid):
'''Remove the extended attribute <key> via a special open.
The userid is overridden to make sure it also works on shared files.'''
_xrootcmd(endpoint, 'attr', 'rm', '0:0', 'mgm.attr.key=user.' + key + '&mgm.path=' + _getfilepath(filepath, encodeamp=True))
if key not in (EOSLOCKKEY, common.LOCKKEY):
common.validatelock(filepath, None, getlock(endpoint, filepath, userid), lockid, 'rmxattr', log)
if 'user' not in key and 'sys' not in key:
# if nothing is given, assume it's a user attr
key = 'user.' + key
_xrootcmd(endpoint, 'attr', 'rm', '0:0', 'mgm.attr.key=' + key + '&mgm.path=' + _getfilepath(filepath, encodeamp=True))


def setlock(endpoint, filepath, userid, appname, value, recurse=False):
'''Set a lock as an xattr with the given value metadata and appname as holder.
The special option "c" (create-if-not-exists) is used to be atomic'''
try:
log.debug('msg="Invoked setlock" filepath="%s" value="%s"' % (filepath, value))
setxattr(endpoint, filepath, userid, common.LOCKKEY,
common.genrevalock(appname, value) + '&mgm.option=c', None)
setxattr(endpoint, filepath, userid, EOSLOCKKEY, _geneoslock(appname) + '&mgm.option=c', None)
setxattr(endpoint, filepath, userid, common.LOCKKEY, common.genrevalock(appname, value), None)
except IOError as e:
# TODO need to confirm this error message once EOS-5145 is implemented
if EXCL_XATTR_MSG in str(e) or 'flock already held' in str(e):
if EXCL_XATTR_MSG in str(e):
# check for pre-existing stale locks (this is now not atomic)
if not getlock(endpoint, filepath, userid) and not recurse:
rmxattr(endpoint, filepath, userid, EOSLOCKKEY, None)
setlock(endpoint, filepath, userid, appname, value, recurse=True)
else:
# the lock is valid, raise conflict error
Expand Down Expand Up @@ -314,6 +335,7 @@ def refreshlock(endpoint, filepath, userid, appname, value, oldvalue=None):
common.validatelock(filepath, appname, getlock(endpoint, filepath, userid), oldvalue, 'refreshlock', log)
log.debug('msg="Invoked refreshlock" filepath="%s" value="%s"' % (filepath, value))
# this is non-atomic, but the lock was already held
setxattr(endpoint, filepath, userid, EOSLOCKKEY, _geneoslock(appname), None)
setxattr(endpoint, filepath, userid, common.LOCKKEY, common.genrevalock(appname, value), None)


Expand All @@ -322,6 +344,7 @@ def unlock(endpoint, filepath, userid, appname, value):
common.validatelock(filepath, appname, getlock(endpoint, filepath, userid), None, 'unlock', log)
log.debug('msg="Invoked unlock" filepath="%s" value="%s"' % (filepath, value))
rmxattr(endpoint, filepath, userid, common.LOCKKEY, None)
rmxattr(endpoint, filepath, userid, EOSLOCKKEY, None)


def readfile(endpoint, filepath, userid, _lockid):
Expand Down Expand Up @@ -396,21 +419,13 @@ def writefile(endpoint, filepath, userid, content, lockmd, islock=False):
if not rc.ok:
log.warning('msg="Error closing the file" filepath="%s" error="%s"' % (filepath, rc.message.strip('\n')))
raise IOError(rc.message.strip('\n'))
if existingLock:
try:
setlock(endpoint, filepath, userid, existingLock['app_name'], existingLock['lock_id'], False)
except IOError as e:
if str(e) == common.EXCL_ERROR:
# new EOS versions do preserve the attributes, so this would fail but it's OK
pass
else:
raise
log.info('msg="File written successfully" filepath="%s" elapsedTimems="%.1f" islock="%s"' %
(filepath, (tend-tstart)*1000, islock))


def renamefile(endpoint, origfilepath, newfilepath, userid, _lockid):
def renamefile(endpoint, origfilepath, newfilepath, userid, lockid):
'''Rename a file via a special open from origfilepath to newfilepath on behalf of the given userid.'''
common.validatelock(origfilepath, None, getlock(endpoint, origfilepath, userid), lockid, 'rename', log)
_xrootcmd(endpoint, 'file', 'rename', userid, 'mgm.path=' + _getfilepath(origfilepath, encodeamp=True)
+ '&mgm.file.source=' + _getfilepath(origfilepath, encodeamp=True)
+ '&mgm.file.target=' + _getfilepath(newfilepath, encodeamp=True))
Expand Down

0 comments on commit 7ccab6e

Please sign in to comment.