Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhancement/enable serverless #303

Merged
merged 7 commits into from
Sep 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changes/unreleased/Features-20220909-122924.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
kind: Features
body: Add support for Dataproc Serverless
time: 2022-09-09T12:29:24.993388-07:00
custom:
Author: ChenyuLInx
Issue: "248"
PR: "303"
128 changes: 23 additions & 105 deletions dbt/adapters/bigquery/impl.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,30 @@
from dataclasses import dataclass
from typing import Dict, List, Optional, Any, Set, Union
from typing import Dict, List, Optional, Any, Set, Union, Type
from dbt.dataclass_schema import dbtClassMixin, ValidationError

import dbt.deprecations
import dbt.exceptions
import dbt.clients.agate_helper

from dbt import ui # type: ignore
from dbt.adapters.base import BaseAdapter, available, RelationType, SchemaSearchMap, AdapterConfig
from dbt.adapters.base.impl import log_code_execution
from dbt.adapters.base import (
BaseAdapter,
available,
RelationType,
SchemaSearchMap,
AdapterConfig,
PythonJobHelper,
)

from dbt.adapters.cache import _make_key

from dbt.adapters.bigquery.relation import BigQueryRelation
from dbt.adapters.bigquery import BigQueryColumn
from dbt.adapters.bigquery import BigQueryConnectionManager
from dbt.adapters.bigquery.python_submissions import (
ClusterDataprocHelper,
ServerlessDataProcHelper,
)
from dbt.adapters.bigquery.connections import BigQueryAdapterResponse
from dbt.contracts.graph.manifest import Manifest
from dbt.events import AdapterLogger
Expand Down Expand Up @@ -835,108 +845,16 @@ def run_sql_for_tests(self, sql, fetch, conn=None):
else:
return list(res)

@available.parse_none
@log_code_execution
def submit_python_job(self, parsed_model: dict, compiled_code: str):
# TODO improve the typing here. N.B. Jinja returns a `jinja2.runtime.Undefined` instead
# of `None` which evaluates to True!

# TODO limit this function to run only when doing the materialization of python nodes
# TODO should we also to timeout here?

# validate all additional stuff for python is set
schema = getattr(parsed_model, "schema", self.config.credentials.schema)
identifier = parsed_model["alias"]
python_required_configs = [
"dataproc_region",
"dataproc_cluster_name",
"gcs_bucket",
]
for required_config in python_required_configs:
if not getattr(self.connections.profile.credentials, required_config):
raise ValueError(
f"Need to supply {required_config} in profile to submit python job"
)
if not hasattr(self, "dataproc_helper"):
self.dataproc_helper = DataProcHelper(self.connections.profile.credentials)
model_file_name = f"{schema}/{identifier}.py"
# upload python file to GCS
self.dataproc_helper.upload_to_gcs(model_file_name, compiled_code)
# submit dataproc job
self.dataproc_helper.submit_dataproc_job(model_file_name)

# TODO proper result for this
message = "OK"
code = None
num_rows = None
bytes_processed = None
return BigQueryAdapterResponse( # type: ignore[call-arg]
_message=message,
rows_affected=num_rows,
code=code,
bytes_processed=bytes_processed,
)

def generate_python_submission_response(self, submission_result) -> BigQueryAdapterResponse:
return BigQueryAdapterResponse(_message="OK") # type: ignore[call-arg]

class DataProcHelper:
def __init__(self, credential):
"""_summary_
@property
def default_python_submission_method(self) -> str:
return "serverless"

Args:
credential (_type_): _description_
"""
try:
# Library only needed for python models
from google.cloud import dataproc_v1
from google.cloud import storage
except ImportError:
raise RuntimeError(
"You need to install [dataproc] extras to run python model in dbt-bigquery"
)
self.credential = credential
self.GoogleCredentials = BigQueryConnectionManager.get_credentials(credential)
self.storage_client = storage.Client(
project=self.credential.database, credentials=self.GoogleCredentials
)
self.job_client = dataproc_v1.JobControllerClient(
client_options={
"api_endpoint": "{}-dataproc.googleapis.com:443".format(
self.credential.dataproc_region
)
},
credentials=self.GoogleCredentials,
)

def upload_to_gcs(self, filename: str, compiled_code: str):
bucket = self.storage_client.get_bucket(self.credential.gcs_bucket)
blob = bucket.blob(filename)
blob.upload_from_string(compiled_code)

