Skip to content

Commit

Permalink
Cleanup
Browse files Browse the repository at this point in the history
Signed-off-by: Achal Shah <[email protected]>
  • Loading branch information
achals committed Mar 10, 2022
1 parent b83d4b3 commit 39a66e3
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 9 deletions.
7 changes: 5 additions & 2 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -729,6 +729,10 @@ def apply(
service.name, project=self.project, commit=False
)

# If a go server is running, kill it so that it can be recreated in `update_infra` with
# the latest registry state.
self.kill_go_server()

self._get_provider().update_infra(
project=self.project,
tables_to_delete=views_to_delete if not partial else [],
Expand All @@ -750,8 +754,7 @@ def teardown(self):

entities = self.list_entities()

if self._go_server:
self._go_server.kill_go_server_explicitly()
self.kill_go_server()

self._get_provider().teardown_infra(self.project, tables, entities)
self._registry.teardown()
Expand Down
17 changes: 10 additions & 7 deletions sdk/python/feast/go_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,28 +246,28 @@ def __init__(

def run(self):
# Target function of the thread class
_logger.info(
_logger.debug(
"%s Started monitoring thread to keep go feature server alive", self.ident
)
try:
while not self._is_cancelled.is_set():

# If we fail to connect to grpc stub, terminate subprocess and repeat
_logger.info("%s Connecting to subprocess", self.ident)
_logger.debug("%s Connecting to subprocess", self.ident)
if not self._shared_connection.connect():
_logger.info(
_logger.debug(
"%s Failed to connect, killing and retrying", self.ident
)
self._shared_connection.kill_process()
continue
else:
_logger.info(
_logger.debug(
"%s Go feature server started, process: %s",
self.ident,
self._shared_connection._process.pid,
)
self._go_server_started.set()
_logger.info(
_logger.debug(
"%s is_cancelled status: %s", self.ident, self._is_cancelled
)
while not self._is_cancelled.is_set():
Expand All @@ -276,7 +276,7 @@ def run(self):
self._shared_connection.wait_for_process(3600)
except subprocess.TimeoutExpired:
pass
_logger.info(
_logger.debug(
"%s No longer waiting for process: %s, %s, %s",
self.ident,
self._shared_connection._process.pid,
Expand All @@ -290,6 +290,9 @@ def run(self):
self._shared_connection.kill_process()

def stop(self):
# _logger.info("%s Stopping monitoring thread and terminating go feature server", self.ident)
_logger.debug(
"%s Stopping monitoring thread and terminating go feature server",
self.ident,
)
self._is_cancelled.set()
self._shared_connection.kill_process()

0 comments on commit 39a66e3

Please sign in to comment.