Skip to content

Commit

Permalink
LocalTarget: Allow moves across devices (#1877)
Browse files Browse the repository at this point in the history
With `os.rename` you get a `OSError: [Errno 18] Invalid cross-device link`
if you attempt to move a file across filesystems.

If the destination is on a different filesystem,
we approximate atomicity by first copying the source
to a temporary file on the target filesystem, followed
by a rename. Finally the source file is removed.
  • Loading branch information
miku authored and Tarrasch committed Oct 25, 2016
1 parent 35b340d commit e9ce79e
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 2 deletions.
17 changes: 16 additions & 1 deletion luigi/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import tempfile
import io
import warnings
import errno

from luigi.format import FileWrapper, get_default_format
from luigi.target import FileAlreadyExists, MissingParentDirectory, NotADirectory, FileSystem, FileSystemTarget, AtomicLocalFile
Expand Down Expand Up @@ -92,12 +93,26 @@ def remove(self, path, recursive=True):
os.remove(path)

def move(self, old_path, new_path, raise_if_exists=False):
"""
Move file atomically. If source and destination are located
on different filesystems, atomicity is approximated
but cannot be guaranteed.
"""
if raise_if_exists and os.path.exists(new_path):
raise RuntimeError('Destination exists: %s' % new_path)
d = os.path.dirname(new_path)
if d and not os.path.exists(d):
self.mkdir(d)
os.rename(old_path, new_path)
try:
os.rename(old_path, new_path)
except OSError as err:
if err.errno == errno.EXDEV:
new_path_tmp = '%s-%09d' % (new_path, random.randint(0, 999999999))
shutil.copy(old_path, new_path_tmp)
os.rename(new_path_tmp, new_path)
os.remove(old_path)
else:
raise err


class LocalTarget(FileSystemTarget):
Expand Down
28 changes: 27 additions & 1 deletion test/file_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@

import itertools
import io
from errno import EEXIST
from errno import EEXIST, EXDEV


class LocalTargetTest(unittest.TestCase, FileSystemTargetTestMixin):
Expand Down Expand Up @@ -130,6 +130,32 @@ def test_move(self):
self.assertFalse(os.path.exists(self.path))
self.assertTrue(os.path.exists(self.copy))

def test_move_across_filesystems(self):
t = LocalTarget(self.path)
with t.open('w') as f:
f.write('test_data')

def rename_across_filesystems(src, dst):
err = OSError()
err.errno = EXDEV
raise err

real_rename = os.rename

def mockrename(src, dst):
if '-across-fs' in src:
real_rename(src, dst)
else:
rename_across_filesystems(src, dst)

copy = '%s-across-fs' % self.copy
with mock.patch('os.rename', mockrename):
t.move(copy)

self.assertFalse(os.path.exists(self.path))
self.assertTrue(os.path.exists(copy))
self.assertEqual('test_data', LocalTarget(copy).open('r').read())

def test_format_chain(self):
UTF8WIN = luigi.format.TextFormat(encoding='utf8', newline='\r\n')
t = LocalTarget(self.path, UTF8WIN >> luigi.format.Gzip)
Expand Down

0 comments on commit e9ce79e

Please sign in to comment.