Skip to content

Commit

Permalink
feat(ingest): platform_instance detection in method plus tests
Browse files Browse the repository at this point in the history
  • Loading branch information
k-popov committed Jul 31, 2023
1 parent 6bf21f1 commit 88acb48
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 15 deletions.
50 changes: 35 additions & 15 deletions metadata-ingestion/src/datahub/ingestion/source/metabase.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from datetime import datetime, timezone
from functools import lru_cache
from typing import Dict, Iterable, List, Optional
from typing import Dict, Iterable, List, Optional, Union

import dateutil.parser as dp
import pydantic
Expand Down Expand Up @@ -126,7 +126,9 @@ def __init__(self, ctx: PipelineContext, config: MetabaseConfig):
super().__init__(ctx)
self.config = config
self.report = SourceReport()
self.setup_session()

def setup_session(self) -> None:
login_response = requests.post(
f"{self.config.connect_uri}/api/session",
None,
Expand Down Expand Up @@ -538,6 +540,36 @@ def get_source_table_from_id(self, table_id):

return None, None

@lru_cache(maxsize=None)
def get_platform_instance(
self, platform: Union[str, None] = None, datasource_id: Union[int, None] = None
) -> Union[str, None]:
"""
Method will attempt to detect `platform_instance` by checking
`database_id_to_instance_map` and `platform_instance_map` mappings.
If `database_id_to_instance_map` is defined it is first checked for
`datasource_id` extracted from Metabase. If this mapping is not defined
or corresponding key is not found, `platform_instance_map` mapping
is checked for datasource platform. If no mapping found `None`
is returned.
:param str platform: DataHub platform name (e.g. `postgres` or `clickhouse`)
:param int datasource_id: Numeric datasource ID received from Metabase API
:return: platform instance name or None
"""
platform_instance = None
# For cases when metabase has several platform instances (e.g. several individual ClickHouse clusters)
if datasource_id is not None and self.config.database_id_to_instance_map:
platform_instance = self.config.database_id_to_instance_map.get(
str(datasource_id)
)

# If Metabase datasource ID is not mapped to platform instace, fall back to platform mapping
# Set platform_instance if configuration provides a mapping from platform name to instance
if platform and self.config.platform_instance_map and platform_instance is None:
platform_instance = self.config.platform_instance_map.get(platform)

return platform_instance

@lru_cache(maxsize=None)
def get_datasource_from_id(self, datasource_id):
try:
Expand Down Expand Up @@ -578,20 +610,8 @@ def get_datasource_from_id(self, datasource_id):
reason=f"Platform was not found in DataHub. Using {platform} name as is",
)

# For cases when metabase has several platform instances (e.g. several individual ClickHouse clusters)
datasource_id_in_metabase = dataset_json.get("id")
platform_instance = (
self.config.database_id_to_instance_map.get(str(datasource_id_in_metabase))
if datasource_id_in_metabase and self.config.database_id_to_instance_map
else None
)

# If Metabase datasource ID is not mapped to platform instace, fall back to platform mapping
# Set platform_instance if configuration provides a mapping from platform name to instance
platform_instance = (
self.config.platform_instance_map.get(platform)
if self.config.platform_instance_map and platform_instance is None
else None
platform_instance = self.get_platform_instance(
platform, dataset_json.get("id", None)
)

field_for_dbname_mapping = {
Expand Down
42 changes: 42 additions & 0 deletions metadata-ingestion/tests/unit/test_metabase_source.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.source import SourceReport
from datahub.ingestion.source.metabase import MetabaseConfig, MetabaseSource


class TestMetabaseSource(MetabaseSource):
def __init__(self, ctx: PipelineContext, config: MetabaseConfig):
self.config = config
self.report = SourceReport()


def test_get_platform_instance():
ctx = PipelineContext(run_id="test-metabase")
config = MetabaseConfig()
config.connect_uri = "http://localhost:3000"
# config.database_id_to_instance_map = {"42": "my_main_clickhouse"}
# config.platform_instance_map = {"clickhouse": "my_only_clickhouse"}
metabase = TestMetabaseSource(ctx, config)

# no mappings defined
assert metabase.get_platform_instance("clickhouse", 42) is None

# database_id_to_instance_map is defined, key is present
metabase.config.database_id_to_instance_map = {"42": "my_main_clickhouse"}
assert metabase.get_platform_instance(None, 42) == "my_main_clickhouse"

# database_id_to_instance_map is defined, key is missing
assert metabase.get_platform_instance(None, 999) is None

# database_id_to_instance_map is defined, key is missing, platform_instance_map is defined and key present
metabase.config.platform_instance_map = {"clickhouse": "my_only_clickhouse"}
assert metabase.get_platform_instance("clickhouse", 999) == "my_only_clickhouse"

# database_id_to_instance_map is defined, key is missing, platform_instance_map is defined and key missing
assert metabase.get_platform_instance("missing-platform", 999) is None

# database_id_to_instance_map is missing, platform_instance_map is defined and key present
metabase.config.database_id_to_instance_map = None
assert metabase.get_platform_instance("clickhouse", 999) == "my_only_clickhouse"

# database_id_to_instance_map is missing, platform_instance_map is defined and key missing
assert metabase.get_platform_instance("missing-platform", 999) is None

0 comments on commit 88acb48

Please sign in to comment.