From f7019d00189a290e0841d35cc425bea476e21925 Mon Sep 17 00:00:00 2001 From: Jarek Potiuk Date: Sun, 22 Nov 2020 11:01:13 +0100 Subject: [PATCH 1/3] Adds support for Hook discovery from providers This PR extends providers discovery with the mechanism of retrieving mapping of connections from type to hook. Fixes #12456 --- .pre-commit-config.yaml | 2 +- airflow/cli/cli_parser.py | 6 + airflow/cli/commands/provider_command.py | 25 ++++- airflow/models/connection.py | 65 +---------- airflow/plugins_manager.py | 18 +-- airflow/provider.yaml.schema.json | 7 ++ .../apache/cassandra/hooks/cassandra.py | 6 +- .../providers/apache/cassandra/provider.yaml | 3 + airflow/providers/apache/hive/hooks/hive.py | 7 +- airflow/providers/apache/hive/provider.yaml | 4 + airflow/providers/apache/pig/hooks/pig.py | 6 +- airflow/providers/apache/pig/provider.yaml | 4 + airflow/providers/cloudant/hooks/cloudant.py | 6 +- airflow/providers/cloudant/provider.yaml | 3 + .../cncf/kubernetes/hooks/kubernetes.py | 6 +- .../providers/cncf/kubernetes/provider.yaml | 3 + airflow/providers/docker/hooks/docker.py | 6 +- airflow/providers/docker/provider.yaml | 3 + .../elasticsearch/hooks/elasticsearch.py | 1 + airflow/providers/elasticsearch/provider.yaml | 3 + airflow/providers/exasol/hooks/exasol.py | 1 + airflow/providers/exasol/provider.yaml | 3 + .../providers/google/cloud/hooks/bigquery.py | 4 +- .../providers/google/cloud/hooks/cloud_sql.py | 7 +- .../google/cloud/hooks/compute_ssh.py | 4 + .../providers/google/cloud/hooks/dataprep.py | 6 +- airflow/providers/google/provider.yaml | 6 + airflow/providers/grpc/hooks/grpc.py | 6 +- airflow/providers/grpc/provider.yaml | 3 + airflow/providers/imap/hooks/imap.py | 6 +- airflow/providers/imap/provider.yaml | 3 + airflow/providers/jdbc/hooks/jdbc.py | 1 + airflow/providers/jdbc/provider.yaml | 3 + airflow/providers/jira/hooks/jira.py | 6 +- airflow/providers/jira/provider.yaml | 3 + .../microsoft/azure/hooks/azure_batch.py | 6 +- .../microsoft/azure/hooks/azure_cosmos.py | 6 +- .../microsoft/azure/hooks/azure_data_lake.py | 6 +- .../providers/microsoft/azure/hooks/wasb.py | 6 +- .../providers/microsoft/azure/provider.yaml | 6 + .../providers/microsoft/mssql/hooks/mssql.py | 12 +- .../providers/microsoft/mssql/provider.yaml | 3 + airflow/providers/mongo/hooks/mongo.py | 4 +- airflow/providers/mongo/provider.yaml | 3 + airflow/providers/mysql/hooks/mysql.py | 1 + airflow/providers/mysql/provider.yaml | 3 + airflow/providers/odbc/hooks/odbc.py | 1 + airflow/providers/odbc/provider.yaml | 3 + airflow/providers/oracle/hooks/oracle.py | 1 + airflow/providers/oracle/provider.yaml | 5 + airflow/providers/postgres/hooks/postgres.py | 1 + airflow/providers/postgres/provider.yaml | 3 + airflow/providers/presto/hooks/presto.py | 1 + airflow/providers/presto/provider.yaml | 3 + airflow/providers/redis/hooks/redis.py | 6 +- airflow/providers/redis/provider.yaml | 3 + airflow/providers/salesforce/hooks/tableau.py | 6 +- airflow/providers/salesforce/provider.yaml | 3 + .../providers/snowflake/hooks/snowflake.py | 1 + airflow/providers/snowflake/provider.yaml | 3 + airflow/providers/sqlite/hooks/sqlite.py | 1 + airflow/providers/sqlite/provider.yaml | 3 + airflow/providers/vertica/hooks/vertica.py | 1 + airflow/providers/vertica/provider.yaml | 3 + airflow/providers_manager.py | 94 +++++++++++++--- airflow/utils/entry_points_with_dist.py | 35 ++++++ .../pre_commit_check_provider_yaml_files.py | 103 +++++++++--------- .../run_install_and_test_provider_packages.sh | 20 ++++ tests/core/test_providers_manager.py | 41 +++++++ tests/models/test_connection.py | 14 --- 70 files changed, 461 insertions(+), 196 deletions(-) create mode 100644 airflow/utils/entry_points_with_dist.py diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index c262c9a13583b3..40433b0f0f9d30 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -457,7 +457,7 @@ repos: entry: ./scripts/ci/pre_commit/pre_commit_check_provider_yaml_files.py language: python require_serial: true - files: provider.yaml$ + files: provider.yaml$|scripts/ci/pre_commit/pre_commit_check_provider_yaml_files.py$ additional_dependencies: ['PyYAML==5.3.1', 'jsonschema==3.2.0', 'tabulate==0.8.7'] - id: mermaid name: Generate mermaid images diff --git a/airflow/cli/cli_parser.py b/airflow/cli/cli_parser.py index fbcb01da07bf0a..6a13db0619b054 100644 --- a/airflow/cli/cli_parser.py +++ b/airflow/cli/cli_parser.py @@ -1153,6 +1153,12 @@ class GroupCommand(NamedTuple): ), ) PROVIDERS_COMMANDS = ( + ActionCommand( + name='hooks', + help='List registered provider hooks', + func=lazy_load_command('airflow.cli.commands.provider_command.hooks_list'), + args=(ARG_OUTPUT,), + ), ActionCommand( name='list', help='List installed providers', diff --git a/airflow/cli/commands/provider_command.py b/airflow/cli/commands/provider_command.py index 70fbd4360a631a..a2df92fa0c6e78 100644 --- a/airflow/cli/commands/provider_command.py +++ b/airflow/cli/commands/provider_command.py @@ -15,7 +15,7 @@ # specific language governing permissions and limitations # under the License. """Providers sub-commands""" -from typing import Dict, List +from typing import Dict, List, Tuple import pygments import yaml @@ -44,8 +44,7 @@ def provider_get(args): """Get a provider info.""" providers = ProvidersManager().providers if args.provider_name in providers: - provider_version = providers[args.provider_name][0] - provider_info = providers[args.provider_name][1] + provider_version, provider_info = providers[args.provider_name] print("#") print(f"# Provider: {args.provider_name}") print(f"# Version: {provider_version}") @@ -64,3 +63,23 @@ def provider_get(args): def providers_list(args): """Lists all providers at the command line""" print(_tabulate_providers(ProvidersManager().providers.values(), args.output)) + + +def _tabulate_hooks(hook_items: Tuple[str, Tuple[str, str]], tablefmt: str): + tabulate_data = [ + { + 'Connection type': hook_item[0], + 'Hook class': hook_item[1][0], + 'Hook connection attribute name': hook_item[1][1], + } + for hook_item in hook_items + ] + + msg = tabulate(tabulate_data, tablefmt=tablefmt, headers='keys') + return msg + + +def hooks_list(args): + """Lists all hooks at the command line""" + msg = _tabulate_hooks(ProvidersManager().hooks.items(), args.output) + print(msg) diff --git a/airflow/models/connection.py b/airflow/models/connection.py index d12d6807e54d80..391aafb93d81c5 100644 --- a/airflow/models/connection.py +++ b/airflow/models/connection.py @@ -30,70 +30,10 @@ from airflow.exceptions import AirflowException, AirflowNotFoundException from airflow.models.base import ID_LEN, Base from airflow.models.crypto import get_fernet +from airflow.providers_manager import ProvidersManager from airflow.utils.log.logging_mixin import LoggingMixin from airflow.utils.module_loading import import_string -# A map that assigns a connection type to a tuple that contains -# the path of the class and the name of the conn_id key parameter. -# PLEASE KEEP BELOW LIST IN ALPHABETICAL ORDER. -CONN_TYPE_TO_HOOK = { - "azure_batch": ( - "airflow.providers.microsoft.azure.hooks.azure_batch.AzureBatchHook", - "azure_batch_conn_id", - ), - "azure_cosmos": ( - "airflow.providers.microsoft.azure.hooks.azure_cosmos.AzureCosmosDBHook", - "azure_cosmos_conn_id", - ), - "azure_data_lake": ( - "airflow.providers.microsoft.azure.hooks.azure_data_lake.AzureDataLakeHook", - "azure_data_lake_conn_id", - ), - "cassandra": ("airflow.providers.apache.cassandra.hooks.cassandra.CassandraHook", "cassandra_conn_id"), - "cloudant": ("airflow.providers.cloudant.hooks.cloudant.CloudantHook", "cloudant_conn_id"), - "dataprep": ("airflow.providers.google.cloud.hooks.dataprep.GoogleDataprepHook", "dataprep_default"), - "docker": ("airflow.providers.docker.hooks.docker.DockerHook", "docker_conn_id"), - "elasticsearch": ( - "airflow.providers.elasticsearch.hooks.elasticsearch.ElasticsearchHook", - "elasticsearch_conn_id", - ), - "exasol": ("airflow.providers.exasol.hooks.exasol.ExasolHook", "exasol_conn_id"), - "gcpcloudsql": ( - "airflow.providers.google.cloud.hooks.cloud_sql.CloudSQLDatabaseHook", - "gcp_cloudsql_conn_id", - ), - "gcpssh": ( - "airflow.providers.google.cloud.hooks.compute_ssh.ComputeEngineSSHHook", - "gcp_conn_id", - ), - "google_cloud_platform": ( - "airflow.providers.google.cloud.hooks.bigquery.BigQueryHook", - "bigquery_conn_id", - ), - "grpc": ("airflow.providers.grpc.hooks.grpc.GrpcHook", "grpc_conn_id"), - "hive_cli": ("airflow.providers.apache.hive.hooks.hive.HiveCliHook", "hive_cli_conn_id"), - "hiveserver2": ("airflow.providers.apache.hive.hooks.hive.HiveServer2Hook", "hiveserver2_conn_id"), - "imap": ("airflow.providers.imap.hooks.imap.ImapHook", "imap_conn_id"), - "jdbc": ("airflow.providers.jdbc.hooks.jdbc.JdbcHook", "jdbc_conn_id"), - "jira": ("airflow.providers.jira.hooks.jira.JiraHook", "jira_conn_id"), - "kubernetes": ("airflow.providers.cncf.kubernetes.hooks.kubernetes.KubernetesHook", "kubernetes_conn_id"), - "mongo": ("airflow.providers.mongo.hooks.mongo.MongoHook", "conn_id"), - "mssql": ("airflow.providers.odbc.hooks.odbc.OdbcHook", "odbc_conn_id"), - "mysql": ("airflow.providers.mysql.hooks.mysql.MySqlHook", "mysql_conn_id"), - "odbc": ("airflow.providers.odbc.hooks.odbc.OdbcHook", "odbc_conn_id"), - "oracle": ("airflow.providers.oracle.hooks.oracle.OracleHook", "oracle_conn_id"), - "pig_cli": ("airflow.providers.apache.pig.hooks.pig.PigCliHook", "pig_cli_conn_id"), - "postgres": ("airflow.providers.postgres.hooks.postgres.PostgresHook", "postgres_conn_id"), - "presto": ("airflow.providers.presto.hooks.presto.PrestoHook", "presto_conn_id"), - "redis": ("airflow.providers.redis.hooks.redis.RedisHook", "redis_conn_id"), - "snowflake": ("airflow.providers.snowflake.hooks.snowflake.SnowflakeHook", "snowflake_conn_id"), - "sqlite": ("airflow.providers.sqlite.hooks.sqlite.SqliteHook", "sqlite_conn_id"), - "tableau": ("airflow.providers.salesforce.hooks.tableau.TableauHook", "tableau_conn_id"), - "vertica": ("airflow.providers.vertica.hooks.vertica.VerticaHook", "vertica_conn_id"), - "wasb": ("airflow.providers.microsoft.azure.hooks.wasb.WasbHook", "wasb_conn_id"), -} -# PLEASE KEEP ABOVE LIST IN ALPHABETICAL ORDER. - def parse_netloc_to_hostname(*args, **kwargs): """This method is deprecated.""" @@ -326,7 +266,8 @@ def rotate_fernet_key(self): def get_hook(self): """Return hook based on conn_type.""" - hook_class_name, conn_id_param = CONN_TYPE_TO_HOOK.get(self.conn_type, (None, None)) + hook_class_name, conn_id_param = ProvidersManager().hooks.get(self.conn_type, (None, None)) + if not hook_class_name: raise AirflowException(f'Unknown hook type "{self.conn_type}"') hook_class = import_string(hook_class_name) diff --git a/airflow/plugins_manager.py b/airflow/plugins_manager.py index dadae6af724dc7..b69aaf40e555c1 100644 --- a/airflow/plugins_manager.py +++ b/airflow/plugins_manager.py @@ -30,6 +30,7 @@ import importlib_metadata from airflow import settings +from airflow.utils.entry_points_with_dist import entry_points_with_dist from airflow.utils.file import find_path_from_directory if TYPE_CHECKING: @@ -170,23 +171,6 @@ def is_valid_plugin(plugin_obj): return False -def entry_points_with_dist(group: str): - """ - Return EntryPoint objects of the given group, along with the distribution information. - - This is like the ``entry_points()`` function from importlib.metadata, - except it also returns the distribution the entry_point was loaded from. - - :param group: FIlter results to only this entrypoint group - :return: Generator of (EntryPoint, Distribution) objects for the specified groups - """ - for dist in importlib_metadata.distributions(): - for e in dist.entry_points: - if e.group != group: - continue - yield (e, dist) - - def load_entrypoint_plugins(): """ Load and register plugins AirflowPlugin subclasses from the entrypoints. diff --git a/airflow/provider.yaml.schema.json b/airflow/provider.yaml.schema.json index 19ece216194f8e..88644b1d4f5d12 100644 --- a/airflow/provider.yaml.schema.json +++ b/airflow/provider.yaml.schema.json @@ -173,6 +173,13 @@ "python-module" ] } + }, + "hook-class-names": { + "type": "array", + "description": "Hook class names that provide connection types to core", + "items": { + "type": "string" + } } }, "additionalProperties": false, diff --git a/airflow/providers/apache/cassandra/hooks/cassandra.py b/airflow/providers/apache/cassandra/hooks/cassandra.py index 0166aa376a9b05..802303df2e994b 100644 --- a/airflow/providers/apache/cassandra/hooks/cassandra.py +++ b/airflow/providers/apache/cassandra/hooks/cassandra.py @@ -83,7 +83,11 @@ class CassandraHook(BaseHook, LoggingMixin): For details of the Cluster config, see cassandra.cluster. """ - def __init__(self, cassandra_conn_id: str = 'cassandra_default'): + conn_name_attr = 'cassandra_conn_id' + default_conn_name = 'cassandra_default' + conn_type = 'cassandra' + + def __init__(self, cassandra_conn_id: str = default_conn_name): super().__init__() conn = self.get_connection(cassandra_conn_id) diff --git a/airflow/providers/apache/cassandra/provider.yaml b/airflow/providers/apache/cassandra/provider.yaml index 77402c05a2b2d4..b75425fce31949 100644 --- a/airflow/providers/apache/cassandra/provider.yaml +++ b/airflow/providers/apache/cassandra/provider.yaml @@ -41,3 +41,6 @@ hooks: - integration-name: Apache Cassandra python-modules: - airflow.providers.apache.cassandra.hooks.cassandra + +hook-class-names: + - airflow.providers.apache.cassandra.hooks.cassandra.CassandraHook diff --git a/airflow/providers/apache/hive/hooks/hive.py b/airflow/providers/apache/hive/hooks/hive.py index a04cfddd8a94d0..6c9360211fc26f 100644 --- a/airflow/providers/apache/hive/hooks/hive.py +++ b/airflow/providers/apache/hive/hooks/hive.py @@ -78,9 +78,13 @@ class HiveCliHook(BaseHook): :type mapred_job_name: str """ + conn_name_attr = 'hive_cli_conn_id' + default_conn_name = 'hive_cli_default' + conn_type = 'hive_cli' + def __init__( self, - hive_cli_conn_id: str = "hive_cli_default", + hive_cli_conn_id: str = default_conn_name, run_as: Optional[str] = None, mapred_queue: Optional[str] = None, mapred_queue_priority: Optional[str] = None, @@ -809,6 +813,7 @@ class HiveServer2Hook(DbApiHook): conn_name_attr = 'hiveserver2_conn_id' default_conn_name = 'hiveserver2_default' + conn_type = 'hiveserver2' supports_autocommit = False def get_conn(self, schema: Optional[str] = None) -> Any: diff --git a/airflow/providers/apache/hive/provider.yaml b/airflow/providers/apache/hive/provider.yaml index 98b94829d86bc6..68c18b77027dab 100644 --- a/airflow/providers/apache/hive/provider.yaml +++ b/airflow/providers/apache/hive/provider.yaml @@ -66,3 +66,7 @@ transfers: - source-integration-name: Microsoft SQL Server (MSSQL) target-integration-name: Apache Hive python-module: airflow.providers.apache.hive.transfers.mssql_to_hive + +hook-class-names: + - airflow.providers.apache.hive.hooks.hive.HiveCliHook + - airflow.providers.apache.hive.hooks.hive.HiveServer2Hook diff --git a/airflow/providers/apache/pig/hooks/pig.py b/airflow/providers/apache/pig/hooks/pig.py index c8e39d8b734607..1560f69af9f5a6 100644 --- a/airflow/providers/apache/pig/hooks/pig.py +++ b/airflow/providers/apache/pig/hooks/pig.py @@ -33,7 +33,11 @@ class PigCliHook(BaseHook): """ - def __init__(self, pig_cli_conn_id: str = "pig_cli_default") -> None: + conn_name_attr = 'pig_cli_conn_id' + default_conn_name = 'pig_cli_default' + conn_type = 'pig_cli' + + def __init__(self, pig_cli_conn_id: str = default_conn_name) -> None: super().__init__() conn = self.get_connection(pig_cli_conn_id) self.pig_properties = conn.extra_dejson.get('pig_properties', '') diff --git a/airflow/providers/apache/pig/provider.yaml b/airflow/providers/apache/pig/provider.yaml index f1754c72e99625..175cd70ff5247f 100644 --- a/airflow/providers/apache/pig/provider.yaml +++ b/airflow/providers/apache/pig/provider.yaml @@ -32,7 +32,11 @@ operators: - integration-name: Apache Pig python-modules: - airflow.providers.apache.pig.operators.pig + hooks: - integration-name: Apache Pig python-modules: - airflow.providers.apache.pig.hooks.pig + +hook-class-names: + - airflow.providers.apache.pig.hooks.pig.PigCliHook diff --git a/airflow/providers/cloudant/hooks/cloudant.py b/airflow/providers/cloudant/hooks/cloudant.py index e490ffc4cb741d..6193bbab98c052 100644 --- a/airflow/providers/cloudant/hooks/cloudant.py +++ b/airflow/providers/cloudant/hooks/cloudant.py @@ -32,7 +32,11 @@ class CloudantHook(BaseHook): :type cloudant_conn_id: str """ - def __init__(self, cloudant_conn_id: str = 'cloudant_default') -> None: + conn_name_attr = 'cloudant_conn_id' + default_conn_name = 'cloudant_default' + conn_type = 'cloudant' + + def __init__(self, cloudant_conn_id: str = default_conn_name) -> None: super().__init__() self.cloudant_conn_id = cloudant_conn_id diff --git a/airflow/providers/cloudant/provider.yaml b/airflow/providers/cloudant/provider.yaml index 4986c439dc133a..0f24c5e8211b0a 100644 --- a/airflow/providers/cloudant/provider.yaml +++ b/airflow/providers/cloudant/provider.yaml @@ -32,3 +32,6 @@ hooks: - integration-name: IBM Cloudant python-modules: - airflow.providers.cloudant.hooks.cloudant + +hook-class-names: + - airflow.providers.cloudant.hooks.cloudant.CloudantHook diff --git a/airflow/providers/cncf/kubernetes/hooks/kubernetes.py b/airflow/providers/cncf/kubernetes/hooks/kubernetes.py index 499cac58c71084..27e21751f0159f 100644 --- a/airflow/providers/cncf/kubernetes/hooks/kubernetes.py +++ b/airflow/providers/cncf/kubernetes/hooks/kubernetes.py @@ -54,8 +54,12 @@ class KubernetesHook(BaseHook): :type conn_id: str """ + conn_name_attr = 'kubernetes_conn_id' + default_conn_name = 'kubernetes_default' + conn_type = 'kubernetes' + def __init__( - self, conn_id: str = "kubernetes_default", client_configuration: Optional[client.Configuration] = None + self, conn_id: str = default_conn_name, client_configuration: Optional[client.Configuration] = None ) -> None: super().__init__() self.conn_id = conn_id diff --git a/airflow/providers/cncf/kubernetes/provider.yaml b/airflow/providers/cncf/kubernetes/provider.yaml index c66fd7134f7f2d..541af8e153c1b3 100644 --- a/airflow/providers/cncf/kubernetes/provider.yaml +++ b/airflow/providers/cncf/kubernetes/provider.yaml @@ -48,3 +48,6 @@ hooks: - integration-name: Kubernetes python-modules: - airflow.providers.cncf.kubernetes.hooks.kubernetes + +hook-class-names: + - airflow.providers.cncf.kubernetes.hooks.kubernetes.KubernetesHook diff --git a/airflow/providers/docker/hooks/docker.py b/airflow/providers/docker/hooks/docker.py index 5ed87d63d22c93..acd9ed0f56d87c 100644 --- a/airflow/providers/docker/hooks/docker.py +++ b/airflow/providers/docker/hooks/docker.py @@ -34,9 +34,13 @@ class DockerHook(BaseHook, LoggingMixin): :type docker_conn_id: str """ + conn_name_attr = 'docker_conn_id' + default_conn_name = 'docker_default' + conn_type = 'docker' + def __init__( self, - docker_conn_id='docker_default', + docker_conn_id: str = default_conn_name, base_url: Optional[str] = None, version: Optional[str] = None, tls: Optional[str] = None, diff --git a/airflow/providers/docker/provider.yaml b/airflow/providers/docker/provider.yaml index a02202c369f572..56d248896d77ac 100644 --- a/airflow/providers/docker/provider.yaml +++ b/airflow/providers/docker/provider.yaml @@ -43,3 +43,6 @@ hooks: - integration-name: Docker python-modules: - airflow.providers.docker.hooks.docker + +hook-class-names: + - airflow.providers.docker.hooks.docker.DockerHook diff --git a/airflow/providers/elasticsearch/hooks/elasticsearch.py b/airflow/providers/elasticsearch/hooks/elasticsearch.py index 8c6d1d2fddf26b..16306ac2a5fa8f 100644 --- a/airflow/providers/elasticsearch/hooks/elasticsearch.py +++ b/airflow/providers/elasticsearch/hooks/elasticsearch.py @@ -29,6 +29,7 @@ class ElasticsearchHook(DbApiHook): conn_name_attr = 'elasticsearch_conn_id' default_conn_name = 'elasticsearch_default' + conn_type = 'elasticsearch' def __init__(self, schema: str = "http", connection: Optional[AirflowConnection] = None, *args, **kwargs): super().__init__(*args, **kwargs) diff --git a/airflow/providers/elasticsearch/provider.yaml b/airflow/providers/elasticsearch/provider.yaml index 079d7244d36be9..7802a2e2f7a1f4 100644 --- a/airflow/providers/elasticsearch/provider.yaml +++ b/airflow/providers/elasticsearch/provider.yaml @@ -32,3 +32,6 @@ hooks: - integration-name: Elasticsearch python-modules: - airflow.providers.elasticsearch.hooks.elasticsearch + +hook-class-names: + - airflow.providers.elasticsearch.hooks.elasticsearch.ElasticsearchHook diff --git a/airflow/providers/exasol/hooks/exasol.py b/airflow/providers/exasol/hooks/exasol.py index 935eb8b1b5fd93..4ff8649240f41b 100644 --- a/airflow/providers/exasol/hooks/exasol.py +++ b/airflow/providers/exasol/hooks/exasol.py @@ -38,6 +38,7 @@ class ExasolHook(DbApiHook): conn_name_attr = 'exasol_conn_id' default_conn_name = 'exasol_default' + conn_type = 'exasol' supports_autocommit = True def __init__(self, *args, **kwargs) -> None: diff --git a/airflow/providers/exasol/provider.yaml b/airflow/providers/exasol/provider.yaml index ae13837ff062e5..1ea92545165006 100644 --- a/airflow/providers/exasol/provider.yaml +++ b/airflow/providers/exasol/provider.yaml @@ -38,3 +38,6 @@ hooks: - integration-name: Exasol python-modules: - airflow.providers.exasol.hooks.exasol + +hook-class-names: + - airflow.providers.exasol.hooks.exasol.ExasolHook diff --git a/airflow/providers/google/cloud/hooks/bigquery.py b/airflow/providers/google/cloud/hooks/bigquery.py index 95434c2c189c8e..bfdc4bf73310c7 100644 --- a/airflow/providers/google/cloud/hooks/bigquery.py +++ b/airflow/providers/google/cloud/hooks/bigquery.py @@ -67,7 +67,9 @@ class BigQueryHook(GoogleBaseHook, DbApiHook): """Interact with BigQuery. This hook uses the Google Cloud connection.""" - conn_name_attr = 'gcp_conn_id' # type: str + conn_name_attr = 'gcp_conn_id' + default_conn_name = 'google_cloud_default' + conn_type = 'google_cloud_platform' def __init__( self, diff --git a/airflow/providers/google/cloud/hooks/cloud_sql.py b/airflow/providers/google/cloud/hooks/cloud_sql.py index dca25e910e481a..023524cc4666ed 100644 --- a/airflow/providers/google/cloud/hooks/cloud_sql.py +++ b/airflow/providers/google/cloud/hooks/cloud_sql.py @@ -709,6 +709,11 @@ class CloudSQLDatabaseHook(BaseHook): # noqa in the connection URL :type default_gcp_project_id: str """ + + conn_name_attr = 'gcp_cloudsql_conn_id' + default_conn_name = 'google_cloud_sql_default' + conn_type = 'gcpcloudsql' + _conn = None # type: Optional[Any] def __init__( @@ -735,7 +740,7 @@ def __init__( self.user = self.cloudsql_connection.login # type: Optional[str] self.password = self.cloudsql_connection.password # type: Optional[str] self.public_ip = self.cloudsql_connection.host # type: Optional[str] - self.public_port = self.cloudsql_connection.port # type: Optional[str] + self.public_port = self.cloudsql_connection.port # type: Optional[int] self.sslcert = self.extras.get('sslcert') # type: Optional[str] self.sslkey = self.extras.get('sslkey') # type: Optional[str] self.sslrootcert = self.extras.get('sslrootcert') # type: Optional[str] diff --git a/airflow/providers/google/cloud/hooks/compute_ssh.py b/airflow/providers/google/cloud/hooks/compute_ssh.py index fe6ad43b2d78fd..2d33c62334274c 100644 --- a/airflow/providers/google/cloud/hooks/compute_ssh.py +++ b/airflow/providers/google/cloud/hooks/compute_ssh.py @@ -89,6 +89,10 @@ class ComputeEngineSSHHook(SSHHook): :type delegate_to: str """ + conn_name_attr = 'gcp_conn_id' + default_conn_name = 'google_cloud_default' + conn_type = 'gcpssh' + def __init__( # pylint: disable=too-many-arguments self, gcp_conn_id: str = 'google_cloud_default', diff --git a/airflow/providers/google/cloud/hooks/dataprep.py b/airflow/providers/google/cloud/hooks/dataprep.py index d5c8ab089a5c26..a9c969f8e2d6f6 100644 --- a/airflow/providers/google/cloud/hooks/dataprep.py +++ b/airflow/providers/google/cloud/hooks/dataprep.py @@ -37,7 +37,11 @@ class GoogleDataprepHook(BaseHook): """ - def __init__(self, dataprep_conn_id: str = "dataprep_default") -> None: + conn_name_attr = 'dataprep_conn_id' + default_conn_name = 'dataprep_default' + conn_type = 'dataprep' + + def __init__(self, dataprep_conn_id: str = default_conn_name) -> None: super().__init__() self.dataprep_conn_id = dataprep_conn_id conn = self.get_connection(self.dataprep_conn_id) diff --git a/airflow/providers/google/provider.yaml b/airflow/providers/google/provider.yaml index 5acc4240d852e9..469134d781f420 100644 --- a/airflow/providers/google/provider.yaml +++ b/airflow/providers/google/provider.yaml @@ -630,3 +630,9 @@ transfers: - source-integration-name: Google Ads target-integration-name: Google Cloud Storage (GCS) python-module: airflow.providers.google.ads.transfers.ads_to_gcs + +hook-class-names: + - airflow.providers.google.cloud.hooks.dataprep.GoogleDataprepHook + - airflow.providers.google.cloud.hooks.cloud_sql.CloudSQLDatabaseHook + - airflow.providers.google.cloud.hooks.compute_ssh.ComputeEngineSSHHook + - airflow.providers.google.cloud.hooks.bigquery.BigQueryHook diff --git a/airflow/providers/grpc/hooks/grpc.py b/airflow/providers/grpc/hooks/grpc.py index ccfbd8f166a50f..13b60cafe3c450 100644 --- a/airflow/providers/grpc/hooks/grpc.py +++ b/airflow/providers/grpc/hooks/grpc.py @@ -47,9 +47,13 @@ class GrpcHook(BaseHook): its only arg. Could be partial or lambda. """ + conn_name_attr = 'grpc_conn_id' + default_conn_name = 'grpc_default' + conn_type = 'grpc' + def __init__( self, - grpc_conn_id: str, + grpc_conn_id: str = default_conn_name, interceptors: Optional[List[Callable]] = None, custom_connection_func: Optional[Callable] = None, ) -> None: diff --git a/airflow/providers/grpc/provider.yaml b/airflow/providers/grpc/provider.yaml index a607e97b3c2fe8..8a06f79ee96339 100644 --- a/airflow/providers/grpc/provider.yaml +++ b/airflow/providers/grpc/provider.yaml @@ -37,3 +37,6 @@ hooks: - integration-name: gRPC python-modules: - airflow.providers.grpc.hooks.grpc + +hook-class-names: + - airflow.providers.grpc.hooks.grpc.GrpcHook diff --git a/airflow/providers/imap/hooks/imap.py b/airflow/providers/imap/hooks/imap.py index 926db52e22fde2..8e7236fdf684e0 100644 --- a/airflow/providers/imap/hooks/imap.py +++ b/airflow/providers/imap/hooks/imap.py @@ -42,7 +42,11 @@ class ImapHook(BaseHook): :type imap_conn_id: str """ - def __init__(self, imap_conn_id: str = 'imap_default') -> None: + conn_name_attr = 'imap_conn_id' + default_conn_name = 'imap_default' + conn_type = 'imap' + + def __init__(self, imap_conn_id: str = default_conn_name) -> None: super().__init__() self.imap_conn_id = imap_conn_id self.mail_client: Optional[imaplib.IMAP4_SSL] = None diff --git a/airflow/providers/imap/provider.yaml b/airflow/providers/imap/provider.yaml index 7616c64270d165..e91a899d34b49c 100644 --- a/airflow/providers/imap/provider.yaml +++ b/airflow/providers/imap/provider.yaml @@ -38,3 +38,6 @@ hooks: - integration-name: Internet Message Access Protocol (IMAP) python-modules: - airflow.providers.imap.hooks.imap + +hook-class-names: + - airflow.providers.imap.hooks.imap.ImapHook diff --git a/airflow/providers/jdbc/hooks/jdbc.py b/airflow/providers/jdbc/hooks/jdbc.py index 48086d2e11007d..8f837f8c76693f 100644 --- a/airflow/providers/jdbc/hooks/jdbc.py +++ b/airflow/providers/jdbc/hooks/jdbc.py @@ -35,6 +35,7 @@ class JdbcHook(DbApiHook): conn_name_attr = 'jdbc_conn_id' default_conn_name = 'jdbc_default' + conn_type = 'jdbc' supports_autocommit = True def get_conn(self) -> jaydebeapi.Connection: diff --git a/airflow/providers/jdbc/provider.yaml b/airflow/providers/jdbc/provider.yaml index dac49d3cc60f05..91a291cd617648 100644 --- a/airflow/providers/jdbc/provider.yaml +++ b/airflow/providers/jdbc/provider.yaml @@ -38,3 +38,6 @@ hooks: - integration-name: Java Database Connectivity (JDBC) python-modules: - airflow.providers.jdbc.hooks.jdbc + +hook-class-names: + - airflow.providers.jdbc.hooks.jdbc.JdbcHook diff --git a/airflow/providers/jira/hooks/jira.py b/airflow/providers/jira/hooks/jira.py index daf573f3a546d0..d9faaeffd1b3ea 100644 --- a/airflow/providers/jira/hooks/jira.py +++ b/airflow/providers/jira/hooks/jira.py @@ -33,7 +33,11 @@ class JiraHook(BaseHook): :type jira_conn_id: str """ - def __init__(self, jira_conn_id: str = 'jira_default', proxies: Optional[Any] = None) -> None: + default_conn_name = 'jira_default' + conn_type = "jira" + conn_name_attr = "jira_conn_id" + + def __init__(self, jira_conn_id: str = default_conn_name, proxies: Optional[Any] = None) -> None: super().__init__() self.jira_conn_id = jira_conn_id self.proxies = proxies diff --git a/airflow/providers/jira/provider.yaml b/airflow/providers/jira/provider.yaml index 9018b8283aab36..34c48559b5aaf9 100644 --- a/airflow/providers/jira/provider.yaml +++ b/airflow/providers/jira/provider.yaml @@ -43,3 +43,6 @@ hooks: - integration-name: Atlassian Jira python-modules: - airflow.providers.jira.hooks.jira + +hook-class-names: + - airflow.providers.jira.hooks.jira.JiraHook diff --git a/airflow/providers/microsoft/azure/hooks/azure_batch.py b/airflow/providers/microsoft/azure/hooks/azure_batch.py index 0102eac7970a6f..265878656101b4 100644 --- a/airflow/providers/microsoft/azure/hooks/azure_batch.py +++ b/airflow/providers/microsoft/azure/hooks/azure_batch.py @@ -37,7 +37,11 @@ class AzureBatchHook(BaseHook): The account url should be in extra parameter as account_url """ - def __init__(self, azure_batch_conn_id: str = 'azure_batch_default') -> None: + conn_name_attr = 'azure_batch_conn_id' + default_conn_name = 'azure_batch_default' + conn_type = 'azure_batch' + + def __init__(self, azure_batch_conn_id: str = default_conn_name) -> None: super().__init__() self.conn_id = azure_batch_conn_id self.connection = self.get_conn() diff --git a/airflow/providers/microsoft/azure/hooks/azure_cosmos.py b/airflow/providers/microsoft/azure/hooks/azure_cosmos.py index c73e0e4ef60901..bcb1de2938c93b 100644 --- a/airflow/providers/microsoft/azure/hooks/azure_cosmos.py +++ b/airflow/providers/microsoft/azure/hooks/azure_cosmos.py @@ -45,7 +45,11 @@ class AzureCosmosDBHook(BaseHook): :type azure_cosmos_conn_id: str """ - def __init__(self, azure_cosmos_conn_id: str = 'azure_cosmos_default') -> None: + conn_name_attr = 'azure_cosmos_conn_id' + default_conn_name = 'azure_cosmos_default' + conn_type = 'azure_cosmos' + + def __init__(self, azure_cosmos_conn_id: str = default_conn_name) -> None: super().__init__() self.conn_id = azure_cosmos_conn_id self._conn = None diff --git a/airflow/providers/microsoft/azure/hooks/azure_data_lake.py b/airflow/providers/microsoft/azure/hooks/azure_data_lake.py index 7d1974e7a9fdce..6ff0f3a997af36 100644 --- a/airflow/providers/microsoft/azure/hooks/azure_data_lake.py +++ b/airflow/providers/microsoft/azure/hooks/azure_data_lake.py @@ -43,7 +43,11 @@ class AzureDataLakeHook(BaseHook): :type azure_data_lake_conn_id: str """ - def __init__(self, azure_data_lake_conn_id: str = 'azure_data_lake_default') -> None: + conn_name_attr = 'azure_data_lake_conn_id' + default_conn_name = 'azure_data_lake_default' + conn_type = 'azure_data_lake' + + def __init__(self, azure_data_lake_conn_id: str = default_conn_name) -> None: super().__init__() self.conn_id = azure_data_lake_conn_id self._conn: Optional[core.AzureDLFileSystem] = None diff --git a/airflow/providers/microsoft/azure/hooks/wasb.py b/airflow/providers/microsoft/azure/hooks/wasb.py index e72c1ec19171e6..c337fce86605dd 100644 --- a/airflow/providers/microsoft/azure/hooks/wasb.py +++ b/airflow/providers/microsoft/azure/hooks/wasb.py @@ -54,7 +54,11 @@ class WasbHook(BaseHook): :type wasb_conn_id: str """ - def __init__(self, wasb_conn_id: str = 'wasb_default') -> None: + conn_name_attr = 'wasb_conn_id' + default_conn_name = 'wasb_default' + conn_type = 'wasb' + + def __init__(self, wasb_conn_id: str = default_conn_name) -> None: super().__init__() self.conn_id = wasb_conn_id self.connection = self.get_conn() diff --git a/airflow/providers/microsoft/azure/provider.yaml b/airflow/providers/microsoft/azure/provider.yaml index fa2a21035cd578..9ceb0349fb5c11 100644 --- a/airflow/providers/microsoft/azure/provider.yaml +++ b/airflow/providers/microsoft/azure/provider.yaml @@ -124,3 +124,9 @@ transfers: target-integration-name: Google Cloud Storage (GCS) how-to-guide: /docs/howto/operator/microsoft/transfer/blob_storage_to_gcs.rst python-module: airflow.providers.microsoft.azure.transfers.azure_blob_to_gcs + +hook-class-names: + - airflow.providers.microsoft.azure.hooks.azure_batch.AzureBatchHook + - airflow.providers.microsoft.azure.hooks.azure_cosmos.AzureCosmosDBHook + - airflow.providers.microsoft.azure.hooks.azure_data_lake.AzureDataLakeHook + - airflow.providers.microsoft.azure.hooks.wasb.WasbHook diff --git a/airflow/providers/microsoft/mssql/hooks/mssql.py b/airflow/providers/microsoft/mssql/hooks/mssql.py index 3e5ec6adbae9fa..e3f7362038add1 100644 --- a/airflow/providers/microsoft/mssql/hooks/mssql.py +++ b/airflow/providers/microsoft/mssql/hooks/mssql.py @@ -17,8 +17,6 @@ # under the License. """Microsoft SQLServer hook module""" -import warnings - import pymssql from airflow.hooks.dbapi_hook import DbApiHook @@ -29,18 +27,10 @@ class MsSqlHook(DbApiHook): conn_name_attr = 'mssql_conn_id' default_conn_name = 'mssql_default' + conn_type = 'mssql' supports_autocommit = True def __init__(self, *args, **kwargs) -> None: - warnings.warn( - ( - "This class is deprecated and will be removed in Airflow 2.0.\n" - "pymssql is discontinued. See https://github.com/pymssql/pymssql/issues/668.\n" - "Please use `airflow.providers.odbc.hooks.odbc.OdbcHook`" - ), - DeprecationWarning, - stacklevel=2, - ) super().__init__(*args, **kwargs) self.schema = kwargs.pop("schema", None) diff --git a/airflow/providers/microsoft/mssql/provider.yaml b/airflow/providers/microsoft/mssql/provider.yaml index b6930e3488b6bb..36dc3152227b73 100644 --- a/airflow/providers/microsoft/mssql/provider.yaml +++ b/airflow/providers/microsoft/mssql/provider.yaml @@ -37,3 +37,6 @@ hooks: - integration-name: Microsoft SQL Server (MSSQL) python-modules: - airflow.providers.microsoft.mssql.hooks.mssql + +hook-class-names: + - airflow.providers.microsoft.mssql.hooks.mssql.MsSqlHook diff --git a/airflow/providers/mongo/hooks/mongo.py b/airflow/providers/mongo/hooks/mongo.py index d1fcc0b4da44fd..3eb0636ffa88c5 100644 --- a/airflow/providers/mongo/hooks/mongo.py +++ b/airflow/providers/mongo/hooks/mongo.py @@ -40,9 +40,11 @@ class MongoHook(BaseHook): {"srv": true, "replicaSet": "test", "ssl": true, "connectTimeoutMS": 30000} """ + conn_name_attr = 'conn_id' + default_conn_name = 'mongo_default' conn_type = 'mongo' - def __init__(self, conn_id: str = 'mongo_default', *args, **kwargs) -> None: + def __init__(self, conn_id: str = default_conn_name, *args, **kwargs) -> None: super().__init__() self.mongo_conn_id = conn_id diff --git a/airflow/providers/mongo/provider.yaml b/airflow/providers/mongo/provider.yaml index b2e5b906676440..3ae1561decc351 100644 --- a/airflow/providers/mongo/provider.yaml +++ b/airflow/providers/mongo/provider.yaml @@ -37,3 +37,6 @@ hooks: - integration-name: MongoDB python-modules: - airflow.providers.mongo.hooks.mongo + +hook-class-names: + - airflow.providers.mongo.hooks.mongo.MongoHook diff --git a/airflow/providers/mysql/hooks/mysql.py b/airflow/providers/mysql/hooks/mysql.py index 5eaa19d9195ea1..998e1efe89d142 100644 --- a/airflow/providers/mysql/hooks/mysql.py +++ b/airflow/providers/mysql/hooks/mysql.py @@ -41,6 +41,7 @@ class MySqlHook(DbApiHook): conn_name_attr = 'mysql_conn_id' default_conn_name = 'mysql_default' + conn_type = 'mysql' supports_autocommit = True def __init__(self, *args, **kwargs) -> None: diff --git a/airflow/providers/mysql/provider.yaml b/airflow/providers/mysql/provider.yaml index e6961874adbbe3..b5a9d98eea29d6 100644 --- a/airflow/providers/mysql/provider.yaml +++ b/airflow/providers/mysql/provider.yaml @@ -51,3 +51,6 @@ transfers: - source-integration-name: Snowflake target-integration-name: MySQL python-module: airflow.providers.mysql.transfers.presto_to_mysql + +hook-class-names: + - airflow.providers.mysql.hooks.mysql.MySqlHook diff --git a/airflow/providers/odbc/hooks/odbc.py b/airflow/providers/odbc/hooks/odbc.py index 43dee05c8bdbc8..89426a5dc10ae7 100644 --- a/airflow/providers/odbc/hooks/odbc.py +++ b/airflow/providers/odbc/hooks/odbc.py @@ -35,6 +35,7 @@ class OdbcHook(DbApiHook): DEFAULT_SQLALCHEMY_SCHEME = 'mssql+pyodbc' conn_name_attr = 'odbc_conn_id' default_conn_name = 'odbc_default' + conn_type = 'odbc' supports_autocommit = True def __init__( diff --git a/airflow/providers/odbc/provider.yaml b/airflow/providers/odbc/provider.yaml index f3e24209b245fd..907b0c1eae8b59 100644 --- a/airflow/providers/odbc/provider.yaml +++ b/airflow/providers/odbc/provider.yaml @@ -32,3 +32,6 @@ hooks: - integration-name: ODBC python-modules: - airflow.providers.odbc.hooks.odbc + +hook-class-names: + - airflow.providers.odbc.hooks.odbc.OdbcHook diff --git a/airflow/providers/oracle/hooks/oracle.py b/airflow/providers/oracle/hooks/oracle.py index 1acc28483c4191..7a993c4c6cd684 100644 --- a/airflow/providers/oracle/hooks/oracle.py +++ b/airflow/providers/oracle/hooks/oracle.py @@ -30,6 +30,7 @@ class OracleHook(DbApiHook): conn_name_attr = 'oracle_conn_id' default_conn_name = 'oracle_default' + conn_type = 'oracle' supports_autocommit = False # pylint: disable=c-extension-no-member diff --git a/airflow/providers/oracle/provider.yaml b/airflow/providers/oracle/provider.yaml index f3695fe3bb8f04..11f11246a530cb 100644 --- a/airflow/providers/oracle/provider.yaml +++ b/airflow/providers/oracle/provider.yaml @@ -32,11 +32,16 @@ operators: - integration-name: Oracle python-modules: - airflow.providers.oracle.operators.oracle + hooks: - integration-name: Oracle python-modules: - airflow.providers.oracle.hooks.oracle + transfers: - source-integration-name: Oracle target-integration-name: Oracle python-module: airflow.providers.oracle.transfers.oracle_to_oracle + +hook-class-names: + - airflow.providers.oracle.hooks.oracle.OracleHook diff --git a/airflow/providers/postgres/hooks/postgres.py b/airflow/providers/postgres/hooks/postgres.py index b207edbf651fed..6b5054d9d50b8b 100644 --- a/airflow/providers/postgres/hooks/postgres.py +++ b/airflow/providers/postgres/hooks/postgres.py @@ -57,6 +57,7 @@ class PostgresHook(DbApiHook): conn_name_attr = 'postgres_conn_id' default_conn_name = 'postgres_default' + conn_type = 'postgres' supports_autocommit = True def __init__(self, *args, **kwargs) -> None: diff --git a/airflow/providers/postgres/provider.yaml b/airflow/providers/postgres/provider.yaml index 55485c42527092..cc5922715bd3df 100644 --- a/airflow/providers/postgres/provider.yaml +++ b/airflow/providers/postgres/provider.yaml @@ -38,3 +38,6 @@ hooks: - integration-name: PostgreSQL python-modules: - airflow.providers.postgres.hooks.postgres + +hook-class-names: + - airflow.providers.postgres.hooks.postgres.PostgresHook diff --git a/airflow/providers/presto/hooks/presto.py b/airflow/providers/presto/hooks/presto.py index a5a0576e1b6709..045b7b5b864f50 100644 --- a/airflow/providers/presto/hooks/presto.py +++ b/airflow/providers/presto/hooks/presto.py @@ -55,6 +55,7 @@ class PrestoHook(DbApiHook): conn_name_attr = 'presto_conn_id' default_conn_name = 'presto_default' + conn_type = 'presto' def get_conn(self) -> Connection: """Returns a connection object""" diff --git a/airflow/providers/presto/provider.yaml b/airflow/providers/presto/provider.yaml index 8e8dccc67840c5..6896087d663f57 100644 --- a/airflow/providers/presto/provider.yaml +++ b/airflow/providers/presto/provider.yaml @@ -32,3 +32,6 @@ hooks: - integration-name: Presto python-modules: - airflow.providers.presto.hooks.presto + +hook-class-names: + - airflow.providers.presto.hooks.presto.PrestoHook diff --git a/airflow/providers/redis/hooks/redis.py b/airflow/providers/redis/hooks/redis.py index 340da6099a1b61..cc6b351fd7d2bf 100644 --- a/airflow/providers/redis/hooks/redis.py +++ b/airflow/providers/redis/hooks/redis.py @@ -31,7 +31,11 @@ class RedisHook(BaseHook): ``{"ssl": true, "ssl_cert_reqs": "require", "ssl_cert_file": "/path/to/cert.pem", etc}``. """ - def __init__(self, redis_conn_id: str = 'redis_default') -> None: + conn_name_attr = 'redis_conn_id' + default_conn_name = 'redis_default' + conn_type = 'redis' + + def __init__(self, redis_conn_id: str = default_conn_name) -> None: """ Prepares hook to connect to a Redis database. diff --git a/airflow/providers/redis/provider.yaml b/airflow/providers/redis/provider.yaml index 2a6df5bbe9d4e4..2b74a34b77436a 100644 --- a/airflow/providers/redis/provider.yaml +++ b/airflow/providers/redis/provider.yaml @@ -44,3 +44,6 @@ hooks: - integration-name: Redis python-modules: - airflow.providers.redis.hooks.redis + +hook-class-names: + - airflow.providers.redis.hooks.redis.RedisHook diff --git a/airflow/providers/salesforce/hooks/tableau.py b/airflow/providers/salesforce/hooks/tableau.py index bd47d10b3e67d8..56d812eeaec8de 100644 --- a/airflow/providers/salesforce/hooks/tableau.py +++ b/airflow/providers/salesforce/hooks/tableau.py @@ -51,7 +51,11 @@ class TableauHook(BaseHook): :type tableau_conn_id: str """ - def __init__(self, site_id: Optional[str] = None, tableau_conn_id: str = 'tableau_default') -> None: + conn_name_attr = 'tableau_conn_id' + default_conn_name = 'tableau_default' + conn_type = 'tableau' + + def __init__(self, site_id: Optional[str] = None, tableau_conn_id: str = default_conn_name) -> None: super().__init__() self.tableau_conn_id = tableau_conn_id self.conn = self.get_connection(self.tableau_conn_id) diff --git a/airflow/providers/salesforce/provider.yaml b/airflow/providers/salesforce/provider.yaml index 2651c4375bc526..ebc907e15f0224 100644 --- a/airflow/providers/salesforce/provider.yaml +++ b/airflow/providers/salesforce/provider.yaml @@ -44,3 +44,6 @@ hooks: python-modules: - airflow.providers.salesforce.hooks.salesforce - airflow.providers.salesforce.hooks.tableau + +hook-class-names: + - airflow.providers.salesforce.hooks.tableau.TableauHook diff --git a/airflow/providers/snowflake/hooks/snowflake.py b/airflow/providers/snowflake/hooks/snowflake.py index 6579f447b24fe1..33eaf720ce46f4 100644 --- a/airflow/providers/snowflake/hooks/snowflake.py +++ b/airflow/providers/snowflake/hooks/snowflake.py @@ -35,6 +35,7 @@ class SnowflakeHook(DbApiHook): conn_name_attr = 'snowflake_conn_id' default_conn_name = 'snowflake_default' + conn_type = 'snowflake' supports_autocommit = True def __init__(self, *args, **kwargs) -> None: diff --git a/airflow/providers/snowflake/provider.yaml b/airflow/providers/snowflake/provider.yaml index b30614590be8e7..5948338c321aeb 100644 --- a/airflow/providers/snowflake/provider.yaml +++ b/airflow/providers/snowflake/provider.yaml @@ -48,3 +48,6 @@ transfers: - source-integration-name: Snowflake target-integration-name: Slack python-module: airflow.providers.snowflake.transfers.snowflake_to_slack + +hook-class-names: + - airflow.providers.snowflake.hooks.snowflake.SnowflakeHook diff --git a/airflow/providers/sqlite/hooks/sqlite.py b/airflow/providers/sqlite/hooks/sqlite.py index 67b927304781fa..c1a41c6ef97308 100644 --- a/airflow/providers/sqlite/hooks/sqlite.py +++ b/airflow/providers/sqlite/hooks/sqlite.py @@ -26,6 +26,7 @@ class SqliteHook(DbApiHook): conn_name_attr = 'sqlite_conn_id' default_conn_name = 'sqlite_default' + conn_type = 'sqlite' def get_conn(self) -> sqlite3.dbapi2.Connection: """Returns a sqlite connection object""" diff --git a/airflow/providers/sqlite/provider.yaml b/airflow/providers/sqlite/provider.yaml index 326d8f1c182043..55586204088afa 100644 --- a/airflow/providers/sqlite/provider.yaml +++ b/airflow/providers/sqlite/provider.yaml @@ -38,3 +38,6 @@ hooks: - integration-name: SQLite python-modules: - airflow.providers.sqlite.hooks.sqlite + +hook-class-names: + - airflow.providers.sqlite.hooks.sqlite.SqliteHook diff --git a/airflow/providers/vertica/hooks/vertica.py b/airflow/providers/vertica/hooks/vertica.py index acb86a5f6f456c..071409d0718527 100644 --- a/airflow/providers/vertica/hooks/vertica.py +++ b/airflow/providers/vertica/hooks/vertica.py @@ -27,6 +27,7 @@ class VerticaHook(DbApiHook): conn_name_attr = 'vertica_conn_id' default_conn_name = 'vertica_default' + conn_type = 'vertica' supports_autocommit = True def get_conn(self) -> connect: diff --git a/airflow/providers/vertica/provider.yaml b/airflow/providers/vertica/provider.yaml index 4ddb8c723b84c8..a63fdc4f7afb03 100644 --- a/airflow/providers/vertica/provider.yaml +++ b/airflow/providers/vertica/provider.yaml @@ -38,3 +38,6 @@ hooks: - integration-name: Vertica python-modules: - airflow.providers.vertica.hooks.vertica + +hook-class-names: + - airflow.providers.vertica.hooks.vertica.VerticaHook diff --git a/airflow/providers_manager.py b/airflow/providers_manager.py index 44821f7d90a3dc..e0abb2d1a61c5b 100644 --- a/airflow/providers_manager.py +++ b/airflow/providers_manager.py @@ -17,6 +17,7 @@ # under the License. """Manages all providers.""" import fnmatch +import importlib import json import logging import os @@ -26,6 +27,8 @@ import jsonschema import yaml +from airflow.utils.entry_points_with_dist import entry_points_with_dist + try: import importlib.resources as importlib_resources except ImportError: @@ -60,8 +63,11 @@ def __new__(cls): return cls._instance def __init__(self): - # Keeps list of providers keyed by module name and value is Tuple: version, provider_info + # Keeps dict of providers keyed by module name and value is Tuple: version, provider_info self._provider_dict: Dict[str, Tuple[str, Dict]] = {} + # Keeps dict of hooks keyed by connection type and value is + # Tuple: connection class, connection_id_attribute_name + self._hooks_dict: Dict[str, Tuple[str, str]] = {} self._validator = _create_validator() # Local source folders are loaded first. They should take precedence over the package ones for # Development purpose. In production provider.yaml files are not present in the 'airflow" directory @@ -69,19 +75,9 @@ def __init__(self): # in case of local development self._discover_all_airflow_builtin_providers_from_local_sources() self._discover_all_providers_from_packages() - self._sort_provider_dictionary() - - def _sort_provider_dictionary(self): - """ - Sort provider_dictionary using OrderedDict. - - The dictionary gets sorted so that when you iterate through it, the providers are by - default returned in alphabetical order. - """ - sorted_dict = OrderedDict() - for provider_name in sorted(self._provider_dict.keys()): - sorted_dict[provider_name] = self._provider_dict[provider_name] - self._provider_dict = sorted_dict + self._discover_hooks() + self._provider_dict = OrderedDict(sorted(self._provider_dict.items())) + self._hooks_dict = OrderedDict(sorted(self._hooks_dict.items())) def _discover_all_providers_from_packages(self) -> None: """ @@ -89,8 +85,6 @@ def _discover_all_providers_from_packages(self) -> None: via the 'apache_airflow_provider' entrypoint as a dictionary conforming to the 'airflow/provider.yaml.schema.json' schema. """ - from airflow.plugins_manager import entry_points_with_dist - for (entry_point, dist) in entry_points_with_dist('apache_airflow_provider'): package_name = dist.metadata['name'] log.debug("Loading %s from package %s", entry_point, package_name) @@ -101,8 +95,7 @@ def _discover_all_providers_from_packages(self) -> None: if package_name != provider_info_package_name: raise Exception( f"The package '{package_name}' from setuptools and " - f"{provider_info_package_name} do not match. Please make sure they are" - f"aligned" + f"{provider_info_package_name} do not match. Please make sure they are aligned" ) if package_name not in self._provider_dict: self._provider_dict[package_name] = (version, provider_info) @@ -171,7 +164,72 @@ def _add_provider_info_from_local_source_file(self, path, package_name) -> None: except Exception as e: # noqa pylint: disable=broad-except log.warning("Error when loading '%s': %s", path, e) + def _discover_hooks(self) -> None: + """Retrieves all connections defined in the providers""" + for name, provider in self._provider_dict.items(): + provider_package = name + hook_class_names = provider[1].get("hook-class-names") + if hook_class_names: + for hook_class_name in hook_class_names: + self._add_hook(hook_class_name, provider_package) + + def _add_hook(self, hook_class_name, provider_package) -> None: + """ + Adds hook class name to list of hooks + + :param hook_class_name: name of the Hook class + :param provider_package: provider package adding the hook + """ + if provider_package.startswith("apache-airflow"): + provider_path = provider_package[len("apache-") :].replace("-", ".") + if not hook_class_name.startswith(provider_path): + log.warning( + "Sanity check failed when importing '%s' from '%s' package. It should start with '%s'", + hook_class_name, + provider_package, + provider_path, + ) + return + if hook_class_name in self._hooks_dict: + log.warning( + "The hook_class '%s' has been already registered.", + hook_class_name, + ) + return + try: + module, class_name = hook_class_name.rsplit('.', maxsplit=1) + hook_class = getattr(importlib.import_module(module), class_name) + except Exception as e: # noqa pylint: disable=broad-except + log.warning( + "Exception when importing '%s' from '%s' package: %s", + hook_class_name, + provider_package, + e, + ) + return + conn_type = getattr(hook_class, 'conn_type') + if not conn_type: + log.warning( + "The hook_class '%s' is missing connection_type attribute and cannot be registered", + hook_class, + ) + return + connection_id_attribute_name = getattr(hook_class, 'conn_name_attr') + if not connection_id_attribute_name: + log.warning( + "The hook_class '%s' is missing conn_name_attr attribute and cannot be registered", + hook_class, + ) + return + + self._hooks_dict[conn_type] = (hook_class_name, connection_id_attribute_name) + @property def providers(self): """Returns information about available providers.""" return self._provider_dict + + @property + def hooks(self): + """Returns dictionary of connection_type-to-hook mapping""" + return self._hooks_dict diff --git a/airflow/utils/entry_points_with_dist.py b/airflow/utils/entry_points_with_dist.py new file mode 100644 index 00000000000000..062aad8c7eb516 --- /dev/null +++ b/airflow/utils/entry_points_with_dist.py @@ -0,0 +1,35 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import importlib_metadata + + +def entry_points_with_dist(group: str): + """ + Return EntryPoint objects of the given group, along with the distribution information. + + This is like the ``entry_points()`` function from importlib.metadata, + except it also returns the distribution the entry_point was loaded from. + + :param group: FIlter results to only this entrypoint group + :return: Generator of (EntryPoint, Distribution) objects for the specified groups + """ + for dist in importlib_metadata.distributions(): + for e in dist.entry_points: + if e.group != group: + continue + yield e, dist diff --git a/scripts/ci/pre_commit/pre_commit_check_provider_yaml_files.py b/scripts/ci/pre_commit/pre_commit_check_provider_yaml_files.py index 63e918a957e4b1..5e58b7592a8ccc 100755 --- a/scripts/ci/pre_commit/pre_commit_check_provider_yaml_files.py +++ b/scripts/ci/pre_commit/pre_commit_check_provider_yaml_files.py @@ -23,7 +23,7 @@ from collections import Counter from glob import glob from itertools import chain, product -from typing import Any, Dict, Iterable +from typing import Any, Dict, Iterable, List import jsonschema import yaml @@ -39,6 +39,8 @@ PROVIDER_DATA_SCHEMA_PATH = os.path.join(ROOT_DIR, "airflow", "provider.yaml.schema.json") CORE_INTEGRATIONS = ["SQL", "Local"] +errors = [] + def _filepath_to_module(filepath: str): filepath = os.path.relpath(os.path.abspath(filepath), ROOT_DIR) @@ -124,21 +126,39 @@ def assert_sets_equal(set1, set2): raise AssertionError(standard_msg) +def check_if_objects_belongs_to_package( + object_names: List[str], provider_package: str, yaml_file_path: str, resource_type: str +): + for object_name in object_names: + if not object_name.startswith(provider_package): + errors.append( + f"The `{object_name}` object in {resource_type} list in {yaml_file_path} does not start" + f" with the expected {provider_package}." + ) + + +def parse_module_data(provider_data, resource_type, yaml_file_path): + package_dir = ROOT_DIR + "/" + os.path.dirname(yaml_file_path) + provider_package = os.path.dirname(yaml_file_path).replace(os.sep, ".") + py_files = chain( + glob(f"{package_dir}/**/{resource_type}/*.py"), glob(f"{package_dir}/{resource_type}/*.py") + ) + expected_modules = {_filepath_to_module(f) for f in py_files if not f.endswith("/__init__.py")} + resource_data = provider_data.get(resource_type, []) + return expected_modules, provider_package, resource_data + + def check_completeness_of_list_of_hooks_sensors_hooks(yaml_files: Dict[str, Dict]): print("Checking completeness of list of {sensors, hooks, operators}") - errors = [] for (yaml_file_path, provider_data), resource_type in product( yaml_files.items(), ["sensors", "operators", "hooks"] ): - package_dir = ROOT_DIR + "/" + os.path.dirname(yaml_file_path) - py_files = chain( - glob(f"{package_dir}/**/{resource_type}/*.py"), glob(f"{package_dir}/{resource_type}/*.py") + expected_modules, provider_package, resource_data = parse_module_data( + provider_data, resource_type, yaml_file_path ) - expected_modules = {_filepath_to_module(f) for f in py_files if not f.endswith("/__init__.py")} - - resource_data = provider_data.get(resource_type, []) - current_modules = {i for r in resource_data for i in r.get('python-modules', [])} + current_modules = {str(i) for r in resource_data for i in r.get('python-modules', [])} + check_if_objects_belongs_to_package(current_modules, provider_package, yaml_file_path, resource_type) try: assert_sets_equal(set(expected_modules), set(current_modules)) except AssertionError as ex: @@ -147,17 +167,10 @@ def check_completeness_of_list_of_hooks_sensors_hooks(yaml_files: Dict[str, Dict f"Incorrect content of key '{resource_type}/python-modules' " f"in file: {yaml_file_path}\n{nested_error}" ) - if errors: - print(f"Found {len(errors)} errors") - for error in errors: - print(error) - print() - sys.exit(1) def check_duplicates_in_integrations_names_of_hooks_sensors_operators(yaml_files: Dict[str, Dict]): print("Checking for duplicates in list of {sensors, hooks, operators}") - errors = [] for (yaml_file_path, provider_data), resource_type in product( yaml_files.items(), ["sensors", "operators", "hooks"] ): @@ -171,28 +184,17 @@ def check_duplicates_in_integrations_names_of_hooks_sensors_operators(yaml_files f"in file: {yaml_file_path}" ) - if errors: - print(f"Found {len(errors)} errors") - for error in errors: - print(error) - print() - sys.exit(1) - def check_completeness_of_list_of_transfers(yaml_files: Dict[str, Dict]): print("Checking completeness of list of transfers") - errors = [] resource_type = 'transfers' for yaml_file_path, provider_data in yaml_files.items(): - package_dir = ROOT_DIR + "/" + os.path.dirname(yaml_file_path) - py_files = chain( - glob(f"{package_dir}/**/{resource_type}/*.py"), glob(f"{package_dir}/{resource_type}/*.py") + expected_modules, provider_package, resource_data = parse_module_data( + provider_data, resource_type, yaml_file_path ) - expected_modules = {_filepath_to_module(f) for f in py_files if not f.endswith("/__init__.py")} - - resource_data = provider_data.get(resource_type, []) current_modules = {r.get('python-module') for r in resource_data} + check_if_objects_belongs_to_package(current_modules, provider_package, yaml_file_path, resource_type) try: assert_sets_equal(set(expected_modules), set(current_modules)) except AssertionError as ex: @@ -201,12 +203,18 @@ def check_completeness_of_list_of_transfers(yaml_files: Dict[str, Dict]): f"Incorrect content of key '{resource_type}/python-module' " f"in file: {yaml_file_path}\n{nested_error}" ) - if errors: - print(f"Found {len(errors)} errors") - for error in errors: - print(error) - print() - sys.exit(1) + + +def check_hook_classes(yaml_files: Dict[str, Dict]): + print("Checking connection classes belong to package") + resource_type = 'hook-class-names' + for yaml_file_path, provider_data in yaml_files.items(): + provider_package = os.path.dirname(yaml_file_path).replace(os.sep, ".") + hook_class_names = provider_data.get(resource_type) + if hook_class_names: + check_if_objects_belongs_to_package( + hook_class_names, provider_package, yaml_file_path, resource_type + ) def check_duplicates_in_list_of_transfers(yaml_files: Dict[str, Dict]): @@ -230,17 +238,9 @@ def check_duplicates_in_list_of_transfers(yaml_files: Dict[str, Dict]): f"in file: {yaml_file_path}" ) - if errors: - print(f"Found {len(errors)} errors") - for error in errors: - print(error) - print() - sys.exit(1) - def check_invalid_integration(yaml_files: Dict[str, Dict]): print("Detect unregistered integrations") - errors = [] all_integration_names = set(get_all_integration_names(yaml_files)) for (yaml_file_path, provider_data), resource_type in product( @@ -267,13 +267,6 @@ def check_invalid_integration(yaml_files: Dict[str, Dict]): f"Invalid values: {invalid_names}" ) - if errors: - print(f"Found {len(errors)} errors") - for error in errors: - print(error) - print() - sys.exit(1) - # TODO: Delete after splitting the documentation for each provider. DOC_FILES_EXCLUDE_LIST = { @@ -335,8 +328,16 @@ def check_doc_files(yaml_files: Dict[str, Dict]): check_completeness_of_list_of_transfers(all_parsed_yaml_files) check_duplicates_in_list_of_transfers(all_parsed_yaml_files) + check_hook_classes(all_parsed_yaml_files) if all_files_loaded: # Only check those if all provider files are loaded check_doc_files(all_parsed_yaml_files) check_invalid_integration(all_parsed_yaml_files) + + if errors: + print(f"Found {len(errors)} errors") + for error in errors: + print(error) + print() + sys.exit(1) diff --git a/scripts/in_container/run_install_and_test_provider_packages.sh b/scripts/in_container/run_install_and_test_provider_packages.sh index 8c0a2b296d4df7..b151505813cfd4 100755 --- a/scripts/in_container/run_install_and_test_provider_packages.sh +++ b/scripts/in_container/run_install_and_test_provider_packages.sh @@ -93,6 +93,26 @@ function discover_all_provider_packages() { fi } +function discover_all_hooks() { + echo + echo Listing available hooks via 'airflow providers hooks' + echo + + airflow providers hooks + + local expected_number_of_hooks=33 + local actual_number_of_hooks + actual_number_of_hooks=$(airflow providers hooks --output simple | grep -c conn_id | xargs) + if [[ ${actual_number_of_hooks} != "${expected_number_of_hooks}" ]]; then + >&2 echo "ERROR! Number of hooks registered is wrong!" + >&2 echo "Expected number was '${expected_number_of_hooks}' and got '${actual_number_of_hooks}'" + >&2 echo + >&2 echo "Either increase the number of hooks if you added one or fix problem with imports if you see one." + >&2 echo + fi +} + if [[ ${BACKPORT_PACKAGES} != "true" ]]; then discover_all_provider_packages + discover_all_hooks fi diff --git a/tests/core/test_providers_manager.py b/tests/core/test_providers_manager.py index 8459e77712009c..69e2027b003ac2 100644 --- a/tests/core/test_providers_manager.py +++ b/tests/core/test_providers_manager.py @@ -82,6 +82,42 @@ 'apache-airflow-providers-zendesk', ] +CONNECTIONS_LIST = [ + 'azure_batch', + 'azure_cosmos', + 'azure_data_lake', + 'cassandra', + 'cloudant', + 'dataprep', + 'docker', + 'elasticsearch', + 'exasol', + 'gcpcloudsql', + 'gcpssh', + 'google_cloud_platform', + 'grpc', + 'hive_cli', + 'hiveserver2', + 'imap', + 'jdbc', + 'jira', + 'kubernetes', + 'mongo', + 'mssql', + 'mysql', + 'odbc', + 'oracle', + 'pig_cli', + 'postgres', + 'presto', + 'redis', + 'snowflake', + 'sqlite', + 'tableau', + 'vertica', + 'wasb', +] + class TestProviderManager(unittest.TestCase): def test_providers_are_loaded(self): @@ -95,3 +131,8 @@ def test_providers_are_loaded(self): self.assertEqual(package_name, provider) self.assertEqual(ALL_PROVIDERS, provider_list) + + def test_hooks(self): + provider_manager = ProvidersManager() + connections_list = list(provider_manager.hooks.keys()) + self.assertEqual(CONNECTIONS_LIST, connections_list) diff --git a/tests/models/test_connection.py b/tests/models/test_connection.py index fbd275d01164c8..c43c85c8a915e5 100644 --- a/tests/models/test_connection.py +++ b/tests/models/test_connection.py @@ -28,9 +28,7 @@ from airflow import AirflowException from airflow.hooks.base_hook import BaseHook from airflow.models import Connection, crypto -from airflow.models.connection import CONN_TYPE_TO_HOOK from airflow.providers.sqlite.hooks.sqlite import SqliteHook -from airflow.utils.module_loading import import_string from tests.test_utils.config import conf_vars ConnectionParts = namedtuple("ConnectionParts", ["conn_type", "login", "password", "host", "port", "schema"]) @@ -549,15 +547,3 @@ def test_connection_mixed(self): ), ): Connection(conn_id="TEST_ID", uri="mysql://", schema="AAA") - - -class TestConnTypeToHook(unittest.TestCase): - def test_enforce_alphabetical_order(self): - current_keys = list(CONN_TYPE_TO_HOOK.keys()) - expected_keys = sorted(current_keys) - - self.assertEqual(expected_keys, current_keys) - - def test_hooks_importable(self): - for hook_path, _ in CONN_TYPE_TO_HOOK.values(): - self.assertTrue(issubclass(import_string(hook_path), BaseHook)) From a6cf883513280de2df80989b81eb65065ec761d1 Mon Sep 17 00:00:00 2001 From: Jarek Potiuk Date: Sun, 29 Nov 2020 11:39:42 +0100 Subject: [PATCH 2/3] fixup! Adds support for Hook discovery from providers --- airflow/providers_manager.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/airflow/providers_manager.py b/airflow/providers_manager.py index e0abb2d1a61c5b..bd3b470a0e5344 100644 --- a/airflow/providers_manager.py +++ b/airflow/providers_manager.py @@ -76,8 +76,8 @@ def __init__(self): self._discover_all_airflow_builtin_providers_from_local_sources() self._discover_all_providers_from_packages() self._discover_hooks() - self._provider_dict = OrderedDict(sorted(self._provider_dict.items())) - self._hooks_dict = OrderedDict(sorted(self._hooks_dict.items())) + self._provider_dict = OrderedDict(sorted(self.providers.items())) + self._hooks_dict = OrderedDict(sorted(self.providers.items())) def _discover_all_providers_from_packages(self) -> None: """ @@ -85,7 +85,7 @@ def _discover_all_providers_from_packages(self) -> None: via the 'apache_airflow_provider' entrypoint as a dictionary conforming to the 'airflow/provider.yaml.schema.json' schema. """ - for (entry_point, dist) in entry_points_with_dist('apache_airflow_provider'): + for entry_point, dist in entry_points_with_dist('apache_airflow_provider'): package_name = dist.metadata['name'] log.debug("Loading %s from package %s", entry_point, package_name) version = dist.version From 034c19e23cbd96c064866eae0ab99ffa6e33189c Mon Sep 17 00:00:00 2001 From: Jarek Potiuk Date: Sun, 29 Nov 2020 12:59:14 +0100 Subject: [PATCH 3/3] fixup! fixup! Adds support for Hook discovery from providers --- airflow/plugins_manager.py | 2 +- airflow/providers_manager.py | 4 ++-- airflow/utils/{entry_points_with_dist.py => entry_points.py} | 0 3 files changed, 3 insertions(+), 3 deletions(-) rename airflow/utils/{entry_points_with_dist.py => entry_points.py} (100%) diff --git a/airflow/plugins_manager.py b/airflow/plugins_manager.py index b69aaf40e555c1..5c8c0f3a772d5e 100644 --- a/airflow/plugins_manager.py +++ b/airflow/plugins_manager.py @@ -30,7 +30,7 @@ import importlib_metadata from airflow import settings -from airflow.utils.entry_points_with_dist import entry_points_with_dist +from airflow.utils.entry_points import entry_points_with_dist from airflow.utils.file import find_path_from_directory if TYPE_CHECKING: diff --git a/airflow/providers_manager.py b/airflow/providers_manager.py index bd3b470a0e5344..f30d433cb0ea32 100644 --- a/airflow/providers_manager.py +++ b/airflow/providers_manager.py @@ -27,7 +27,7 @@ import jsonschema import yaml -from airflow.utils.entry_points_with_dist import entry_points_with_dist +from airflow.utils.entry_points import entry_points_with_dist try: import importlib.resources as importlib_resources @@ -77,7 +77,7 @@ def __init__(self): self._discover_all_providers_from_packages() self._discover_hooks() self._provider_dict = OrderedDict(sorted(self.providers.items())) - self._hooks_dict = OrderedDict(sorted(self.providers.items())) + self._hooks_dict = OrderedDict(sorted(self.hooks.items())) def _discover_all_providers_from_packages(self) -> None: """ diff --git a/airflow/utils/entry_points_with_dist.py b/airflow/utils/entry_points.py similarity index 100% rename from airflow/utils/entry_points_with_dist.py rename to airflow/utils/entry_points.py