Skip to content

Commit

Permalink
Check if worker is initialized
Browse files Browse the repository at this point in the history
Fixes #605.
  • Loading branch information
nkaretnikov committed Dec 15, 2023
1 parent 5e4e2e5 commit a9a02bb
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
"""add worker
Revision ID: 0f7e23ff24ee
Revises: 771180018e1b
Create Date: 2023-12-13 21:01:45.546591
"""
import sqlalchemy as sa
from alembic import op

# revision identifiers, used by Alembic.
revision = "0f7e23ff24ee"
down_revision = "771180018e1b"
branch_labels = None
depends_on = None


def upgrade():
op.create_table(
"worker",
sa.Column("id", sa.Integer(), nullable=False),
sa.Column("initialized", sa.Boolean(), nullable=True),
sa.PrimaryKeyConstraint("id"),
sa.UniqueConstraint("initialized", name="_uc"),
)


def downgrade():
op.drop_table("worker")
17 changes: 17 additions & 0 deletions conda-store-server/conda_store_server/orm.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from sqlalchemy import (
JSON,
BigInteger,
Boolean,
Column,
DateTime,
Enum,
Expand Down Expand Up @@ -41,6 +42,22 @@
ARN_ALLOWED_REGEX = re.compile(schema.ARN_ALLOWED)


class Worker(Base):
"""Used to communicate with the worker process"""

__tablename__ = "worker"

id = Column(Integer, primary_key=True)

# Used to check whether the worker is initialized
initialized = Column(Boolean, default=False)

__table_args__ = (
# Ensures no duplicates can be added with this combination of fields.
UniqueConstraint("initialized", name="_uc"),
)


class Namespace(Base):
"""Namespace for resources"""

Expand Down
52 changes: 52 additions & 0 deletions conda-store-server/conda_store_server/server/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@
)
from traitlets.config import Application, catch_config_error

logger = logging.getLogger("app")
logger.setLevel(logging.INFO)


class CondaStoreServer(Application):
aliases = {
Expand Down Expand Up @@ -340,16 +343,64 @@ async def favicon():
name="static-storage",
)

@app.on_event("startup")
async def startup_event():
import signal
import time

from conda_store_server import orm

# This adds a signal handler because uvicorn ignores all signals in
# the startup event. You wouldn't be able to terminate this loop via
# Ctrl-C otherwise
signal.signal(signal.SIGINT, sys.exit)

# Colors to make the output more visible
green = "\x1b[32m"
red = "\x1b[31m"
reset = "\x1b[0m"

# Waits in a loop for the worker to become ready, which is
# communicated via task_initialize_worker
while True:
with self.conda_store.session_factory() as db:
q = db.query(orm.Worker).first()
if q is not None and q.initialized:
logger.info(f"{green}" "Worker initialized" f"{reset}")
break

time.sleep(5)
logger.critical(
f"{red}"
"Waiting for worker... "
"Use --standalone if running outside of docker"
f"{reset}"
)

return app

def start(self):
fastapi_app = self.init_fastapi_app()

from conda_store_server import orm

with self.conda_store.session_factory() as db:
self.conda_store.ensure_settings(db)
self.conda_store.ensure_namespace(db)
self.conda_store.ensure_conda_channels(db)

# We need to ensure the database has no entries in the Worker table
# when the server is started. This can happen when the server was
# terminated while the Worker table was populated in the previous
# run. The check in startup_event expects the worker to populate the
# said table (via task_initialize_worker) only if things are working
# as expected. The deletion code needs to be run on the server side
# (here) and before the worker is started to avoid a race condition.
# We want to make sure that the worker is able to create a new entry
# in the database if the worker is actually running.
db.query(orm.Worker).delete()
db.commit()

# start worker if in standalone mode
if self.standalone:
import multiprocessing
Expand All @@ -376,6 +427,7 @@ def start(self):
else []
),
)

finally:
if self.standalone:
process.join()
14 changes: 14 additions & 0 deletions conda-store-server/conda_store_server/worker/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
@worker_ready.connect
def at_start(sender, **k):
with sender.app.connection():
sender.app.send_task("task_initialize_worker")
sender.app.send_task("task_update_conda_channels")
sender.app.send_task("task_watch_paths")
sender.app.send_task("task_cleanup_builds")
Expand All @@ -46,6 +47,19 @@ def worker(self):
return self._worker


# Signals to the server that the worker is running, see startup_event in
# CondaStoreServer.init_fastapi_app
@shared_task(base=WorkerTask, name="task_initialize_worker", bind=True)
def task_initialize_worker(self):
from conda_store_server import orm

conda_store = self.worker.conda_store

with conda_store.session_factory() as db:
db.add(orm.Worker(initialized=True))
db.commit()


@shared_task(base=WorkerTask, name="task_watch_paths", bind=True)
def task_watch_paths(self):
conda_store = self.worker.conda_store
Expand Down

0 comments on commit a9a02bb

Please sign in to comment.