Skip to content

Commit

Permalink
Enhance job api to return associated feature table and start time (#1259
Browse files Browse the repository at this point in the history
)

* Enhance job api to return associated feature table and start time

Signed-off-by: Khor Shu Heng <[email protected]>

* Fix typo for datetime conversion method

Signed-off-by: Khor Shu Heng <[email protected]>

* Fix return type and wrong argument

Signed-off-by: Khor Shu Heng <[email protected]>

* Provide job start time to get historical feature response

Signed-off-by: Khor Shu Heng <[email protected]>

* Use utc timestamp for start time

Signed-off-by: Khor Shu Heng <[email protected]>

* Rename fields

Signed-off-by: Khor Shu Heng <[email protected]>

* Add log uri as fields for retreived job

Signed-off-by: Khor Shu Heng <[email protected]>

* Cast optional str to str type

Signed-off-by: Khor Shu Heng <[email protected]>

Co-authored-by: Khor Shu Heng <[email protected]>
  • Loading branch information
khorshuheng and khorshuheng authored Jan 12, 2021
1 parent 286941d commit f4f345e
Show file tree
Hide file tree
Showing 15 changed files with 360 additions and 37 deletions.
58 changes: 46 additions & 12 deletions protos/feast/core/JobService.proto
Original file line number Diff line number Diff line change
Expand Up @@ -73,24 +73,31 @@ message Job {
// Current job status
JobStatus status = 3;
// Deterministic hash of the Job
string hash = 8;
string hash = 4;
// Start time of the Job
google.protobuf.Timestamp start_time = 5;

message RetrievalJobMeta {
string output_location = 4;
string output_location = 1;
}

message OfflineToOnlineMeta {
string table_name = 1;
}

message StreamToOnlineMeta {
string table_name = 1;
}

// JobType specific metadata on the job
oneof meta {
RetrievalJobMeta retrieval = 5;
OfflineToOnlineMeta batch_ingestion = 6;
StreamToOnlineMeta stream_ingestion = 7;
RetrievalJobMeta retrieval = 6;
OfflineToOnlineMeta batch_ingestion = 7;
StreamToOnlineMeta stream_ingestion = 8;
}

// Path to Spark job logs, if available
string log_uri = 9;
}

// Ingest data from offline store into online store
Expand All @@ -107,8 +114,17 @@ message StartOfflineToOnlineIngestionJobRequest {
}

message StartOfflineToOnlineIngestionJobResponse {
// Job ID assigned by Feast
string id = 1;
// Job ID assigned by Feast
string id = 1;

// Job start time
google.protobuf.Timestamp job_start_time = 2;

// Feature table associated with the job
string table_name = 3;

// Path to Spark job logs, if available
string log_uri = 4;
}

