Skip to content

Commit

Permalink
targets: Implement temporary_path decorator
Browse files Browse the repository at this point in the history
As has been detailed in #1519, there are many ways to do
temporary files in luigi. The use case for temporary files are always
about achieving atomic writes.

This PR finally describes the problem in the luigi documentation and
tries to interlink essential parts. Also it implements a decorator
that is meant to be able to unify many of the existing implementations,
and as a new, clean and recommended way to do temporary files and atomic
moves using luigi.
  • Loading branch information
Tarrasch committed Nov 9, 2016
1 parent 7559240 commit c2b38e6
Show file tree
Hide file tree
Showing 3 changed files with 172 additions and 11 deletions.
53 changes: 53 additions & 0 deletions doc/luigi_patterns.rst
Original file line number Diff line number Diff line change
Expand Up @@ -214,3 +214,56 @@ reliable scheduling for you, but also emit events which you can use to
set up delay monitoring. That way you can implement alerts for when
jobs are stuck for prolonged periods lacking input data or otherwise
requiring attention.

.. _AtomicWrites:

Atomic Writes Problem
~~~~~~~~~~~~~~~~~~~~~

A very common mistake done by luigi plumbers is to write data partially to the
final destination, that is, not atomically. The problem arises because
completion checks in luigi are exactly as naive as running
:meth:`luigi.target.Target.exists`. And in many cases it just means to check if
a folder exist on disk. During the time we have partially written data, a task
depending on that output would think its input is complete. This can have
devestating effects, as in `the thanksgiving bug
<http://tarrasch.github.io/luigi-budapest-bi-oct-2015/#/21>`__.

The concept can be illustrated by imagining that we deal with data stored on
local disk and by running commands:

.. code-block:: console
# This the BAD way
$ mkdir /outputs/final_output
$ big-slow-calculation > /outputs/final_output/foo.data
As stated earlier, the problem is that only partial data exists for a duration,
yet we consider the data to be :meth:`~luigi.task.Task.complete` because the
output folder already exists. Here is a robust version of this:

.. code-block:: console
# This is the good way
$ mkdir /outputs/final_output-tmp-123456
$ big-slow-calculation > /outputs/final_output-tmp-123456/foo.data
$ mv --no-target-directory --no-clobber /outputs/final_output{-tmp-123456,}
$ [[ -d /outputs/final_output-tmp-123456 ]] && rm -r /outputs/final_output-tmp-123456
Indeed, the good way is not as trivial. It involves coming up with a unique
directory name and a pretty complex ``mv`` line, the reason ``mv`` need all
those is because we don't want ``mv`` to move a directory into a potentially
existing directory. A directory could already exist in exceptional cases, for
example when central locking fails and the same task would somehow run twice at
the same time. Lastly, in the exceptional case where the file was never moved,
one might want to remove the temporary directory that never got used.

Note that this was an example where the storage was on local disk. But for
every storage (hard disk file, hdfs file, database table, etc.) this procedure
will look different. But do every luigi user need to implement that complexity?
Nope, thankfully luigi developers are aware of these and luigi comes with many
built-in solutions. In the case of you're dealing with a file system
(:class:`~luigi.target.FileSystemTarget`), you should consider using
:meth:`~luigi.target.FileSystemTarget.temporary_path`. For other targets, you
should ensure that the way you're writing your final output directory is
atomic.
64 changes: 53 additions & 11 deletions luigi/target.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,13 +167,7 @@ def move(self, path, dest):
def rename_dont_move(self, path, dest):
"""
Potentially rename ``path`` to ``dest``, but don't move it into the
``dest`` folder (if it is a folder). This kind of operation is useful
when you don't want your output path to ever contain partial or
errinously nested data.
See `this github issue <https://github.com/spotify/luigi/pull/557>`__ and
`the thanksgiving bug <http://tarrasch.github.io/luigi-budapest-bi-oct-2015/#/21>`__
where the problem is described.
``dest`` folder (if it is a folder). This relates to :ref:`AtomicWrites`.
This method has a reasonable but not bullet proof default
implementation. It will just do ``move()`` if the file doesn't
Expand Down Expand Up @@ -206,7 +200,7 @@ class FileSystemTarget(Target):
A FileSystemTarget has an associated :py:class:`FileSystem` to which certain operations can be
delegated. By default, :py:meth:`exists` and :py:meth:`remove` are delegated to the
:py:class:`FileSystem`, which is determined by the :py:meth:`fs` property.
:py:class:`FileSystem`, which is determined by the :py:attr:`fs` property.
Methods of FileSystemTarget raise :py:class:`FileSystemException` if there is a problem
completing the operation.
Expand All @@ -225,7 +219,7 @@ def fs(self):
"""
The :py:class:`FileSystem` associated with this FileSystemTarget.
"""
raise
raise NotImplementedError()

@abc.abstractmethod
def open(self, mode):
Expand All @@ -245,7 +239,7 @@ def exists(self):
"""
Returns ``True`` if the path for this FileSystemTarget exists; ``False`` otherwise.
This method is implemented by using :py:meth:`fs`.
This method is implemented by using :py:attr:`fs`.
"""
path = self.path
if '*' in path or '?' in path or '[' in path or '{' in path:
Expand All @@ -257,14 +251,62 @@ def remove(self):
"""
Remove the resource at the path specified by this FileSystemTarget.
This method is implemented by using :py:meth:`fs`.
This method is implemented by using :py:attr:`fs`.
"""
self.fs.remove(self.path)

def temporary_path(self):
"""
A context manager that enables a reasonably short, general and
magic-less way to solve the :ref:`AtomicWrites`. When the context
manager enters it will not do anything, but when it exits it will move
the file if there was no exception thrown.
The final move operation will be carried out by calling
:py:meth:`FileSystem.rename_dont_move` on :py:attr:`fs`.
The typical use case looks like this:
.. code:: python
class MyTask(luigi.Task):
def output(self):
return MyFileSystemTarget(...)
def run(self):
with self.output().temporary_path() as self.temp_output_path:
run_some_external_command(output_dir=self.temp_output_path)
"""
class _Manager(object):
target = self

def __init__(self):
num = random.randrange(0, 1e10)
self._temp_path = '{}-luigi-tmp-{:010}{}'.format(
self.target.path.rstrip('/').rstrip("\\"),
num,
self.target._trailing_slash())

def __enter__(self):
return self._temp_path

def __exit__(self, exc_type, exc_value, traceback):
if exc_type is None:
# There were no exceptions
self.target.fs.rename_dont_move(self._temp_path, self.target.path)
return False # False means we don't suppress the exception

return _Manager()

def _touchz(self):
with self.open('w'):
pass

def _trailing_slash(self):
# I suppose one day schema-like paths, like
# file:///path/blah.txt?params=etc can be parsed too
return self.path[-1] if self.path[-1] in r'\/' else ''


class AtomicLocalFile(io.BufferedWriter):
"""Abstract class to create a Target that creates
Expand Down
66 changes: 66 additions & 0 deletions test/target_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
from __future__ import print_function

