Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingest): Ingest Previews for Looker Charts, Dashboards, and Explores #6941

Merged
8 changes: 8 additions & 0 deletions metadata-ingestion/src/datahub/emitter/mcp_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from datahub.metadata.schema_classes import (
ContainerClass,
DomainsClass,
EmbedClass,
GlobalTagsClass,
MetadataChangeEventClass,
OwnerClass,
Expand Down Expand Up @@ -309,3 +310,10 @@ def mcps_from_mce(
aspect=aspect,
systemMetadata=mce.systemMetadata,
)


def create_embed_mcp(urn: str, embed_url: str) -> MetadataChangeProposalWrapper:
return MetadataChangeProposalWrapper(
entityUrn=urn,
aspect=EmbedClass(renderUrl=embed_url),
)
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from datahub.configuration.common import ConfigurationError
from datahub.configuration.source_common import DatasetSourceConfigBase
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.mcp_builder import create_embed_mcp
from datahub.ingestion.api.report import Report
from datahub.ingestion.api.source import SourceReport
from datahub.ingestion.source.looker.looker_lib_wrapper import LookerAPI
Expand Down Expand Up @@ -82,6 +83,7 @@
TagSnapshotClass,
)
from datahub.utilities.lossy_collections import LossyList, LossySet
from datahub.utilities.url_util import remove_port_from_url

if TYPE_CHECKING:
from datahub.ingestion.source.looker.lookml_source import (
Expand Down Expand Up @@ -163,12 +165,10 @@ class LookerCommonConfig(DatasetSourceConfigBase):
description=f"Pattern for providing dataset names to explores. {LookerNamingPattern.allowed_docstring()}",
default=LookerNamingPattern(pattern="{model}.explore.{name}"),
)

explore_browse_pattern: LookerNamingPattern = pydantic.Field(
description=f"Pattern for providing browse paths to explores. {LookerNamingPattern.allowed_docstring()}",
default=LookerNamingPattern(pattern="/{env}/{platform}/{project}/explores"),
)

view_naming_pattern: LookerNamingPattern = Field(
LookerNamingPattern(pattern="{project}.view.{name}"),
description=f"Pattern for providing dataset names to views. {LookerNamingPattern.allowed_docstring()}",
Expand All @@ -177,7 +177,6 @@ class LookerCommonConfig(DatasetSourceConfigBase):
LookerNamingPattern(pattern="/{env}/{platform}/{project}/views"),
description=f"Pattern for providing browse paths to views. {LookerNamingPattern.allowed_docstring()}",
)

