Skip to content

Commit

Permalink
Add 'schema' attribute to CopyToTable and use for target table (#2176)
Browse files Browse the repository at this point in the history
When using an RDBMS that supports schemas, this allows target tables to
exist in a non-default schema, specified using the 'schema' attribute of
the task class.

Closes #1984.
  • Loading branch information
sd2k authored and Tarrasch committed Aug 25, 2017
1 parent 5b80f54 commit bb6eccc
Showing 1 changed file with 17 additions and 4 deletions.
21 changes: 17 additions & 4 deletions luigi/contrib/sqla.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,8 @@ class CopyToTable(luigi.Task):
Usage:
* subclass and override the required `connection_string`, `table` and `columns` attributes.
* optionally override the `schema` attribute to use a different schema for
the target table.
"""
_logger = logging.getLogger('luigi-interface')

Expand Down Expand Up @@ -300,6 +302,11 @@ def table(self):
# completely ignore the columns. Instead set the reflect value to True below
columns = []

# Specify the database schema of the target table, if supported by the
# RDBMS. Note that this doesn't change the schema of the marker table.
# The schema MUST already exist in the database, or this will task fail.
schema = ''

# options
column_separator = "\t" # how columns are separated in the file copied into postgres
chunk_size = 5000 # default chunk size for insert
Expand Down Expand Up @@ -328,15 +335,21 @@ def construct_sqla_columns(columns):
else:
# if columns is specified as (name, type) tuples
with engine.begin() as con:
metadata = sqlalchemy.MetaData()

if self.schema:
metadata = sqlalchemy.MetaData(schema=self.schema)
else:
metadata = sqlalchemy.MetaData()

try:
if not con.dialect.has_table(con, self.table):
if not con.dialect.has_table(con, self.table, self.schema or None):
sqla_columns = construct_sqla_columns(self.columns)
self.table_bound = sqlalchemy.Table(self.table, metadata, *sqla_columns)
metadata.create_all(engine)
else:
metadata.reflect(only=[self.table], bind=engine)
self.table_bound = metadata.tables[self.table]
full_table = '.'.join([self.schema, self.table]) if self.schema else self.table
metadata.reflect(only=[full_table], bind=engine)
self.table_bound = metadata.tables[full_table]
except Exception as e:
self._logger.exception(self.table + str(e))

Expand Down

0 comments on commit bb6eccc

Please sign in to comment.