Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Check if worker is initialized #705

Merged
merged 11 commits into from
Jan 27, 2024
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
"""add worker

Revision ID: 0f7e23ff24ee
Revises: 771180018e1b
Create Date: 2023-12-13 21:01:45.546591

"""
import sqlalchemy as sa
from alembic import op

# revision identifiers, used by Alembic.
revision = "0f7e23ff24ee"
down_revision = "771180018e1b"
branch_labels = None
depends_on = None


def upgrade():
op.create_table(
"worker",
sa.Column("id", sa.Integer(), nullable=False),
sa.Column("initialized", sa.Boolean(), nullable=True),
sa.PrimaryKeyConstraint("id"),
)


def downgrade():
op.drop_table("worker")
12 changes: 12 additions & 0 deletions conda-store-server/conda_store_server/orm.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from sqlalchemy import (
JSON,
BigInteger,
Boolean,
Column,
DateTime,
Enum,
Expand Down Expand Up @@ -41,6 +42,17 @@
ARN_ALLOWED_REGEX = re.compile(schema.ARN_ALLOWED)


class Worker(Base):
"""For communicating with the worker process"""

__tablename__ = "worker"

id = Column(Integer, primary_key=True)

# For checking whether the worker is initialized
initialized = Column(Boolean, default=False)


class Namespace(Base):
"""Namespace for resources"""

Expand Down
64 changes: 63 additions & 1 deletion conda-store-server/conda_store_server/server/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,22 @@
import os
import posixpath
import sys
import time
from enum import Enum
from threading import Thread

import conda_store_server
import conda_store_server.dbutil as dbutil
import uvicorn
from conda_store_server import __version__, storage
from conda_store_server import __version__, orm, storage
from conda_store_server.app import CondaStore
from conda_store_server.server import auth, views
from fastapi import FastAPI, HTTPException, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import FileResponse, JSONResponse, RedirectResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from sqlalchemy.pool import QueuePool
from starlette.middleware.sessions import SessionMiddleware
from traitlets import (
Bool,
Expand All @@ -29,6 +33,12 @@
from traitlets.config import Application, catch_config_error


class _Color(str, Enum):
GREEN = "\x1b[32m"
RED = "\x1b[31m"
RESET = "\x1b[0m"


class CondaStoreServer(Application):
aliases = {
"config": "CondaStoreServer.config_file",
Expand Down Expand Up @@ -342,6 +352,33 @@ async def favicon():

return app

def _check_worker(self, delay=5):
# Creates a new DB connection since this will be run in a separate
# thread and connections cannot be shared between threads
session_factory = orm.new_session_factory(
url=self.conda_store.database_url,
poolclass=QueuePool,
)

# Waits in a loop for the worker to become ready, which is
# communicated by the worker via task_initialize_worker
while True:
with session_factory() as db:
q = db.query(orm.Worker).first()
if q is not None and q.initialized:
self.log.info(
f"{_Color.GREEN}" "Worker initialized" f"{_Color.RESET}"
)
break

time.sleep(delay)
self.log.warning(
f"{_Color.RED}"
"Waiting for worker... "
"Use --standalone if running outside of docker"
f"{_Color.RESET}"
)

def start(self):
fastapi_app = self.init_fastapi_app()

Expand All @@ -350,6 +387,30 @@ def start(self):
self.conda_store.ensure_namespace(db)
self.conda_store.ensure_conda_channels(db)

# This ensures the database has no Worker table entries when the
# server starts, which is necessary for the worker to signal that
# it's ready via task_initialize_worker. Old Worker entries could
# still be in the database on startup after they were added on the
# previous run and the server was terminated.
#
# Note that this cleanup is deliberately done on startup because the
# server could be terminated due to a power failure, which would
# leave no chance for cleanup actions to run on shutdown.
#
# The database is used for worker-server communication because it
# will work regardless of celery_broker_url used, which can be Redis
# or just point to a database connection.
db.query(orm.Worker).delete()
db.commit()
nkaretnikov marked this conversation as resolved.
Show resolved Hide resolved

# We cannot check whether the worker is ready right away and block. When
# running via docker, the worker is started *after* the server is
# running because it relies on config files created by the server.
# So we just keep checking in a separate thread until the worker is
# ready.
worker_checker = Thread(target=self._check_worker)
worker_checker.start()

# start worker if in standalone mode
if self.standalone:
import multiprocessing
Expand Down Expand Up @@ -391,3 +452,4 @@ def start(self):
finally:
if self.standalone:
process.join()
worker_checker.join()
14 changes: 14 additions & 0 deletions conda-store-server/conda_store_server/worker/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
@worker_ready.connect
def at_start(sender, **k):
with sender.app.connection():
sender.app.send_task("task_initialize_worker")
sender.app.send_task("task_update_conda_channels")
sender.app.send_task("task_watch_paths")
sender.app.send_task("task_cleanup_builds")
Expand Down Expand Up @@ -59,6 +60,19 @@ def _shutdown(*args, **kwargs):
return self._worker


# Signals to the server that the worker is running, see _check_worker in
# CondaStoreServer
@shared_task(base=WorkerTask, name="task_initialize_worker", bind=True)
def task_initialize_worker(self):
from conda_store_server import orm

conda_store = self.worker.conda_store

with conda_store.session_factory() as db:
db.add(orm.Worker(initialized=True))
db.commit()


@shared_task(base=WorkerTask, name="task_watch_paths", bind=True)
def task_watch_paths(self):
conda_store = self.worker.conda_store
Expand Down
Loading