From 2274d905da0a33d2af4d109d5c8af01781bc0523 Mon Sep 17 00:00:00 2001 From: Kevin Wurster Date: Wed, 20 Apr 2016 17:30:19 -0400 Subject: [PATCH 1/2] Allow GCSTarget()'s as Hadoop streaming input and output. --- luigi/contrib/hadoop.py | 21 +++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/luigi/contrib/hadoop.py b/luigi/contrib/hadoop.py index 501a820934..b2501252cc 100644 --- a/luigi/contrib/hadoop.py +++ b/luigi/contrib/hadoop.py @@ -49,6 +49,7 @@ from luigi import configuration import luigi import luigi.task +import luigi.contrib.gcs import luigi.contrib.hdfs import luigi.s3 from luigi import mrrunner @@ -521,15 +522,23 @@ def run_job(self, job, tracking_url_callback=None): if self.input_format: arglist += ['-inputformat', self.input_format] + allowed_input_targets = ( + luigi.contrib.hdfs.HdfsTarget, + luigi.s3.S3Target, + luigi.contrib.gcs.GCSTarget) for target in luigi.task.flatten(job.input_hadoop()): - if not isinstance(target, luigi.contrib.hdfs.HdfsTarget) \ - and not isinstance(target, luigi.s3.S3Target): - raise TypeError('target must be an HdfsTarget or S3Target') + if not isinstance(target, allowed_input_targets): + raise TypeError('target must one of: {}'.format( + allowed_input_targets)) arglist += ['-input', target.path] - if not isinstance(job.output(), luigi.contrib.hdfs.HdfsTarget) \ - and not isinstance(job.output(), luigi.s3.S3FlagTarget): - raise TypeError('output must be an HdfsTarget or S3FlagTarget') + allowed_output_targets = ( + luigi.contrib.hdfs.HdfsTarget, + luigi.s3.S3FlagTarget, + luigi.contrib.gcs.GCSFlagTarget) + if not isinstance(job.output(), allowed_output_targets): + raise TypeError('output must be one of: {}'.format( + allowed_output_targets)) arglist += ['-output', output_hadoop] # submit job From d9f20f0d3221445c59180b52416d1b3e6c7a8418 Mon Sep 17 00:00:00 2001 From: Kevin Wurster Date: Thu, 21 Apr 2016 11:55:33 -0400 Subject: [PATCH 2/2] Don't GCSFlagTarget doesn't support atomic move at end --- luigi/contrib/hadoop.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/luigi/contrib/hadoop.py b/luigi/contrib/hadoop.py index b2501252cc..61182f51b4 100644 --- a/luigi/contrib/hadoop.py +++ b/luigi/contrib/hadoop.py @@ -457,9 +457,11 @@ def run_job(self, job, tracking_url_callback=None): output_final = job.output().path # atomic output: replace output with a temporary work directory if self.end_job_with_atomic_move_dir: - if isinstance(job.output(), luigi.s3.S3FlagTarget): + illegal_targets = ( + luigi.s3.S3FlagTarget, luigi.contrib.gcs.GCSFlagTarget) + if isinstance(job.output(), illegal_targets): raise TypeError("end_job_with_atomic_move_dir is not supported" - " for S3FlagTarget") + " for {}".format(illegal_targets)) output_hadoop = '{output}-temp-{time}'.format( output=output_final, time=datetime.datetime.now().isoformat().replace(':', '-'))