diff --git a/infra/charts/feast/charts/feast-jobservice/templates/deployment.yaml b/infra/charts/feast/charts/feast-jobservice/templates/deployment.yaml index 5a3429c5f1..9ffd402363 100644 --- a/infra/charts/feast/charts/feast-jobservice/templates/deployment.yaml +++ b/infra/charts/feast/charts/feast-jobservice/templates/deployment.yaml @@ -57,6 +57,11 @@ spec: {{- end }} env: + - name: FEAST_CORE_URL + value: "{{ .Release.Name }}-feast-core:6565" + - name: FEAST_HISTORICAL_SERVING_URL + value: "{{ .Release.Name }}-feast-batch-serving:6566" + {{- if .Values.gcpServiceAccount.enabled }} - name: GOOGLE_APPLICATION_CREDENTIALS value: /etc/secrets/google/{{ .Values.gcpServiceAccount.existingSecret.key }} diff --git a/infra/charts/feast/values.yaml b/infra/charts/feast/values.yaml index c3658f88a2..c06f44c713 100644 --- a/infra/charts/feast/values.yaml +++ b/infra/charts/feast/values.yaml @@ -13,6 +13,10 @@ feast-jupyter: # feast-jupyter.enabled -- Flag to install Feast Jupyter Notebook with SDK enabled: true +feast-jobservice: + # feast-jobservice.enabled -- Flag to install Feast Job Service + enabled: true + postgresql: # postgresql.enabled -- Flag to install Postgresql enabled: true diff --git a/infra/docker/jobservice/Dockerfile b/infra/docker/jobservice/Dockerfile index 29a3d97f90..dd4eb35961 100644 --- a/infra/docker/jobservice/Dockerfile +++ b/infra/docker/jobservice/Dockerfile @@ -1,4 +1,4 @@ -FROM python:3.7-slim-buster +FROM jupyter/pyspark-notebook:ae5f7e104dd5 USER root WORKDIR /feast diff --git a/infra/docker/jupyter/Dockerfile b/infra/docker/jupyter/Dockerfile index 6396784a38..69aa3622ca 100644 --- a/infra/docker/jupyter/Dockerfile +++ b/infra/docker/jupyter/Dockerfile @@ -29,7 +29,5 @@ COPY examples . ENV FEAST_SPARK_LAUNCHER standalone ENV FEAST_SPARK_STANDALONE_MASTER "local[*]" ENV FEAST_SPARK_HOME $SPARK_HOME -ENV FEAST_SPARK_EXTRA_OPTIONS "--jars https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop2-latest.jar \ ---conf spark.hadoop.fs.gs.impl=com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem" CMD ["start-notebook.sh", "--NotebookApp.token=''"] \ No newline at end of file diff --git a/protos/feast/core/JobService.proto b/protos/feast/core/JobService.proto index 13e37b7ffe..d3924ecc71 100644 --- a/protos/feast/core/JobService.proto +++ b/protos/feast/core/JobService.proto @@ -110,7 +110,7 @@ message StartOfflineToOnlineIngestionJobResponse { } message GetHistoricalFeaturesRequest { - // List of features that are being retrieved + // List of feature references that are being retrieved repeated string feature_refs = 1; // Batch DataSource that can be used to obtain entity values for historical retrieval. diff --git a/sdk/python/feast/client.py b/sdk/python/feast/client.py index 5c15e6e022..bf71babae3 100644 --- a/sdk/python/feast/client.py +++ b/sdk/python/feast/client.py @@ -65,7 +65,13 @@ ListProjectsResponse, ) from feast.core.CoreService_pb2_grpc import CoreServiceStub -from feast.core.JobService_pb2 import GetHistoricalFeaturesRequest +from feast.core.JobService_pb2 import ( + GetHistoricalFeaturesRequest, + GetJobRequest, + ListJobsRequest, + StartOfflineToOnlineIngestionJobRequest, + StartStreamToOnlineIngestionJobRequest, +) from feast.core.JobService_pb2_grpc import JobServiceStub from feast.data_format import ParquetFormat from feast.data_source import BigQuerySource, FileSource @@ -94,7 +100,12 @@ start_offline_to_online_ingestion, start_stream_to_online_ingestion, ) -from feast.remote_job import RemoteRetrievalJob +from feast.remote_job import ( + RemoteBatchIngestionJob, + RemoteRetrievalJob, + RemoteStreamIngestionJob, + get_remote_job_from_proto, +) from feast.serving.ServingService_pb2 import ( GetFeastServingInfoRequest, GetOnlineFeaturesRequestV2, @@ -201,6 +212,10 @@ def _job_service(self): Returns: JobServiceStub """ + # Don't try to initialize job service stub if the job service is disabled + if not self._use_job_service: + return None + if not self._job_service_stub: channel = create_grpc_channel( url=self._config.get(CONFIG_JOB_SERVICE_URL_KEY), @@ -891,8 +906,8 @@ def get_historical_features( self, feature_refs: List[str], entity_source: Union[pd.DataFrame, FileSource, BigQuerySource], - project: str = None, - output_location: str = None, + project: Optional[str] = None, + output_location: Optional[str] = None, ) -> RetrievalJob: """ Launch a historical feature retrieval job. @@ -915,6 +930,7 @@ def get_historical_features( retrieval job. project: Specifies the project that contains the feature tables which the requested features belong to. + destination_path: Specifies the path in a bucket to write the exported feature data files Returns: Returns a retrieval job object that can be used to monitor retrieval @@ -1062,18 +1078,57 @@ def start_offline_to_online_ingestion( :param end: upper datetime boundary :return: Spark Job Proxy object """ - return start_offline_to_online_ingestion(feature_table, start, end, self) + if not self._use_job_service: + return start_offline_to_online_ingestion(feature_table, start, end, self) + else: + request = StartOfflineToOnlineIngestionJobRequest( + project=self.project, table_name=feature_table.name, + ) + request.start_date.FromDatetime(start) + request.end_date.FromDatetime(end) + response = self._job_service.StartOfflineToOnlineIngestionJob(request) + return RemoteBatchIngestionJob( + self._job_service, self._extra_grpc_params, response.id, + ) def start_stream_to_online_ingestion( self, feature_table: FeatureTable, extra_jars: Optional[List[str]] = None, ) -> SparkJob: - return start_stream_to_online_ingestion(feature_table, extra_jars or [], self) + if not self._use_job_service: + return start_stream_to_online_ingestion( + feature_table, extra_jars or [], self + ) + else: + request = StartStreamToOnlineIngestionJobRequest( + project=self.project, table_name=feature_table.name, + ) + response = self._job_service.StartStreamToOnlineIngestionJob(request) + return RemoteStreamIngestionJob( + self._job_service, self._extra_grpc_params, response.id, + ) def list_jobs(self, include_terminated: bool) -> List[SparkJob]: - return list_jobs(include_terminated, self) + if not self._use_job_service: + return list_jobs(include_terminated, self) + else: + request = ListJobsRequest(include_terminated=include_terminated) + response = self._job_service.ListJobs(request) + return [ + get_remote_job_from_proto( + self._job_service, self._extra_grpc_params, job + ) + for job in response.jobs + ] def get_job_by_id(self, job_id: str) -> SparkJob: - return get_job_by_id(job_id, self) + if not self._use_job_service: + return get_job_by_id(job_id, self) + else: + request = GetJobRequest(job_id=job_id) + response = self._job_service.GetJob(request) + return get_remote_job_from_proto( + self._job_service, self._extra_grpc_params, response.job + ) def stage_dataframe( self, df: pd.DataFrame, event_timestamp_column: str, diff --git a/sdk/python/feast/constants.py b/sdk/python/feast/constants.py index 746201445f..c7e22ff4fe 100644 --- a/sdk/python/feast/constants.py +++ b/sdk/python/feast/constants.py @@ -93,8 +93,6 @@ class AuthProvider(Enum): CONFIG_SPARK_EMR_CLUSTER_TEMPLATE_PATH = "emr_cluster_template_path" CONFIG_SPARK_EMR_LOG_LOCATION = "emr_log_location" -CONFIG_SPARK_EXTRA_OPTIONS = "spark_extra_options" - # Configuration option default values FEAST_DEFAULT_OPTIONS = { # Default Feast project to use @@ -115,7 +113,11 @@ class AuthProvider(Enum): CONFIG_SERVING_ENABLE_SSL_KEY: "False", # Path to certificate(s) to secure connection to Feast Serving CONFIG_SERVING_SERVER_SSL_CERT_KEY: "", - # Default connection timeout to Feast Serving and Feast Core (in seconds) + # Enable or disable TLS/SSL to Feast Job Service + CONFIG_JOB_SERVICE_ENABLE_SSL_KEY: "False", + # Path to certificate(s) to secure connection to Feast Job Service + CONFIG_JOB_SERVICE_SERVER_SSL_CERT_KEY: "", + # Default connection timeout to Feast Serving, Feast Core, and Feast Job Service (in seconds) CONFIG_GRPC_CONNECTION_TIMEOUT_DEFAULT_KEY: "10", # Default gRPC connection timeout when sending an ApplyFeatureSet command to # Feast Core (in seconds) @@ -128,13 +130,9 @@ class AuthProvider(Enum): CONFIG_AUTH_PROVIDER: "google", CONFIG_SPARK_LAUNCHER: "dataproc", CONFIG_SPARK_INGESTION_JOB_JAR: "gs://feast-jobs/spark/ingestion/feast-ingestion-spark-develop.jar", + CONFIG_SPARK_STANDALONE_MASTER: "local[*]", CONFIG_REDIS_HOST: "localhost", CONFIG_REDIS_PORT: "6379", CONFIG_REDIS_SSL: "False", CONFIG_SPARK_HISTORICAL_FEATURE_OUTPUT_FORMAT: "parquet", - CONFIG_SPARK_EXTRA_OPTIONS: "", - # Enable or disable TLS/SSL to Feast Service - CONFIG_JOB_SERVICE_ENABLE_SSL_KEY: "False", - # Path to certificate(s) to secure connection to Feast Job Service - CONFIG_JOB_SERVICE_SERVER_SSL_CERT_KEY: "", } diff --git a/sdk/python/feast/job_service.py b/sdk/python/feast/job_service.py index 1587dc5cb6..dd9bea00c8 100644 --- a/sdk/python/feast/job_service.py +++ b/sdk/python/feast/job_service.py @@ -12,7 +12,13 @@ GetJobResponse, ) from feast.core.JobService_pb2 import Job as JobProto -from feast.core.JobService_pb2 import JobStatus, JobType, ListJobsResponse +from feast.core.JobService_pb2 import ( + JobStatus, + JobType, + ListJobsResponse, + StartOfflineToOnlineIngestionJobResponse, + StartStreamToOnlineIngestionJobResponse, +) from feast.data_source import DataSource from feast.pyspark.abc import ( BatchIngestionJob, @@ -21,7 +27,10 @@ SparkJobStatus, StreamIngestionJob, ) -from feast.pyspark.launcher import start_historical_feature_retrieval_job +from feast.pyspark.launcher import ( + start_historical_feature_retrieval_job, + start_stream_to_online_ingestion, +) from feast.third_party.grpc.health.v1 import HealthService_pb2_grpc from feast.third_party.grpc.health.v1.HealthService_pb2 import ( HealthCheckResponse, @@ -62,9 +71,15 @@ def _job_to_proto(self, spark_job: SparkJob) -> JobProto: def StartOfflineToOnlineIngestionJob(self, request, context): """Start job to ingest data from offline store into online store""" - context.set_code(grpc.StatusCode.UNIMPLEMENTED) - context.set_details("Method not implemented!") - raise NotImplementedError("Method not implemented!") + feature_table = self.client.get_feature_table( + request.table_name, request.project + ) + job = self.client.start_offline_to_online_ingestion( + feature_table, + request.start_date.ToDatetime(), + request.end_date.ToDatetime(), + ) + return StartOfflineToOnlineIngestionJobResponse(id=job.get_id()) def GetHistoricalFeatures(self, request: GetHistoricalFeaturesRequest, context): """Produce a training dataset, return a job id that will provide a file reference""" @@ -86,9 +101,13 @@ def GetHistoricalFeatures(self, request: GetHistoricalFeaturesRequest, context): def StartStreamToOnlineIngestionJob(self, request, context): """Start job to ingest data from stream into online store""" - context.set_code(grpc.StatusCode.UNIMPLEMENTED) - context.set_details("Method not implemented!") - raise NotImplementedError("Method not implemented!") + + feature_table = self.client.get_feature_table( + request.table_name, request.project + ) + # TODO: add extra_jars to request + job = start_stream_to_online_ingestion(feature_table, [], self.client) + return StartStreamToOnlineIngestionJobResponse(id=job.get_id()) def ListJobs(self, request, context): """List all types of jobs""" diff --git a/sdk/python/feast/pyspark/abc.py b/sdk/python/feast/pyspark/abc.py index d350d764c9..127112a06f 100644 --- a/sdk/python/feast/pyspark/abc.py +++ b/sdk/python/feast/pyspark/abc.py @@ -113,15 +113,6 @@ def get_arguments(self) -> List[str]: """ raise NotImplementedError - @abc.abstractmethod - def get_extra_options(self) -> str: - """ - Spark job dependencies (expected to resolved from maven) - Returns: - str: Spark job dependencies. - """ - raise NotImplementedError - class RetrievalJobParameters(SparkJobParameters): def __init__( @@ -130,7 +121,6 @@ def __init__( feature_tables_sources: List[Dict], entity_source: Dict, destination: Dict, - extra_options: str = "", ): """ Args: @@ -242,7 +232,6 @@ def __init__( self._feature_tables_sources = feature_tables_sources self._entity_source = entity_source self._destination = destination - self._extra_options = extra_options def get_name(self) -> str: all_feature_tables_names = [ft["name"] for ft in self._feature_tables] @@ -271,9 +260,6 @@ def get_arguments(self) -> List[str]: def get_destination_path(self) -> str: return self._destination["path"] - def get_extra_options(self) -> str: - return self._extra_options - class RetrievalJob(SparkJob): """ @@ -315,7 +301,6 @@ def __init__( redis_host: str, redis_port: int, redis_ssl: bool, - extra_options: str = "", ): self._feature_table = feature_table self._source = source @@ -325,7 +310,6 @@ def __init__( self._redis_host = redis_host self._redis_port = redis_port self._redis_ssl = redis_ssl - self._extra_options = extra_options def get_name(self) -> str: return ( @@ -364,9 +348,6 @@ def get_arguments(self) -> List[str]: json.dumps(self._get_redis_config()), ] - def get_extra_options(self) -> str: - return self._extra_options - class StreamIngestionJobParameters(SparkJobParameters): def __init__( @@ -378,7 +359,6 @@ def __init__( redis_host: str, redis_port: int, redis_ssl: bool, - extra_options="", ): self._feature_table = feature_table self._source = source @@ -387,7 +367,6 @@ def __init__( self._redis_host = redis_host self._redis_port = redis_port self._redis_ssl = redis_ssl - self._extra_options = extra_options def get_name(self) -> str: return f"{self.get_job_type().to_pascal_case()}-{self.get_feature_table_name()}" @@ -422,9 +401,6 @@ def get_arguments(self) -> List[str]: json.dumps(self._get_redis_config()), ] - def get_extra_options(self) -> str: - return self._extra_options - class BatchIngestionJob(SparkJob): """ diff --git a/sdk/python/feast/pyspark/launcher.py b/sdk/python/feast/pyspark/launcher.py index 8260383fbc..08b5ca5b21 100644 --- a/sdk/python/feast/pyspark/launcher.py +++ b/sdk/python/feast/pyspark/launcher.py @@ -16,7 +16,6 @@ CONFIG_SPARK_EMR_CLUSTER_TEMPLATE_PATH, CONFIG_SPARK_EMR_LOG_LOCATION, CONFIG_SPARK_EMR_REGION, - CONFIG_SPARK_EXTRA_OPTIONS, CONFIG_SPARK_HOME, CONFIG_SPARK_INGESTION_JOB_JAR, CONFIG_SPARK_LAUNCHER, @@ -192,7 +191,6 @@ def start_historical_feature_retrieval_job( for feature_table in feature_tables ], destination={"format": output_format, "path": output_path}, - extra_options=client._config.get(CONFIG_SPARK_EXTRA_OPTIONS), ) ) @@ -256,7 +254,6 @@ def start_offline_to_online_ingestion( redis_host=client._config.get(CONFIG_REDIS_HOST), redis_port=client._config.getint(CONFIG_REDIS_PORT), redis_ssl=client._config.getboolean(CONFIG_REDIS_SSL), - extra_options=client._config.get(CONFIG_SPARK_EXTRA_OPTIONS), ) ) @@ -277,7 +274,6 @@ def start_stream_to_online_ingestion( redis_host=client._config.get(CONFIG_REDIS_HOST), redis_port=client._config.getint(CONFIG_REDIS_PORT), redis_ssl=client._config.getboolean(CONFIG_REDIS_SSL), - extra_options=client._config.get(CONFIG_SPARK_EXTRA_OPTIONS), ) ) diff --git a/sdk/python/feast/pyspark/launchers/standalone/local.py b/sdk/python/feast/pyspark/launchers/standalone/local.py index f3106c6851..821a962ed3 100644 --- a/sdk/python/feast/pyspark/launchers/standalone/local.py +++ b/sdk/python/feast/pyspark/launchers/standalone/local.py @@ -3,7 +3,7 @@ import subprocess import uuid from contextlib import closing -from typing import List +from typing import Dict, List import requests from requests.exceptions import RequestException @@ -22,6 +22,10 @@ StreamIngestionJobParameters, ) +# In-memory cache of Spark jobs +# This is necessary since we can't query Spark jobs in local mode +JOB_CACHE: Dict[str, SparkJob] = {} + def _find_free_port(): with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s: @@ -200,12 +204,17 @@ def spark_submit( "spark.sql.session.timeZone=UTC", # ignore local timezone "--packages", f"com.google.cloud.spark:spark-bigquery-with-dependencies_{self.BQ_CONNECTOR_VERSION}", + "--jars", + "https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop2-latest.jar," + "https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/2.7.3/hadoop-aws-2.7.3.jar," + "https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk/1.7.4/aws-java-sdk-1.7.4.jar", + "--conf", + "spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem", + "--conf", + "spark.hadoop.fs.gs.impl=com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem", ] ) - if job_params.get_extra_options(): - submission_cmd.extend(job_params.get_extra_options().split(" ")) - submission_cmd.append(job_params.get_main_file_path()) submission_cmd.extend(job_params.get_arguments()) @@ -215,42 +224,56 @@ def historical_feature_retrieval( self, job_params: RetrievalJobParameters ) -> RetrievalJob: job_id = str(uuid.uuid4()) - return StandaloneClusterRetrievalJob( + job = StandaloneClusterRetrievalJob( job_id, job_params.get_name(), self.spark_submit(job_params), job_params.get_destination_path(), ) + JOB_CACHE[job_id] = job + return job def offline_to_online_ingestion( self, ingestion_job_params: BatchIngestionJobParameters ) -> BatchIngestionJob: job_id = str(uuid.uuid4()) ui_port = _find_free_port() - return StandaloneClusterBatchIngestionJob( + job = StandaloneClusterBatchIngestionJob( job_id, ingestion_job_params.get_name(), self.spark_submit(ingestion_job_params, ui_port), ui_port, ) + JOB_CACHE[job_id] = job + return job def start_stream_to_online_ingestion( self, ingestion_job_params: StreamIngestionJobParameters ) -> StreamIngestionJob: job_id = str(uuid.uuid4()) ui_port = _find_free_port() - return StandaloneClusterStreamingIngestionJob( + job = StandaloneClusterStreamingIngestionJob( job_id, ingestion_job_params.get_name(), self.spark_submit(ingestion_job_params, ui_port), ui_port, ) + JOB_CACHE[job_id] = job + return job def stage_dataframe(self, df, event_timestamp_column: str): raise NotImplementedError def get_job_by_id(self, job_id: str) -> SparkJob: - raise NotImplementedError + return JOB_CACHE[job_id] def list_jobs(self, include_terminated: bool) -> List[SparkJob]: - raise NotImplementedError + if include_terminated is True: + return list(JOB_CACHE.values()) + else: + return [ + job + for job in JOB_CACHE.values() + if job.get_status() + in (SparkJobStatus.STARTING, SparkJobStatus.IN_PROGRESS) + ] diff --git a/sdk/python/feast/remote_job.py b/sdk/python/feast/remote_job.py index 69792a1e88..0a766cf796 100644 --- a/sdk/python/feast/remote_job.py +++ b/sdk/python/feast/remote_job.py @@ -1,11 +1,14 @@ import time from typing import Any, Callable, Dict, List -from feast.core.JobService_pb2 import CancelJobRequest, GetJobRequest, JobStatus +from feast.core.JobService_pb2 import CancelJobRequest, GetJobRequest +from feast.core.JobService_pb2 import Job as JobProto +from feast.core.JobService_pb2 import JobStatus, JobType from feast.core.JobService_pb2_grpc import JobServiceStub from feast.pyspark.abc import ( BatchIngestionJob, RetrievalJob, + SparkJob, SparkJobFailure, SparkJobStatus, StreamIngestionJob, @@ -128,3 +131,33 @@ def __init__( job_id: str, ): super().__init__(service, grpc_extra_param_provider, job_id) + + +def get_remote_job_from_proto( + service: JobServiceStub, + grpc_extra_param_provider: GrpcExtraParamProvider, + job: JobProto, +) -> SparkJob: + """Get the remote job python object from Job proto. + + Args: + service (JobServiceStub): Reference to Job Service + grpc_extra_param_provider (GrpcExtraParamProvider): Callable for providing extra parameters to grpc requests + job (JobProto): Proto object describing the Job + + Returns: + (SparkJob): A remote job object for the given job + """ + if job.type == JobType.RETRIEVAL_JOB: + return RemoteRetrievalJob( + service, grpc_extra_param_provider, job.id, job.retrieval.output_location + ) + elif job.type == JobType.BATCH_INGESTION_JOB: + return RemoteBatchIngestionJob(service, grpc_extra_param_provider, job.id) + elif job.type == JobType.STREAM_INGESTION_JOB: + return RemoteStreamIngestionJob(service, grpc_extra_param_provider, job.id) + else: + raise ValueError( + f"Invalid Job Type {job.type}, has to be one of " + f"{(JobType.RETRIEVAL_JOB, JobType.BATCH_INGESTION_JOB, JobType.STREAM_INGESTION_JOB)}" + ) diff --git a/tests/e2e/fixtures/client.py b/tests/e2e/fixtures/client.py index a58fb7b2d5..c5716922d9 100644 --- a/tests/e2e/fixtures/client.py +++ b/tests/e2e/fixtures/client.py @@ -1,7 +1,7 @@ import os import tempfile import uuid -from typing import Tuple +from typing import Optional, Tuple import pyspark import pytest @@ -18,8 +18,16 @@ def feast_client( feast_core: Tuple[str, int], feast_serving: Tuple[str, int], local_staging_path, + feast_jobservice: Optional[Tuple[str, int]], enable_auth, ): + if feast_jobservice is None: + job_service_env = dict() + else: + job_service_env = dict( + job_service_url=f"{feast_jobservice[0]}:{feast_jobservice[1]}" + ) + if pytestconfig.getoption("env") == "local": c = Client( core_url=f"{feast_core[0]}:{feast_core[1]}", @@ -34,6 +42,7 @@ def feast_client( historical_feature_output_location=os.path.join( local_staging_path, "historical_output" ), + **job_service_env, ) elif pytestconfig.getoption("env") == "gcloud": @@ -51,6 +60,7 @@ def feast_client( historical_feature_output_location=os.path.join( local_staging_path, "historical_output" ), + **job_service_env, ) else: raise KeyError(f"Unknown environment {pytestconfig.getoption('env')}") diff --git a/tests/e2e/fixtures/feast_services.py b/tests/e2e/fixtures/feast_services.py index 5e9aed124c..805934b5f4 100644 --- a/tests/e2e/fixtures/feast_services.py +++ b/tests/e2e/fixtures/feast_services.py @@ -4,8 +4,9 @@ import subprocess import tempfile import time -from typing import Any, Dict +from typing import Any, Dict, Tuple +import pyspark import pytest import yaml from pytest_postgresql.executor import PostgreSQLExecutor @@ -15,6 +16,7 @@ "feast_core", "feast_serving", "enable_auth", + "feast_jobservice", ) @@ -140,3 +142,61 @@ def feast_serving( _wait_port_open(6566) yield "localhost", 6566 process.terminate() + + +@pytest.fixture(params=["jobservice_disabled", "jobservice_enabled"]) +def feast_jobservice( + request, + pytestconfig, + ingestion_job_jar, + redis_server: RedisExecutor, + feast_core: Tuple[str, int], + feast_serving: Tuple[str, int], + local_staging_path, +): + if request.param == "jobservice_disabled": + yield None + else: + env = os.environ.copy() + + if pytestconfig.getoption("env") == "local": + env["FEAST_CORE_URL"] = f"{feast_core[0]}:{feast_core[1]}" + env["FEAST_SERVING_URL"] = f"{feast_serving[0]}:{feast_serving[1]}" + env["FEAST_SPARK_LAUNCHER"] = "standalone" + env["FEAST_SPARK_STANDALONE_MASTER"] = "local" + env["FEAST_SPARK_HOME"] = os.getenv("SPARK_HOME") or os.path.dirname( + pyspark.__file__ + ) + env["FEAST_SPARK_INGESTION_JAR"] = ingestion_job_jar + env["FEAST_REDIS_HOST"] = redis_server.host + env["FEAST_REDIS_PORT"] = str(redis_server.port) + env["FEAST_SPARK_STAGING_LOCATION"] = os.path.join( + local_staging_path, "spark" + ) + env["FEAST_HISTORICAL_FEATURE_OUTPUT_LOCATION"] = os.path.join( + local_staging_path, "historical_output" + ) + + if pytestconfig.getoption("env") == "gcloud": + env["FEAST_CORE_URL"] = f"{feast_core[0]}:{feast_core[1]}" + env["FEAST_SERVING_URL"] = f"{feast_serving[0]}:{feast_serving[1]}" + env["FEAST_SPARK_LAUNCHER"] = "dataproc" + env["FEAST_DATAPROC_CLUSTER_NAME"] = pytestconfig.getoption( + "dataproc_cluster_name" + ) + env["FEAST_DATAPROC_PROJECT"] = pytestconfig.getoption("dataproc_project") + env["FEAST_DATAPROC_REGION"] = pytestconfig.getoption("dataproc_region") + env["FEAST_SPARK_STAGING_LOCATION"] = os.path.join( + local_staging_path, "dataproc" + ) + env["FEAST_SPARK_INGESTION_JAR"] = ingestion_job_jar + env["FEAST_REDIS_HOST"] = pytestconfig.getoption("redis_url").split(":")[0] + env["FEAST_REDIS_PORT"] = pytestconfig.getoption("redis_url").split(":")[1] + env["FEAST_HISTORICAL_FEATURE_OUTPUT_LOCATION"] = os.path.join( + local_staging_path, "historical_output" + ) + + process = subprocess.Popen(["feast", "server"], env=env) + _wait_port_open(6568) + yield "localhost", 6568 + process.terminate()