From 36c50c97270f8e3c56be2bb2cd37a24cf2a6d1a0 Mon Sep 17 00:00:00 2001 From: Arash Rouhani Date: Wed, 2 Nov 2016 17:58:35 +0700 Subject: [PATCH] targets: Implement temporary_path decorator As has been detailed in spotify/luigi#1519, there are many ways to do temporary files in luigi. The use case for temporary files are always about achieving atomic writes. This PR finally describes the problem in the luigi documentation and tries to interlink essential parts. Also it implements a decorator that is meant to be able to unify many of the existing implementations, and as a new, clean and recommended way to do temporary files and atomic moves using luigi. --- doc/luigi_patterns.rst | 53 +++++++++++++++++++++++++++++++++ luigi/target.py | 64 +++++++++++++++++++++++++++++++++------- test/target_test.py | 67 ++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 173 insertions(+), 11 deletions(-) diff --git a/doc/luigi_patterns.rst b/doc/luigi_patterns.rst index 9a60f4a4ec..304406fb4e 100644 --- a/doc/luigi_patterns.rst +++ b/doc/luigi_patterns.rst @@ -214,3 +214,56 @@ reliable scheduling for you, but also emit events which you can use to set up delay monitoring. That way you can implement alerts for when jobs are stuck for prolonged periods lacking input data or otherwise requiring attention. + +.. _AtomicWrites: + +Atomic Writes Problem +~~~~~~~~~~~~~~~~~~~~~ + +A very common mistake done by luigi plumbers is to write data partially to the +final destination, that is, not atomically. The problem arises because +completion checks in luigi are exactly as naive as running +:meth:`luigi.target.Target.exists`. And in many cases it just means to check if +a folder exist on disk. During the time we have partially written data, a task +depending on that output would think its input is complete. This can have +devestating effects, as in `the thanksgiving bug +`__. + +The concept can be illustrated by imagining that we deal with data stored on +local disk and by running commands: + +.. code-block:: console + + # This the BAD way + $ mkdir /outputs/final_output + $ big-slow-calculation > /outputs/final_output/foo.data + +As stated earlier, the problem is that only partial data exists for a duration, +yet we consider the data to be :meth:`~luigi.task.Task.complete` because the +output folder already exists. Here is a robust version of this: + +.. code-block:: console + + # This is the good way + $ mkdir /outputs/final_output-tmp-123456 + $ big-slow-calculation > /outputs/final_output-tmp-123456/foo.data + $ mv --no-target-directory --no-clobber /outputs/final_output{-tmp-123456,} + $ [[ -d /outputs/final_output-tmp-123456 ]] && rm -r /outputs/final_output-tmp-123456 + +Indeed, the good way is not as trivial. It involves coming up with a unique +directory name and a pretty complex ``mv`` line, the reason ``mv`` need all +those is because we don't want ``mv`` to move a directory into a potentially +existing directory. A directory could already exist in exceptional cases, for +example when central locking fails and the same task would somehow run twice at +the same time. Lastly, in the exceptional case where the file was never moved, +one might want to remove the temporary directory that never got used. + +Note that this was an example where the storage was on local disk. But for +every storage (hard disk file, hdfs file, database table, etc.) this procedure +will look different. But do every luigi user need to implement that complexity? +Nope, thankfully luigi developers are aware of these and luigi comes with many +built-in solutions. In the case of you're dealing with a file system +(:class:`~luigi.target.FileSystemTarget`), you should consider using +:meth:`~luigi.target.FileSystemTarget.temporary_path`. For other targets, you +should ensure that the way you're writing your final output directory is +atomic. diff --git a/luigi/target.py b/luigi/target.py index d2d40bd36a..9dde33192e 100644 --- a/luigi/target.py +++ b/luigi/target.py @@ -167,13 +167,7 @@ def move(self, path, dest): def rename_dont_move(self, path, dest): """ Potentially rename ``path`` to ``dest``, but don't move it into the - ``dest`` folder (if it is a folder). This kind of operation is useful - when you don't want your output path to ever contain partial or - errinously nested data. - - See `this github issue `__ and - `the thanksgiving bug `__ - where the problem is described. + ``dest`` folder (if it is a folder). This relates to :ref:`AtomicWrites`. This method has a reasonable but not bullet proof default implementation. It will just do ``move()`` if the file doesn't @@ -206,7 +200,7 @@ class FileSystemTarget(Target): A FileSystemTarget has an associated :py:class:`FileSystem` to which certain operations can be delegated. By default, :py:meth:`exists` and :py:meth:`remove` are delegated to the - :py:class:`FileSystem`, which is determined by the :py:meth:`fs` property. + :py:class:`FileSystem`, which is determined by the :py:attr:`fs` property. Methods of FileSystemTarget raise :py:class:`FileSystemException` if there is a problem completing the operation. @@ -225,7 +219,7 @@ def fs(self): """ The :py:class:`FileSystem` associated with this FileSystemTarget. """ - raise + raise NotImplementedError() @abc.abstractmethod def open(self, mode): @@ -245,7 +239,7 @@ def exists(self): """ Returns ``True`` if the path for this FileSystemTarget exists; ``False`` otherwise. - This method is implemented by using :py:meth:`fs`. + This method is implemented by using :py:attr:`fs`. """ path = self.path if '*' in path or '?' in path or '[' in path or '{' in path: @@ -257,14 +251,62 @@ def remove(self): """ Remove the resource at the path specified by this FileSystemTarget. - This method is implemented by using :py:meth:`fs`. + This method is implemented by using :py:attr:`fs`. """ self.fs.remove(self.path) + def temporary_path(self): + """ + A context manager that enables a reasonably short, general and + magic-less way to solve the :ref:`AtomicWrites`. When the context + manager enters it will not do anything, but when it exits it will move + the file if there was no exception thrown. + + The final move operation will be carried out by calling + :py:meth:`FileSystem.rename_dont_move` on :py:attr:`fs`. + + The typical use case looks like this: + + .. code:: python + + class MyTask(luigi.Task): + def output(self): + return MyFileSystemTarget(...) + + def run(self): + with self.output().temporary_path() as self.temp_output_path: + run_some_external_command(output_dir=self.temp_output_path) + """ + class _Manager(object): + target = self + + def __init__(self): + num = random.randrange(0, 1e10) + self._temp_path = '{}-luigi-tmp-{:010}{}'.format( + self.target.path.rstrip('/').rstrip("\\"), + num, + self.target._trailing_slash()) + + def __enter__(self): + return self._temp_path + + def __exit__(self, exc_type, exc_value, traceback): + if exc_type is None: + # There were no exceptions + self.target.fs.rename_dont_move(self._temp_path, self.target.path) + return False # False means we don't suppress the exception + + return _Manager() + def _touchz(self): with self.open('w'): pass + def _trailing_slash(self): + # I suppose one day schema-like paths, like + # file:///path/blah.txt?params=etc can be parsed too + return self.path[-1] if self.path[-1] in r'\/' else '' + class AtomicLocalFile(io.BufferedWriter): """Abstract class to create a Target that creates diff --git a/test/target_test.py b/test/target_test.py index 69c1da5ef6..10a098a491 100644 --- a/test/target_test.py +++ b/test/target_test.py @@ -17,6 +17,8 @@ from __future__ import print_function from helpers import unittest, skipOnTravis +from mock import Mock +import re import luigi.target import luigi.format @@ -266,3 +268,68 @@ def test_rename_dont_move_on_fs(self): self.assertFalse(t.exists()) self.assertRaises(luigi.target.FileAlreadyExists, lambda: fs.rename_dont_move(t.path, t.path+"-yay")) + + +class TemporaryPathTest(unittest.TestCase): + def setUp(self): + super(TemporaryPathTest, self).setUp() + self.fs = Mock() + + class MyFileSystemTarget(luigi.target.FileSystemTarget): + open = None # Must be implemented due to abc stuff + fs = self.fs + + self.target_cls = MyFileSystemTarget + + def test_temporary_path_files(self): + target_outer = self.target_cls('/tmp/notreal.xls') + target_inner = self.target_cls('/tmp/blah.txt') + + class MyException(Exception): + pass + + orig_ex = MyException() + try: + with target_outer.temporary_path() as tmp_path_outer: + assert 'notreal' in tmp_path_outer + with target_inner.temporary_path() as tmp_path_inner: + assert 'blah' in tmp_path_inner + with target_inner.temporary_path() as tmp_path_inner_2: + assert tmp_path_inner != tmp_path_inner_2 + self.fs.rename_dont_move.assert_called_once_with(tmp_path_inner_2, target_inner.path) + self.fs.rename_dont_move.assert_called_with(tmp_path_inner, target_inner.path) + self.fs.rename_dont_move.call_count == 2 + raise orig_ex + except MyException as ex: + self.fs.rename_dont_move.call_count == 2 + assert ex is orig_ex + else: + assert False + + def test_temporary_path_directory(self): + target_slash = self.target_cls('/tmp/dir/') + target_noslash = self.target_cls('/tmp/dir') + + with target_slash.temporary_path() as tmp_path: + assert re.match(r'/tmp/dir-luigi-tmp-\d{10}/', tmp_path) + self.fs.rename_dont_move.assert_called_once_with(tmp_path, target_slash.path) + + with target_noslash.temporary_path() as tmp_path: + assert re.match(r'/tmp/dir-luigi-tmp-\d{10}', tmp_path) + self.fs.rename_dont_move.assert_called_with(tmp_path, target_noslash.path) + + def test_windowsish_dir(self): + target = self.target_cls(r'''C:\my\folder''' + "\\") + pattern = r'''C:\\my\\folder-luigi-tmp-\d{10}''' + r"\\" + + with target.temporary_path() as tmp_path: + assert re.match(pattern, tmp_path) + self.fs.rename_dont_move.assert_called_once_with(tmp_path, target.path) + + + def test_hadoopish_dir(self): + target = self.target_cls(r'''hdfs:///user/arash/myfile.uids''') + + with target.temporary_path() as tmp_path: + assert re.match(r'''hdfs:///user/arash/myfile.uids-luigi-tmp-\d{10}''', tmp_path) + self.fs.rename_dont_move.assert_called_once_with(tmp_path, target.path)