Skip to content

Commit

Permalink
Adds support for Connection discovery from providers
Browse files Browse the repository at this point in the history
This PR extends providers discovery with the mechanism
of retrieving mapping of connections from type to hook.

Fixes #12465
  • Loading branch information
potiuk committed Nov 19, 2020
1 parent d84a52d commit ca1ac78
Show file tree
Hide file tree
Showing 67 changed files with 902 additions and 488 deletions.
1 change: 0 additions & 1 deletion airflow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ def __getattr__(name):
from airflow.models.dag import DAG
from airflow.exceptions import AirflowException


if not PY37:
from pep562 import Pep562

Expand Down
79 changes: 17 additions & 62 deletions airflow/models/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@

import json
import warnings
from functools import lru_cache
from json import JSONDecodeError
from typing import Dict, List, Optional
from typing import Dict, List, Optional, Tuple
from urllib.parse import parse_qsl, quote, unquote, urlencode, urlparse

from sqlalchemy import Boolean, Column, Integer, String
Expand All @@ -30,69 +31,22 @@
from airflow.exceptions import AirflowException, AirflowNotFoundException
from airflow.models.base import ID_LEN, Base
from airflow.models.crypto import get_fernet
from airflow.providers_manager import get_providers_manager
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.module_loading import import_string

# A map that assigns a connection type to a tuple that contains
# the path of the class and the name of the conn_id key parameter.
# PLEASE KEEP BELOW LIST IN ALPHABETICAL ORDER.
CONN_TYPE_TO_HOOK = {
"azure_batch": (
"airflow.providers.microsoft.azure.hooks.azure_batch.AzureBatchHook",
"azure_batch_conn_id",
),
"azure_cosmos": (
"airflow.providers.microsoft.azure.hooks.azure_cosmos.AzureCosmosDBHook",
"azure_cosmos_conn_id",
),
"azure_data_lake": (
"airflow.providers.microsoft.azure.hooks.azure_data_lake.AzureDataLakeHook",
"azure_data_lake_conn_id",
),
"cassandra": ("airflow.providers.apache.cassandra.hooks.cassandra.CassandraHook", "cassandra_conn_id"),
"cloudant": ("airflow.providers.cloudant.hooks.cloudant.CloudantHook", "cloudant_conn_id"),
"dataprep": ("airflow.providers.google.cloud.hooks.dataprep.GoogleDataprepHook", "dataprep_default"),
"docker": ("airflow.providers.docker.hooks.docker.DockerHook", "docker_conn_id"),
"elasticsearch": (
"airflow.providers.elasticsearch.hooks.elasticsearch.ElasticsearchHook",
"elasticsearch_conn_id",
),
"exasol": ("airflow.providers.exasol.hooks.exasol.ExasolHook", "exasol_conn_id"),
"gcpcloudsql": (
"airflow.providers.google.cloud.hooks.cloud_sql.CloudSQLDatabaseHook",
"gcp_cloudsql_conn_id",
),
"gcpssh": (
"airflow.providers.google.cloud.hooks.compute_ssh.ComputeEngineSSHHook",
"gcp_conn_id",
),
"google_cloud_platform": (
"airflow.providers.google.cloud.hooks.bigquery.BigQueryHook",
"bigquery_conn_id",
),
"grpc": ("airflow.providers.grpc.hooks.grpc.GrpcHook", "grpc_conn_id"),
"hive_cli": ("airflow.providers.apache.hive.hooks.hive.HiveCliHook", "hive_cli_conn_id"),
"hiveserver2": ("airflow.providers.apache.hive.hooks.hive.HiveServer2Hook", "hiveserver2_conn_id"),
"imap": ("airflow.providers.imap.hooks.imap.ImapHook", "imap_conn_id"),
"jdbc": ("airflow.providers.jdbc.hooks.jdbc.JdbcHook", "jdbc_conn_id"),
"jira": ("airflow.providers.jira.hooks.jira.JiraHook", "jira_conn_id"),
"kubernetes": ("airflow.providers.cncf.kubernetes.hooks.kubernetes.KubernetesHook", "kubernetes_conn_id"),
"mongo": ("airflow.providers.mongo.hooks.mongo.MongoHook", "conn_id"),
"mssql": ("airflow.providers.odbc.hooks.odbc.OdbcHook", "odbc_conn_id"),
"mysql": ("airflow.providers.mysql.hooks.mysql.MySqlHook", "mysql_conn_id"),
"odbc": ("airflow.providers.odbc.hooks.odbc.OdbcHook", "odbc_conn_id"),
"oracle": ("airflow.providers.oracle.hooks.oracle.OracleHook", "oracle_conn_id"),
"pig_cli": ("airflow.providers.apache.pig.hooks.pig.PigCliHook", "pig_cli_conn_id"),
"postgres": ("airflow.providers.postgres.hooks.postgres.PostgresHook", "postgres_conn_id"),
"presto": ("airflow.providers.presto.hooks.presto.PrestoHook", "presto_conn_id"),
"redis": ("airflow.providers.redis.hooks.redis.RedisHook", "redis_conn_id"),
"snowflake": ("airflow.providers.snowflake.hooks.snowflake.SnowflakeHook", "snowflake_conn_id"),
"sqlite": ("airflow.providers.sqlite.hooks.sqlite.SqliteHook", "sqlite_conn_id"),
"tableau": ("airflow.providers.salesforce.hooks.tableau.TableauHook", "tableau_conn_id"),
"vertica": ("airflow.providers.vertica.hooks.vertica.VerticaHook", "vertica_conn_id"),
"wasb": ("airflow.providers.microsoft.azure.hooks.wasb.WasbHook", "wasb_conn_id"),
}
# PLEASE KEEP ABOVE LIST IN ALPHABETICAL ORDER.
cache = lru_cache(maxsize=None)


