Skip to content

Commit

Permalink
Merge branch 'master' into feat-improve-mongodb
Browse files Browse the repository at this point in the history
  • Loading branch information
TonyOuyangGit authored Oct 30, 2023
2 parents 9f09008 + ce0f36b commit d48b2bb
Show file tree
Hide file tree
Showing 16 changed files with 303 additions and 28 deletions.
12 changes: 11 additions & 1 deletion docs/how/updating-datahub.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,20 @@ This file documents any backwards-incompatible changes in DataHub and assists pe

## Next

- #9010 - In Redshift source's config `incremental_lineage` is set default to off.
### Breaking Changes

### Potential Downtime

### Deprecations

### Other Notable Changes

## 0.12.0

### Breaking Changes

- #9044 - GraphQL APIs for adding ownership now expect either an `ownershipTypeUrn` referencing a customer ownership type or a (deprecated) `type`. Where before adding an ownership without a concrete type was allowed, this is no longer the case. For simplicity you can use the `type` parameter which will get translated to a custom ownership type internally if one exists for the type being added.
- #9010 - In Redshift source's config `incremental_lineage` is set default to off.
- #8810 - Removed support for SQLAlchemy 1.3.x. Only SQLAlchemy 1.4.x is supported now.
- #8942 - Removed `urn:li:corpuser:datahub` owner for the `Measure`, `Dimension` and `Temporal` tags emitted
by Looker and LookML source connectors.
Expand Down
4 changes: 3 additions & 1 deletion metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,9 @@

clickhouse_common = {
# Clickhouse 0.2.0 adds support for SQLAlchemy 1.4.x
"clickhouse-sqlalchemy>=0.2.0",
# Disallow 0.2.5 because of https://github.com/xzkostyan/clickhouse-sqlalchemy/issues/272.
# Note that there's also a known issue around nested map types: https://github.com/xzkostyan/clickhouse-sqlalchemy/issues/269.
"clickhouse-sqlalchemy>=0.2.0,<0.2.5",
}

