Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement JobService API calls & connect it to SDK #1129

Merged
merged 13 commits into from
Nov 4, 2020
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ spec:
{{- end }}

env:
- name: FEAST_CORE_URL
value: "{{ .Release.Name }}-feast-core:6565"
- name: FEAST_HISTORICAL_SERVING_URL
value: "{{ .Release.Name }}-feast-batch-serving:6566"

{{- if .Values.gcpServiceAccount.enabled }}
- name: GOOGLE_APPLICATION_CREDENTIALS
value: /etc/secrets/google/{{ .Values.gcpServiceAccount.existingSecret.key }}
Expand Down
4 changes: 4 additions & 0 deletions infra/charts/feast/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ feast-jupyter:
# feast-jupyter.enabled -- Flag to install Feast Jupyter Notebook with SDK
enabled: true

feast-jobservice:
# feast-jobservice.enabled -- Flag to install Feast Job Service
enabled: true

postgresql:
# postgresql.enabled -- Flag to install Postgresql
enabled: true
Expand Down
2 changes: 1 addition & 1 deletion infra/docker/jobservice/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM python:3.7-slim-buster
FROM jupyter/pyspark-notebook:ae5f7e104dd5

USER root
WORKDIR /feast
Expand Down
2 changes: 0 additions & 2 deletions infra/docker/jupyter/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,5 @@ COPY examples .
ENV FEAST_SPARK_LAUNCHER standalone
ENV FEAST_SPARK_STANDALONE_MASTER "local[*]"
ENV FEAST_SPARK_HOME $SPARK_HOME
ENV FEAST_SPARK_EXTRA_OPTIONS "--jars https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop2-latest.jar \
--conf spark.hadoop.fs.gs.impl=com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem"

CMD ["start-notebook.sh", "--NotebookApp.token=''"]
2 changes: 1 addition & 1 deletion protos/feast/core/JobService.proto
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ message StartOfflineToOnlineIngestionJobResponse {
}

