Skip to content

Commit

Permalink
Merge branch 'master' into jj--ui-add-delete-domains-tags
Browse files Browse the repository at this point in the history
  • Loading branch information
jjoyce0510 committed Jun 29, 2022
2 parents 6297428 + 9e58cd6 commit ad73a66
Show file tree
Hide file tree
Showing 10 changed files with 236 additions and 154 deletions.
51 changes: 0 additions & 51 deletions .github/workflows/metadata-ingestion-slow.yml

This file was deleted.

12 changes: 2 additions & 10 deletions docker/datahub-ingestion/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,20 +1,11 @@
# Defining environment
ARG APP_ENV=prod

FROM python:3.8 as base
FROM acryldata/datahub-ingestion-base as base
# ENV DOCKERIZE_VERSION v0.6.1
# RUN apk --no-cache add curl tar \
# && curl https://repo1.maven.org/maven2/org/eclipse/jetty/jetty-runner/9.4.20.v20190813/jetty-runner-9.4.20.v20190813.jar --output jetty-runner.jar \
# && curl -L https://github.com/jwilder/dockerize/releases/download/$DOCKERIZE_VERSION/dockerize-linux-amd64-$DOCKERIZE_VERSION.tar.gz | tar -C /usr/local/bin -xzv
RUN apt-get update && apt-get install -y \
jq \
librdkafka-dev \
python3-ldap \
libldap2-dev \
libsasl2-dev \
libsasl2-modules \
ldap-utils \
&& python -m pip install --upgrade pip wheel setuptools==57.5.0


FROM openjdk:8 as prod-build
Expand All @@ -29,6 +20,7 @@ RUN cd /datahub-src/metadata-ingestion && \

FROM base as prod-install
COPY --from=prod-codegen /datahub-src/metadata-ingestion /datahub-ingestion
COPY --from=prod-codegen /root/.cache/pip /root/.cache/pip
ARG RELEASE_VERSION
RUN cd /datahub-ingestion && \
sed -i.bak "s/__version__ = \"0.0.0.dev0\"/__version__ = \"$RELEASE_VERSION\"/" src/datahub/__init__.py && \
Expand Down
34 changes: 18 additions & 16 deletions metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,11 @@ def get_long_description():
"botocore!=1.23.0",
}

path_spec_common = {
"parse>=1.19.0",
"wcmatch",
}

looker_common = {
# Looker Python SDK
"looker-sdk==22.2.1"
Expand All @@ -121,6 +126,14 @@ def get_long_description():
"protobuf<=3.20.1",
}

redshift_common = {
"sqlalchemy-redshift",
"psycopg2-binary",
"GeoAlchemy2",
"sqllineage==1.3.5",
*path_spec_common,
}

snowflake_common = {
# Snowflake plugin utilizes sql common
*sql_common,
Expand Down Expand Up @@ -158,11 +171,7 @@ def get_long_description():
"azure-identity==1.10.0",
}

s3_base = {
*data_lake_base,
"moto[s3]",
"wcmatch",
}
s3_base = {*data_lake_base, "moto[s3]", *path_spec_common}

delta_lake = {
*s3_base,
Expand All @@ -173,6 +182,7 @@ def get_long_description():
"sqlparse",
}