from helpers import unittest, skipOnTravis
from mock import Mock
import re

import luigi.target
import luigi.format
Expand Down Expand Up @@ -266,3 +268,67 @@ def test_rename_dont_move_on_fs(self):
self.assertFalse(t.exists())
self.assertRaises(luigi.target.FileAlreadyExists,
lambda: fs.rename_dont_move(t.path, t.path+"-yay"))


class TemporaryPathTest(unittest.TestCase):
def setUp(self):
super(TemporaryPathTest, self).setUp()
self.fs = Mock()

class MyFileSystemTarget(luigi.target.FileSystemTarget):
open = None # Must be implemented due to abc stuff
fs = self.fs

self.target_cls = MyFileSystemTarget

def test_temporary_path_files(self):
target_outer = self.target_cls('/tmp/notreal.xls')
target_inner = self.target_cls('/tmp/blah.txt')

class MyException(Exception):
pass

orig_ex = MyException()
try:
with target_outer.temporary_path() as tmp_path_outer:
assert 'notreal' in tmp_path_outer
with target_inner.temporary_path() as tmp_path_inner:
assert 'blah' in tmp_path_inner
with target_inner.temporary_path() as tmp_path_inner_2:
assert tmp_path_inner != tmp_path_inner_2
self.fs.rename_dont_move.assert_called_once_with(tmp_path_inner_2, target_inner.path)
self.fs.rename_dont_move.assert_called_with(tmp_path_inner, target_inner.path)
self.fs.rename_dont_move.call_count == 2
raise orig_ex
except MyException as ex:
self.fs.rename_dont_move.call_count == 2
assert ex is orig_ex
else:
assert False

def test_temporary_path_directory(self):
target_slash = self.target_cls('/tmp/dir/')
target_noslash = self.target_cls('/tmp/dir')

with target_slash.temporary_path() as tmp_path:
assert re.match(r'/tmp/dir-luigi-tmp-\d{10}/', tmp_path)
self.fs.rename_dont_move.assert_called_once_with(tmp_path, target_slash.path)

with target_noslash.temporary_path() as tmp_path:
assert re.match(r'/tmp/dir-luigi-tmp-\d{10}', tmp_path)
self.fs.rename_dont_move.assert_called_with(tmp_path, target_noslash.path)

def test_windowsish_dir(self):
target = self.target_cls(r'''C:\my\folder''' + "\\")
pattern = r'''C:\\my\\folder-luigi-tmp-\d{10}''' + r"\\"

with target.temporary_path() as tmp_path:
assert re.match(pattern, tmp_path)
self.fs.rename_dont_move.assert_called_once_with(tmp_path, target.path)

def test_hadoopish_dir(self):
target = self.target_cls(r'''hdfs:///user/arash/myfile.uids''')

with target.temporary_path() as tmp_path:
assert re.match(r'''hdfs:///user/arash/myfile.uids-luigi-tmp-\d{10}''', tmp_path)
self.fs.rename_dont_move.assert_called_once_with(tmp_path, target.path)

0 comments on commit c2b38e6

Please sign in to comment.