message GetHistoricalFeaturesRequest {
// List of features that are being retrieved
// List of feature references that are being retrieved
repeated string feature_refs = 1;

// Batch DataSource that can be used to obtain entity values for historical retrieval.
Expand Down
71 changes: 63 additions & 8 deletions sdk/python/feast/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,13 @@
ListProjectsResponse,
)
from feast.core.CoreService_pb2_grpc import CoreServiceStub
from feast.core.JobService_pb2 import GetHistoricalFeaturesRequest
from feast.core.JobService_pb2 import (
GetHistoricalFeaturesRequest,
GetJobRequest,
ListJobsRequest,
StartOfflineToOnlineIngestionJobRequest,
StartStreamToOnlineIngestionJobRequest,
)
from feast.core.JobService_pb2_grpc import JobServiceStub
from feast.data_format import ParquetFormat
from feast.data_source import BigQuerySource, FileSource
Expand Down Expand Up @@ -94,7 +100,12 @@
start_offline_to_online_ingestion,
start_stream_to_online_ingestion,
)
from feast.remote_job import RemoteRetrievalJob
from feast.remote_job import (
RemoteBatchIngestionJob,
RemoteRetrievalJob,
RemoteStreamIngestionJob,
get_remote_job_from_proto,
)
from feast.serving.ServingService_pb2 import (
GetFeastServingInfoRequest,
GetOnlineFeaturesRequestV2,
Expand Down Expand Up @@ -201,6 +212,10 @@ def _job_service(self):

Returns: JobServiceStub
"""
# Don't try to initialize job service stub if the job service is disabled
if not self._use_job_service:
return None

if not self._job_service_stub:
channel = create_grpc_channel(
url=self._config.get(CONFIG_JOB_SERVICE_URL_KEY),
Expand Down Expand Up @@ -891,8 +906,8 @@ def get_historical_features(
self,
feature_refs: List[str],
entity_source: Union[pd.DataFrame, FileSource, BigQuerySource],
project: str = None,
output_location: str = None,
project: Optional[str] = None,
output_location: Optional[str] = None,
) -> RetrievalJob:
"""
Launch a historical feature retrieval job.
Expand All @@ -915,6 +930,7 @@ def get_historical_features(
retrieval job.
project: Specifies the project that contains the feature tables
which the requested features belong to.
destination_path: Specifies the path in a bucket to write the exported feature data files

Returns:
Returns a retrieval job object that can be used to monitor retrieval
Expand Down Expand Up @@ -1062,18 +1078,57 @@ def start_offline_to_online_ingestion(
:param end: upper datetime boundary
:return: Spark Job Proxy object
"""
return start_offline_to_online_ingestion(feature_table, start, end, self)
if not self._use_job_service:
return start_offline_to_online_ingestion(feature_table, start, end, self)
else:
request = StartOfflineToOnlineIngestionJobRequest(
project=self.project, table_name=feature_table.name,
)
request.start_date.FromDatetime(start)
request.end_date.FromDatetime(end)
response = self._job_service.StartOfflineToOnlineIngestionJob(request)
return RemoteBatchIngestionJob(
self._job_service, self._extra_grpc_params, response.id,
)

def start_stream_to_online_ingestion(
self, feature_table: FeatureTable, extra_jars: Optional[List[str]] = None,
) -> SparkJob:
return start_stream_to_online_ingestion(feature_table, extra_jars or [], self)
if not self._use_job_service:
return start_stream_to_online_ingestion(
feature_table, extra_jars or [], self
)
else:
request = StartStreamToOnlineIngestionJobRequest(
project=self.project, table_name=feature_table.name,
)
response = self._job_service.StartStreamToOnlineIngestionJob(request)
return RemoteStreamIngestionJob(
self._job_service, self._extra_grpc_params, response.id,
)

def list_jobs(self, include_terminated: bool) -> List[SparkJob]:
return list_jobs(include_terminated, self)
if not self._use_job_service:
return list_jobs(include_terminated, self)
else:
request = ListJobsRequest(include_terminated=include_terminated)
response = self._job_service.ListJobs(request)
return [
get_remote_job_from_proto(
self._job_service, self._extra_grpc_params, job
)
for job in response.jobs
]

def get_job_by_id(self, job_id: str) -> SparkJob:
return get_job_by_id(job_id, self)
if not self._use_job_service:
return get_job_by_id(job_id, self)
else:
request = GetJobRequest(job_id=job_id)
response = self._job_service.GetJob(request)
return get_remote_job_from_proto(
self._job_service, self._extra_grpc_params, response.job
)

def stage_dataframe(
self, df: pd.DataFrame, event_timestamp_column: str,
Expand Down
14 changes: 6 additions & 8 deletions sdk/python/feast/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,6 @@ class AuthProvider(Enum):
CONFIG_SPARK_EMR_CLUSTER_TEMPLATE_PATH = "emr_cluster_template_path"
CONFIG_SPARK_EMR_LOG_LOCATION = "emr_log_location"

CONFIG_SPARK_EXTRA_OPTIONS = "spark_extra_options"

# Configuration option default values
FEAST_DEFAULT_OPTIONS = {
# Default Feast project to use
Expand All @@ -115,7 +113,11 @@ class AuthProvider(Enum):
CONFIG_SERVING_ENABLE_SSL_KEY: "False",
# Path to certificate(s) to secure connection to Feast Serving
CONFIG_SERVING_SERVER_SSL_CERT_KEY: "",
# Default connection timeout to Feast Serving and Feast Core (in seconds)
# Enable or disable TLS/SSL to Feast Job Service
CONFIG_JOB_SERVICE_ENABLE_SSL_KEY: "False",
# Path to certificate(s) to secure connection to Feast Job Service
CONFIG_JOB_SERVICE_SERVER_SSL_CERT_KEY: "",
# Default connection timeout to Feast Serving, Feast Core, and Feast Job Service (in seconds)
CONFIG_GRPC_CONNECTION_TIMEOUT_DEFAULT_KEY: "10",
# Default gRPC connection timeout when sending an ApplyFeatureSet command to
# Feast Core (in seconds)
Expand All @@ -128,13 +130,9 @@ class AuthProvider(Enum):
CONFIG_AUTH_PROVIDER: "google",
CONFIG_SPARK_LAUNCHER: "dataproc",
CONFIG_SPARK_INGESTION_JOB_JAR: "gs://feast-jobs/spark/ingestion/feast-ingestion-spark-develop.jar",
CONFIG_SPARK_STANDALONE_MASTER: "local[*]",
CONFIG_REDIS_HOST: "localhost",
CONFIG_REDIS_PORT: "6379",
CONFIG_REDIS_SSL: "False",
CONFIG_SPARK_HISTORICAL_FEATURE_OUTPUT_FORMAT: "parquet",
CONFIG_SPARK_EXTRA_OPTIONS: "",
# Enable or disable TLS/SSL to Feast Service
CONFIG_JOB_SERVICE_ENABLE_SSL_KEY: "False",
# Path to certificate(s) to secure connection to Feast Job Service
CONFIG_JOB_SERVICE_SERVER_SSL_CERT_KEY: "",
}
35 changes: 27 additions & 8 deletions sdk/python/feast/job_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,13 @@
GetJobResponse,
)
from feast.core.JobService_pb2 import Job as JobProto
from feast.core.JobService_pb2 import JobStatus, JobType, ListJobsResponse
from feast.core.JobService_pb2 import (
JobStatus,
JobType,
ListJobsResponse,
StartOfflineToOnlineIngestionJobResponse,
StartStreamToOnlineIngestionJobResponse,
)
from feast.data_source import DataSource
from feast.pyspark.abc import (
BatchIngestionJob,
Expand All @@ -21,7 +27,10 @@
SparkJobStatus,
StreamIngestionJob,
)
from feast.pyspark.launcher import start_historical_feature_retrieval_job
from feast.pyspark.launcher import (
start_historical_feature_retrieval_job,
start_stream_to_online_ingestion,
)
from feast.third_party.grpc.health.v1 import HealthService_pb2_grpc
from feast.third_party.grpc.health.v1.HealthService_pb2 import (
HealthCheckResponse,
Expand Down Expand Up @@ -62,9 +71,15 @@ def _job_to_proto(self, spark_job: SparkJob) -> JobProto:

def StartOfflineToOnlineIngestionJob(self, request, context):
"""Start job to ingest data from offline store into online store"""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details("Method not implemented!")
raise NotImplementedError("Method not implemented!")
feature_table = self.client.get_feature_table(
request.table_name, request.project
)
job = self.client.start_offline_to_online_ingestion(
feature_table,
request.start_date.ToDatetime(),
request.end_date.ToDatetime(),
)
return StartOfflineToOnlineIngestionJobResponse(id=job.get_id())

def GetHistoricalFeatures(self, request: GetHistoricalFeaturesRequest, context):
"""Produce a training dataset, return a job id that will provide a file reference"""
Expand All @@ -86,9 +101,13 @@ def GetHistoricalFeatures(self, request: GetHistoricalFeaturesRequest, context):

def StartStreamToOnlineIngestionJob(self, request, context):
"""Start job to ingest data from stream into online store"""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details("Method not implemented!")
raise NotImplementedError("Method not implemented!")

feature_table = self.client.get_feature_table(
request.table_name, request.project
)
# TODO: add extra_jars to request
job = start_stream_to_online_ingestion(feature_table, [], self.client)
return StartStreamToOnlineIngestionJobResponse(id=job.get_id())

def ListJobs(self, request, context):
"""List all types of jobs"""
Expand Down
24 changes: 0 additions & 24 deletions sdk/python/feast/pyspark/abc.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,15 +113,6 @@ def get_arguments(self) -> List[str]:
"""
raise NotImplementedError

@abc.abstractmethod
def get_extra_options(self) -> str:
"""
Spark job dependencies (expected to resolved from maven)
Returns:
str: Spark job dependencies.
"""
raise NotImplementedError


class RetrievalJobParameters(SparkJobParameters):
def __init__(
Expand All @@ -130,7 +121,6 @@ def __init__(
feature_tables_sources: List[Dict],
entity_source: Dict,
destination: Dict,
extra_options: str = "",
):
"""
Args:
Expand Down Expand Up @@ -242,7 +232,6 @@ def __init__(
self._feature_tables_sources = feature_tables_sources
self._entity_source = entity_source
self._destination = destination
self._extra_options = extra_options

def get_name(self) -> str:
all_feature_tables_names = [ft["name"] for ft in self._feature_tables]
Expand Down Expand Up @@ -271,9 +260,6 @@ def get_arguments(self) -> List[str]:
def get_destination_path(self) -> str:
return self._destination["path"]

def get_extra_options(self) -> str:
return self._extra_options


class RetrievalJob(SparkJob):
"""
Expand Down Expand Up @@ -315,7 +301,6 @@ def __init__(
redis_host: str,
redis_port: int,
redis_ssl: bool,
extra_options: str = "",
):
self._feature_table = feature_table
self._source = source
Expand All @@ -325,7 +310,6 @@ def __init__(
self._redis_host = redis_host
self._redis_port = redis_port
self._redis_ssl = redis_ssl
self._extra_options = extra_options

def get_name(self) -> str:
return (
Expand Down Expand Up @@ -364,9 +348,6 @@ def get_arguments(self) -> List[str]:
json.dumps(self._get_redis_config()),
]

def get_extra_options(self) -> str:
return self._extra_options


class StreamIngestionJobParameters(SparkJobParameters):
def __init__(
Expand All @@ -378,7 +359,6 @@ def __init__(
redis_host: str,
redis_port: int,
redis_ssl: bool,
extra_options="",
):
self._feature_table = feature_table
self._source = source
Expand All @@ -387,7 +367,6 @@ def __init__(
self._redis_host = redis_host
self._redis_port = redis_port
self._redis_ssl = redis_ssl
self._extra_options = extra_options

def get_name(self) -> str:
return f"{self.get_job_type().to_pascal_case()}-{self.get_feature_table_name()}"
Expand Down Expand Up @@ -422,9 +401,6 @@ def get_arguments(self) -> List[str]:
json.dumps(self._get_redis_config()),
]

def get_extra_options(self) -> str:
return self._extra_options


class BatchIngestionJob(SparkJob):
"""
Expand Down
4 changes: 0 additions & 4 deletions sdk/python/feast/pyspark/launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
CONFIG_SPARK_EMR_CLUSTER_TEMPLATE_PATH,
CONFIG_SPARK_EMR_LOG_LOCATION,
CONFIG_SPARK_EMR_REGION,
CONFIG_SPARK_EXTRA_OPTIONS,
CONFIG_SPARK_HOME,
CONFIG_SPARK_INGESTION_JOB_JAR,
CONFIG_SPARK_LAUNCHER,
Expand Down Expand Up @@ -192,7 +191,6 @@ def start_historical_feature_retrieval_job(
for feature_table in feature_tables
],
destination={"format": output_format, "path": output_path},
extra_options=client._config.get(CONFIG_SPARK_EXTRA_OPTIONS),
)
)

Expand Down Expand Up @@ -256,7 +254,6 @@ def start_offline_to_online_ingestion(
redis_host=client._config.get(CONFIG_REDIS_HOST),
redis_port=client._config.getint(CONFIG_REDIS_PORT),
redis_ssl=client._config.getboolean(CONFIG_REDIS_SSL),
extra_options=client._config.get(CONFIG_SPARK_EXTRA_OPTIONS),
)
)

Expand All @@ -277,7 +274,6 @@ def start_stream_to_online_ingestion(
redis_host=client._config.get(CONFIG_REDIS_HOST),
redis_port=client._config.getint(CONFIG_REDIS_PORT),
redis_ssl=client._config.getboolean(CONFIG_REDIS_SSL),
extra_options=client._config.get(CONFIG_SPARK_EXTRA_OPTIONS),
)
)

Expand Down
Loading