# Note: for all of these, framework_common will be added.
plugins: Dict[str, Set[str]] = {
# Sink plugins.
Expand Down Expand Up @@ -249,16 +259,8 @@ def get_long_description():
| {"psycopg2-binary", "acryl-pyhive[hive]>=0.6.12", "pymysql>=1.0.2"},
"pulsar": {"requests"},
"redash": {"redash-toolbelt", "sql-metadata", "sqllineage==1.3.5"},
"redshift": sql_common
| {"sqlalchemy-redshift", "psycopg2-binary", "GeoAlchemy2", "sqllineage==1.3.5"},
"redshift-usage": sql_common
| usage_common
| {
"sqlalchemy-redshift",
"psycopg2-binary",
"GeoAlchemy2",
"sqllineage==1.3.5",
},
"redshift": sql_common | redshift_common,
"redshift-usage": sql_common | usage_common | redshift_common,
"sagemaker": aws_common,
"snowflake": snowflake_common,
"snowflake-usage": snowflake_common
Expand Down Expand Up @@ -454,7 +456,7 @@ def get_long_description():
entry_points = {
"console_scripts": ["datahub = datahub.entrypoints:main"],
"datahub.ingestion.source.plugins": [
"csv-enricher = datahub.ingestion.source.csv_enricher:CSVEnricherSource",
"csv-enricher = datahub.ingestion.source.csv_enricher:CSVEnricherSource",
"file = datahub.ingestion.source.file:GenericFileSource",
"sqlalchemy = datahub.ingestion.source.sql.sql_generic:SQLAlchemyGenericSource",
"athena = datahub.ingestion.source.sql.athena:AthenaSource",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

from great_expectations import __version__ as ge_version

from datahub.configuration.common import ConfigurationError
from datahub.telemetry import stats, telemetry

# Fun compatibility hack! GE version 0.13.44 broke compatibility with SQLAlchemy 1.3.24.
Expand Down Expand Up @@ -872,7 +873,16 @@ def _generate_single_profile(
ge_config["schema"] = temp_table_db

if self.config.bigquery_temp_table_schema:
bigquery_temp_table = f"{temp_table_db}.{self.config.bigquery_temp_table_schema}.ge-temp-{uuid.uuid4()}"
num_parts = self.config.bigquery_temp_table_schema.split(".")
# If we only have 1 part that means the project_id is missing from the table name and we add it
if len(num_parts) == 1:
bigquery_temp_table = f"{temp_table_db}.{self.config.bigquery_temp_table_schema}.ge-temp-{uuid.uuid4()}"
elif len(num_parts) == 2:
bigquery_temp_table = f"{self.config.bigquery_temp_table_schema}.ge-temp-{uuid.uuid4()}"
else:
raise ConfigurationError(
f"bigquery_temp_table_schema should be either project.dataset or dataset format but it was: {self.config.bigquery_temp_table_schema}"
)
else:
assert table
table_parts = table.split(".")
Expand Down Expand Up @@ -970,12 +980,15 @@ def _get_ge_dataset(
if platform is not None and platform == "bigquery":
# This is done as GE makes the name as DATASET.TABLE
# but we want it to be PROJECT.DATASET.TABLE instead for multi-project setups
logger.debug(f"Setting table name to be {pretty_name}")
batch._table = sa.text(pretty_name)
name_parts = pretty_name.split(".")
if len(name_parts) != 3:
logger.error(
f"Unexpected {pretty_name} while profiling. Should have 3 parts but has {len(name_parts)} parts."
)
# If we only have two parts that means the project_id is missing from the table name and we add it
# Temp tables has 3 parts while normal tables only has 2 parts
if len(str(batch._table).split(".")) == 2:
batch._table = sa.text(f"{name_parts[0]}.{str(batch._table)}")
logger.debug(f"Setting table name to be {batch._table}")

return batch
20 changes: 15 additions & 5 deletions metadata-ingestion/src/datahub/ingestion/source/sql/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,11 @@
WHERE table_id LIKE '{table}%'
""".strip()

BQ_GET_LATEST_DATE_TABLE = """
SELECT MAX(table_name) as max_shard
FROM `{project_id}.{schema}.INFORMATION_SCHEMA.TABLES`
where REGEXP_CONTAINS(table_name, r'^\\d{{{date_length}}}$')
""".strip()

# The existing implementation of this method can be found here:
# https://github.com/googleapis/python-bigquery-sqlalchemy/blob/main/sqlalchemy_bigquery/base.py#L1018-L1025.
Expand Down Expand Up @@ -707,11 +712,16 @@ def is_latest_shard(self, project_id: str, schema: str, table: str) -> bool:
engine = self._get_engine(for_run_sql=True)
if f"{project_id}.{schema}.{table_name}" not in self.maximum_shard_ids:
with engine.connect() as con:
sql = BQ_GET_LATEST_SHARD.format(
project_id=project_id,
schema=schema,
table=table_name,
)
if table_name is not None:
sql = BQ_GET_LATEST_SHARD.format(
project_id=project_id,
schema=schema,
table=table_name,
)
else:
sql = BQ_GET_LATEST_DATE_TABLE.format(
project_id=project_id, schema=schema, date_length=len(shard)
)

result = con.execute(sql)
for row in result:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,14 +281,16 @@ def remove_extras(self, sharded_table_regex: str) -> "BigQueryTableRef":
if matches:
table_name = matches.group(1)
if matches:
logger.debug(
f"Found sharded table {self.table}. Using {table_name} as the table name."
)
if not table_name:
logger.debug(
f"Using dataset id {self.dataset} as table name because table only contains date value {self.table}"
)
table_name = self.dataset

logger.debug(
f"Found sharded table {self.table}. Using {table_name} as the table name."
)

return BigQueryTableRef(self.project, self.dataset, table_name)

# Handle table snapshots.
Expand Down
Loading

0 comments on commit ad73a66

Please sign in to comment.