Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨ Add Mediatool source and Prefect tasks #1038

Merged
merged 18 commits into from
Sep 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/viadot/orchestration/prefect/flows/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from .exchange_rates_to_redshift_spectrum import exchange_rates_api_to_redshift_spectrum
from .genesys_to_adls import genesys_to_adls
from .hubspot_to_adls import hubspot_to_adls
from .mediatool_to_adls import mediatool_to_adls
from .mindful_to_adls import mindful_to_adls
from .outlook_to_adls import outlook_to_adls
from .sap_to_parquet import sap_to_parquet
Expand Down Expand Up @@ -41,6 +42,7 @@
"exchange_rates_to_databricks",
"genesys_to_adls",
"hubspot_to_adls",
"mediatool_to_adls",
"mindful_to_adls",
"outlook_to_adls",
"sap_to_parquet",
Expand Down
67 changes: 67 additions & 0 deletions src/viadot/orchestration/prefect/flows/mediatool_to_adls.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
"""'mediatool_to_adls.py'."""

from typing import Any

from prefect import flow
from prefect.task_runners import ConcurrentTaskRunner

from viadot.orchestration.prefect.tasks import df_to_adls, mediatool_to_df


@flow(
name="Mediatool extraction to ADLS",
description="Extract data from Mediatool and load it into Azure Data Lake Storage.",
retries=1,
retry_delay_seconds=60,
task_runner=ConcurrentTaskRunner,
)
def mediatool_to_adls(
config_key: str | None = None,
azure_key_vault_secret: str | None = None,
organization_ids: list[str] | None = None,
media_entries_columns: list[str] | None = None,
adls_credentials: dict[str, Any] | None = None,
trymzet marked this conversation as resolved.
Show resolved Hide resolved
adls_config_key: str | None = None,
adls_azure_key_vault_secret: str | None = None,
adls_path: str | None = None,
adls_path_overwrite: bool = False,
) -> None:
"""Download data from Mediatool to Azure Data Lake.

Args:
config_key (str, optional): The key in the viadot config holding relevant
credentials. Defaults to None.
azure_key_vault_secret (str, optional): The name of the Azure Key Vault secret
where credentials are stored. Defaults to None.
organization_ids (list[str], optional): List of organization IDs.
Defaults to None.
media_entries_columns (list[str], optional): Columns to get from media entries.
Defaults to None.
adls_credentials (dict[str, Any], optional): The credentials as a dictionary.
Defaults to None.
adls_config_key (str, optional): The key in the viadot config holding relevant
credentials. Defaults to None.
adls_azure_key_vault_secret (str, optional): The name of the Azure Key Vault
secret containing a dictionary with ACCOUNT_NAME and Service Principal
credentials (TENANT_ID, CLIENT_ID, CLIENT_SECRET) for the Azure Data Lake.
Defaults to None.
adls_path (str, optional): Azure Data Lake destination file path.
Defaults to None.
adls_path_overwrite (bool, optional): Whether to overwrite the file in ADLS.
Defaults to True.
"""
data_frame = mediatool_to_df(
config_key=config_key,
azure_key_vault_secret=azure_key_vault_secret,
organization_ids=organization_ids,
media_entries_columns=media_entries_columns,
)

return df_to_adls(
df=data_frame,
path=adls_path,
credentials=adls_credentials,
credentials_secret=adls_azure_key_vault_secret,
config_key=adls_config_key,
overwrite=adls_path_overwrite,
)
2 changes: 2 additions & 0 deletions src/viadot/orchestration/prefect/tasks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from .git import clone_repo
from .hubspot import hubspot_to_df
from .luma import luma_ingest_task
from .mediatool import mediatool_to_df
from .mindful import mindful_to_df
from .minio import df_to_minio
from .outlook import outlook_to_df
Expand Down Expand Up @@ -44,6 +45,7 @@
"genesys_to_df",
"hubspot_to_df",
"luma_ingest_task",
"mediatool_to_df",
"mindful_to_df",
"outlook_to_df",
"s3_upload_file",
Expand Down
160 changes: 160 additions & 0 deletions src/viadot/orchestration/prefect/tasks/mediatool.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
"""'mediatool.py'."""

import pandas as pd
from prefect import get_run_logger, task

from viadot.orchestration.prefect.exceptions import MissingSourceCredentialsError
from viadot.orchestration.prefect.utils import get_credentials
from viadot.sources import Mediatool
from viadot.utils import join_dfs