redshift_common = {
Expand Down
13 changes: 12 additions & 1 deletion metadata-ingestion/src/datahub/emitter/mcp_builder.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
from typing import Dict, Iterable, List, Optional, TypeVar
from typing import Dict, Iterable, List, Optional, Type, TypeVar

from pydantic.fields import Field
from pydantic.main import BaseModel

from datahub.emitter.mce_builder import (
Aspect,
datahub_guid,
make_container_urn,
make_data_platform_urn,
Expand All @@ -18,6 +19,7 @@
)
from datahub.metadata.com.linkedin.pegasus2avro.container import ContainerProperties
from datahub.metadata.schema_classes import (
KEY_ASPECTS,
ContainerClass,
DomainsClass,
EmbedClass,
Expand Down Expand Up @@ -306,3 +308,12 @@ def create_embed_mcp(urn: str, embed_url: str) -> MetadataChangeProposalWrapper:
entityUrn=urn,
aspect=EmbedClass(renderUrl=embed_url),
)


def entity_supports_aspect(entity_type: str, aspect_type: Type[Aspect]) -> bool:
entity_key_aspect = KEY_ASPECTS[entity_type]
aspect_name = aspect_type.get_aspect_name()

supported_aspects = entity_key_aspect.ASPECT_INFO["entityAspects"]

return aspect_name in supported_aspects
6 changes: 5 additions & 1 deletion metadata-ingestion/src/datahub/ingestion/api/workunit.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,11 @@ class MetadataWorkUnit(WorkUnit):
metadata: Union[
MetadataChangeEvent, MetadataChangeProposal, MetadataChangeProposalWrapper
]
# A workunit creator can determine if this workunit is allowed to fail

# A workunit creator can determine if this workunit is allowed to fail.
# TODO: This flag was initially added during the rollout of the subType aspect
# to improve backwards compatibility, but is not really needed anymore and so
# should be removed.
treat_errors_as_warnings: bool = False

# When this is set to false, this MWU will be ignored by automatic helpers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,12 @@ def get_user(self, id_: str, user_fields: str) -> Optional[User]:
transport_options=self.transport_options,
)
except SDKError as e:
logger.warning(f"Could not find user with id {id_}")
logger.warning(f"Failure was {e}")
if "Looker Not Found (404)" in str(e):
# User not found
logger.info(f"Could not find user with id {id_}: 404 error")
else:
logger.warning(f"Could not find user with id {id_}")
logger.warning(f"Failure was {e}")
# User not found
return None

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -926,14 +926,7 @@ def process_metrics_dimensions_and_fields_for_dashboard(
mcps = chart_mcps
mcps.append(dashboard_mcp)

workunits = [
MetadataWorkUnit(
id=f"looker-{mcp.aspectName}-{mcp.entityUrn}",
mcp=mcp,
treat_errors_as_warnings=True,
)
for mcp in mcps
]
workunits = [mcp.as_workunit() for mcp in mcps]

return workunits

Expand Down Expand Up @@ -1320,10 +1313,7 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
id=f"looker-{event.proposedSnapshot.urn}", mce=event
)
elif isinstance(event, MetadataChangeProposalWrapper):
# We want to treat subtype aspects as optional, so allowing failures in this aspect to be treated as warnings rather than failures
yield event.as_workunit(
treat_errors_as_warnings=event.aspectName in ["subTypes"]
)
yield event.as_workunit()
else:
raise Exception(f"Unexpected type of event {event}")
self.reporter.report_stage_end("explore_metadata")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2171,10 +2171,7 @@ def get_internal_workunits(self) -> Iterable[MetadataWorkUnit]: # noqa: C901
for mcp in self._build_dataset_mcps(
maybe_looker_view
):
# We want to treat mcp aspects as optional, so allowing failures in this aspect to be treated as warnings rather than failures
yield mcp.as_workunit(
treat_errors_as_warnings=True
)
yield mcp.as_workunit()
else:
(
prev_model_name,
Expand Down
57 changes: 52 additions & 5 deletions metadata-ingestion/src/datahub/utilities/sqlglot_lineage.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
import sqlglot.errors
import sqlglot.lineage
import sqlglot.optimizer.annotate_types
import sqlglot.optimizer.optimizer
import sqlglot.optimizer.qualify
import sqlglot.optimizer.qualify_columns
from pydantic import BaseModel
from typing_extensions import TypedDict

Expand Down Expand Up @@ -48,6 +48,19 @@
SQL_PARSE_RESULT_CACHE_SIZE = 1000


RULES_BEFORE_TYPE_ANNOTATION: tuple = tuple(
filter(
# Skip pushdown_predicates because it sometimes throws exceptions, and we
# don't actually need it for anything.
lambda func: func.__name__ not in {"pushdown_predicates"},
itertools.takewhile(
lambda func: func != sqlglot.optimizer.annotate_types.annotate_types,
sqlglot.optimizer.optimizer.RULES,
),
)
)


class GraphQLSchemaField(TypedDict):
fieldPath: str
nativeDataType: str
Expand Down Expand Up @@ -289,6 +302,10 @@ def _table_level_lineage(
)
# TODO: If a CTAS has "LIMIT 0", it's not really lineage, just copying the schema.

# Update statements implicitly read from the table being updated, so add those back in.
if isinstance(statement, sqlglot.exp.Update):
tables = tables | modified

return tables, modified


Expand Down Expand Up @@ -568,17 +585,20 @@ def _schema_aware_fuzzy_column_resolve(
# - the select instead of the full outer statement
# - schema info
# - column qualification enabled
# - running the full pre-type annotation optimizer

# logger.debug("Schema: %s", sqlglot_db_schema.mapping)
statement = sqlglot.optimizer.qualify.qualify(
statement = sqlglot.optimizer.optimizer.optimize(
statement,
dialect=dialect,
schema=sqlglot_db_schema,
qualify_columns=True,
validate_qualify_columns=False,
identify=True,
# sqlglot calls the db -> schema -> table hierarchy "catalog", "db", "table".
catalog=default_db,
db=default_schema,
rules=RULES_BEFORE_TYPE_ANNOTATION,
)
except (sqlglot.errors.OptimizeError, ValueError) as e:
raise SqlUnderstandingError(
Expand Down Expand Up @@ -748,6 +768,7 @@ def _extract_select_from_create(
_UPDATE_ARGS_NOT_SUPPORTED_BY_SELECT: Set[str] = set(
sqlglot.exp.Update.arg_types.keys()
) - set(sqlglot.exp.Select.arg_types.keys())
_UPDATE_FROM_TABLE_ARGS_TO_MOVE = {"joins", "laterals", "pivot"}


def _extract_select_from_update(
Expand All @@ -774,17 +795,43 @@ def _extract_select_from_update(
# they'll get caught later.
new_expressions.append(expr)

return sqlglot.exp.Select(
# Special translation for the `from` clause.
extra_args = {}
original_from = statement.args.get("from")
if original_from and isinstance(original_from.this, sqlglot.exp.Table):
# Move joins, laterals, and pivots from the Update->From->Table->field
# to the top-level Select->field.

for k in _UPDATE_FROM_TABLE_ARGS_TO_MOVE:
if k in original_from.this.args:
# Mutate the from table clause in-place.
extra_args[k] = original_from.this.args.pop(k)

select_statement = sqlglot.exp.Select(
**{
**{
k: v
for k, v in statement.args.items()
if k not in _UPDATE_ARGS_NOT_SUPPORTED_BY_SELECT
},
**extra_args,
"expressions": new_expressions,
}
)

# Update statements always implicitly have the updated table in context.
# TODO: Retain table name alias.
if select_statement.args.get("from"):
# select_statement = sqlglot.parse_one(select_statement.sql(dialect=dialect))

select_statement = select_statement.join(
statement.this, append=True, join_kind="cross"
)
else:
select_statement = select_statement.from_(statement.this)

return select_statement


def _is_create_table_ddl(statement: sqlglot.exp.Expression) -> bool:
return isinstance(statement, sqlglot.exp.Create) and isinstance(
Expand Down Expand Up @@ -955,7 +1002,7 @@ def _sqlglot_lineage_inner(
# Fetch schema info for the relevant tables.
table_name_urn_mapping: Dict[_TableName, str] = {}
table_name_schema_mapping: Dict[_TableName, SchemaInfo] = {}
for table in itertools.chain(tables, modified):
for table in tables | modified:
# For select statements, qualification will be a no-op. For other statements, this
# is where the qualification actually happens.
qualified_table = table.qualified(
Expand All @@ -971,7 +1018,7 @@ def _sqlglot_lineage_inner(
# Also include the original, non-qualified table name in the urn mapping.
table_name_urn_mapping[table] = urn

total_tables_discovered = len(tables) + len(modified)
total_tables_discovered = len(tables | modified)
total_schemas_resolved = len(table_name_schema_mapping)
debug_info = SqlParsingDebugInfo(
confidence=0.9 if total_tables_discovered == total_schemas_resolved
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
{
"query_type": "SELECT",
"in_tables": [
"urn:li:dataset:(urn:li:dataPlatform:postgres,my_db.my_schema.table1,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:postgres,my_db.my_schema.table2,PROD)"
],
"out_tables": [],
"column_lineage": [
{
"downstream": {
"table": null,
"column": "a",
"column_type": {
"type": {
"com.linkedin.pegasus2avro.schema.NumberType": {}
}
},
"native_column_type": "INT"
},
"upstreams": [
{
"table": "urn:li:dataset:(urn:li:dataPlatform:postgres,my_db.my_schema.table1,PROD)",
"column": "a"
}
]
},
{
"downstream": {
"table": null,
"column": "b",
"column_type": {
"type": {
"com.linkedin.pegasus2avro.schema.NumberType": {}
}
},
"native_column_type": "INT"
},
"upstreams": [
{
"table": "urn:li:dataset:(urn:li:dataPlatform:postgres,my_db.my_schema.table1,PROD)",
"column": "b"
}
]
},
{
"downstream": {
"table": null,
"column": "c",
"column_type": {
"type": {
"com.linkedin.pegasus2avro.schema.ArrayType": {}
}
},
"native_column_type": "INT[]"
},
"upstreams": [
{
"table": "urn:li:dataset:(urn:li:dataPlatform:postgres,my_db.my_schema.table2,PROD)",
"column": "c"
}
]
}
]
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
{
"query_type": "UPDATE",
"in_tables": [
"urn:li:dataset:(urn:li:dataPlatform:snowflake,my_db.my_schema.my_table,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:snowflake,my_db.my_schema.table1,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:snowflake,my_db.my_schema.table2,PROD)"
],
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
{
"query_type": "UPDATE",
"in_tables": [],
"in_tables": [
"urn:li:dataset:(urn:li:dataPlatform:snowflake,snowflake_sample_data.tpch_sf1.orders,PROD)"
],
"out_tables": [
"urn:li:dataset:(urn:li:dataPlatform:snowflake,snowflake_sample_data.tpch_sf1.orders,PROD)"
],
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
{
"query_type": "UPDATE",
"in_tables": [
"urn:li:dataset:(urn:li:dataPlatform:snowflake,snowflake_sample_data.tpch_sf1.orders,PROD)"
],
"out_tables": [
"urn:li:dataset:(urn:li:dataPlatform:snowflake,snowflake_sample_data.tpch_sf1.orders,PROD)"
],
"column_lineage": [
{
"downstream": {
"table": "urn:li:dataset:(urn:li:dataPlatform:snowflake,snowflake_sample_data.tpch_sf1.orders,PROD)",
"column": "orderkey",
"column_type": {
"type": {
"com.linkedin.pegasus2avro.schema.NumberType": {}
}
},
"native_column_type": "DECIMAL"
},
"upstreams": [
{
"table": "urn:li:dataset:(urn:li:dataPlatform:snowflake,snowflake_sample_data.tpch_sf1.orders,PROD)",
"column": "orderkey"
}
]
}
]
}
Loading

0 comments on commit d48b2bb

Please sign in to comment.