Skip to content

Commit

Permalink
✨ Add AzureSQL source and Prefect tasks (#1043)
Browse files Browse the repository at this point in the history
* 🚀 Adding `Aselite` connector with integration tests

* 🚀 Add unit tests for Aselite (Azure SQL)

* 🐛 Fix task utils bug

* 🎨 Refactor task utils file to pass code_checker

* removed prefect dependency from azure_sql source

* Bug located into the return task

* ♻️ Change flow and task name to `azure_sql`

* Update src/viadot/orchestration/prefect/flows/azure_sql_to_adls.py

Co-authored-by: Michał Zawadzki <[email protected]>

* Update src/viadot/orchestration/prefect/flows/azure_sql_to_adls.py

Co-authored-by: Michał Zawadzki <[email protected]>

* Update src/viadot/orchestration/prefect/flows/azure_sql_to_adls.py

Co-authored-by: Michał Zawadzki <[email protected]>

* Update src/viadot/orchestration/prefect/flows/azure_sql_to_adls.py

Co-authored-by: Michał Zawadzki <[email protected]>

* Update src/viadot/orchestration/prefect/flows/azure_sql_to_adls.py

Co-authored-by: Michał Zawadzki <[email protected]>

* Update src/viadot/orchestration/prefect/flows/azure_sql_to_adls.py

Co-authored-by: Michał Zawadzki <[email protected]>

* Update src/viadot/orchestration/prefect/tasks/azure_sql.py

Co-authored-by: Michał Zawadzki <[email protected]>

* Update src/viadot/orchestration/prefect/tasks/azure_sql.py

Co-authored-by: Michał Zawadzki <[email protected]>

* Update src/viadot/orchestration/prefect/tasks/azure_sql.py

Co-authored-by: Michał Zawadzki <[email protected]>

* Update src/viadot/orchestration/prefect/tasks/azure_sql.py

Co-authored-by: Michał Zawadzki <[email protected]>

* Update src/viadot/orchestration/prefect/tasks/azure_sql.py

Co-authored-by: Michał Zawadzki <[email protected]>

* Update src/viadot/orchestration/prefect/tasks/azure_sql.py

Co-authored-by: Michał Zawadzki <[email protected]>

* Update tests/integration/orchestration/prefect/flows/test_azure_sql_to_adls.py

Co-authored-by: Michał Zawadzki <[email protected]>

* Update tests/integration/orchestration/prefect/flows/test_azure_sql_to_adls.py

Co-authored-by: Michał Zawadzki <[email protected]>

* Update tests/integration/orchestration/prefect/flows/test_azure_sql_to_adls.py

Co-authored-by: Michał Zawadzki <[email protected]>

* Update tests/integration/orchestration/prefect/flows/test_azure_sql_to_adls.py

Co-authored-by: Michał Zawadzki <[email protected]>

* Update tests/integration/orchestration/prefect/flows/test_azure_sql_to_adls.py

Co-authored-by: Michał Zawadzki <[email protected]>

* Update tests/integration/orchestration/prefect/flows/test_azure_sql_to_adls.py

Co-authored-by: Michał Zawadzki <[email protected]>

* Update tests/integration/orchestration/prefect/flows/test_azure_sql_to_adls.py

Co-authored-by: Michał Zawadzki <[email protected]>

* Update tests/integration/orchestration/prefect/tasks/test_azure_sql.py

Co-authored-by: Michał Zawadzki <[email protected]>

* Update tests/integration/orchestration/prefect/tasks/test_azure_sql.py

Co-authored-by: Michał Zawadzki <[email protected]>

* 🐛 Fix task tests bugs

* 🐛 Fix bugs in azure sql unit tests

* 🎨 Changed docstring description for parameter `convert_bytes`

* 🔥 Remove task tests as all of it is covered in unit tests

* 🎨 Improved structure of the `AzureSQL` source class and added docstring

* ✅ Modified tests structure

* 🎨 Removed unused parameters and improved structure of the code

* 🎨 Removed unused parameters and improved structure of the flow code

* 🎨 Improved structure of the tests code

* 🎨 Improved structure of the `__init__` files

* 🎨 Added extra spaces in `chunk_df`

* 🚧 Added `pragma: allowlist secret`

* 🚧 Added `# noqa: S105`

* 🚧 Added `pragma: allowlist secret`

* 🚨 Fix linter and pre-commit errors

* 🐛 Removed `src`

* ✅ Updated tests

* 🐛 Fixed names

* 🐛 Added fixtures

* Update tests/unit/test_azure_sql.py

Co-authored-by: Michał Zawadzki <[email protected]>

* 🎨 Moved operations from task to source and created new `to_df()` method

* 🎨 Changed to `map` and added `super()` in to_df method

* 📝 Add connector documentation

---------

Co-authored-by: adrian-wojcik <[email protected]>
Co-authored-by: fdelgadodyvenia <[email protected]>
Co-authored-by: Michał Zawadzki <[email protected]>
Co-authored-by: rziemianek <[email protected]>
Co-authored-by: Rafał Ziemianek <[email protected]>
  • Loading branch information
6 people authored Oct 4, 2024
1 parent 8492477 commit 4fe8913
Show file tree
Hide file tree
Showing 14 changed files with 588 additions and 37 deletions.
2 changes: 2 additions & 0 deletions docs/references/orchestration/prefect/flows.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,5 @@
::: viadot.orchestration.prefect.flows.sql_server_to_minio

::: viadot.orchestration.prefect.flows.sql_server_to_parquet

::: viadot.orchestration.prefect.flows.azure_sql_to_adls
2 changes: 2 additions & 0 deletions docs/references/orchestration/prefect/tasks.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,5 @@
::: viadot.orchestration.prefect.tasks.sql_server_query

::: viadot.orchestration.prefect.tasks.sql_server_to_df

::: viadot.orchestration.prefect.tasks.azure_sql_to_df
2 changes: 2 additions & 0 deletions docs/references/sources/sql.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,5 @@
::: viadot.sources.sap_rfc.SAPRFC

::: viadot.sources.sap_rfc.SAPRFCV2

::: viadot.sources.azure_sql.AzureSQL
2 changes: 2 additions & 0 deletions src/viadot/orchestration/prefect/flows/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Import flows."""

from .azure_sql_to_adls import azure_sql_to_adls
from .bigquery_to_adls import bigquery_to_adls
from .cloud_for_customers_to_adls import cloud_for_customers_to_adls
from .cloud_for_customers_to_databricks import cloud_for_customers_to_databricks
Expand Down Expand Up @@ -34,6 +35,7 @@


__all__ = [
"azure_sql_to_adls",
"bigquery_to_adls",
"cloud_for_customers_to_adls",
"cloud_for_customers_to_databricks",
Expand Down
77 changes: 77 additions & 0 deletions src/viadot/orchestration/prefect/flows/azure_sql_to_adls.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
"""Flows for downloading data from Azure SQL and uploading it to Azure ADLS."""

from typing import Any

from prefect import flow
from prefect.task_runners import ConcurrentTaskRunner

from viadot.orchestration.prefect.tasks import azure_sql_to_df, df_to_adls


@flow(
name="Azure SQL extraction to ADLS",
description="Extract data from Azure SQL"
+ " and load it into Azure Data Lake Storage.",
retries=1,
retry_delay_seconds=60,
task_runner=ConcurrentTaskRunner,
log_prints=True,
)
def azure_sql_to_adls(
query: str | None = None,
credentials_secret: str | None = None,
validate_df_dict: dict[str, Any] | None = None,
convert_bytes: bool = False,
remove_special_characters: bool | None = None,
columns_to_clean: list[str] | None = None,
adls_config_key: str | None = None,
adls_azure_key_vault_secret: str | None = None,
adls_path: str | None = None,
adls_path_overwrite: bool = False,
) -> None:
r"""Download data from Azure SQL to a CSV file and uploading it to ADLS.
Args:
query (str): Query to perform on a database. Defaults to None.
credentials_secret (str, optional): The name of the Azure Key Vault
secret containing a dictionary with database credentials.
Defaults to None.
validate_df_dict (Dict[str], optional): A dictionary with optional list of
tests to verify the output dataframe. If defined, triggers the `validate_df`
task from task_utils. Defaults to None.
convert_bytes (bool). A boolean value to trigger method df_converts_bytes_to_int
It is used to convert bytes data type into int, as pulling data with bytes
can lead to malformed data in data frame.
Defaults to False.
remove_special_characters (str, optional): Call a function that remove
special characters like escape symbols. Defaults to None.
columns_to_clean (List(str), optional): Select columns to clean, used with
remove_special_characters. If None whole data frame will be processed.
Defaults to None.
adls_config_key (Optional[str], optional): The key in the viadot config holding
relevant credentials. Defaults to None.
adls_azure_key_vault_secret (Optional[str], optional): The name of the Azure Key
Vault secret containing a dictionary with ACCOUNT_NAME and Service Principal
credentials (TENANT_ID, CLIENT_ID, CLIENT_SECRET) for the Azure Data Lake.
Defaults to None.
adls_path (Optional[str], optional): Azure Data Lake destination file path (with
file name). Defaults to None.
adls_path_overwrite (bool, optional): Whether to overwrite the file in ADLS.
Defaults to True.
"""
data_frame = azure_sql_to_df(
query=query,
credentials_secret=credentials_secret,
validate_df_dict=validate_df_dict,
convert_bytes=convert_bytes,
remove_special_characters=remove_special_characters,
columns_to_clean=columns_to_clean,
)

return df_to_adls(
df=data_frame,
path=adls_path,
credentials_secret=adls_azure_key_vault_secret,
config_key=adls_config_key,
overwrite=adls_path_overwrite,
)
2 changes: 2 additions & 0 deletions src/viadot/orchestration/prefect/tasks/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Imports."""

from .adls import adls_upload, df_to_adls
from .azure_sql import azure_sql_to_df
from .bcp import bcp
from .bigquery import bigquery_to_df
from .cloud_for_customers import cloud_for_customers_to_df
Expand Down Expand Up @@ -31,6 +32,7 @@


__all__ = [
"azure_sql_to_df",
"adls_upload",
"bcp",
"clone_repo",
Expand Down
70 changes: 70 additions & 0 deletions src/viadot/orchestration/prefect/tasks/azure_sql.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
"""Task for downloading data from Azure SQL."""

from typing import Any, Literal

import pandas as pd
from prefect import task

from viadot.orchestration.prefect.utils import get_credentials
from viadot.sources import AzureSQL
from viadot.utils import validate


@task(retries=3, retry_delay_seconds=10, timeout_seconds=60 * 60)
def azure_sql_to_df(
query: str | None = None,
credentials_secret: str | None = None,
validate_df_dict: dict[str, Any] | None = None,
convert_bytes: bool = False,
remove_special_characters: bool | None = None,
columns_to_clean: list[str] | None = None,
if_empty: Literal["warn", "skip", "fail"] = "warn",
) -> pd.DataFrame:
r"""Task to download data from Azure SQL.
Args:
query (str): Query to perform on a database. Defaults to None.
credentials_secret (str, optional): The name of the Azure Key Vault
secret containing a dictionary with database credentials.
Defaults to None.
validate_df_dict (Dict[str], optional): A dictionary with optional list of
tests to verify the output dataframe. If defined, triggers the `validate_df`
task from task_utils. Defaults to None.
convert_bytes (bool). A boolean value to trigger method df_converts_bytes_to_int
It is used to convert bytes data type into int, as pulling data with bytes
can lead to malformed data in data frame.
Defaults to False.
remove_special_characters (str, optional): Call a function that remove
special characters like escape symbols. Defaults to None.
columns_to_clean (List(str), optional): Select columns to clean, used with
remove_special_characters. If None whole data frame will be processed.
Defaults to None.
if_empty (Literal["warn", "skip", "fail"], optional): What to do if the
query returns no data. Defaults to None.
Raises:
ValueError: Raising ValueError if credentials_secret is not provided
Returns:
pd.DataFrame: The response data as a pandas DataFrame.
"""
if not credentials_secret:
msg = "`credentials_secret` has to be specified and not empty."
raise ValueError(msg)

credentials = get_credentials(credentials_secret)

azure_sql = AzureSQL(credentials=credentials)

df = azure_sql.to_df(
query=query,
if_empty=if_empty,
convert_bytes=convert_bytes,
remove_special_characters=remove_special_characters,
columns_to_clean=columns_to_clean,
)

if validate_df_dict is not None:
validate(df=df, tests=validate_df_dict)

return df
41 changes: 4 additions & 37 deletions src/viadot/orchestration/prefect/tasks/task_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ def dtypes_to_json_task(dtypes_dict: dict[str, Any], local_json_path: str) -> No
dtypes_dict (dict): Dictionary containing data types.
local_json_path (str): Path to local json file.
"""
with Path(local_json_path).open("w") as fp:
json.dump(dtypes_dict, fp)
with Path(local_json_path).open("w") as file_path:
json.dump(dtypes_dict, file_path)


@task
Expand Down Expand Up @@ -59,7 +59,7 @@ def get_sql_dtypes_from_df(df: pd.DataFrame) -> dict:
"Categorical": "VARCHAR(500)",
"Time": "TIME",
"Boolean": "VARCHAR(5)", # Bool is True/False, Microsoft expects 0/1
"DateTime": "DATETIMEOFFSET", # DATETIMEOFFSET is the only timezone-aware dtype in TSQL
"DateTime": "DATETIMEOFFSET", # DATETIMEOFFSET is timezone-aware dtype in TSQL
"Object": "VARCHAR(500)",
"EmailAddress": "VARCHAR(50)",
"File": None,
Expand All @@ -73,7 +73,7 @@ def get_sql_dtypes_from_df(df: pd.DataFrame) -> dict:
"String": "VARCHAR(500)",
"IPAddress": "VARCHAR(39)",
"Path": "VARCHAR(255)",
"TimeDelta": "VARCHAR(20)", # datetime.datetime.timedelta; eg. '1 days 11:00:00'
"TimeDelta": "VARCHAR(20)", # datetime.datetime.timedelta; eg.'1 days 11:00:00'
"URL": "VARCHAR(255)",
"Count": "INT",
}
Expand Down Expand Up @@ -209,36 +209,3 @@ def union_dfs_task(dfs: list[pd.DataFrame]) -> pd.DataFrame:
different size of DataFrames NaN values can appear.
"""
return pd.concat(dfs, ignore_index=True)


@task
def df_clean_column(
df: pd.DataFrame, columns_to_clean: list[str] | None = None
) -> pd.DataFrame:
"""Remove special characters from a pandas DataFrame.
Args:
df (pd.DataFrame): The DataFrame to clean.
columns_to_clean (List[str]): A list of columns to clean. Defaults is None.
Returns:
pd.DataFrame: The cleaned DataFrame
"""
df = df.copy()

if columns_to_clean is None:
df.replace(
to_replace=[r"\\t|\\n|\\r", "\t|\n|\r"],
value=["", ""],
regex=True,
inplace=True,
)
else:
for col in columns_to_clean:
df[col].replace(
to_replace=[r"\\t|\\n|\\r", "\t|\n|\r"],
value=["", ""],
regex=True,
inplace=True,
)
return df
2 changes: 2 additions & 0 deletions src/viadot/sources/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from ._duckdb import DuckDB
from ._trino import Trino
from .azure_sql import AzureSQL
from .bigquery import BigQuery
from .cloud_for_customers import CloudForCustomers
from .customer_gauge import CustomerGauge
Expand All @@ -24,6 +25,7 @@


__all__ = [
"AzureSQL",
"BigQuery",
"CloudForCustomers",
"CustomerGauge",
Expand Down
Loading

0 comments on commit 4fe8913

Please sign in to comment.