From 221badc95517c7b7ddb45b65b3f07a62581df661 Mon Sep 17 00:00:00 2001 From: Tsotne Tabidze Date: Thu, 29 Oct 2020 09:54:35 +0400 Subject: [PATCH 01/10] Implement half of JobService functionality Signed-off-by: Tsotne Tabidze --- .../templates/deployment.yaml | 5 + infra/charts/feast/values.yaml | 4 + infra/docker/jobservice/Dockerfile | 8 +- protos/feast/core/JobService.proto | 4 +- sdk/python/feast/client.py | 92 +++++++++++++------ sdk/python/feast/constants.py | 11 ++- sdk/python/feast/job_service.py | 59 ++++++++++-- 7 files changed, 144 insertions(+), 39 deletions(-) diff --git a/infra/charts/feast/charts/feast-jobservice/templates/deployment.yaml b/infra/charts/feast/charts/feast-jobservice/templates/deployment.yaml index 5a3429c5f1..9ffd402363 100644 --- a/infra/charts/feast/charts/feast-jobservice/templates/deployment.yaml +++ b/infra/charts/feast/charts/feast-jobservice/templates/deployment.yaml @@ -57,6 +57,11 @@ spec: {{- end }} env: + - name: FEAST_CORE_URL + value: "{{ .Release.Name }}-feast-core:6565" + - name: FEAST_HISTORICAL_SERVING_URL + value: "{{ .Release.Name }}-feast-batch-serving:6566" + {{- if .Values.gcpServiceAccount.enabled }} - name: GOOGLE_APPLICATION_CREDENTIALS value: /etc/secrets/google/{{ .Values.gcpServiceAccount.existingSecret.key }} diff --git a/infra/charts/feast/values.yaml b/infra/charts/feast/values.yaml index c3658f88a2..c06f44c713 100644 --- a/infra/charts/feast/values.yaml +++ b/infra/charts/feast/values.yaml @@ -13,6 +13,10 @@ feast-jupyter: # feast-jupyter.enabled -- Flag to install Feast Jupyter Notebook with SDK enabled: true +feast-jobservice: + # feast-jobservice.enabled -- Flag to install Feast Job Service + enabled: true + postgresql: # postgresql.enabled -- Flag to install Postgresql enabled: true diff --git a/infra/docker/jobservice/Dockerfile b/infra/docker/jobservice/Dockerfile index 29a3d97f90..4eb000441b 100644 --- a/infra/docker/jobservice/Dockerfile +++ b/infra/docker/jobservice/Dockerfile @@ -1,4 +1,4 @@ -FROM python:3.7-slim-buster +FROM jupyter/pyspark-notebook:ae5f7e104dd5 USER root WORKDIR /feast @@ -27,4 +27,10 @@ RUN wget -q https://github.com/grpc-ecosystem/grpc-health-probe/releases/downloa -O /usr/bin/grpc-health-probe && \ chmod +x /usr/bin/grpc-health-probe +ENV FEAST_SPARK_LAUNCHER standalone +ENV FEAST_SPARK_STANDALONE_MASTER "local[*]" +ENV FEAST_SPARK_HOME $SPARK_HOME +ENV FEAST_SPARK_EXTRA_OPTIONS "--jars https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop2-latest.jar \ +--conf spark.hadoop.fs.gs.impl=com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem" + CMD ["feast", "server"] diff --git a/protos/feast/core/JobService.proto b/protos/feast/core/JobService.proto index 861f3b74a5..3ec6aea60c 100644 --- a/protos/feast/core/JobService.proto +++ b/protos/feast/core/JobService.proto @@ -126,8 +126,8 @@ message StartOfflineToOnlineIngestionJobResponse { } message GetHistoricalFeaturesRequest { - // List of features that are being retrieved - repeated feast.serving.FeatureReferenceV2 features = 1; + // List of feature references that are being retrieved + repeated string feature_refs = 1; // Batch DataSource that can be used to obtain entity values for historical retrieval. // For each entity value, a feature value will be retrieved for that value/timestamp diff --git a/sdk/python/feast/client.py b/sdk/python/feast/client.py index 9355539067..7867331dc3 100644 --- a/sdk/python/feast/client.py +++ b/sdk/python/feast/client.py @@ -33,6 +33,7 @@ CONFIG_ENABLE_AUTH_KEY, CONFIG_GRPC_CONNECTION_TIMEOUT_DEFAULT_KEY, CONFIG_JOB_SERVICE_ENABLE_SSL_KEY, + CONFIG_JOB_SERVICE_ENABLED, CONFIG_JOB_SERVICE_SERVER_SSL_CERT_KEY, CONFIG_JOB_SERVICE_URL_KEY, CONFIG_PROJECT_KEY, @@ -67,6 +68,11 @@ ) from feast.core.CoreService_pb2_grpc import CoreServiceStub from feast.core.JobService_pb2_grpc import JobServiceStub +from feast.core.JobService_pb2 import ( + GetHistoricalFeaturesRequest, + StartOfflineToOnlineIngestionJobRequest, + StartStreamToOnlineIngestionJobRequest, +) from feast.data_format import ParquetFormat from feast.data_source import BigQuerySource, FileSource from feast.entity import Entity @@ -190,6 +196,10 @@ def _job_service(self): Returns: JobServiceStub """ + # Don't initialize job service stub if the job service is disabled + if self._config.get(CONFIG_JOB_SERVICE_ENABLED) == "False": + return None + if not self._job_service_stub: channel = create_grpc_channel( url=self._config.get(CONFIG_JOB_SERVICE_URL_KEY), @@ -853,8 +863,9 @@ def get_historical_features( self, feature_refs: List[str], entity_source: Union[pd.DataFrame, FileSource, BigQuerySource], - project: str = None, - ) -> RetrievalJob: + project: Optional[str] = None, + destination_path: Optional[str] = None, + ) -> Union[RetrievalJob, str]: """ Launch a historical feature retrieval job. @@ -873,11 +884,12 @@ def get_historical_features( retrieval job. project: Specifies the project that contains the feature tables which the requested features belong to. + destination_path: Specifies the path in a bucket to write the exported feature data files Returns: - Returns a retrieval job object that can be used to monitor retrieval - progress asynchronously, and can be used to materialize the - results. + If jobs are launched locally, returns a retrieval job object that can be used to monitor retrieval + progress asynchronously, and can be used to materialize the results. + Otherwise, if jobs are launched through Feast Job Service, returns a job id. Examples: >>> from feast import Client @@ -890,15 +902,6 @@ def get_historical_features( >>> output_file_uri = feature_retrieval_job.get_output_file_uri() "gs://some-bucket/output/ """ - feature_tables = self._get_feature_tables_from_feature_refs( - feature_refs, project - ) - output_location = os.path.join( - self._config.get(CONFIG_SPARK_HISTORICAL_FEATURE_OUTPUT_LOCATION), - str(uuid.uuid4()), - ) - output_format = self._config.get(CONFIG_SPARK_HISTORICAL_FEATURE_OUTPUT_FORMAT) - if isinstance(entity_source, pd.DataFrame): staging_location = self._config.get(CONFIG_SPARK_STAGING_LOCATION) entity_staging_uri = urlparse( @@ -922,13 +925,29 @@ def get_historical_features( entity_staging_uri.geturl(), ) - return start_historical_feature_retrieval_job( - self, - entity_source, - feature_tables, - output_format, - os.path.join(output_location, str(uuid.uuid4())), - ) + if destination_path is None: + destination_path = self._config.get(CONFIG_SPARK_HISTORICAL_FEATURE_OUTPUT_LOCATION) + destination_path = os.path.join(destination_path, str(uuid.uuid4())) + + if not self._job_service: + feature_tables = self._get_feature_tables_from_feature_refs( + feature_refs, project + ) + output_format = self._config.get(CONFIG_SPARK_HISTORICAL_FEATURE_OUTPUT_FORMAT) + + + return start_historical_feature_retrieval_job( + self, entity_source, feature_tables, output_format, destination_path + ) + else: + request = GetHistoricalFeaturesRequest( + feature_refs=feature_refs, + entities_source=entity_source.to_proto(), + project=project, + destination_path=destination_path, + ) + response = self._job_service.GetHistoricalFeatures(request) + return response.id def get_historical_features_df( self, @@ -993,7 +1012,7 @@ def _get_feature_tables_from_feature_refs( def start_offline_to_online_ingestion( self, feature_table: FeatureTable, start: datetime, end: datetime, - ) -> SparkJob: + ) -> Union[SparkJob, str]: """ Launch Ingestion Job from Batch Source to Online Store for given featureTable @@ -1001,14 +1020,35 @@ def start_offline_to_online_ingestion( :param feature_table: FeatureTable which will be ingested :param start: lower datetime boundary :param end: upper datetime boundary - :return: Spark Job Proxy object + :return: Spark Job Proxy object if jobs are launched locally, + or Spark Job ID if jobs are launched through Feast Job Service """ - return start_offline_to_online_ingestion(feature_table, start, end, self) + if not self._job_service: + return start_offline_to_online_ingestion(feature_table, start, end, self) + else: + request = StartOfflineToOnlineIngestionJobRequest( + project=self.project, + table_name=feature_table.name, + ) + request.start_date.FromDatetime(start) + request.end_date.FromDatetime(end) + response = self._job_service.StartOfflineToOnlineIngestionJob(request) + return response.id def start_stream_to_online_ingestion( self, feature_table: FeatureTable, extra_jars: Optional[List[str]] = None, - ) -> SparkJob: - return start_stream_to_online_ingestion(feature_table, extra_jars or [], self) + ) -> Union[SparkJob, str]: + if not self._job_service: + return start_stream_to_online_ingestion( + feature_table, extra_jars or [], self + ) + else: + request = StartStreamToOnlineIngestionJobRequest( + project=self.project, + table_name=feature_table.name, + ) + response = self._job_service.StartStreamToOnlineIngestionJob(request) + return response.id def stage_dataframe( self, diff --git a/sdk/python/feast/constants.py b/sdk/python/feast/constants.py index c62ca88d77..ae3e49e83e 100644 --- a/sdk/python/feast/constants.py +++ b/sdk/python/feast/constants.py @@ -52,6 +52,7 @@ class AuthProvider(Enum): CONFIG_JOB_SERVICE_URL_KEY = "job_service_url" CONFIG_JOB_SERVICE_ENABLE_SSL_KEY = "job_service_enable_ssl" CONFIG_JOB_SERVICE_SERVER_SSL_CERT_KEY = "job_service_server_ssl_cert" +CONFIG_JOB_SERVICE_ENABLED = "job_service_enabled" CONFIG_GRPC_CONNECTION_TIMEOUT_DEFAULT_KEY = "grpc_connection_timeout_default" CONFIG_GRPC_CONNECTION_TIMEOUT_APPLY_KEY = "grpc_connection_timeout_apply" CONFIG_BATCH_FEATURE_REQUEST_WAIT_TIME_SECONDS_KEY = ( @@ -115,7 +116,15 @@ class AuthProvider(Enum): CONFIG_SERVING_ENABLE_SSL_KEY: "False", # Path to certificate(s) to secure connection to Feast Serving CONFIG_SERVING_SERVER_SSL_CERT_KEY: "", - # Default connection timeout to Feast Serving and Feast Core (in seconds) + # Default Feast Job Service URL + CONFIG_JOB_SERVICE_URL_KEY: "localhost:6568", + # Enable or disable TLS/SSL to Feast Job Service + CONFIG_JOB_SERVICE_ENABLE_SSL_KEY: "False", + # Path to certificate(s) to secure connection to Feast Job Service + CONFIG_JOB_SERVICE_SERVER_SSL_CERT_KEY: "", + # Enable or disable Feast Job Service + CONFIG_JOB_SERVICE_ENABLED: "False", + # Default connection timeout to Feast Serving, Feast Core, and Feast Job Service (in seconds) CONFIG_GRPC_CONNECTION_TIMEOUT_DEFAULT_KEY: "3", # Default gRPC connection timeout when sending an ApplyFeatureSet command to # Feast Core (in seconds) diff --git a/sdk/python/feast/job_service.py b/sdk/python/feast/job_service.py index 53acb63066..c0f56489b9 100644 --- a/sdk/python/feast/job_service.py +++ b/sdk/python/feast/job_service.py @@ -3,7 +3,27 @@ import grpc import feast +from feast.constants import ( + CONFIG_SPARK_HISTORICAL_FEATURE_OUTPUT_FORMAT, + CONFIG_SPARK_HISTORICAL_FEATURE_OUTPUT_LOCATION, +) from feast.core import JobService_pb2_grpc +from feast.core.JobService_pb2 import ( + GetHistoricalFeaturesResponse, + GetJobResponse, + ListJobsResponse, + StartOfflineToOnlineIngestionJobResponse, + StartStreamToOnlineIngestionJobResponse, + StopJobResponse, +) +from feast.data_source import DataSource +from feast.pyspark.launcher import ( + stage_dataframe, + start_historical_feature_retrieval_job, + start_historical_feature_retrieval_spark_session, + start_offline_to_online_ingestion, + start_stream_to_online_ingestion, +) from feast.third_party.grpc.health.v1 import HealthService_pb2_grpc from feast.third_party.grpc.health.v1.HealthService_pb2 import ( HealthCheckResponse, @@ -17,21 +37,42 @@ def __init__(self): def StartOfflineToOnlineIngestionJob(self, request, context): """Start job to ingest data from offline store into online store""" - context.set_code(grpc.StatusCode.UNIMPLEMENTED) - context.set_details("Method not implemented!") - raise NotImplementedError("Method not implemented!") + feature_table = self.client.get_feature_table( + request.table_name, request.project + ) + job = start_offline_to_online_ingestion( + feature_table, + request.start_date.ToDatetime(), + request.end_date.ToDatetime(), + self.client, + ) + return StartOfflineToOnlineIngestionJobResponse(id=job.get_id()) def GetHistoricalFeatures(self, request, context): """Produce a training dataset, return a job id that will provide a file reference""" - context.set_code(grpc.StatusCode.UNIMPLEMENTED) - context.set_details("Method not implemented!") - raise NotImplementedError("Method not implemented!") + feature_tables = self.client._get_feature_tables_from_feature_refs( + request.feature_refs, request.project + ) + output_format = self.client._config.get( + CONFIG_SPARK_HISTORICAL_FEATURE_OUTPUT_FORMAT + ) + + job = start_historical_feature_retrieval_job( + self.client, + DataSource.from_proto(request.entities_source), + feature_tables, + output_format, + request.destination_path, + ) + return GetHistoricalFeaturesResponse(id=job.get_id()) def StartStreamToOnlineIngestionJob(self, request, context): """Start job to ingest data from stream into online store""" - context.set_code(grpc.StatusCode.UNIMPLEMENTED) - context.set_details("Method not implemented!") - raise NotImplementedError("Method not implemented!") + feature_table = self.client.get_feature_table( + request.table_name, request.project + ) + job = start_stream_to_online_ingestion(feature_table, [], self.client) + return StartStreamToOnlineIngestionJobResponse(id=job.get_id()) def ListJobs(self, request, context): """List all types of jobs""" From 2c055f71ee94e56e29ea41de93e5756debb2a751 Mon Sep 17 00:00:00 2001 From: Tsotne Tabidze Date: Thu, 29 Oct 2020 23:34:16 +0400 Subject: [PATCH 02/10] Python lint Signed-off-by: Tsotne Tabidze --- sdk/python/feast/client.py | 32 ++++++++++++++++---------------- sdk/python/feast/job_service.py | 10 +--------- 2 files changed, 17 insertions(+), 25 deletions(-) diff --git a/sdk/python/feast/client.py b/sdk/python/feast/client.py index 7867331dc3..8cedf667e5 100644 --- a/sdk/python/feast/client.py +++ b/sdk/python/feast/client.py @@ -67,12 +67,12 @@ ListProjectsResponse, ) from feast.core.CoreService_pb2_grpc import CoreServiceStub -from feast.core.JobService_pb2_grpc import JobServiceStub from feast.core.JobService_pb2 import ( GetHistoricalFeaturesRequest, StartOfflineToOnlineIngestionJobRequest, StartStreamToOnlineIngestionJobRequest, ) +from feast.core.JobService_pb2_grpc import JobServiceStub from feast.data_format import ParquetFormat from feast.data_source import BigQuerySource, FileSource from feast.entity import Entity @@ -865,7 +865,7 @@ def get_historical_features( entity_source: Union[pd.DataFrame, FileSource, BigQuerySource], project: Optional[str] = None, destination_path: Optional[str] = None, - ) -> Union[RetrievalJob, str]: + ) -> RetrievalJob: """ Launch a historical feature retrieval job. @@ -887,9 +887,9 @@ def get_historical_features( destination_path: Specifies the path in a bucket to write the exported feature data files Returns: - If jobs are launched locally, returns a retrieval job object that can be used to monitor retrieval - progress asynchronously, and can be used to materialize the results. - Otherwise, if jobs are launched through Feast Job Service, returns a job id. + Returns a retrieval job object that can be used to monitor retrieval + progress asynchronously, and can be used to materialize the + results. Examples: >>> from feast import Client @@ -926,15 +926,18 @@ def get_historical_features( ) if destination_path is None: - destination_path = self._config.get(CONFIG_SPARK_HISTORICAL_FEATURE_OUTPUT_LOCATION) + destination_path = self._config.get( + CONFIG_SPARK_HISTORICAL_FEATURE_OUTPUT_LOCATION + ) destination_path = os.path.join(destination_path, str(uuid.uuid4())) if not self._job_service: feature_tables = self._get_feature_tables_from_feature_refs( feature_refs, project ) - output_format = self._config.get(CONFIG_SPARK_HISTORICAL_FEATURE_OUTPUT_FORMAT) - + output_format = self._config.get( + CONFIG_SPARK_HISTORICAL_FEATURE_OUTPUT_FORMAT + ) return start_historical_feature_retrieval_job( self, entity_source, feature_tables, output_format, destination_path @@ -1012,7 +1015,7 @@ def _get_feature_tables_from_feature_refs( def start_offline_to_online_ingestion( self, feature_table: FeatureTable, start: datetime, end: datetime, - ) -> Union[SparkJob, str]: + ) -> SparkJob: """ Launch Ingestion Job from Batch Source to Online Store for given featureTable @@ -1020,15 +1023,13 @@ def start_offline_to_online_ingestion( :param feature_table: FeatureTable which will be ingested :param start: lower datetime boundary :param end: upper datetime boundary - :return: Spark Job Proxy object if jobs are launched locally, - or Spark Job ID if jobs are launched through Feast Job Service + :return: Spark Job Proxy object """ if not self._job_service: return start_offline_to_online_ingestion(feature_table, start, end, self) else: request = StartOfflineToOnlineIngestionJobRequest( - project=self.project, - table_name=feature_table.name, + project=self.project, table_name=feature_table.name, ) request.start_date.FromDatetime(start) request.end_date.FromDatetime(end) @@ -1037,15 +1038,14 @@ def start_offline_to_online_ingestion( def start_stream_to_online_ingestion( self, feature_table: FeatureTable, extra_jars: Optional[List[str]] = None, - ) -> Union[SparkJob, str]: + ) -> SparkJob: if not self._job_service: return start_stream_to_online_ingestion( feature_table, extra_jars or [], self ) else: request = StartStreamToOnlineIngestionJobRequest( - project=self.project, - table_name=feature_table.name, + project=self.project, table_name=feature_table.name, ) response = self._job_service.StartStreamToOnlineIngestionJob(request) return response.id diff --git a/sdk/python/feast/job_service.py b/sdk/python/feast/job_service.py index c0f56489b9..22f58d4c61 100644 --- a/sdk/python/feast/job_service.py +++ b/sdk/python/feast/job_service.py @@ -3,24 +3,16 @@ import grpc import feast -from feast.constants import ( - CONFIG_SPARK_HISTORICAL_FEATURE_OUTPUT_FORMAT, - CONFIG_SPARK_HISTORICAL_FEATURE_OUTPUT_LOCATION, -) +from feast.constants import CONFIG_SPARK_HISTORICAL_FEATURE_OUTPUT_FORMAT from feast.core import JobService_pb2_grpc from feast.core.JobService_pb2 import ( GetHistoricalFeaturesResponse, - GetJobResponse, - ListJobsResponse, StartOfflineToOnlineIngestionJobResponse, StartStreamToOnlineIngestionJobResponse, - StopJobResponse, ) from feast.data_source import DataSource from feast.pyspark.launcher import ( - stage_dataframe, start_historical_feature_retrieval_job, - start_historical_feature_retrieval_spark_session, start_offline_to_online_ingestion, start_stream_to_online_ingestion, ) From 7e68055ed7752b2a42dfe39f7bfdcb0fb3b0d5ce Mon Sep 17 00:00:00 2001 From: Tsotne Tabidze Date: Fri, 30 Oct 2020 04:59:49 +0400 Subject: [PATCH 03/10] Correct implementation of all jobservice functions tested on standalone mode * New API calls (start_offline_to_online_ingestion, start_stream_to_online_ingestion) now return Remote Jobs instead of job ids * Implement list_jobs & get_job for standalone mode (looks like Spark is running in local mode and we can't get job statuses so we have to keep cache in memory) * Wire up list_jobs & get_job on client side with job service * Tested locally on Feast 101 notebook, everything works Signed-off-by: Tsotne Tabidze --- sdk/python/feast/client.py | 43 +++++++++++++++---- sdk/python/feast/constants.py | 6 --- sdk/python/feast/job_service.py | 10 ++--- .../pyspark/launchers/standalone/local.py | 20 ++++++--- sdk/python/feast/remote_job.py | 22 +++++++++- 5 files changed, 76 insertions(+), 25 deletions(-) diff --git a/sdk/python/feast/client.py b/sdk/python/feast/client.py index 94e02b692b..4b700e447f 100644 --- a/sdk/python/feast/client.py +++ b/sdk/python/feast/client.py @@ -33,7 +33,6 @@ CONFIG_ENABLE_AUTH_KEY, CONFIG_GRPC_CONNECTION_TIMEOUT_DEFAULT_KEY, CONFIG_JOB_SERVICE_ENABLE_SSL_KEY, - CONFIG_JOB_SERVICE_ENABLED, CONFIG_JOB_SERVICE_SERVER_SSL_CERT_KEY, CONFIG_JOB_SERVICE_URL_KEY, CONFIG_PROJECT_KEY, @@ -69,6 +68,8 @@ from feast.core.CoreService_pb2_grpc import CoreServiceStub from feast.core.JobService_pb2 import ( GetHistoricalFeaturesRequest, + GetJobRequest, + ListJobsRequest, StartOfflineToOnlineIngestionJobRequest, StartStreamToOnlineIngestionJobRequest, ) @@ -100,7 +101,12 @@ start_offline_to_online_ingestion, start_stream_to_online_ingestion, ) -from feast.remote_job import RemoteRetrievalJob +from feast.remote_job import ( + RemoteBatchIngestionJob, + RemoteRetrievalJob, + RemoteStreamIngestionJob, + get_remote_job_from_proto, +) from feast.serving.ServingService_pb2 import ( GetFeastServingInfoRequest, GetOnlineFeaturesRequestV2, @@ -203,8 +209,8 @@ def _job_service(self): Returns: JobServiceStub """ - # Don't initialize job service stub if the job service is disabled - if self._config.get(CONFIG_JOB_SERVICE_ENABLED) == "False": + # Don't try to initialize job service stub if the job service is disabled + if not self._use_job_service: return None if not self._job_service_stub: @@ -1055,7 +1061,9 @@ def start_offline_to_online_ingestion( request.start_date.FromDatetime(start) request.end_date.FromDatetime(end) response = self._job_service.StartOfflineToOnlineIngestionJob(request) - return response.id + return RemoteBatchIngestionJob( + self._job_service, self._extra_grpc_params, response.id, + ) def start_stream_to_online_ingestion( self, feature_table: FeatureTable, extra_jars: Optional[List[str]] = None, @@ -1069,13 +1077,32 @@ def start_stream_to_online_ingestion( project=self.project, table_name=feature_table.name, ) response = self._job_service.StartStreamToOnlineIngestionJob(request) - return response.id + return RemoteStreamIngestionJob( + self._job_service, self._extra_grpc_params, response.id, + ) def list_jobs(self, include_terminated: bool) -> List[SparkJob]: - return list_jobs(include_terminated, self) + if not self._use_job_service: + return list_jobs(include_terminated, self) + else: + request = ListJobsRequest(include_terminated=include_terminated) + response = self._job_service.ListJobs(request) + return [ + get_remote_job_from_proto( + self._job_service, self._extra_grpc_params, job + ) + for job in response.jobs + ] def get_job_by_id(self, job_id: str) -> SparkJob: - return get_job_by_id(job_id, self) + if not self._use_job_service: + return get_job_by_id(job_id, self) + else: + request = GetJobRequest(job_id=job_id) + response = self._job_service.GetJob(request) + return get_remote_job_from_proto( + self._job_service, self._extra_grpc_params, response.job + ) def stage_dataframe( self, df: pd.DataFrame, event_timestamp_column: str, diff --git a/sdk/python/feast/constants.py b/sdk/python/feast/constants.py index 60a9b463a7..9588032099 100644 --- a/sdk/python/feast/constants.py +++ b/sdk/python/feast/constants.py @@ -52,7 +52,6 @@ class AuthProvider(Enum): CONFIG_JOB_SERVICE_URL_KEY = "job_service_url" CONFIG_JOB_SERVICE_ENABLE_SSL_KEY = "job_service_enable_ssl" CONFIG_JOB_SERVICE_SERVER_SSL_CERT_KEY = "job_service_server_ssl_cert" -CONFIG_JOB_SERVICE_ENABLED = "job_service_enabled" CONFIG_GRPC_CONNECTION_TIMEOUT_DEFAULT_KEY = "grpc_connection_timeout_default" CONFIG_GRPC_CONNECTION_TIMEOUT_APPLY_KEY = "grpc_connection_timeout_apply" CONFIG_BATCH_FEATURE_REQUEST_WAIT_TIME_SECONDS_KEY = ( @@ -116,15 +115,10 @@ class AuthProvider(Enum): CONFIG_SERVING_ENABLE_SSL_KEY: "False", # Path to certificate(s) to secure connection to Feast Serving CONFIG_SERVING_SERVER_SSL_CERT_KEY: "", - # Default Feast Job Service URL - CONFIG_JOB_SERVICE_URL_KEY: "localhost:6568", # Enable or disable TLS/SSL to Feast Job Service CONFIG_JOB_SERVICE_ENABLE_SSL_KEY: "False", # Path to certificate(s) to secure connection to Feast Job Service CONFIG_JOB_SERVICE_SERVER_SSL_CERT_KEY: "", - # Enable or disable Feast Job Service - # TODO: is this necessary? - CONFIG_JOB_SERVICE_ENABLED: "False", # Default connection timeout to Feast Serving, Feast Core, and Feast Job Service (in seconds) CONFIG_GRPC_CONNECTION_TIMEOUT_DEFAULT_KEY: "10", # Default gRPC connection timeout when sending an ApplyFeatureSet command to diff --git a/sdk/python/feast/job_service.py b/sdk/python/feast/job_service.py index 5cfb7855a9..af6dd4d857 100644 --- a/sdk/python/feast/job_service.py +++ b/sdk/python/feast/job_service.py @@ -4,19 +4,19 @@ import grpc import feast -from feast.constants import CONFIG_SPARK_HISTORICAL_FEATURE_OUTPUT_FORMAT from feast.core import JobService_pb2_grpc from feast.core.JobService_pb2 import ( - GetHistoricalFeaturesResponse, - StartOfflineToOnlineIngestionJobResponse, - StartStreamToOnlineIngestionJobResponse, CancelJobResponse, GetHistoricalFeaturesResponse, GetJobResponse, +) +from feast.core.JobService_pb2 import Job as JobProto +from feast.core.JobService_pb2 import ( JobStatus, JobType, ListJobsResponse, - Job as JobProto, + StartOfflineToOnlineIngestionJobResponse, + StartStreamToOnlineIngestionJobResponse, ) from feast.data_source import DataSource from feast.pyspark.abc import ( diff --git a/sdk/python/feast/pyspark/launchers/standalone/local.py b/sdk/python/feast/pyspark/launchers/standalone/local.py index d7bda7a59c..df97271099 100644 --- a/sdk/python/feast/pyspark/launchers/standalone/local.py +++ b/sdk/python/feast/pyspark/launchers/standalone/local.py @@ -22,6 +22,10 @@ StreamIngestionJobParameters, ) +# In-memory cache of Spark jobs +# This is necessary since we can't query Spark jobs in local mode +JOB_CACHE = {} + def _find_free_port(): with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s: @@ -196,42 +200,48 @@ def historical_feature_retrieval( self, job_params: RetrievalJobParameters ) -> RetrievalJob: job_id = str(uuid.uuid4()) - return StandaloneClusterRetrievalJob( + job = StandaloneClusterRetrievalJob( job_id, job_params.get_name(), self.spark_submit(job_params), job_params.get_destination_path(), ) + JOB_CACHE[job_id] = job # type: ignore + return job def offline_to_online_ingestion( self, ingestion_job_params: BatchIngestionJobParameters ) -> BatchIngestionJob: job_id = str(uuid.uuid4()) ui_port = _find_free_port() - return StandaloneClusterBatchIngestionJob( + job = StandaloneClusterBatchIngestionJob( job_id, ingestion_job_params.get_name(), self.spark_submit(ingestion_job_params, ui_port), ui_port, ) + JOB_CACHE[job_id] = job # type: ignore + return job def start_stream_to_online_ingestion( self, ingestion_job_params: StreamIngestionJobParameters ) -> StreamIngestionJob: job_id = str(uuid.uuid4()) ui_port = _find_free_port() - return StandaloneClusterStreamingIngestionJob( + job = StandaloneClusterStreamingIngestionJob( job_id, ingestion_job_params.get_name(), self.spark_submit(ingestion_job_params, ui_port), ui_port, ) + JOB_CACHE[job_id] = job # type: ignore + return job def stage_dataframe(self, df, event_timestamp_column: str): raise NotImplementedError def get_job_by_id(self, job_id: str) -> SparkJob: - raise NotImplementedError + return JOB_CACHE[job_id] def list_jobs(self, include_terminated: bool) -> List[SparkJob]: - raise NotImplementedError + return list(JOB_CACHE.values()) diff --git a/sdk/python/feast/remote_job.py b/sdk/python/feast/remote_job.py index 69792a1e88..4c1565cd8f 100644 --- a/sdk/python/feast/remote_job.py +++ b/sdk/python/feast/remote_job.py @@ -1,7 +1,9 @@ import time from typing import Any, Callable, Dict, List -from feast.core.JobService_pb2 import CancelJobRequest, GetJobRequest, JobStatus +from feast.core.JobService_pb2 import CancelJobRequest, GetJobRequest +from feast.core.JobService_pb2 import Job as JobProto +from feast.core.JobService_pb2 import JobStatus, JobType from feast.core.JobService_pb2_grpc import JobServiceStub from feast.pyspark.abc import ( BatchIngestionJob, @@ -128,3 +130,21 @@ def __init__( job_id: str, ): super().__init__(service, grpc_extra_param_provider, job_id) + + +def get_remote_job_from_proto( + service: JobServiceStub, + grpc_extra_param_provider: GrpcExtraParamProvider, + job: JobProto, +): + """ + Get the remote job python object from Job proto. + """ + if job.type == JobType.RETRIEVAL_JOB: + return RemoteRetrievalJob( + service, grpc_extra_param_provider, job.id, job.retrieval.output_location + ) + elif job.type == JobType.BATCH_INGESTION_JOB: + return RemoteBatchIngestionJob(service, grpc_extra_param_provider, job.id) + elif job.type == JobType.STREAM_INGESTION_JOB: + return RemoteStreamIngestionJob(service, grpc_extra_param_provider, job.id) From f59dc02b50cf0f84673a42df0e12852da619bb53 Mon Sep 17 00:00:00 2001 From: Tsotne Tabidze Date: Fri, 30 Oct 2020 05:30:54 +0400 Subject: [PATCH 04/10] Fix list_jobs when include_terminated=False Signed-off-by: Tsotne Tabidze --- sdk/python/feast/pyspark/launchers/standalone/local.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/sdk/python/feast/pyspark/launchers/standalone/local.py b/sdk/python/feast/pyspark/launchers/standalone/local.py index df97271099..a27dd494d3 100644 --- a/sdk/python/feast/pyspark/launchers/standalone/local.py +++ b/sdk/python/feast/pyspark/launchers/standalone/local.py @@ -244,4 +244,12 @@ def get_job_by_id(self, job_id: str) -> SparkJob: return JOB_CACHE[job_id] def list_jobs(self, include_terminated: bool) -> List[SparkJob]: - return list(JOB_CACHE.values()) + if include_terminated is True: + return list(JOB_CACHE.values()) + else: + return [ + job + for job in JOB_CACHE.values() + if job.get_status() + in (SparkJobStatus.STARTING, SparkJobStatus.IN_PROGRESS) + ] From e5506e7d44215cc608d647c20a5db6d28dd12907 Mon Sep 17 00:00:00 2001 From: Tsotne Tabidze Date: Mon, 2 Nov 2020 10:29:59 +0400 Subject: [PATCH 05/10] e2e tests Signed-off-by: Tsotne Tabidze --- tests/e2e/fixtures/client.py | 10 +++++- tests/e2e/fixtures/feast_services.py | 53 +++++++++++++++++++++++++++- 2 files changed, 61 insertions(+), 2 deletions(-) diff --git a/tests/e2e/fixtures/client.py b/tests/e2e/fixtures/client.py index 366b0aa711..27ef9a03be 100644 --- a/tests/e2e/fixtures/client.py +++ b/tests/e2e/fixtures/client.py @@ -1,7 +1,7 @@ import os import tempfile import uuid -from typing import Tuple +from typing import Tuple, Optional import pyspark import pytest @@ -18,7 +18,13 @@ def feast_client( feast_core: Tuple[str, int], feast_serving: Tuple[str, int], local_staging_path, + feast_jobservice: Optional[Tuple[str, int]] ): + if feast_jobservice is None: + job_service_env = dict() + else: + job_service_env = dict(job_service_url=f"{feast_jobservice[0]}:{feast_jobservice[1]}") + if pytestconfig.getoption("env") == "local": return Client( core_url=f"{feast_core[0]}:{feast_core[1]}", @@ -33,6 +39,7 @@ def feast_client( historical_feature_output_location=os.path.join( local_staging_path, "historical_output" ), + **job_service_env ) if pytestconfig.getoption("env") == "gcloud": @@ -50,6 +57,7 @@ def feast_client( historical_feature_output_location=os.path.join( local_staging_path, "historical_output" ), + **job_service_env ) diff --git a/tests/e2e/fixtures/feast_services.py b/tests/e2e/fixtures/feast_services.py index ce7f854691..fd3348354f 100644 --- a/tests/e2e/fixtures/feast_services.py +++ b/tests/e2e/fixtures/feast_services.py @@ -4,7 +4,7 @@ import subprocess import tempfile import time -from typing import Any, Dict +from typing import Any, Dict, Tuple import pytest import yaml @@ -15,6 +15,7 @@ "feast_core", "feast_serving", "enable_auth", + "feast_jobservice", ) @@ -138,3 +139,53 @@ def feast_serving( _wait_port_open(6566) yield "localhost", 6566 process.terminate() + + +@pytest.fixture(params=["jobservice_disabled", "jobservice_enabled"]) +def feast_jobservice( + request, + pytestconfig, + ingestion_job_jar, + redis_server: RedisExecutor, + feast_core: Tuple[str, int], + feast_serving: Tuple[str, int], + local_staging_path, +): + if request.param == "jobservice_disabled": + yield None + else: + env = os.environ.copy() + + if pytestconfig.getoption("env") == "local": + env["FEAST_CORE_URL"] = f"{feast_core[0]}:{feast_core[1]}" + env["FEAST_SERVING_URL"] = f"{feast_serving[0]}:{feast_serving[1]}" + env["FEAST_SPARK_LAUNCHER"] = "standalone" + env["FEAST_SPARK_STANDALONE_MASTER"] = "local" + env["FEAST_SPARK_HOME"] = os.getenv("SPARK_HOME") or os.path.dirname(pyspark.__file__) + env["FEAST_SPARK_INGESTION_JAR"] = ingestion_job_jar + env["FEAST_REDIS_HOST"] = redis_server.host + env["FEAST_REDIS_PORT"] = str(redis_server.port) + env["FEAST_SPARK_STAGING_LOCATION"] = os.path.join(local_staging_path, "spark") + env["FEAST_HISTORICAL_FEATURE_OUTPUT_LOCATION"] = os.path.join( + local_staging_path, "historical_output" + ) + + if pytestconfig.getoption("env") == "gcloud": + env["FEAST_CORE_URL"] = f"{feast_core[0]}:{feast_core[1]}" + env["FEAST_SERVING_URL"] = f"{feast_serving[0]}:{feast_serving[1]}" + env["FEAST_SPARK_LAUNCHER"] = "dataproc" + env["FEAST_DATAPROC_CLUSTER_NAME"] = pytestconfig.getoption("dataproc_cluster_name") + env["FEAST_DATAPROC_PROJECT"] = pytestconfig.getoption("dataproc_project") + env["FEAST_DATAPROC_REGION"] = pytestconfig.getoption("dataproc_region") + env["FEAST_SPARK_STAGING_LOCATION"] = os.path.join(local_staging_path, "dataproc") + env["FEAST_SPARK_INGESTION_JAR"] = ingestion_job_jar + env["FEAST_REDIS_HOST"] = pytestconfig.getoption("redis_url").split(":")[0] + env["FEAST_REDIS_PORT"] = pytestconfig.getoption("redis_url").split(":")[1] + env["FEAST_HISTORICAL_FEATURE_OUTPUT_LOCATION"] = os.path.join( + local_staging_path, "historical_output" + ) + + process = subprocess.Popen(["feast", "server"], env=env) + _wait_port_open(6568) + yield "localhost", 6568 + process.terminate() From afe6e0077f38564d4498c01394e6ad178258f290 Mon Sep 17 00:00:00 2001 From: Tsotne Tabidze Date: Mon, 2 Nov 2020 10:38:47 +0400 Subject: [PATCH 06/10] Format python Signed-off-by: Tsotne Tabidze --- tests/e2e/fixtures/client.py | 10 ++++++---- tests/e2e/fixtures/feast_services.py | 25 +++++++++++++++++-------- 2 files changed, 23 insertions(+), 12 deletions(-) diff --git a/tests/e2e/fixtures/client.py b/tests/e2e/fixtures/client.py index 847d67b73c..5b9c821206 100644 --- a/tests/e2e/fixtures/client.py +++ b/tests/e2e/fixtures/client.py @@ -1,7 +1,7 @@ import os import tempfile import uuid -from typing import Tuple, Optional +from typing import Optional, Tuple import pyspark import pytest @@ -24,7 +24,9 @@ def feast_client( if feast_jobservice is None: job_service_env = dict() else: - job_service_env = dict(job_service_url=f"{feast_jobservice[0]}:{feast_jobservice[1]}") + job_service_env = dict( + job_service_url=f"{feast_jobservice[0]}:{feast_jobservice[1]}" + ) if pytestconfig.getoption("env") == "local": return Client( @@ -40,7 +42,7 @@ def feast_client( historical_feature_output_location=os.path.join( local_staging_path, "historical_output" ), - **job_service_env + **job_service_env, ) if pytestconfig.getoption("env") == "gcloud": @@ -58,7 +60,7 @@ def feast_client( historical_feature_output_location=os.path.join( local_staging_path, "historical_output" ), - **job_service_env + **job_service_env, ) diff --git a/tests/e2e/fixtures/feast_services.py b/tests/e2e/fixtures/feast_services.py index 7ca144d89e..805934b5f4 100644 --- a/tests/e2e/fixtures/feast_services.py +++ b/tests/e2e/fixtures/feast_services.py @@ -6,6 +6,7 @@ import time from typing import Any, Dict, Tuple +import pyspark import pytest import yaml from pytest_postgresql.executor import PostgreSQLExecutor @@ -163,29 +164,37 @@ def feast_jobservice( env["FEAST_SERVING_URL"] = f"{feast_serving[0]}:{feast_serving[1]}" env["FEAST_SPARK_LAUNCHER"] = "standalone" env["FEAST_SPARK_STANDALONE_MASTER"] = "local" - env["FEAST_SPARK_HOME"] = os.getenv("SPARK_HOME") or os.path.dirname(pyspark.__file__) + env["FEAST_SPARK_HOME"] = os.getenv("SPARK_HOME") or os.path.dirname( + pyspark.__file__ + ) env["FEAST_SPARK_INGESTION_JAR"] = ingestion_job_jar env["FEAST_REDIS_HOST"] = redis_server.host env["FEAST_REDIS_PORT"] = str(redis_server.port) - env["FEAST_SPARK_STAGING_LOCATION"] = os.path.join(local_staging_path, "spark") + env["FEAST_SPARK_STAGING_LOCATION"] = os.path.join( + local_staging_path, "spark" + ) env["FEAST_HISTORICAL_FEATURE_OUTPUT_LOCATION"] = os.path.join( - local_staging_path, "historical_output" - ) + local_staging_path, "historical_output" + ) if pytestconfig.getoption("env") == "gcloud": env["FEAST_CORE_URL"] = f"{feast_core[0]}:{feast_core[1]}" env["FEAST_SERVING_URL"] = f"{feast_serving[0]}:{feast_serving[1]}" env["FEAST_SPARK_LAUNCHER"] = "dataproc" - env["FEAST_DATAPROC_CLUSTER_NAME"] = pytestconfig.getoption("dataproc_cluster_name") + env["FEAST_DATAPROC_CLUSTER_NAME"] = pytestconfig.getoption( + "dataproc_cluster_name" + ) env["FEAST_DATAPROC_PROJECT"] = pytestconfig.getoption("dataproc_project") env["FEAST_DATAPROC_REGION"] = pytestconfig.getoption("dataproc_region") - env["FEAST_SPARK_STAGING_LOCATION"] = os.path.join(local_staging_path, "dataproc") + env["FEAST_SPARK_STAGING_LOCATION"] = os.path.join( + local_staging_path, "dataproc" + ) env["FEAST_SPARK_INGESTION_JAR"] = ingestion_job_jar env["FEAST_REDIS_HOST"] = pytestconfig.getoption("redis_url").split(":")[0] env["FEAST_REDIS_PORT"] = pytestconfig.getoption("redis_url").split(":")[1] env["FEAST_HISTORICAL_FEATURE_OUTPUT_LOCATION"] = os.path.join( - local_staging_path, "historical_output" - ) + local_staging_path, "historical_output" + ) process = subprocess.Popen(["feast", "server"], env=env) _wait_port_open(6568) From 7e43530427c0e146389c7d879d1fa497fbcce039 Mon Sep 17 00:00:00 2001 From: Tsotne Tabidze Date: Tue, 3 Nov 2020 14:10:54 +0400 Subject: [PATCH 07/10] Add docstring Signed-off-by: Tsotne Tabidze --- sdk/python/feast/remote_job.py | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/sdk/python/feast/remote_job.py b/sdk/python/feast/remote_job.py index 4c1565cd8f..029b86e55f 100644 --- a/sdk/python/feast/remote_job.py +++ b/sdk/python/feast/remote_job.py @@ -11,6 +11,7 @@ SparkJobFailure, SparkJobStatus, StreamIngestionJob, + SparkJob, ) GrpcExtraParamProvider = Callable[[], Dict[str, Any]] @@ -136,9 +137,16 @@ def get_remote_job_from_proto( service: JobServiceStub, grpc_extra_param_provider: GrpcExtraParamProvider, job: JobProto, -): - """ - Get the remote job python object from Job proto. +) -> SparkJob: + """Get the remote job python object from Job proto. + + Args: + service (JobServiceStub): Reference to Job Service + grpc_extra_param_provider (GrpcExtraParamProvider): Callable for providing extra parameters to grpc requests + job (JobProto): Proto object describing the Job + + Returns: + (SparkJob): A remote job object for the given job """ if job.type == JobType.RETRIEVAL_JOB: return RemoteRetrievalJob( From 3335ae424701ecf8e9778a371d594e7508c6789e Mon Sep 17 00:00:00 2001 From: Tsotne Tabidze Date: Tue, 3 Nov 2020 14:43:59 +0400 Subject: [PATCH 08/10] Remove spark extra configs and hardcode spark jars/conf in stadalone mode Signed-off-by: Tsotne Tabidze --- infra/docker/jobservice/Dockerfile | 2 -- infra/docker/jupyter/Dockerfile | 2 -- sdk/python/feast/constants.py | 3 --- sdk/python/feast/pyspark/abc.py | 24 ------------------- sdk/python/feast/pyspark/launcher.py | 4 ---- .../pyspark/launchers/standalone/local.py | 21 +++++++++------- sdk/python/feast/remote_job.py | 7 +++++- 7 files changed, 19 insertions(+), 44 deletions(-) diff --git a/infra/docker/jobservice/Dockerfile b/infra/docker/jobservice/Dockerfile index 4eb000441b..885b6c52e0 100644 --- a/infra/docker/jobservice/Dockerfile +++ b/infra/docker/jobservice/Dockerfile @@ -30,7 +30,5 @@ RUN wget -q https://github.com/grpc-ecosystem/grpc-health-probe/releases/downloa ENV FEAST_SPARK_LAUNCHER standalone ENV FEAST_SPARK_STANDALONE_MASTER "local[*]" ENV FEAST_SPARK_HOME $SPARK_HOME -ENV FEAST_SPARK_EXTRA_OPTIONS "--jars https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop2-latest.jar \ ---conf spark.hadoop.fs.gs.impl=com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem" CMD ["feast", "server"] diff --git a/infra/docker/jupyter/Dockerfile b/infra/docker/jupyter/Dockerfile index 6396784a38..69aa3622ca 100644 --- a/infra/docker/jupyter/Dockerfile +++ b/infra/docker/jupyter/Dockerfile @@ -29,7 +29,5 @@ COPY examples . ENV FEAST_SPARK_LAUNCHER standalone ENV FEAST_SPARK_STANDALONE_MASTER "local[*]" ENV FEAST_SPARK_HOME $SPARK_HOME -ENV FEAST_SPARK_EXTRA_OPTIONS "--jars https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop2-latest.jar \ ---conf spark.hadoop.fs.gs.impl=com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem" CMD ["start-notebook.sh", "--NotebookApp.token=''"] \ No newline at end of file diff --git a/sdk/python/feast/constants.py b/sdk/python/feast/constants.py index 9588032099..ddb1cea7de 100644 --- a/sdk/python/feast/constants.py +++ b/sdk/python/feast/constants.py @@ -93,8 +93,6 @@ class AuthProvider(Enum): CONFIG_SPARK_EMR_CLUSTER_TEMPLATE_PATH = "emr_cluster_template_path" CONFIG_SPARK_EMR_LOG_LOCATION = "emr_log_location" -CONFIG_SPARK_EXTRA_OPTIONS = "spark_extra_options" - # Configuration option default values FEAST_DEFAULT_OPTIONS = { # Default Feast project to use @@ -136,5 +134,4 @@ class AuthProvider(Enum): CONFIG_REDIS_PORT: "6379", CONFIG_REDIS_SSL: "False", CONFIG_SPARK_HISTORICAL_FEATURE_OUTPUT_FORMAT: "parquet", - CONFIG_SPARK_EXTRA_OPTIONS: "", } diff --git a/sdk/python/feast/pyspark/abc.py b/sdk/python/feast/pyspark/abc.py index d350d764c9..127112a06f 100644 --- a/sdk/python/feast/pyspark/abc.py +++ b/sdk/python/feast/pyspark/abc.py @@ -113,15 +113,6 @@ def get_arguments(self) -> List[str]: """ raise NotImplementedError - @abc.abstractmethod - def get_extra_options(self) -> str: - """ - Spark job dependencies (expected to resolved from maven) - Returns: - str: Spark job dependencies. - """ - raise NotImplementedError - class RetrievalJobParameters(SparkJobParameters): def __init__( @@ -130,7 +121,6 @@ def __init__( feature_tables_sources: List[Dict], entity_source: Dict, destination: Dict, - extra_options: str = "", ): """ Args: @@ -242,7 +232,6 @@ def __init__( self._feature_tables_sources = feature_tables_sources self._entity_source = entity_source self._destination = destination - self._extra_options = extra_options def get_name(self) -> str: all_feature_tables_names = [ft["name"] for ft in self._feature_tables] @@ -271,9 +260,6 @@ def get_arguments(self) -> List[str]: def get_destination_path(self) -> str: return self._destination["path"] - def get_extra_options(self) -> str: - return self._extra_options - class RetrievalJob(SparkJob): """ @@ -315,7 +301,6 @@ def __init__( redis_host: str, redis_port: int, redis_ssl: bool, - extra_options: str = "", ): self._feature_table = feature_table self._source = source @@ -325,7 +310,6 @@ def __init__( self._redis_host = redis_host self._redis_port = redis_port self._redis_ssl = redis_ssl - self._extra_options = extra_options def get_name(self) -> str: return ( @@ -364,9 +348,6 @@ def get_arguments(self) -> List[str]: json.dumps(self._get_redis_config()), ] - def get_extra_options(self) -> str: - return self._extra_options - class StreamIngestionJobParameters(SparkJobParameters): def __init__( @@ -378,7 +359,6 @@ def __init__( redis_host: str, redis_port: int, redis_ssl: bool, - extra_options="", ): self._feature_table = feature_table self._source = source @@ -387,7 +367,6 @@ def __init__( self._redis_host = redis_host self._redis_port = redis_port self._redis_ssl = redis_ssl - self._extra_options = extra_options def get_name(self) -> str: return f"{self.get_job_type().to_pascal_case()}-{self.get_feature_table_name()}" @@ -422,9 +401,6 @@ def get_arguments(self) -> List[str]: json.dumps(self._get_redis_config()), ] - def get_extra_options(self) -> str: - return self._extra_options - class BatchIngestionJob(SparkJob): """ diff --git a/sdk/python/feast/pyspark/launcher.py b/sdk/python/feast/pyspark/launcher.py index 8260383fbc..08b5ca5b21 100644 --- a/sdk/python/feast/pyspark/launcher.py +++ b/sdk/python/feast/pyspark/launcher.py @@ -16,7 +16,6 @@ CONFIG_SPARK_EMR_CLUSTER_TEMPLATE_PATH, CONFIG_SPARK_EMR_LOG_LOCATION, CONFIG_SPARK_EMR_REGION, - CONFIG_SPARK_EXTRA_OPTIONS, CONFIG_SPARK_HOME, CONFIG_SPARK_INGESTION_JOB_JAR, CONFIG_SPARK_LAUNCHER, @@ -192,7 +191,6 @@ def start_historical_feature_retrieval_job( for feature_table in feature_tables ], destination={"format": output_format, "path": output_path}, - extra_options=client._config.get(CONFIG_SPARK_EXTRA_OPTIONS), ) ) @@ -256,7 +254,6 @@ def start_offline_to_online_ingestion( redis_host=client._config.get(CONFIG_REDIS_HOST), redis_port=client._config.getint(CONFIG_REDIS_PORT), redis_ssl=client._config.getboolean(CONFIG_REDIS_SSL), - extra_options=client._config.get(CONFIG_SPARK_EXTRA_OPTIONS), ) ) @@ -277,7 +274,6 @@ def start_stream_to_online_ingestion( redis_host=client._config.get(CONFIG_REDIS_HOST), redis_port=client._config.getint(CONFIG_REDIS_PORT), redis_ssl=client._config.getboolean(CONFIG_REDIS_SSL), - extra_options=client._config.get(CONFIG_SPARK_EXTRA_OPTIONS), ) ) diff --git a/sdk/python/feast/pyspark/launchers/standalone/local.py b/sdk/python/feast/pyspark/launchers/standalone/local.py index 995b702073..821a962ed3 100644 --- a/sdk/python/feast/pyspark/launchers/standalone/local.py +++ b/sdk/python/feast/pyspark/launchers/standalone/local.py @@ -3,7 +3,7 @@ import subprocess import uuid from contextlib import closing -from typing import List +from typing import Dict, List import requests from requests.exceptions import RequestException @@ -24,7 +24,7 @@ # In-memory cache of Spark jobs # This is necessary since we can't query Spark jobs in local mode -JOB_CACHE = {} +JOB_CACHE: Dict[str, SparkJob] = {} def _find_free_port(): @@ -204,12 +204,17 @@ def spark_submit( "spark.sql.session.timeZone=UTC", # ignore local timezone "--packages", f"com.google.cloud.spark:spark-bigquery-with-dependencies_{self.BQ_CONNECTOR_VERSION}", + "--jars", + "https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop2-latest.jar," + "https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/2.7.3/hadoop-aws-2.7.3.jar," + "https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk/1.7.4/aws-java-sdk-1.7.4.jar", + "--conf", + "spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem", + "--conf", + "spark.hadoop.fs.gs.impl=com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem", ] ) - if job_params.get_extra_options(): - submission_cmd.extend(job_params.get_extra_options().split(" ")) - submission_cmd.append(job_params.get_main_file_path()) submission_cmd.extend(job_params.get_arguments()) @@ -225,7 +230,7 @@ def historical_feature_retrieval( self.spark_submit(job_params), job_params.get_destination_path(), ) - JOB_CACHE[job_id] = job # type: ignore + JOB_CACHE[job_id] = job return job def offline_to_online_ingestion( @@ -239,7 +244,7 @@ def offline_to_online_ingestion( self.spark_submit(ingestion_job_params, ui_port), ui_port, ) - JOB_CACHE[job_id] = job # type: ignore + JOB_CACHE[job_id] = job return job def start_stream_to_online_ingestion( @@ -253,7 +258,7 @@ def start_stream_to_online_ingestion( self.spark_submit(ingestion_job_params, ui_port), ui_port, ) - JOB_CACHE[job_id] = job # type: ignore + JOB_CACHE[job_id] = job return job def stage_dataframe(self, df, event_timestamp_column: str): diff --git a/sdk/python/feast/remote_job.py b/sdk/python/feast/remote_job.py index 029b86e55f..0a766cf796 100644 --- a/sdk/python/feast/remote_job.py +++ b/sdk/python/feast/remote_job.py @@ -8,10 +8,10 @@ from feast.pyspark.abc import ( BatchIngestionJob, RetrievalJob, + SparkJob, SparkJobFailure, SparkJobStatus, StreamIngestionJob, - SparkJob, ) GrpcExtraParamProvider = Callable[[], Dict[str, Any]] @@ -156,3 +156,8 @@ def get_remote_job_from_proto( return RemoteBatchIngestionJob(service, grpc_extra_param_provider, job.id) elif job.type == JobType.STREAM_INGESTION_JOB: return RemoteStreamIngestionJob(service, grpc_extra_param_provider, job.id) + else: + raise ValueError( + f"Invalid Job Type {job.type}, has to be one of " + f"{(JobType.RETRIEVAL_JOB, JobType.BATCH_INGESTION_JOB, JobType.STREAM_INGESTION_JOB)}" + ) From ca438009a63a4bfaf318c1bb8d9a842c27445a92 Mon Sep 17 00:00:00 2001 From: Tsotne Tabidze Date: Tue, 3 Nov 2020 19:35:55 +0400 Subject: [PATCH 09/10] Remove extra spark params from dockerfile Signed-off-by: Tsotne Tabidze --- infra/docker/jobservice/Dockerfile | 4 ---- sdk/python/feast/constants.py | 1 + 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/infra/docker/jobservice/Dockerfile b/infra/docker/jobservice/Dockerfile index 885b6c52e0..dd4eb35961 100644 --- a/infra/docker/jobservice/Dockerfile +++ b/infra/docker/jobservice/Dockerfile @@ -27,8 +27,4 @@ RUN wget -q https://github.com/grpc-ecosystem/grpc-health-probe/releases/downloa -O /usr/bin/grpc-health-probe && \ chmod +x /usr/bin/grpc-health-probe -ENV FEAST_SPARK_LAUNCHER standalone -ENV FEAST_SPARK_STANDALONE_MASTER "local[*]" -ENV FEAST_SPARK_HOME $SPARK_HOME - CMD ["feast", "server"] diff --git a/sdk/python/feast/constants.py b/sdk/python/feast/constants.py index ddb1cea7de..c7e22ff4fe 100644 --- a/sdk/python/feast/constants.py +++ b/sdk/python/feast/constants.py @@ -130,6 +130,7 @@ class AuthProvider(Enum): CONFIG_AUTH_PROVIDER: "google", CONFIG_SPARK_LAUNCHER: "dataproc", CONFIG_SPARK_INGESTION_JOB_JAR: "gs://feast-jobs/spark/ingestion/feast-ingestion-spark-develop.jar", + CONFIG_SPARK_STANDALONE_MASTER: "local[*]", CONFIG_REDIS_HOST: "localhost", CONFIG_REDIS_PORT: "6379", CONFIG_REDIS_SSL: "False", From dc92728a2b76b70c7fc9164c74ed44528eb4d2eb Mon Sep 17 00:00:00 2001 From: Tsotne Tabidze Date: Wed, 4 Nov 2020 07:50:00 +0400 Subject: [PATCH 10/10] Use start_stream_to_online_ingestion from launcher in job service Signed-off-by: Tsotne Tabidze --- sdk/python/feast/job_service.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/sdk/python/feast/job_service.py b/sdk/python/feast/job_service.py index aed2dbf51e..dd9bea00c8 100644 --- a/sdk/python/feast/job_service.py +++ b/sdk/python/feast/job_service.py @@ -27,7 +27,10 @@ SparkJobStatus, StreamIngestionJob, ) -from feast.pyspark.launcher import start_historical_feature_retrieval_job +from feast.pyspark.launcher import ( + start_historical_feature_retrieval_job, + start_stream_to_online_ingestion, +) from feast.third_party.grpc.health.v1 import HealthService_pb2_grpc from feast.third_party.grpc.health.v1.HealthService_pb2 import ( HealthCheckResponse, @@ -103,7 +106,7 @@ def StartStreamToOnlineIngestionJob(self, request, context): request.table_name, request.project ) # TODO: add extra_jars to request - job = self.client.start_stream_to_online_ingestion(feature_table, []) + job = start_stream_to_online_ingestion(feature_table, [], self.client) return StartStreamToOnlineIngestionJobResponse(id=job.get_id()) def ListJobs(self, request, context):