Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade to pyiceberg #12

Merged
merged 13 commits into from
Feb 16, 2023
5 changes: 3 additions & 2 deletions metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ def get_long_description():

aws_common = {
# AWS Python SDK
"boto3",
"boto3==1.24.59",
# Deal with a version incompatibility between botocore (used by boto3) and urllib3.
# See https://github.com/boto/botocore/pull/2563.
"botocore!=1.23.0",
Expand Down Expand Up @@ -199,7 +199,8 @@ def get_long_description():

iceberg_common = {
# Iceberg Python SDK
"acryl-iceberg-legacy==0.0.4",
"fsspec[abfs]==2023.1.0",
"pyiceberg[adlfs,s3fs]==0.3.0",
"azure-identity==1.10.0",
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
from typing import Dict, Optional, Union
from typing import Dict, Optional

from azure.identity import ClientSecretCredential
from azure.storage.filedatalake import DataLakeServiceClient, FileSystemClient
from pydantic import Field, root_validator

from datahub.configuration import ConfigModel
Expand Down Expand Up @@ -47,29 +45,7 @@ class AdlsSourceConfig(ConfigModel):
)

def get_abfss_url(self, folder_path: str = "") -> str:
if not folder_path.startswith("/"):
folder_path = f"/{folder_path}"
return f"abfss://{self.container_name}@{self.account_name}.dfs.core.windows.net{folder_path}"

def get_filesystem_client(self) -> FileSystemClient:
return self.get_service_client().get_file_system_client(self.container_name)

def get_service_client(self) -> DataLakeServiceClient:
return DataLakeServiceClient(
account_url=f"https://{self.account_name}.dfs.core.windows.net",
credential=self.get_credentials(),
)

def get_credentials(
self,
) -> Union[Optional[str], ClientSecretCredential]:
if self.client_id and self.client_secret and self.tenant_id:
return ClientSecretCredential(
tenant_id=self.tenant_id,
client_id=self.client_id,
client_secret=self.client_secret,
)
return self.sas_token if self.sas_token is not None else self.account_key
return f"abfss://{self.container_name}@{self.account_name}.dfs.core.windows.net{strltrim(folder_path, self.container_name)}"

@root_validator()
def _check_credential_values(cls, values: Dict) -> Dict:
Expand All @@ -86,3 +62,7 @@ def _check_credential_values(cls, values: Dict) -> Dict:
raise ConfigurationError(
"credentials missing, requires one combination of account_key or sas_token or (client_id and client_secret and tenant_id)"
)


def strltrim(to_trim: str, prefix: str) -> str:
return to_trim[len(prefix) :] if to_trim.startswith(prefix) else to_trim
Loading