def submit_dataproc_job(self, filename: str):
# Create the job config.
job = {
"placement": {"cluster_name": self.credential.dataproc_cluster_name},
"pyspark_job": {
"main_python_file_uri": "gs://{}/{}".format(self.credential.gcs_bucket, filename)
},
@property
def python_submission_helpers(self) -> Dict[str, Type[PythonJobHelper]]:
return {
"cluster": ClusterDataprocHelper,
"serverless": ServerlessDataProcHelper,
}
operation = self.job_client.submit_job_as_operation(
request={
"project_id": self.credential.database,
"region": self.credential.dataproc_region,
"job": job,
}
)
response = operation.result()
return response

# TODO: there might be useful results here that we can parse and return
# Dataproc job output is saved to the Cloud Storage bucket
# allocated to the job. Use regex to obtain the bucket and blob info.
# matches = re.match("gs://(.*?)/(.*)", response.driver_output_resource_uri)
# output = (
# self.storage_client
# .get_bucket(matches.group(1))
# .blob(f"{matches.group(2)}.000000000")
# .download_as_string()
# )
152 changes: 152 additions & 0 deletions dbt/adapters/bigquery/python_submissions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
from typing import Dict, Union

from dbt.adapters.base import PythonJobHelper
from dbt.adapters.bigquery import BigQueryConnectionManager, BigQueryCredentials
from google.api_core import retry
from google.api_core.client_options import ClientOptions

try:
# library only needed for python models
from google.cloud import storage, dataproc_v1 # type: ignore
except ImportError:
_has_dataproc_lib = False
else:
_has_dataproc_lib = True


class BaseDataProcHelper(PythonJobHelper):
def __init__(self, parsed_model: Dict, credential: BigQueryCredentials) -> None:
"""_summary_

Args:
credential (_type_): _description_
"""
if not _has_dataproc_lib:
raise RuntimeError(
"You need to install [dataproc] extras to run python model in dbt-bigquery"
)
# validate all additional stuff for python is set
schema = parsed_model["schema"]
identifier = parsed_model["alias"]
self.parsed_model = parsed_model
python_required_configs = [
"dataproc_region",
"gcs_bucket",
]
for required_config in python_required_configs:
if not getattr(credential, required_config):
raise ValueError(
f"Need to supply {required_config} in profile to submit python job"
)
self.model_file_name = f"{schema}/{identifier}.py"
self.credential = credential
self.GoogleCredentials = BigQueryConnectionManager.get_credentials(credential)
self.storage_client = storage.Client(
project=self.credential.database, credentials=self.GoogleCredentials
)
self.gcs_location = "gs://{}/{}".format(self.credential.gcs_bucket, self.model_file_name)

# set retry policy, default to timeout after 24 hours
self.timeout = self.parsed_model["config"].get(
"timeout", self.credential.job_execution_timeout_seconds or 60 * 60 * 24
)
self.retry = retry.Retry(maximum=10.0, deadline=self.timeout)
self.client_options = ClientOptions(
api_endpoint="{}-dataproc.googleapis.com:443".format(self.credential.dataproc_region)
)
self.job_client = self._get_job_client()

def _upload_to_gcs(self, filename: str, compiled_code: str) -> None:
bucket = self.storage_client.get_bucket(self.credential.gcs_bucket)
blob = bucket.blob(filename)
blob.upload_from_string(compiled_code)

def submit(self, compiled_code: str) -> dataproc_v1.types.jobs.Job:
# upload python file to GCS
self._upload_to_gcs(self.model_file_name, compiled_code)
# submit dataproc job
return self._submit_dataproc_job()

def _get_job_client(
self,
) -> Union[dataproc_v1.JobControllerClient, dataproc_v1.BatchControllerClient]:
raise NotImplementedError("_get_job_client not implemented")

def _submit_dataproc_job(self) -> dataproc_v1.types.jobs.Job:
raise NotImplementedError("_submit_dataproc_job not implemented")


class ClusterDataprocHelper(BaseDataProcHelper):
def _get_job_client(self) -> dataproc_v1.JobControllerClient:
if not self._get_cluster_name():
raise ValueError(
"Need to supply dataproc_cluster_name in profile or config to submit python job with cluster submission method"
)
return dataproc_v1.JobControllerClient( # type: ignore
client_options=self.client_options, credentials=self.GoogleCredentials
)

def _get_cluster_name(self) -> str:
return self.parsed_model["config"].get(
"dataproc_cluster_name", self.credential.dataproc_cluster_name
)

def _submit_dataproc_job(self) -> dataproc_v1.types.jobs.Job:
job = {
"placement": {"cluster_name": self._get_cluster_name()},
"pyspark_job": {
"main_python_file_uri": self.gcs_location,
},
}
operation = self.job_client.submit_job_as_operation( # type: ignore
request={
"project_id": self.credential.database,
"region": self.credential.dataproc_region,
"job": job,
}
)
response = operation.result(retry=self.retry)
return response


class ServerlessDataProcHelper(BaseDataProcHelper):
def _get_job_client(self) -> dataproc_v1.BatchControllerClient:
return dataproc_v1.BatchControllerClient(
client_options=self.client_options, credentials=self.GoogleCredentials
)

def _submit_dataproc_job(self) -> dataproc_v1.types.jobs.Job:
# create the Dataproc Serverless job config
batch = dataproc_v1.Batch()
batch.pyspark_batch.main_python_file_uri = self.gcs_location
# how to keep this up to date?
# we should probably also open this up to be configurable
batch.pyspark_batch.jar_file_uris = [
"gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.21.1.jar"
]
# should we make all of these spark/dataproc properties configurable?
# https://cloud.google.com/dataproc-serverless/docs/concepts/properties
# https://cloud.google.com/dataproc-serverless/docs/reference/rest/v1/projects.locations.batches#runtimeconfig
batch.runtime_config.properties = {
"spark.executor.instances": "2",
}
parent = f"projects/{self.credential.database}/locations/{self.credential.dataproc_region}"
request = dataproc_v1.CreateBatchRequest(
parent=parent,
batch=batch,
)
# make the request
operation = self.job_client.create_batch(request=request) # type: ignore
# this takes quite a while, waiting on GCP response to resolve
response = operation.result(retry=self.retry)
return response
# there might be useful results here that we can parse and return
# Dataproc job output is saved to the Cloud Storage bucket
# allocated to the job. Use regex to obtain the bucket and blob info.
# matches = re.match("gs://(.*?)/(.*)", response.driver_output_resource_uri)
# output = (
# self.storage_client
# .get_bucket(matches.group(1))
# .blob(f"{matches.group(2)}.000000000")
# .download_as_string()
# )
2 changes: 1 addition & 1 deletion dbt/include/bigquery/macros/materializations/table.sql
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,6 @@ df = model(dbt, spark)
df.write \
.mode("overwrite") \
.format("bigquery") \
.option("writeMethod", "direct") \
.option("writeMethod", "direct").option("writeDisposition", 'WRITE_TRUNCATE') \
.save("{{target_relation}}")
{% endmacro %}
4 changes: 2 additions & 2 deletions dev-requirements.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# install latest changes in dbt-core
# TODO: how to automate switching from develop to version branches?
git+https://github.com/dbt-labs/dbt-core.git#egg=dbt-core&subdirectory=core
git+https://github.com/dbt-labs/dbt-core.git#egg=dbt-tests-adapter&subdirectory=tests/adapter
git+https://github.com/dbt-labs/dbt-core.git@enhancement/python_submission_helper#egg=dbt-core&subdirectory=core
git+https://github.com/dbt-labs/dbt-core.git@enhancement/python_submission_helper#egg=dbt-tests-adapter&subdirectory=tests/adapter

black==22.8.0
bumpversion
Expand Down
2 changes: 1 addition & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,6 @@ def service_account_target():
'keyfile_json': credentials,
# following 3 for python model
'dataproc_region': os.getenv("DATAPROC_REGION"),
'dataproc_cluster_name': os.getenv("DATAPROC_CLUSTER_NAME"),
'dataproc_cluster_name': os.getenv("DATAPROC_CLUSTER_NAME"), # only needed for cluster submission method
'gcs_bucket': os.getenv("GCS_BUCKET")
}
2 changes: 1 addition & 1 deletion tests/functional/adapter/test_python_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from dbt.tests.util import run_dbt, write_file
import dbt.tests.adapter.python_model.test_python_model as dbt_tests

@pytest.skip("cluster unstable", allow_module_level=True)

class TestPythonIncrementalMatsDataproc(dbt_tests.BasePythonIncrementalTests):
pass

Expand Down