message GetHistoricalFeaturesRequest {
Expand Down Expand Up @@ -136,9 +152,18 @@ message GetHistoricalFeaturesRequest {
}

message GetHistoricalFeaturesResponse {
// Export Job with ID assigned by Feast
string id = 1;
string output_file_uri = 2;
// Export Job with ID assigned by Feast
string id = 1;

// Uri to the join result output file
string output_file_uri = 2;

// Job start time
google.protobuf.Timestamp job_start_time = 3;

// Path to Spark job logs, if available
string log_uri = 4;

}

message StartStreamToOnlineIngestionJobRequest {
Expand All @@ -148,8 +173,17 @@ message StartStreamToOnlineIngestionJobRequest {
}

message StartStreamToOnlineIngestionJobResponse {
// Job ID assigned by Feast
string id = 1;
// Job ID assigned by Feast
string id = 1;

// Job start time
google.protobuf.Timestamp job_start_time = 2;

// Feature table associated with the job
string table_name = 3;

// Path to Spark job logs, if available
string log_uri = 4;
}

message ListJobsRequest {
Expand Down
16 changes: 14 additions & 2 deletions sdk/python/feast/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1075,6 +1075,8 @@ def get_historical_features(
self._extra_grpc_params,
response.id,
output_file_uri=response.output_file_uri,
start_time=response.job_start_time.ToDatetime(),
log_uri=response.log_uri,
)
else:
return start_historical_feature_retrieval_job(
Expand Down Expand Up @@ -1174,7 +1176,12 @@ def start_offline_to_online_ingestion(
request.end_date.FromDatetime(end)
response = self._job_service.StartOfflineToOnlineIngestionJob(request)
return RemoteBatchIngestionJob(
self._job_service, self._extra_grpc_params, response.id,
self._job_service,
self._extra_grpc_params,
response.id,
feature_table.name,
response.job_start_time.ToDatetime(),
response.log_uri,
)

def start_stream_to_online_ingestion(
Expand All @@ -1196,7 +1203,12 @@ def start_stream_to_online_ingestion(
)
response = self._job_service.StartStreamToOnlineIngestionJob(request)
return RemoteStreamIngestionJob(
self._job_service, self._extra_grpc_params, response.id
self._job_service,
self._extra_grpc_params,
response.id,
feature_table.name,
response.job_start_time,
response.log_uri,
)

def list_jobs(self, include_terminated: bool) -> List[SparkJob]:
Expand Down
45 changes: 40 additions & 5 deletions sdk/python/feast/job_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@
import time
import traceback
from concurrent.futures import ThreadPoolExecutor
from typing import Dict, List, Tuple
from typing import Dict, List, Tuple, cast

import grpc
from google.protobuf.timestamp_pb2 import Timestamp

import feast
from feast.constants import ConfigOptions as opt
Expand Down Expand Up @@ -54,6 +55,7 @@
def _job_to_proto(spark_job: SparkJob) -> JobProto:
job = JobProto()
job.id = spark_job.get_id()
job.log_uri = cast(str, spark_job.get_log_uri() or "")
status = spark_job.get_status()
if status == SparkJobStatus.COMPLETED:
job.status = JobStatus.JOB_STATUS_DONE
Expand All @@ -71,11 +73,15 @@ def _job_to_proto(spark_job: SparkJob) -> JobProto:
job.retrieval.output_location = spark_job.get_output_file_uri(block=False)
elif isinstance(spark_job, BatchIngestionJob):
job.type = JobType.BATCH_INGESTION_JOB
job.batch_ingestion.table_name = spark_job.get_feature_table()
elif isinstance(spark_job, StreamIngestionJob):
job.type = JobType.STREAM_INGESTION_JOB
job.stream_ingestion.table_name = spark_job.get_feature_table()
else:
raise ValueError(f"Invalid job type {job}")

job.start_time.FromDatetime(spark_job.get_start_time())

return job


Expand All @@ -97,7 +103,16 @@ def StartOfflineToOnlineIngestionJob(
start=request.start_date.ToDatetime(),
end=request.end_date.ToDatetime(),
)
return StartOfflineToOnlineIngestionJobResponse(id=job.get_id())

job_start_timestamp = Timestamp()
job_start_timestamp.FromDatetime(job.get_start_time())

return StartOfflineToOnlineIngestionJobResponse(
id=job.get_id(),
job_start_time=job_start_timestamp,
table_name=request.table_name,
log_uri=job.get_log_uri(),
)

def GetHistoricalFeatures(self, request: GetHistoricalFeaturesRequest, context):
"""Produce a training dataset, return a job id that will provide a file reference"""
Expand All @@ -114,8 +129,13 @@ def GetHistoricalFeatures(self, request: GetHistoricalFeaturesRequest, context):

output_file_uri = job.get_output_file_uri(block=False)

job_start_timestamp = Timestamp()
job_start_timestamp.FromDatetime(job.get_start_time())

return GetHistoricalFeaturesResponse(
id=job.get_id(), output_file_uri=output_file_uri
id=job.get_id(),
output_file_uri=output_file_uri,
job_start_time=job_start_timestamp,
)

def StartStreamToOnlineIngestionJob(
Expand All @@ -135,7 +155,14 @@ def StartStreamToOnlineIngestionJob(
job_hash = params.get_job_hash()
for job in list_jobs(include_terminated=True, client=self.client):
if isinstance(job, StreamIngestionJob) and job.get_hash() == job_hash:
return StartStreamToOnlineIngestionJobResponse(id=job.get_id())
job_start_timestamp = Timestamp()
job_start_timestamp.FromDatetime(job.get_start_time())
return StartStreamToOnlineIngestionJobResponse(
id=job.get_id(),
job_start_time=job_start_timestamp,
table_name=job.get_feature_table(),
log_uri=job.get_log_uri(),
)
raise RuntimeError(
"Feast Job Service has control loop enabled, but couldn't find the existing stream ingestion job for the given FeatureTable"
)
Expand All @@ -147,7 +174,15 @@ def StartStreamToOnlineIngestionJob(
feature_table=feature_table,
extra_jars=[],
)
return StartStreamToOnlineIngestionJobResponse(id=job.get_id())

job_start_timestamp = Timestamp()
job_start_timestamp.FromDatetime(job.get_start_time())
return StartStreamToOnlineIngestionJobResponse(
id=job.get_id(),
job_start_time=job_start_timestamp,
table_name=request.table_name,
log_uri=job.get_log_uri(),
)

def ListJobs(self, request, context):
"""List all types of jobs"""
Expand Down
36 changes: 36 additions & 0 deletions sdk/python/feast/pyspark/abc.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,18 @@ def cancel(self):
"""
raise NotImplementedError

@abc.abstractmethod
def get_start_time(self) -> datetime:
"""
Get job start time.
"""

def get_log_uri(self) -> Optional[str]:
"""
Get path to Spark job log, if applicable.
"""
return None


class SparkJobParameters(abc.ABC):
@abc.abstractmethod
Expand Down Expand Up @@ -496,6 +508,18 @@ class BatchIngestionJob(SparkJob):
Container for the ingestion job result
"""

@abc.abstractmethod
def get_feature_table(self) -> str:
"""
Get the feature table name associated with this job. Return empty string if unable to
determine the feature table, such as when the job is created by the earlier
version of Feast.
Returns:
str: Feature table name
"""
raise NotImplementedError


class StreamIngestionJob(SparkJob):
"""
Expand All @@ -513,6 +537,18 @@ def get_hash(self) -> str:
"""
raise NotImplementedError

@abc.abstractmethod
def get_feature_table(self) -> str:
"""
Get the feature table name associated with this job. Return `None` if unable to
determine the feature table, such as when the job is created by the earlier
version of Feast.
Returns:
str: Feature table name
"""
raise NotImplementedError


class JobLauncher(abc.ABC):
"""
Expand Down
Loading

0 comments on commit f4f345e

Please sign in to comment.