From a0d19e0d43d324dec8c34f0e5e623ac1faf21030 Mon Sep 17 00:00:00 2001 From: Ryan Yuan Date: Mon, 25 Mar 2019 22:03:26 +1100 Subject: [PATCH] [AIRFLOW-3987] Unify GCP's Connection IDs (#4818) --- UPDATING.md | 11 ++++++++ airflow/contrib/hooks/bigquery_hook.py | 2 +- airflow/contrib/hooks/datastore_hook.py | 2 +- .../operators/bigquery_check_operator.py | 6 ++--- .../contrib/operators/bigquery_get_data.py | 2 +- .../contrib/operators/bigquery_operator.py | 10 ++++---- .../bigquery_table_delete_operator.py | 2 +- .../contrib/operators/bigquery_to_bigquery.py | 2 +- airflow/contrib/operators/bigquery_to_gcs.py | 2 +- airflow/contrib/operators/gcs_to_bq.py | 2 +- airflow/contrib/operators/gcs_to_s3.py | 2 +- airflow/contrib/sensors/bigquery_sensor.py | 2 +- airflow/utils/db.py | 4 --- docs/howto/connection/gcp.rst | 25 +------------------ .../operators/test_bigquery_operator.py | 2 +- 15 files changed, 30 insertions(+), 46 deletions(-) diff --git a/UPDATING.md b/UPDATING.md index d50c123cf5fe6..ac0ccf4ca89fd 100644 --- a/UPDATING.md +++ b/UPDATING.md @@ -24,6 +24,17 @@ assists users migrating to a new version. ## Airflow Master +#### Unify default conn_id for Google Cloud Platform + +Previously not all hooks and operators related to Google Cloud Platform use +``google_cloud_default`` as a default conn_id. There is currently one default +variant. Values like ``google_cloud_storage_default``, ``bigquery_default``, +``google_cloud_datastore_default`` have been deprecated. The configuration of +existing relevant connections in the database have been preserved. To use those +deprecated GCP conn_id, you need to explicitly pass their conn_id into +operators/hooks. Otherwise, ``google_cloud_default`` will be used as GCP's conn_id +by default. + ### The chain function is removed Bit operation like `>>` or `<<` are recommended for setting the dependency, which is easier to explain. diff --git a/airflow/contrib/hooks/bigquery_hook.py b/airflow/contrib/hooks/bigquery_hook.py index 34ddb45cc3875..c3d0a3e246815 100644 --- a/airflow/contrib/hooks/bigquery_hook.py +++ b/airflow/contrib/hooks/bigquery_hook.py @@ -52,7 +52,7 @@ class BigQueryHook(GoogleCloudBaseHook, DbApiHook): conn_name_attr = 'bigquery_conn_id' def __init__(self, - bigquery_conn_id='bigquery_default', + bigquery_conn_id='google_cloud_default', delegate_to=None, use_legacy_sql=True, location=None): diff --git a/airflow/contrib/hooks/datastore_hook.py b/airflow/contrib/hooks/datastore_hook.py index 308809e1413b1..6e44661ec5428 100644 --- a/airflow/contrib/hooks/datastore_hook.py +++ b/airflow/contrib/hooks/datastore_hook.py @@ -33,7 +33,7 @@ class DatastoreHook(GoogleCloudBaseHook): """ def __init__(self, - datastore_conn_id='google_cloud_datastore_default', + datastore_conn_id='google_cloud_default', delegate_to=None): super(DatastoreHook, self).__init__(datastore_conn_id, delegate_to) self.connection = self.get_conn() diff --git a/airflow/contrib/operators/bigquery_check_operator.py b/airflow/contrib/operators/bigquery_check_operator.py index afb600a3d9120..4e74a1095bafd 100644 --- a/airflow/contrib/operators/bigquery_check_operator.py +++ b/airflow/contrib/operators/bigquery_check_operator.py @@ -63,7 +63,7 @@ class BigQueryCheckOperator(CheckOperator): @apply_defaults def __init__(self, sql, - bigquery_conn_id='bigquery_default', + bigquery_conn_id='google_cloud_default', use_legacy_sql=True, *args, **kwargs): super(BigQueryCheckOperator, self).__init__(sql=sql, *args, **kwargs) @@ -91,7 +91,7 @@ class BigQueryValueCheckOperator(ValueCheckOperator): def __init__(self, sql, pass_value, tolerance=None, - bigquery_conn_id='bigquery_default', + bigquery_conn_id='google_cloud_default', use_legacy_sql=True, *args, **kwargs): super(BigQueryValueCheckOperator, self).__init__( @@ -131,7 +131,7 @@ class BigQueryIntervalCheckOperator(IntervalCheckOperator): @apply_defaults def __init__(self, table, metrics_thresholds, date_filter_column='ds', - days_back=-7, bigquery_conn_id='bigquery_default', + days_back=-7, bigquery_conn_id='google_cloud_default', use_legacy_sql=True, *args, **kwargs): super(BigQueryIntervalCheckOperator, self).__init__( table=table, metrics_thresholds=metrics_thresholds, diff --git a/airflow/contrib/operators/bigquery_get_data.py b/airflow/contrib/operators/bigquery_get_data.py index f5e6e50f066d5..d97fc68387c33 100644 --- a/airflow/contrib/operators/bigquery_get_data.py +++ b/airflow/contrib/operators/bigquery_get_data.py @@ -76,7 +76,7 @@ def __init__(self, table_id, max_results='100', selected_fields=None, - bigquery_conn_id='bigquery_default', + bigquery_conn_id='google_cloud_default', delegate_to=None, *args, **kwargs): diff --git a/airflow/contrib/operators/bigquery_operator.py b/airflow/contrib/operators/bigquery_operator.py index 85e68deff8bde..8787afbdebe82 100644 --- a/airflow/contrib/operators/bigquery_operator.py +++ b/airflow/contrib/operators/bigquery_operator.py @@ -118,7 +118,7 @@ def __init__(self, write_disposition='WRITE_EMPTY', allow_large_results=False, flatten_results=None, - bigquery_conn_id='bigquery_default', + bigquery_conn_id='google_cloud_default', delegate_to=None, udf_config=None, use_legacy_sql=True, @@ -296,7 +296,7 @@ def __init__(self, schema_fields=None, gcs_schema_object=None, time_partitioning=None, - bigquery_conn_id='bigquery_default', + bigquery_conn_id='google_cloud_default', google_cloud_storage_conn_id='google_cloud_default', delegate_to=None, labels=None, @@ -436,7 +436,7 @@ def __init__(self, quote_character=None, allow_quoted_newlines=False, allow_jagged_rows=False, - bigquery_conn_id='bigquery_default', + bigquery_conn_id='google_cloud_default', google_cloud_storage_conn_id='google_cloud_default', delegate_to=None, src_fmt_configs=None, @@ -532,7 +532,7 @@ class BigQueryDeleteDatasetOperator(BaseOperator): def __init__(self, dataset_id, project_id=None, - bigquery_conn_id='bigquery_default', + bigquery_conn_id='google_cloud_default', delegate_to=None, *args, **kwargs): self.dataset_id = dataset_id @@ -594,7 +594,7 @@ def __init__(self, dataset_id, project_id=None, dataset_reference=None, - bigquery_conn_id='bigquery_default', + bigquery_conn_id='google_cloud_default', delegate_to=None, *args, **kwargs): self.dataset_id = dataset_id diff --git a/airflow/contrib/operators/bigquery_table_delete_operator.py b/airflow/contrib/operators/bigquery_table_delete_operator.py index 106afd1e94f0e..c4a46e263af7f 100644 --- a/airflow/contrib/operators/bigquery_table_delete_operator.py +++ b/airflow/contrib/operators/bigquery_table_delete_operator.py @@ -46,7 +46,7 @@ class BigQueryTableDeleteOperator(BaseOperator): @apply_defaults def __init__(self, deletion_dataset_table, - bigquery_conn_id='bigquery_default', + bigquery_conn_id='google_cloud_default', delegate_to=None, ignore_if_missing=False, *args, diff --git a/airflow/contrib/operators/bigquery_to_bigquery.py b/airflow/contrib/operators/bigquery_to_bigquery.py index 288731e157de7..d12f88dfdefec 100644 --- a/airflow/contrib/operators/bigquery_to_bigquery.py +++ b/airflow/contrib/operators/bigquery_to_bigquery.py @@ -64,7 +64,7 @@ def __init__(self, destination_project_dataset_table, write_disposition='WRITE_EMPTY', create_disposition='CREATE_IF_NEEDED', - bigquery_conn_id='bigquery_default', + bigquery_conn_id='google_cloud_default', delegate_to=None, labels=None, *args, diff --git a/airflow/contrib/operators/bigquery_to_gcs.py b/airflow/contrib/operators/bigquery_to_gcs.py index 19b004fdfe2fb..11d87ae74d371 100644 --- a/airflow/contrib/operators/bigquery_to_gcs.py +++ b/airflow/contrib/operators/bigquery_to_gcs.py @@ -71,7 +71,7 @@ def __init__(self, export_format='CSV', field_delimiter=',', print_header=True, - bigquery_conn_id='bigquery_default', + bigquery_conn_id='google_cloud_default', delegate_to=None, labels=None, *args, diff --git a/airflow/contrib/operators/gcs_to_bq.py b/airflow/contrib/operators/gcs_to_bq.py index db5679b9c996d..7372de69e0e81 100644 --- a/airflow/contrib/operators/gcs_to_bq.py +++ b/airflow/contrib/operators/gcs_to_bq.py @@ -149,7 +149,7 @@ def __init__(self, allow_quoted_newlines=False, allow_jagged_rows=False, max_id_key=None, - bigquery_conn_id='bigquery_default', + bigquery_conn_id='google_cloud_default', google_cloud_storage_conn_id='google_cloud_default', delegate_to=None, schema_update_options=(), diff --git a/airflow/contrib/operators/gcs_to_s3.py b/airflow/contrib/operators/gcs_to_s3.py index 6029661f370c6..b67fe9aba24d9 100644 --- a/airflow/contrib/operators/gcs_to_s3.py +++ b/airflow/contrib/operators/gcs_to_s3.py @@ -67,7 +67,7 @@ def __init__(self, bucket, prefix=None, delimiter=None, - google_cloud_storage_conn_id='google_cloud_storage_default', + google_cloud_storage_conn_id='google_cloud_default', delegate_to=None, dest_aws_conn_id=None, dest_s3_key=None, diff --git a/airflow/contrib/sensors/bigquery_sensor.py b/airflow/contrib/sensors/bigquery_sensor.py index fe8bd2ed6acc9..566075d92ca6e 100644 --- a/airflow/contrib/sensors/bigquery_sensor.py +++ b/airflow/contrib/sensors/bigquery_sensor.py @@ -50,7 +50,7 @@ def __init__(self, project_id, dataset_id, table_id, - bigquery_conn_id='bigquery_default_conn', + bigquery_conn_id='google_cloud_default', delegate_to=None, *args, **kwargs): diff --git a/airflow/utils/db.py b/airflow/utils/db.py index 0646496108b4f..eacf1cc5d310b 100644 --- a/airflow/utils/db.py +++ b/airflow/utils/db.py @@ -93,10 +93,6 @@ def initdb(): conn_id='airflow_db', conn_type='mysql', host='mysql', login='root', password='', schema='airflow')) - merge_conn( - Connection( - conn_id='bigquery_default', conn_type='google_cloud_platform', - schema='default')) merge_conn( Connection( conn_id='local_mysql', conn_type='mysql', diff --git a/docs/howto/connection/gcp.rst b/docs/howto/connection/gcp.rst index 942132a41a218..5b96864ecdfef 100644 --- a/docs/howto/connection/gcp.rst +++ b/docs/howto/connection/gcp.rst @@ -36,30 +36,7 @@ There are two ways to connect to GCP using Airflow. Default Connection IDs ---------------------- -The following connection IDs are used by default. - -``bigquery_default`` - Used by the :class:`~airflow.contrib.hooks.bigquery_hook.BigQueryHook` - hook. - -``google_cloud_datastore_default`` - Used by the :class:`~airflow.contrib.hooks.datastore_hook.DatastoreHook` - hook. - -``google_cloud_default`` - Used by those hooks: - - * :class:`~airflow.contrib.hooks.gcp_api_base_hook.GoogleCloudBaseHook` - * :class:`~airflow.contrib.hooks.gcp_dataflow_hook.DataFlowHook` - * :class:`~airflow.contrib.hooks.gcp_dataproc_hook.DataProcHook` - * :class:`~airflow.contrib.hooks.gcp_mlengine_hook.MLEngineHook` - * :class:`~airflow.contrib.hooks.gcs_hook.GoogleCloudStorageHook` - * :class:`~airflow.contrib.hooks.gcp_bigtable_hook.BigtableHook` - * :class:`~airflow.contrib.hooks.gcp_compute_hook.GceHook` - * :class:`~airflow.contrib.hooks.gcp_function_hook.GcfHook` - * :class:`~airflow.contrib.hooks.gcp_spanner_hook.CloudSpannerHook` - * :class:`~airflow.contrib.hooks.gcp_sql_hook.CloudSqlHook` - +All hooks and operators related to Google Cloud Platform use ``google_cloud_default`` by default. Configuring the Connection -------------------------- diff --git a/tests/contrib/operators/test_bigquery_operator.py b/tests/contrib/operators/test_bigquery_operator.py index de9a241f3b136..7eb9feeda3120 100644 --- a/tests/contrib/operators/test_bigquery_operator.py +++ b/tests/contrib/operators/test_bigquery_operator.py @@ -182,7 +182,7 @@ def test_execute(self, mock_hook): write_disposition='WRITE_EMPTY', allow_large_results=False, flatten_results=None, - bigquery_conn_id='bigquery_default', + bigquery_conn_id='google_cloud_default', udf_config=None, use_legacy_sql=True, maximum_billing_tier=None,