From fa600fe3c4b1d5fdd383a9367511ac5616ee7a32 Mon Sep 17 00:00:00 2001 From: snowron <35639836+snowron@users.noreply.github.com> Date: Tue, 12 Sep 2023 22:13:30 +0300 Subject: [PATCH] feat: Add materialize and materialize-incremental rest endpoints (#3761) * resolve #3760 Signed-off-by: snowron * format feature_server.py Signed-off-by: snowron --------- Signed-off-by: snowron --- sdk/python/feast/feature_server.py | 43 +++++++++++++++++++++++++++++- 1 file changed, 42 insertions(+), 1 deletion(-) diff --git a/sdk/python/feast/feature_server.py b/sdk/python/feast/feature_server.py index 3abca1d6e8..7c638dd248 100644 --- a/sdk/python/feast/feature_server.py +++ b/sdk/python/feast/feature_server.py @@ -1,9 +1,11 @@ import json import traceback import warnings +from typing import List, Optional import gunicorn.app.base import pandas as pd +from dateutil import parser from fastapi import FastAPI, HTTPException, Request, Response, status from fastapi.logger import logger from fastapi.params import Depends @@ -11,7 +13,7 @@ from pydantic import BaseModel import feast -from feast import proto_json +from feast import proto_json, utils from feast.data_source import PushMode from feast.errors import PushSourceNotFoundException from feast.protos.feast.serving.ServingService_pb2 import GetOnlineFeaturesRequest @@ -31,6 +33,17 @@ class PushFeaturesRequest(BaseModel): to: str = "online" +class MaterializeRequest(BaseModel): + start_ts: str + end_ts: str + feature_views: Optional[List[str]] = None + + +class MaterializeIncrementalRequest(BaseModel): + end_ts: str + feature_views: Optional[List[str]] = None + + def get_app(store: "feast.FeatureStore"): proto_json.patch() @@ -134,6 +147,34 @@ def write_to_online_store(body=Depends(get_body)): def health(): return Response(status_code=status.HTTP_200_OK) + @app.post("/materialize") + def materialize(body=Depends(get_body)): + try: + request = MaterializeRequest(**json.loads(body)) + store.materialize( + utils.make_tzaware(parser.parse(request.start_ts)), + utils.make_tzaware(parser.parse(request.end_ts)), + request.feature_views, + ) + except Exception as e: + # Print the original exception on the server side + logger.exception(traceback.format_exc()) + # Raise HTTPException to return the error message to the client + raise HTTPException(status_code=500, detail=str(e)) + + @app.post("/materialize-incremental") + def materialize_incremental(body=Depends(get_body)): + try: + request = MaterializeIncrementalRequest(**json.loads(body)) + store.materialize_incremental( + utils.make_tzaware(parser.parse(request.end_ts)), request.feature_views + ) + except Exception as e: + # Print the original exception on the server side + logger.exception(traceback.format_exc()) + # Raise HTTPException to return the error message to the client + raise HTTPException(status_code=500, detail=str(e)) + return app