@cache
def get_conn_type_to_hook() -> Dict[str, Tuple[str, str]]:
"""
Return A map that assigns a connection type to a tuple that contains
the path of the class and the name of the conn_id key parameter.
:return: The map of connection type to hook
"""
return get_providers_manager().connections


def parse_netloc_to_hostname(*args, **kwargs):
Expand Down Expand Up @@ -326,7 +280,8 @@ def rotate_fernet_key(self):

def get_hook(self):
"""Return hook based on conn_type."""
hook_class_name, conn_id_param = CONN_TYPE_TO_HOOK.get(self.conn_type, (None, None))
hook_class_name, conn_id_param = get_conn_type_to_hook().get(self.conn_type, (None, None))

if not hook_class_name:
raise AirflowException(f'Unknown hook type "{self.conn_type}"')
hook_class = import_string(hook_class_name)
Expand Down
45 changes: 40 additions & 5 deletions airflow/provider.yaml.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@
"type": "string"
}
},
"provider-package": {
"description": "Package of the provider. All classes and resources of this provider must reside within this package.",
"type": "string"
},

"integrations": {
"type": "array",
"items": {
Expand Down Expand Up @@ -75,10 +80,12 @@
"description": "Integration name. It must have a matching item in the 'integration' section of any provider."
},
"python-modules": {
"description": "Name of the module - relative to the provider package. Must start with .",
"type": "array",
"items": {
"type": "string"
}
},
"pattern": "^\\..*$"
}
},
"additionalProperties": false,
Expand All @@ -98,9 +105,11 @@
"description": "Integration name. It must have a matching item in the 'integration' section of any provider."
},
"python-modules": {
"description": "Name of the module - relative to the provider package. Must start with .",
"type": "array",
"items": {
"type": "string"
"type": "string",
"pattern": "^\\..*$"
}
}
},
Expand All @@ -121,9 +130,11 @@
"description": "Integration name. It must have a matching item in the 'integration' section of any provider."
},
"python-modules": {
"description": "Name of the module - relative to the provider package. Must start with .",
"type": "array",
"items": {
"type": "string"
"type": "string",
"pattern": "^\\..*$"
}
}
},
Expand Down Expand Up @@ -153,7 +164,8 @@
},
"python-module": {
"type": "string",
"description": "Source integration name. It must have a matching item in the 'integration' section of any provider."
"description": "Name of the module - relative to the provider package. Must start with .",
"pattern": "^\\..*$"
}
},
"additionalProperties": false,
Expand All @@ -163,12 +175,35 @@
"python-module"
]
}
},
"connections": {
"type": "array",
"items": {
"type": "object",
"properties": {
"connection-type": {
"description": "Type of connection that the provider has",
"type": "string"
},
"hook-class": {
"description": "Path of the class that the provider adds. Must start with . and be relative to the provider package defined in providerPackage field.",
"type": "string",
"pattern": "^\\..*$"
},
"connection-id-parameter-name": {
"description": "Name of the parameter to pass to the hook.",
"type": "string"
}
},
"required": ["connection-type", "hook-class", "connection-id-parameter-name"]
}
}
},
"additionalProperties": false,
"required": [
"package-name",
"description",
"versions"
"versions",
"provider-package"
]
}
Loading

0 comments on commit ca1ac78

Please sign in to comment.