Skip to content

Commit

Permalink
Merge pull request #705 from nkaretnikov/check-worker-605
Browse files Browse the repository at this point in the history
Check if worker is initialized
  • Loading branch information
Nikita Karetnikov authored Jan 27, 2024
2 parents 66083e6 + 1863ec0 commit 149b55c
Show file tree
Hide file tree
Showing 4 changed files with 117 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
"""add worker
Revision ID: 0f7e23ff24ee
Revises: 771180018e1b
Create Date: 2023-12-13 21:01:45.546591
"""
import sqlalchemy as sa
from alembic import op

# revision identifiers, used by Alembic.
revision = "0f7e23ff24ee"
down_revision = "771180018e1b"
branch_labels = None
depends_on = None


def upgrade():
op.create_table(
"worker",
sa.Column("id", sa.Integer(), nullable=False),
sa.Column("initialized", sa.Boolean(), nullable=True),
sa.PrimaryKeyConstraint("id"),
)


def downgrade():
op.drop_table("worker")
12 changes: 12 additions & 0 deletions conda-store-server/conda_store_server/orm.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from sqlalchemy import (
JSON,
BigInteger,
Boolean,
Column,
DateTime,
Enum,
Expand Down Expand Up @@ -41,6 +42,17 @@
ARN_ALLOWED_REGEX = re.compile(schema.ARN_ALLOWED)


class Worker(Base):
"""For communicating with the worker process"""

__tablename__ = "worker"

id = Column(Integer, primary_key=True)

# For checking whether the worker is initialized
initialized = Column(Boolean, default=False)


class Namespace(Base):
"""Namespace for resources"""

Expand Down
64 changes: 63 additions & 1 deletion conda-store-server/conda_store_server/server/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,22 @@
import os
import posixpath
import sys
import time
from enum import Enum
from threading import Thread

import conda_store_server
import conda_store_server.dbutil as dbutil
import uvicorn
from conda_store_server import __version__, storage
from conda_store_server import __version__, orm, storage
from conda_store_server.app import CondaStore
from conda_store_server.server import auth, views
from fastapi import FastAPI, HTTPException, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import FileResponse, JSONResponse, RedirectResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from sqlalchemy.pool import QueuePool
from starlette.middleware.sessions import SessionMiddleware
from traitlets import (
Bool,
Expand All @@ -29,6 +33,12 @@
from traitlets.config import Application, catch_config_error


class _Color(str, Enum):
GREEN = "\x1b[32m"
RED = "\x1b[31m"
RESET = "\x1b[0m"


class CondaStoreServer(Application):
aliases = {
"config": "CondaStoreServer.config_file",
Expand Down Expand Up @@ -342,6 +352,33 @@ async def favicon():

return app

def _check_worker(self, delay=5):
# Creates a new DB connection since this will be run in a separate
# thread and connections cannot be shared between threads
session_factory = orm.new_session_factory(
url=self.conda_store.database_url,
poolclass=QueuePool,
)

# Waits in a loop for the worker to become ready, which is
# communicated by the worker via task_initialize_worker
while True:
with session_factory() as db:
q = db.query(orm.Worker).first()
if q is not None and q.initialized:
self.log.info(
f"{_Color.GREEN}" "Worker initialized" f"{_Color.RESET}"
)
break

time.sleep(delay)
self.log.warning(
f"{_Color.RED}"
"Waiting for worker... "
"Use --standalone if running outside of docker"
f"{_Color.RESET}"
)

def start(self):
fastapi_app = self.init_fastapi_app()

Expand All @@ -350,6 +387,30 @@ def start(self):
self.conda_store.ensure_namespace(db)
self.conda_store.ensure_conda_channels(db)

# This ensures the database has no Worker table entries when the
# server starts, which is necessary for the worker to signal that
# it's ready via task_initialize_worker. Old Worker entries could
# still be in the database on startup after they were added on the
# previous run and the server was terminated.
#
# Note that this cleanup is deliberately done on startup because the
# server could be terminated due to a power failure, which would
# leave no chance for cleanup actions to run on shutdown.
#
# The database is used for worker-server communication because it
# will work regardless of celery_broker_url used, which can be Redis
# or just point to a database connection.
db.query(orm.Worker).delete()
db.commit()

# We cannot check whether the worker is ready right away and block. When
# running via docker, the worker is started *after* the server is
# running because it relies on config files created by the server.
# So we just keep checking in a separate thread until the worker is
# ready.
worker_checker = Thread(target=self._check_worker)
worker_checker.start()

# start worker if in standalone mode
if self.standalone:
import multiprocessing
Expand Down Expand Up @@ -391,3 +452,4 @@ def start(self):
finally:
if self.standalone:
process.join()
worker_checker.join()
14 changes: 14 additions & 0 deletions conda-store-server/conda_store_server/worker/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
@worker_ready.connect
def at_start(sender, **k):
with sender.app.connection():
sender.app.send_task("task_initialize_worker")
sender.app.send_task("task_update_conda_channels")
sender.app.send_task("task_watch_paths")
sender.app.send_task("task_cleanup_builds")
Expand Down Expand Up @@ -59,6 +60,19 @@ def _shutdown(*args, **kwargs):
return self._worker


# Signals to the server that the worker is running, see _check_worker in
# CondaStoreServer
@shared_task(base=WorkerTask, name="task_initialize_worker", bind=True)
def task_initialize_worker(self):
from conda_store_server import orm

conda_store = self.worker.conda_store

with conda_store.session_factory() as db:
db.add(orm.Worker(initialized=True))
db.commit()


@shared_task(base=WorkerTask, name="task_watch_paths", bind=True)
def task_watch_paths(self):
conda_store = self.worker.conda_store
Expand Down

0 comments on commit 149b55c

Please sign in to comment.