Skip to content

Commit

Permalink
✨ Add VidClub source and Prefect tasks (#1044)
Browse files Browse the repository at this point in the history
* 🚀 Add Vid Club connector with tests

* removed prefect dependency from source

* super init passing credentials

* 🎨 Changed `source` to `endpoint`

* 🎨 Change imports structure

* 🐛 Fix import bug

* ♻️ Refactor credential passing method with returning data frame on task level

* 🎨 Improved code structure for VidClub source

* 🐛 Modified building `url`

* ✅  Improved tests code structure

* 🎨 Modified code structure

* 🎨 Modified code structure

* 🎨 Modified code structure

* 🎨 Moved description to the new line

* 🎨 Added `# pragma: allowlist secret`

* 🎨 Modified loggers and added `if_empty` param

* 🔥 Removed logging

---------

Co-authored-by: adrian-wojcik <[email protected]>
Co-authored-by: fdelgadodyvenia <[email protected]>
Co-authored-by: rziemianek <[email protected]>
Co-authored-by: Rafał Ziemianek <[email protected]>
  • Loading branch information
5 people authored Oct 4, 2024
1 parent b49eaa2 commit 8492477
Show file tree
Hide file tree
Showing 9 changed files with 716 additions and 0 deletions.
2 changes: 2 additions & 0 deletions src/viadot/orchestration/prefect/flows/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from .supermetrics_to_adls import supermetrics_to_adls
from .transform import transform
from .transform_and_catalog import transform_and_catalog
from .vid_club_to_adls import vid_club_to_adls


__all__ = [
Expand Down Expand Up @@ -63,4 +64,5 @@
"supermetrics_to_adls",
"transform",
"transform_and_catalog",
"vid_club_to_adls",
]
98 changes: 98 additions & 0 deletions src/viadot/orchestration/prefect/flows/vid_club_to_adls.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
"""Download data from Vid CLub API and load it into Azure Data Lake Storage."""

from typing import Any, Literal

from prefect import flow
from prefect.task_runners import ConcurrentTaskRunner

from viadot.orchestration.prefect.tasks import df_to_adls, vid_club_to_df


@flow(
name="Vid CLub extraction to ADLS",
description="Extract data from Vid CLub and load it into Azure Data Lake Storage.",
retries=1,
retry_delay_seconds=60,
task_runner=ConcurrentTaskRunner,
)
def vid_club_to_adls( # noqa: PLR0913
*args: list[Any],
endpoint: Literal["jobs", "product", "company", "survey"] | None = None,
from_date: str = "2022-03-22",
to_date: str | None = None,
items_per_page: int = 100,
region: Literal["bg", "hu", "hr", "pl", "ro", "si", "all"] | None = None,
days_interval: int = 30,
cols_to_drop: list[str] | None = None,
config_key: str | None = None,
azure_key_vault_secret: str | None = None,
adls_config_key: str | None = None,
adls_azure_key_vault_secret: str | None = None,
adls_path: str | None = None,
adls_path_overwrite: bool = False,
validate_df_dict: dict | None = None,
timeout: int = 3600,
**kwargs: dict[str, Any],
) -> None:
"""Flow for downloading data from the Vid Club via API to a CSV or Parquet file.
Then upload it to Azure Data Lake.
Args:
endpoint (Literal["jobs", "product", "company", "survey"], optional): The
endpoint source to be accessed. Defaults to None.
from_date (str, optional): Start date for the query, by default is the oldest
date in the data 2022-03-22.
to_date (str, optional): End date for the query. By default None,
which will be executed as datetime.today().strftime("%Y-%m-%d") in code.
items_per_page (int, optional): Number of entries per page. Defaults to 100.
region (Literal["bg", "hu", "hr", "pl", "ro", "si", "all"], optional): Region
filter for the query. Defaults to None (parameter is not used in url).
[December 2023 status: value 'all' does not work for company and jobs]
days_interval (int, optional): Days specified in date range per API call
(test showed that 30-40 is optimal for performance). Defaults to 30.
cols_to_drop (List[str], optional): List of columns to drop. Defaults to None.
config_key (str, optional): The key in the viadot config holding relevant
credentials. Defaults to None.
azure_key_vault_secret (Optional[str], optional): The name of the Azure Key
Vault secret where credentials are stored. Defaults to None.
adls_config_key (Optional[str], optional): The key in the viadot config holding
relevant credentials. Defaults to None.
adls_azure_key_vault_secret (Optional[str], optional): The name of the Azure Key
Vault secret containing a dictionary with ACCOUNT_NAME and Service Principal
credentials (TENANT_ID, CLIENT_ID, CLIENT_SECRET) for the Azure Data Lake.
Defaults to None.
adls_path (Optional[str], optional): Azure Data Lake destination file path.
Defaults to None.
adls_path_overwrite (bool, optional): Whether to overwrite the file in ADLS.
Defaults to True.
validate_df_dict (dict, optional): A dictionary with optional list of tests
to verify the output
dataframe. If defined, triggers the `validate_df` task from task_utils.
Defaults to None.
timeout (int, optional): The time (in seconds) to wait while running this task
before a timeout occurs. Defaults to 3600.
"""
data_frame = vid_club_to_df(
args=args,
endpoint=endpoint,
from_date=from_date,
to_date=to_date,
items_per_page=items_per_page,
region=region,
days_interval=days_interval,
cols_to_drop=cols_to_drop,
config_key=config_key,
azure_key_vault_secret=azure_key_vault_secret,
validate_df_dict=validate_df_dict,
timeout=timeout,
kawrgs=kwargs,
)

return df_to_adls(
df=data_frame,
path=adls_path,
credentials_secret=adls_azure_key_vault_secret,
config_key=adls_config_key,
overwrite=adls_path_overwrite,
)
2 changes: 2 additions & 0 deletions src/viadot/orchestration/prefect/tasks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from .sharepoint import sharepoint_download_file, sharepoint_to_df
from .sql_server import create_sql_server_table, sql_server_query, sql_server_to_df
from .supermetrics import supermetrics_to_df
from .vid_club import vid_club_to_df


__all__ = [
Expand Down Expand Up @@ -62,5 +63,6 @@
"sharepoint_to_df",
"sql_server_query",
"sql_server_to_df",
"vid_club_to_df",
"supermetrics_to_df",
]
79 changes: 79 additions & 0 deletions src/viadot/orchestration/prefect/tasks/vid_club.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
"""Task for downloading data from Vid Club Cloud API."""

from typing import Any, Literal

import pandas as pd
from prefect import task

from viadot.orchestration.prefect.exceptions import MissingSourceCredentialsError
from viadot.orchestration.prefect.utils import get_credentials
from viadot.sources import VidClub


@task(retries=3, log_prints=True, retry_delay_seconds=10, timeout_seconds=2 * 60 * 60)
def vid_club_to_df( # noqa: PLR0913
*args: list[Any],
endpoint: Literal["jobs", "product", "company", "survey"] | None = None,
from_date: str = "2022-03-22",
to_date: str | None = None,
items_per_page: int = 100,
region: Literal["bg", "hu", "hr", "pl", "ro", "si", "all"] | None = None,
days_interval: int = 30,
cols_to_drop: list[str] | None = None,
azure_key_vault_secret: str | None = None,
adls_config_key: str | None = None,
validate_df_dict: dict | None = None,
timeout: int = 3600,
**kwargs: dict[str, Any],
) -> pd.DataFrame:
"""Task to downloading data from Vid Club APIs to Pandas DataFrame.
Args:
endpoint (Literal["jobs", "product", "company", "survey"], optional):
The endpoint source to be accessed. Defaults to None.
from_date (str, optional): Start date for the query, by default is the oldest
date in the data 2022-03-22.
to_date (str, optional): End date for the query. By default None,
which will be executed as datetime.today().strftime("%Y-%m-%d") in code.
items_per_page (int, optional): Number of entries per page. Defaults to 100.
region (Literal["bg", "hu", "hr", "pl", "ro", "si", "all"], optional): Region
filter for the query. Defaults to None (parameter is not used in url).
[December 2023 status: value 'all' does not work for company and jobs]
days_interval (int, optional): Days specified in date range per API call
(test showed that 30-40 is optimal for performance). Defaults to 30.
cols_to_drop (List[str], optional): List of columns to drop. Defaults to None.
config_key (str, optional): The key in the viadot config holding relevant
credentials. Defaults to None.
azure_key_vault_secret (Optional[str], optional): The name of the Azure Key
Vault secret where credentials are stored. Defaults to None.
validate_df_dict (dict, optional): A dictionary with optional list of tests
to verify the output
dataframe. If defined, triggers the `validate_df` task from task_utils.
Defaults to None.
timeout (int, optional): The time (in seconds) to wait while running this task
before a timeout occurs. Defaults to 3600.
Returns: Pandas DataFrame
"""
if not (azure_key_vault_secret or adls_config_key):
raise MissingSourceCredentialsError

if not adls_config_key:
credentials = get_credentials(azure_key_vault_secret)

vc_obj = VidClub(
args=args,
endpoint=endpoint,
from_date=from_date,
to_date=to_date,
items_per_page=items_per_page,
region=region,
days_interval=days_interval,
cols_to_drop=cols_to_drop,
vid_club_credentials=credentials,
validate_df_dict=validate_df_dict,
timeout=timeout,
kwargs=kwargs,
)

return vc_obj.to_df()
2 changes: 2 additions & 0 deletions src/viadot/sources/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from .sql_server import SQLServer
from .supermetrics import Supermetrics, SupermetricsCredentials
from .uk_carbon_intensity import UKCarbonIntensity
from .vid_club import VidClub


__all__ = [
Expand All @@ -41,6 +42,7 @@
"SupermetricsCredentials", # pragma: allowlist-secret
"Trino",
"UKCarbonIntensity",
"VidClub",
]
if find_spec("adlfs"):
from viadot.sources.azure_data_lake import AzureDataLake # noqa: F401
Expand Down
Loading

0 comments on commit 8492477

Please sign in to comment.