Skip to content

Commit

Permalink
fix: Add async refresh to prevent synchronous refresh in main thread (#…
Browse files Browse the repository at this point in the history
…3812)

fix: Add async refresh to prevent synchronous refresh in main thread for http feature server

Signed-off-by: Hai Nguyen <[email protected]>
  • Loading branch information
sudohainguyen authored Oct 31, 2023
1 parent 01db8cc commit 9583ed6
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 2 deletions.
10 changes: 10 additions & 0 deletions sdk/python/feast/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -663,6 +663,14 @@ def init_command(project_directory, minimal: bool, template: str):
show_default=True,
help="Timeout for keep alive",
)
@click.option(
"--registry_ttl_sec",
"-r",
help="Number of seconds after which the registry is refreshed",
type=click.INT,
default=5,
show_default=True,
)
@click.pass_context
def serve_command(
ctx: click.Context,
Expand All @@ -673,6 +681,7 @@ def serve_command(
no_feature_log: bool,
workers: int,
keep_alive_timeout: int,
registry_ttl_sec: int = 5,
):
"""Start a feature server locally on a given port."""
store = create_feature_store(ctx)
Expand All @@ -685,6 +694,7 @@ def serve_command(
no_feature_log=no_feature_log,
workers=workers,
keep_alive_timeout=keep_alive_timeout,
registry_ttl_sec=registry_ttl_sec,
)


Expand Down
33 changes: 31 additions & 2 deletions sdk/python/feast/feature_server.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import json
import threading
import traceback
import warnings
from typing import List, Optional
Expand Down Expand Up @@ -44,14 +45,37 @@ class MaterializeIncrementalRequest(BaseModel):
feature_views: Optional[List[str]] = None


def get_app(store: "feast.FeatureStore"):
def get_app(store: "feast.FeatureStore", registry_ttl_sec: int = 5):
proto_json.patch()

app = FastAPI()
# Asynchronously refresh registry, notifying shutdown and canceling the active timer if the app is shutting down
registry_proto = None
shutting_down = False
active_timer: Optional[threading.Timer] = None

async def get_body(request: Request):
return await request.body()

def async_refresh():
store.refresh_registry()
nonlocal registry_proto
registry_proto = store.registry.proto()
if shutting_down:
return
nonlocal active_timer
active_timer = threading.Timer(registry_ttl_sec, async_refresh)
active_timer.start()

@app.on_event("shutdown")
def shutdown_event():
nonlocal shutting_down
shutting_down = True
if active_timer:
active_timer.cancel()

async_refresh()

@app.post("/get-online-features")
def get_online_features(body=Depends(get_body)):
try:
Expand Down Expand Up @@ -180,7 +204,10 @@ def materialize_incremental(body=Depends(get_body)):

class FeastServeApplication(gunicorn.app.base.BaseApplication):
def __init__(self, store: "feast.FeatureStore", **options):
self._app = get_app(store=store)
self._app = get_app(
store=store,
registry_ttl_sec=options.get("registry_ttl_sec", 5),
)
self._options = options
super().__init__()

Expand All @@ -202,11 +229,13 @@ def start_server(
no_access_log: bool,
workers: int,
keep_alive_timeout: int,
registry_ttl_sec: int = 5,
):
FeastServeApplication(
store=store,
bind=f"{host}:{port}",
accesslog=None if no_access_log else "-",
workers=workers,
keepalive=keep_alive_timeout,
registry_ttl_sec=registry_ttl_sec,
).run()
2 changes: 2 additions & 0 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -2228,6 +2228,7 @@ def serve(
no_feature_log: bool,
workers: int,
keep_alive_timeout: int,
registry_ttl_sec: int,
) -> None:
"""Start the feature consumption server locally on a given port."""
type_ = type_.lower()
Expand All @@ -2243,6 +2244,7 @@ def serve(
no_access_log=no_access_log,
workers=workers,
keep_alive_timeout=keep_alive_timeout,
registry_ttl_sec=registry_ttl_sec,
)

@log_exceptions_and_usage
Expand Down

0 comments on commit 9583ed6

Please sign in to comment.