From 160457c656e0c711fe3ffe1a77a9e1293b693efa Mon Sep 17 00:00:00 2001 From: Connor McArthur Date: Tue, 13 Mar 2018 17:48:11 -0400 Subject: [PATCH 1/6] fix snowflake seed types, add chunking for >100k data sets --- dbt/adapters/snowflake.py | 31 +++++++++++++++++++++++++++++++ dbt/utils.py | 6 ++++++ 2 files changed, 37 insertions(+) diff --git a/dbt/adapters/snowflake.py b/dbt/adapters/snowflake.py index 6afa9d06327..8aa7cc657ab 100644 --- a/dbt/adapters/snowflake.py +++ b/dbt/adapters/snowflake.py @@ -2,6 +2,7 @@ import re +import agate import snowflake.connector import snowflake.connector.errors @@ -13,6 +14,7 @@ from dbt.adapters.postgres import PostgresAdapter from dbt.contracts.connection import validate_connection +from dbt.utils import chunks from dbt.logger import GLOBAL_LOGGER as logger @@ -224,3 +226,32 @@ def cancel_connection(cls, profile, connection): res = cursor.fetchone() logger.debug("Cancel query '{}': {}".format(connection_name, res)) + + @classmethod + def convert_number_type(cls, agate_table, col_idx): + decimals = agate_table.aggregate(agate.MaxPrecision(col_idx)) + return "number(38, 4)" if decimals else "integer" + + @classmethod + def load_csv_rows(cls, profile, schema, table_name, agate_table): + cols_sql = ", ".join(c for c in agate_table.column_names) + + for chunk in chunks(agate_table.rows, 10000): + print('in chunk') + bindings = [] + placeholders = [] + + for row in chunk: + bindings += row + placeholders.append("({})".format( + ", ".join("%s" for _ in agate_table.column_names))) + + sql = ('insert into {}.{} ({}) values {}' + .format(cls.quote(schema), + cls.quote(table_name), + cols_sql, + ",\n".join(placeholders))) + + cls.add_query(profile, sql, + bindings=bindings, + abridge_sql_log=True) diff --git a/dbt/utils.py b/dbt/utils.py index cabcfe427b2..0a00c670ff4 100644 --- a/dbt/utils.py +++ b/dbt/utils.py @@ -89,6 +89,12 @@ def coalesce(*args): return None +def chunks(l, n): + """Yield successive n-sized chunks from l.""" + for i in range(0, len(l), n): + yield l[i:i + n] + + def get_profile_from_project(project): target_name = project.get('target', {}) profile = project.get('outputs', {}).get(target_name, {}) From af3f50617404d807234dedf2bd92299009258d8a Mon Sep 17 00:00:00 2001 From: Connor McArthur Date: Tue, 13 Mar 2018 17:48:46 -0400 Subject: [PATCH 2/6] remove print --- dbt/adapters/snowflake.py | 1 - 1 file changed, 1 deletion(-) diff --git a/dbt/adapters/snowflake.py b/dbt/adapters/snowflake.py index 8aa7cc657ab..060f7b73fe3 100644 --- a/dbt/adapters/snowflake.py +++ b/dbt/adapters/snowflake.py @@ -237,7 +237,6 @@ def load_csv_rows(cls, profile, schema, table_name, agate_table): cols_sql = ", ".join(c for c in agate_table.column_names) for chunk in chunks(agate_table.rows, 10000): - print('in chunk') bindings = [] placeholders = [] From 6441dac853249354f611e99907301b4cf67c7b65 Mon Sep 17 00:00:00 2001 From: Connor McArthur Date: Wed, 14 Mar 2018 10:03:45 -0400 Subject: [PATCH 3/6] floats everywhere --- dbt/adapters/postgres.py | 2 +- dbt/adapters/snowflake.py | 5 ----- 2 files changed, 1 insertion(+), 6 deletions(-) diff --git a/dbt/adapters/postgres.py b/dbt/adapters/postgres.py index 6169afdcabb..1e91f59b9b9 100644 --- a/dbt/adapters/postgres.py +++ b/dbt/adapters/postgres.py @@ -174,7 +174,7 @@ def convert_text_type(cls, agate_table, col_idx): @classmethod def convert_number_type(cls, agate_table, col_idx): decimals = agate_table.aggregate(agate.MaxPrecision(col_idx)) - return "numeric" if decimals else "integer" + return "float8" if decimals else "integer" @classmethod def convert_boolean_type(cls, agate_table, col_idx): diff --git a/dbt/adapters/snowflake.py b/dbt/adapters/snowflake.py index 060f7b73fe3..0e3d1354fb9 100644 --- a/dbt/adapters/snowflake.py +++ b/dbt/adapters/snowflake.py @@ -227,11 +227,6 @@ def cancel_connection(cls, profile, connection): logger.debug("Cancel query '{}': {}".format(connection_name, res)) - @classmethod - def convert_number_type(cls, agate_table, col_idx): - decimals = agate_table.aggregate(agate.MaxPrecision(col_idx)) - return "number(38, 4)" if decimals else "integer" - @classmethod def load_csv_rows(cls, profile, schema, table_name, agate_table): cols_sql = ", ".join(c for c in agate_table.column_names) From 815d1f0b9842fa0545934e4649c8c19ea8c2a236 Mon Sep 17 00:00:00 2001 From: Connor McArthur Date: Wed, 14 Mar 2018 10:05:03 -0400 Subject: [PATCH 4/6] hoist chunking up to postgres adapter --- dbt/adapters/postgres.py | 28 ++++++++++++++++------------ dbt/adapters/snowflake.py | 23 ----------------------- 2 files changed, 16 insertions(+), 35 deletions(-) diff --git a/dbt/adapters/postgres.py b/dbt/adapters/postgres.py index 1e91f59b9b9..6caf5c0247c 100644 --- a/dbt/adapters/postgres.py +++ b/dbt/adapters/postgres.py @@ -213,19 +213,23 @@ def reset_csv_table(cls, profile, schema, table_name, agate_table, @classmethod def load_csv_rows(cls, profile, schema, table_name, agate_table): - bindings = [] - placeholders = [] cols_sql = ", ".join(c for c in agate_table.column_names) - for row in agate_table.rows: - bindings += row - placeholders.append("({})".format( - ", ".join("%s" for _ in agate_table.column_names))) + for chunk in chunks(agate_table.rows, 10000): + bindings = [] + placeholders = [] - sql = ('insert into {}.{} ({}) values {}' - .format(cls.quote(schema), - cls.quote(table_name), - cols_sql, - ",\n".join(placeholders))) + for row in chunk: + bindings += row + placeholders.append("({})".format( + ", ".join("%s" for _ in agate_table.column_names))) - cls.add_query(profile, sql, bindings=bindings, abridge_sql_log=True) + sql = ('insert into {}.{} ({}) values {}' + .format(cls.quote(schema), + cls.quote(table_name), + cols_sql, + ",\n".join(placeholders))) + + cls.add_query(profile, sql, + bindings=bindings, + abridge_sql_log=True) diff --git a/dbt/adapters/snowflake.py b/dbt/adapters/snowflake.py index 0e3d1354fb9..b111bef8107 100644 --- a/dbt/adapters/snowflake.py +++ b/dbt/adapters/snowflake.py @@ -226,26 +226,3 @@ def cancel_connection(cls, profile, connection): res = cursor.fetchone() logger.debug("Cancel query '{}': {}".format(connection_name, res)) - - @classmethod - def load_csv_rows(cls, profile, schema, table_name, agate_table): - cols_sql = ", ".join(c for c in agate_table.column_names) - - for chunk in chunks(agate_table.rows, 10000): - bindings = [] - placeholders = [] - - for row in chunk: - bindings += row - placeholders.append("({})".format( - ", ".join("%s" for _ in agate_table.column_names))) - - sql = ('insert into {}.{} ({}) values {}' - .format(cls.quote(schema), - cls.quote(table_name), - cols_sql, - ",\n".join(placeholders))) - - cls.add_query(profile, sql, - bindings=bindings, - abridge_sql_log=True) From f6af723c9dbd4ecda3f1ed494946b12e0cce410b Mon Sep 17 00:00:00 2001 From: Connor McArthur Date: Wed, 14 Mar 2018 10:06:02 -0400 Subject: [PATCH 5/6] forgot to move import --- dbt/adapters/postgres.py | 1 + dbt/adapters/snowflake.py | 1 - 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/dbt/adapters/postgres.py b/dbt/adapters/postgres.py index 6caf5c0247c..1ee260cf67a 100644 --- a/dbt/adapters/postgres.py +++ b/dbt/adapters/postgres.py @@ -7,6 +7,7 @@ import dbt.exceptions import agate +from dbt.utils import chunks from dbt.logger import GLOBAL_LOGGER as logger diff --git a/dbt/adapters/snowflake.py b/dbt/adapters/snowflake.py index b111bef8107..c03ae41c29c 100644 --- a/dbt/adapters/snowflake.py +++ b/dbt/adapters/snowflake.py @@ -14,7 +14,6 @@ from dbt.adapters.postgres import PostgresAdapter from dbt.contracts.connection import validate_connection -from dbt.utils import chunks from dbt.logger import GLOBAL_LOGGER as logger From 967dd6f4cfb4b44b7b84d7f36e80a6f54cccf3c0 Mon Sep 17 00:00:00 2001 From: Connor McArthur Date: Tue, 3 Apr 2018 14:28:10 -0400 Subject: [PATCH 6/6] remove extraneous import --- dbt/adapters/snowflake.py | 1 - 1 file changed, 1 deletion(-) diff --git a/dbt/adapters/snowflake.py b/dbt/adapters/snowflake.py index c03ae41c29c..6afa9d06327 100644 --- a/dbt/adapters/snowflake.py +++ b/dbt/adapters/snowflake.py @@ -2,7 +2,6 @@ import re -import agate import snowflake.connector import snowflake.connector.errors