@task(retries=3, log_prints=True, retry_delay_seconds=10, timeout_seconds=60 * 60)
def mediatool_to_df(
config_key: str | None = None,
azure_key_vault_secret: str | None = None,
organization_ids: list[str] | None = None,
media_entries_columns: list[str] | None = None,
) -> pd.DataFrame:
"""Task to download data from Mediatool API.

Data from different endpoints are retrieved and combined. A final object is created
containing data for all organizations from the list.

Args:
config_key (str, optional): The key in the viadot config holding relevant
credentials. Defaults to None.
azure_key_vault_secret (str, optional): The name of the Azure Key Vault secret
where credentials are stored. Defaults to None.
organization_ids (list[str], optional): List of organization IDs.
Defaults to None.
media_entries_columns (list[str], optional): Columns to get from media entries.
Defaults to None.

Raises:
ValueError: 'organization_ids' argument is None, and one is mandatory.
ValueError: One 'organization_id' is not in organizations.

Returns:
pd.DataFrame: The response data as a Pandas Data Frame.
"""
logger = get_run_logger()

if not (azure_key_vault_secret or config_key):
raise MissingSourceCredentialsError

if not config_key:
credentials = get_credentials(azure_key_vault_secret)

mediatool = Mediatool(
credentials=credentials,
config_key=config_key,
)
# first method ORGANIZATIONS
method = "organizations"
organizations_data = mediatool.api_connection(get_data_from=method)
df_organizations = mediatool.to_df(data=organizations_data, column_suffix=method)
trymzet marked this conversation as resolved.
Show resolved Hide resolved

if organization_ids is None:
message = "No organizations were defined."
raise ValueError(message)

list_of_organizations_df = []
for organization_id in organization_ids:
if organization_id in df_organizations["_id_organizations"].unique():
logger.info(f"Downloading data for: {organization_id} ...")

# extract media entries per organization
media_entries_data = mediatool.api_connection(
get_data_from="media_entries",
organization_id=organization_id,
)
df_media_entries = mediatool.to_df(
data=media_entries_data, drop_columns=media_entries_columns
)
unique_vehicle_ids = df_media_entries["vehicleId"].unique()
unique_media_type_ids = df_media_entries["mediaTypeId"].unique()

# extract vehicles
method = "vehicles"
vehicles_data = mediatool.api_connection(
get_data_from=method,
vehicle_ids=unique_vehicle_ids,
)
df_vehicles = mediatool.to_df(data=vehicles_data, column_suffix=method)

# extract campaigns
method = "campaigns"
campaigns_data = mediatool.api_connection(
get_data_from=method,
organization_id=organization_id,
)
df_campaigns = mediatool.to_df(data=campaigns_data, column_suffix=method)

# extract media types
method = "media_types"
media_types_data = mediatool.api_connection(
get_data_from=method,
media_type_ids=unique_media_type_ids,
)
df_media_types = mediatool.to_df(
data=media_types_data, column_suffix=method
)

# join media entries & organizations
df_merged_entries_orgs = join_dfs(
df_left=df_media_entries,
df_right=df_organizations,
left_on="organizationId",
right_on="_id_organizations",
columns_from_right_df=[
"_id_organizations",
"name_organizations",
"abbreviation_organizations",
],
how="left",
)

# join the previous merge & campaigns
df_merged_campaigns = join_dfs(
df_left=df_merged_entries_orgs,
df_right=df_campaigns,
left_on="campaignId",
right_on="_id_campaigns",
columns_from_right_df=[
"_id_campaigns",
"name_campaigns",
"conventionalName_campaigns",
],
how="left",
)

# join the previous merge & vehicles
df_merged_vehicles = join_dfs(
df_left=df_merged_campaigns,
df_right=df_vehicles,
left_on="vehicleId",
right_on="_id_vehicles",
columns_from_right_df=["_id_vehicles", "name_vehicles"],
how="left",
)

# join the previous merge & media types
df_merged_media_types = join_dfs(
df_left=df_merged_vehicles,
df_right=df_media_types,
left_on="mediaTypeId",
right_on="_id_media_types",
columns_from_right_df=["_id_media_types", "name_media_types"],
how="left",
)

list_of_organizations_df.append(df_merged_media_types)

else:
message = (
f"Organization - {organization_id} not found in organizations list."
)
raise ValueError(message)

return pd.concat(list_of_organizations_df)
trymzet marked this conversation as resolved.
Show resolved Hide resolved
2 changes: 2 additions & 0 deletions src/viadot/sources/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from .exchange_rates import ExchangeRates
from .genesys import Genesys
from .hubspot import Hubspot
from .mediatool import Mediatool
from .mindful import Mindful
from .outlook import Outlook
from .sftp import Sftp
Expand All @@ -26,6 +27,7 @@
"ExchangeRates",
"Genesys",
"Hubspot",
"Mediatool",
"Mindful",
"Sftp",
"Outlook",
Expand Down
Loading