Skip to content

Commit

Permalink
Merge branch 'master' into feature/chyhir_lytvynov_mssql_extend
Browse files Browse the repository at this point in the history
  • Loading branch information
DmytroYurchuk authored Aug 18, 2023
2 parents e4b5f47 + 10d3edc commit c428576
Show file tree
Hide file tree
Showing 19 changed files with 444 additions and 56 deletions.
5 changes: 5 additions & 0 deletions datahub-graphql-core/src/main/resources/search.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,11 @@ enum FilterOperator {
* Represent the relation: String field is one of the array values to, e.g. name in ["Profile", "Event"]
"""
IN

"""
Represents the relation: The field exists. If the field is an array, the field is either not present or empty.
"""
EXISTS
}

"""
Expand Down
2 changes: 1 addition & 1 deletion datahub-web-react/src/app/preview/DefaultPreviewCard.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ export default function DefaultPreviewCard({
{deprecation?.deprecated && (
<DeprecationPill deprecation={deprecation} urn="" showUndeprecate={false} />
)}
{health && health.length > 0 && <EntityHealth baseUrl={url} health={health} />}
{health && health.length > 0 ? <EntityHealth baseUrl={url} health={health} /> : null}
{externalUrl && (
<ExternalUrlButton
externalUrl={externalUrl}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,7 @@ import {
REMOVED_FILTER_NAME,
TAGS_FILTER_NAME,
TYPE_NAMES_FILTER_NAME,
DATA_PRODUCTS_FILTER_NAME,
} from './utils/constants';
import SetDataProductModal from '../entity/shared/containers/profile/sidebar/DataProduct/SetDataProductModal';

type Props = {
facet?: FacetMetadata | null;
Expand Down Expand Up @@ -80,23 +78,6 @@ export const AdvancedFilterSelectValueModal = ({
);
}

if (filterField === DATA_PRODUCTS_FILTER_NAME) {
return (
<SetDataProductModal
titleOverride="Select Data Product"
urns={[]}
currentDataProduct={
facet?.aggregations.find((agg) => initialValues?.includes(agg?.entity?.urn || ''))?.entity || null
}
onModalClose={onCloseModal}
onOkOverride={(dataProductUrn) => {
onSelect([dataProductUrn]);
onCloseModal();
}}
/>
);
}

if (filterField === CONTAINER_FILTER_NAME) {
return (
<ContainerSelectModal
Expand Down
3 changes: 0 additions & 3 deletions datahub-web-react/src/app/search/utils/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ export const TAGS_FILTER_NAME = 'tags';
export const GLOSSARY_TERMS_FILTER_NAME = 'glossaryTerms';
export const CONTAINER_FILTER_NAME = 'container';
export const DOMAINS_FILTER_NAME = 'domains';
export const DATA_PRODUCTS_FILTER_NAME = 'dataProducts';
export const OWNERS_FILTER_NAME = 'owners';
export const TYPE_NAMES_FILTER_NAME = 'typeNames';
export const PLATFORM_FILTER_NAME = 'platform';
Expand Down Expand Up @@ -57,7 +56,6 @@ export const ORDERED_FIELDS = [
TAGS_FILTER_NAME,
GLOSSARY_TERMS_FILTER_NAME,
DOMAINS_FILTER_NAME,
DATA_PRODUCTS_FILTER_NAME,
FIELD_TAGS_FILTER_NAME,
FIELD_GLOSSARY_TERMS_FILTER_NAME,
FIELD_PATHS_FILTER_NAME,
Expand All @@ -74,7 +72,6 @@ export const FIELD_TO_LABEL = {
owners: 'Owner',
tags: 'Tag',
domains: 'Domain',
[DATA_PRODUCTS_FILTER_NAME]: 'Data Product',
platform: 'Platform',
fieldTags: 'Column Tag',
glossaryTerms: 'Glossary Term',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@
# Handle table snapshots
# See https://cloud.google.com/bigquery/docs/table-snapshots-intro.
SNAPSHOT_TABLE_REGEX = re.compile(r"^(.+)@(\d{13})$")
CLUSTERING_COLUMN_TAG = "CLUSTERING_COLUMN"


# We can't use close as it is not called if the ingestion is not successful
Expand Down Expand Up @@ -1151,6 +1152,21 @@ def gen_schema_fields(self, columns: List[BigqueryColumn]) -> List[SchemaField]:
field.description = col.comment
schema_fields[idx] = field
else:
tags = []
if col.is_partition_column:
tags.append(
TagAssociationClass(make_tag_urn(Constants.TAG_PARTITION_KEY))
)

if col.cluster_column_position is not None:
tags.append(
TagAssociationClass(
make_tag_urn(
f"{CLUSTERING_COLUMN_TAG}_{col.cluster_column_position}"
)
)
)

field = SchemaField(
fieldPath=col.name,
type=SchemaFieldDataType(
Expand All @@ -1160,15 +1176,7 @@ def gen_schema_fields(self, columns: List[BigqueryColumn]) -> List[SchemaField]:
nativeDataType=col.data_type,
description=col.comment,
nullable=col.is_nullable,
globalTags=GlobalTagsClass(
tags=[
TagAssociationClass(
make_tag_urn(Constants.TAG_PARTITION_KEY)
)
]
)
if col.is_partition_column
else GlobalTagsClass(tags=[]),
globalTags=GlobalTagsClass(tags=tags),
)
schema_fields.append(field)
last_id = col.ordinal_position
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class BigqueryTableType:
class BigqueryColumn(BaseColumn):
field_path: str
is_partition_column: bool
cluster_column_position: Optional[int]


RANGE_PARTITION_NAME: str = "RANGE"
Expand Down Expand Up @@ -285,7 +286,8 @@ class BigqueryQuery:
CASE WHEN CONTAINS_SUBSTR(field_path, ".") THEN NULL ELSE c.data_type END as data_type,
description as comment,
c.is_hidden as is_hidden,
c.is_partitioning_column as is_partitioning_column
c.is_partitioning_column as is_partitioning_column,
c.clustering_ordinal_position as clustering_ordinal_position,
from
`{project_id}`.`{dataset_name}`.INFORMATION_SCHEMA.COLUMNS c
join `{project_id}`.`{dataset_name}`.INFORMATION_SCHEMA.COLUMN_FIELD_PATHS as cfp on cfp.table_name = c.table_name
Expand All @@ -307,6 +309,7 @@ class BigqueryQuery:
description as comment,
c.is_hidden as is_hidden,
c.is_partitioning_column as is_partitioning_column,
c.clustering_ordinal_position as clustering_ordinal_position,
-- We count the columns to be able limit it later
row_number() over (partition by c.table_catalog, c.table_schema, c.table_name order by c.ordinal_position asc, c.data_type DESC) as column_num,
-- Getting the maximum shard for each table
Expand All @@ -333,6 +336,7 @@ class BigqueryQuery:
CASE WHEN CONTAINS_SUBSTR(field_path, ".") THEN NULL ELSE c.data_type END as data_type,
c.is_hidden as is_hidden,
c.is_partitioning_column as is_partitioning_column,
c.clustering_ordinal_position as clustering_ordinal_position,
description as comment
from
`{table_identifier.project_id}`.`{table_identifier.dataset}`.INFORMATION_SCHEMA.COLUMNS as c
Expand Down Expand Up @@ -583,6 +587,7 @@ def get_columns_for_dataset(
data_type=column.data_type,
comment=column.comment,
is_partition_column=column.is_partitioning_column == "YES",
cluster_column_position=column.clustering_ordinal_position,
)
)

Expand Down Expand Up @@ -621,6 +626,7 @@ def get_columns_for_table(
data_type=column.data_type,
comment=column.comment,
is_partition_column=column.is_partitioning_column == "YES",
cluster_column_position=column.clustering_ordinal_position,
)
)
last_seen_table = column.table_name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,9 +162,11 @@ class DBTCloudConfig(DBTCommonConfig):
}

_DBT_GRAPHQL_QUERY = """
query DatahubMetadataQuery_{type}($jobId: Int!, $runId: Int) {{
{type}(jobId: $jobId, runId: $runId) {{
query DatahubMetadataQuery_{type}($jobId: BigInt!, $runId: BigInt) {{
job(id: $jobId, runId: $runId) {{
{type} {{
{fields}
}}
}}
}}
"""
Expand Down Expand Up @@ -218,7 +220,7 @@ def load_nodes(self) -> Tuple[List[DBTNode], Dict[str, Optional[str]]]:
},
)

raw_nodes.extend(data[node_type])
raw_nodes.extend(data["job"][node_type])

nodes = [self._parse_into_dbt_node(node) for node in raw_nodes]

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from typing import List, Optional

from datahub.configuration.time_window_config import BucketDuration
from datahub.ingestion.source.snowflake.constants import SnowflakeObjectDomain
from datahub.ingestion.source.snowflake.snowflake_config import DEFAULT_TABLES_DENY_LIST

Expand Down Expand Up @@ -575,14 +576,17 @@ def get_access_history_date_range() -> str:
def usage_per_object_per_time_bucket_for_time_window(
start_time_millis: int,
end_time_millis: int,
time_bucket_size: str,
time_bucket_size: BucketDuration,
use_base_objects: bool,
top_n_queries: int,
include_top_n_queries: bool,
) -> str:
if not include_top_n_queries:
top_n_queries = 0
assert time_bucket_size == "DAY" or time_bucket_size == "HOUR"
assert (
time_bucket_size == BucketDuration.DAY
or time_bucket_size == BucketDuration.HOUR
)
objects_column = (
"BASE_OBJECTS_ACCESSED" if use_base_objects else "DIRECT_OBJECTS_ACCESSED"
)
Expand Down Expand Up @@ -629,7 +633,7 @@ def usage_per_object_per_time_bucket_for_time_window(
SELECT
object_name,
ANY_VALUE(object_domain) AS object_domain,
DATE_TRUNC('{time_bucket_size}', CONVERT_TIMEZONE('UTC', query_start_time)) AS bucket_start_time,
DATE_TRUNC('{time_bucket_size.value}', CONVERT_TIMEZONE('UTC', query_start_time)) AS bucket_start_time,
count(distinct(query_id)) AS total_queries,
count( distinct(user_name) ) AS total_users
FROM
Expand All @@ -644,7 +648,7 @@ def usage_per_object_per_time_bucket_for_time_window(
SELECT
object_name,
column_name,
DATE_TRUNC('{time_bucket_size}', CONVERT_TIMEZONE('UTC', query_start_time)) AS bucket_start_time,
DATE_TRUNC('{time_bucket_size.value}', CONVERT_TIMEZONE('UTC', query_start_time)) AS bucket_start_time,
count(distinct(query_id)) AS total_queries
FROM
field_access_history
Expand All @@ -658,7 +662,7 @@ def usage_per_object_per_time_bucket_for_time_window(
(
SELECT
object_name,
DATE_TRUNC('{time_bucket_size}', CONVERT_TIMEZONE('UTC', query_start_time)) AS bucket_start_time,
DATE_TRUNC('{time_bucket_size.value}', CONVERT_TIMEZONE('UTC', query_start_time)) AS bucket_start_time,
count(distinct(query_id)) AS total_queries,
user_name,
ANY_VALUE(users.email) AS user_email
Expand All @@ -677,7 +681,7 @@ def usage_per_object_per_time_bucket_for_time_window(
(
SELECT
object_name,
DATE_TRUNC('{time_bucket_size}', CONVERT_TIMEZONE('UTC', query_start_time)) AS bucket_start_time,
DATE_TRUNC('{time_bucket_size.value}', CONVERT_TIMEZONE('UTC', query_start_time)) AS bucket_start_time,
query_history.query_text AS query_text,
count(distinct(access_history.query_id)) AS total_queries
FROM
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,6 @@ def _check_usage_date_ranges(self) -> Any:
def _get_operation_aspect_work_unit(
self, event: SnowflakeJoinedAccessEvent, discovered_datasets: List[str]
) -> Iterable[MetadataWorkUnit]:

if event.query_start_time and event.query_type:
start_time = event.query_start_time
query_type = event.query_type
Expand Down
3 changes: 3 additions & 0 deletions metadata-ingestion/tests/unit/test_bigquery_profiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ def test_generate_day_partitioned_partition_profiler_query():
ordinal_position=1,
data_type="TIMESTAMP",
is_partition_column=True,
cluster_column_position=None,
comment=None,
is_nullable=False,
)
Expand Down Expand Up @@ -79,6 +80,7 @@ def test_generate_day_partitioned_partition_profiler_query_with_set_partition_ti
ordinal_position=1,
data_type="TIMESTAMP",
is_partition_column=True,
cluster_column_position=None,
comment=None,
is_nullable=False,
)
Expand Down Expand Up @@ -120,6 +122,7 @@ def test_generate_hour_partitioned_partition_profiler_query():
ordinal_position=1,
data_type="TIMESTAMP",
is_partition_column=True,
cluster_column_position=None,
comment=None,
is_nullable=False,
)
Expand Down
Loading

0 comments on commit c428576

Please sign in to comment.