tag_measures_and_dimensions: bool = Field(
True,
description="When enabled, attaches tags to measures, dimensions and dimension groups to make them more discoverable. When disabled, adds this information to the description of the column.",
Expand Down Expand Up @@ -756,14 +755,19 @@ def get_explore_browse_path(self, config: LookerCommonConfig) -> str:
return browse_path

def _get_url(self, base_url):
# If the base_url contains a port number (like https://company.looker.com:19999) remove the port number
m = re.match("^(.*):([0-9]+)$", base_url)
if m is not None:
base_url = m[1]
base_url = remove_port_from_url(base_url)
return f"{base_url}/explore/{self.model_name}/{self.name}"

def _get_embed_url(self, base_url: str) -> str:
base_url = remove_port_from_url(base_url)
return f"{base_url}/embed/explore/{self.model_name}/{self.name}"

def _to_metadata_events( # noqa: C901
self, config: LookerCommonConfig, reporter: SourceReport, base_url: str
self,
config: LookerCommonConfig,
reporter: SourceReport,
base_url: str,
extract_embed_urls: bool,
) -> Optional[List[Union[MetadataChangeEvent, MetadataChangeProposalWrapper]]]:
# We only generate MCE-s for explores that contain from clauses and do NOT contain joins
# All other explores (passthrough explores and joins) end in correct resolution of lineage, and don't need additional nodes in the graph.
Expand Down Expand Up @@ -862,7 +866,19 @@ def _to_metadata_events( # noqa: C901
aspect=SubTypesClass(typeNames=["explore"]),
)

return [mce, mcp]
proposals: List[Union[MetadataChangeEvent, MetadataChangeProposalWrapper]] = [
mce,
mcp,
]

# If extracting embeds is enabled, produce an MCP for embed URL.
if extract_embed_urls:
embed_mcp = create_embed_mcp(
dataset_snapshot.urn, self._get_embed_url(base_url)
)
proposals.append(embed_mcp)

return proposals


class LookerExploreRegistry:
Expand Down Expand Up @@ -1048,15 +1064,21 @@ class LookerDashboardElement:

def url(self, base_url: str) -> str:
# A dashboard element can use a look or just a raw query against an explore
# If the base_url contains a port number (like https://company.looker.com:19999) remove the port number
m = re.match("^(.*):([0-9]+)$", base_url)
if m is not None:
base_url = m[1]
base_url = remove_port_from_url(base_url)
if self.look_id is not None:
return f"{base_url}/looks/{self.look_id}"
else:
return f"{base_url}/x/{self.query_slug}"

def embed_url(self, base_url: str) -> Optional[str]:
# A dashboard element can use a look or just a raw query against an explore
base_url = remove_port_from_url(base_url)
if self.look_id is not None:
return f"{base_url}/embed/looks/{self.look_id}"
else:
# No embeddable URL
return None

def get_urn_element_id(self):
# A dashboard element can use a look or just a raw query against an explore
return f"dashboard_elements.{self.id}"
Expand Down Expand Up @@ -1095,12 +1117,13 @@ class LookerDashboard:
last_viewed_at: Optional[datetime.datetime] = None

def url(self, base_url):
# If the base_url contains a port number (like https://company.looker.com:19999) remove the port number
m = re.match("^(.*):([0-9]+)$", base_url)
if m is not None:
base_url = m[1]
base_url = remove_port_from_url(base_url)
return f"{base_url}/dashboards/{self.id}"

def embed_url(self, base_url: str) -> str:
base_url = remove_port_from_url(base_url)
return f"{base_url}/embed/dashboards/{self.id}"

def get_urn_dashboard_id(self):
return get_urn_looker_dashboard_id(self.id)

Expand Down
155 changes: 115 additions & 40 deletions metadata-ingestion/src/datahub/ingestion/source/looker/looker_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from datahub.configuration.common import AllowDenyPattern, ConfigurationError
from datahub.configuration.validate_field_removal import pydantic_removed_field
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.mcp_builder import create_embed_mcp
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.decorators import (
SupportStatus,
Expand Down Expand Up @@ -149,7 +150,10 @@ class LookerDashboardSourceConfig(
"30 days",
description="Used only if extract_usage_history is set to True. Interval to extract looker dashboard usage history for. See https://docs.looker.com/reference/filter-expressions#date_and_time.",
)

extract_embed_urls: bool = Field(
True,
description="Produce URLs used to render Looker Explores as Previews inside of DataHub UI. Embeds must be enabled inside of Looker to use this feature.",
)
stateful_ingestion: Optional[StatefulStaleMetadataRemovalConfig] = Field(
default=None, description=""
)
Expand Down Expand Up @@ -649,9 +653,9 @@ def _get_chart_type(

return chart_type

def _make_chart_mce(
def _make_chart_metadata_events(
self, dashboard_element: LookerDashboardElement, dashboard: LookerDashboard
) -> MetadataChangeEvent:
) -> List[Union[MetadataChangeEvent, MetadataChangeProposalWrapper]]:
chart_urn = builder.make_chart_urn(
self.source_config.platform_name, dashboard_element.get_urn_element_id()
)
Expand Down Expand Up @@ -683,7 +687,81 @@ def _make_chart_mce(
if ownership is not None:
chart_snapshot.aspects.append(ownership)

return MetadataChangeEvent(proposedSnapshot=chart_snapshot)
chart_mce = MetadataChangeEvent(proposedSnapshot=chart_snapshot)

proposals: List[Union[MetadataChangeEvent, MetadataChangeProposalWrapper]] = [
chart_mce
]

# If extracting embeds is enabled, produce an MCP for embed URL.
if (
self.source_config.extract_embed_urls
and self.source_config.external_base_url
):
maybe_embed_url = dashboard_element.embed_url(
self.source_config.external_base_url
)
if maybe_embed_url:
proposals.append(
create_embed_mcp(
chart_snapshot.urn,
maybe_embed_url,
)
)

return proposals

def _make_dashboard_metadata_events(
self, looker_dashboard: LookerDashboard, chart_urns: List[str]
) -> List[Union[MetadataChangeEvent, MetadataChangeProposalWrapper]]:
dashboard_urn = builder.make_dashboard_urn(
self.source_config.platform_name, looker_dashboard.get_urn_dashboard_id()
)
dashboard_snapshot = DashboardSnapshot(
urn=dashboard_urn,
aspects=[],
)

dashboard_info = DashboardInfoClass(
description=looker_dashboard.description or "",
title=looker_dashboard.title,
charts=chart_urns,
lastModified=self._get_change_audit_stamps(looker_dashboard),
dashboardUrl=looker_dashboard.url(self.source_config.external_base_url),
)

dashboard_snapshot.aspects.append(dashboard_info)
if looker_dashboard.folder_path is not None:
browse_path = BrowsePathsClass(
paths=[f"/looker/{looker_dashboard.folder_path}"]
)
dashboard_snapshot.aspects.append(browse_path)

ownership = self.get_ownership(looker_dashboard)
if ownership is not None:
dashboard_snapshot.aspects.append(ownership)

dashboard_snapshot.aspects.append(Status(removed=looker_dashboard.is_deleted))

dashboard_mce = MetadataChangeEvent(proposedSnapshot=dashboard_snapshot)

proposals: List[Union[MetadataChangeEvent, MetadataChangeProposalWrapper]] = [
dashboard_mce
]

# If extracting embeds is enabled, produce an MCP for embed URL.
if (
self.source_config.extract_embed_urls
and self.source_config.external_base_url
):
proposals.append(
create_embed_mcp(
dashboard_snapshot.urn,
looker_dashboard.embed_url(self.source_config.external_base_url),
)
)

return proposals

def _make_explore_metadata_events(
self,
Expand Down Expand Up @@ -725,55 +803,52 @@ def fetch_one_explore(
if looker_explore is not None:
events = (
looker_explore._to_metadata_events(
self.source_config, self.reporter, self.source_config.base_url
self.source_config,
self.reporter,
self.source_config.base_url,
self.source_config.extract_embed_urls,
)
or events
)

return events, f"{model}:{explore}", start_time, datetime.datetime.now()

def _extract_event_urn(
self, event: Union[MetadataChangeEvent, MetadataChangeProposalWrapper]
) -> Optional[str]:
if isinstance(event, MetadataChangeEvent):
return event.proposedSnapshot.urn
else:
return event.entityUrn

def _make_dashboard_and_chart_mces(
self, looker_dashboard: LookerDashboard
) -> Iterable[Union[MetadataChangeEvent, MetadataChangeProposalWrapper]]:
chart_mces = [
self._make_chart_mce(element, looker_dashboard)
for element in looker_dashboard.dashboard_elements
if element.type == "vis"
]
for chart_mce in chart_mces:
yield chart_mce

dashboard_urn = builder.make_dashboard_urn(
self.source_config.platform_name, looker_dashboard.get_urn_dashboard_id()
)
dashboard_snapshot = DashboardSnapshot(
urn=dashboard_urn,
aspects=[],
)

dashboard_info = DashboardInfoClass(
description=looker_dashboard.description or "",
title=looker_dashboard.title,
charts=[mce.proposedSnapshot.urn for mce in chart_mces],
lastModified=self._get_change_audit_stamps(looker_dashboard),
dashboardUrl=looker_dashboard.url(self.source_config.external_base_url),
)

dashboard_snapshot.aspects.append(dashboard_info)
if looker_dashboard.folder_path is not None:
browse_path = BrowsePathsClass(
paths=[f"/looker/{looker_dashboard.folder_path}"]
)
dashboard_snapshot.aspects.append(browse_path)
# Step 1: Emit metadata for each Chart inside the Dashboard.
chart_events = []
for element in looker_dashboard.dashboard_elements:
if element.type == "vis":
chart_events.extend(
self._make_chart_metadata_events(element, looker_dashboard)
)

ownership = self.get_ownership(looker_dashboard)
if ownership is not None:
dashboard_snapshot.aspects.append(ownership)
yield from chart_events

dashboard_snapshot.aspects.append(Status(removed=looker_dashboard.is_deleted))
# Step 2: Emit metadata events for the Dashboard itself.
chart_urns: Set[
str
] = set() # Collect the unique child chart urns for dashboard input lineage.
for chart_event in chart_events:
chart_event_urn = self._extract_event_urn(chart_event)
if chart_event_urn:
chart_urns.add(chart_event_urn)

dashboard_mce = MetadataChangeEvent(proposedSnapshot=dashboard_snapshot)
yield dashboard_mce
dashboard_events = self._make_dashboard_metadata_events(
looker_dashboard, list(chart_urns)
)
for dashboard_event in dashboard_events:
yield dashboard_event

def get_ownership(
self, looker_dashboard: LookerDashboard
Expand Down
8 changes: 8 additions & 0 deletions metadata-ingestion/src/datahub/utilities/url_util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import re


def remove_port_from_url(base_url: str) -> str:
m = re.match("^(.*):([0-9]+)$", base_url)
if m is not None:
base_url = m[1]
return base_url
Loading