Skip to content

Commit

Permalink
feat: cyclotron fetch and janitor run migrations (#24559)
Browse files Browse the repository at this point in the history
Co-authored-by: Brett Hoerner <[email protected]>
  • Loading branch information
oliverb123 and bretthoerner authored Aug 23, 2024
1 parent f0abc88 commit 145cacf
Show file tree
Hide file tree
Showing 10 changed files with 37 additions and 102 deletions.
81 changes: 0 additions & 81 deletions .github/workflows/rust-cyclotron-migrator-docker.yml

This file was deleted.

3 changes: 0 additions & 3 deletions bin/start-cyclotron
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,6 @@ export RUST_LOG=$RUST_LOG,sqlx::query=$SQLX_QUERY_LEVEL

export DATABASE_URL=${DATABASE_URL:-postgres://posthog:posthog@localhost:5432/posthog}
export ALLOW_INTERNAL_IPS=${ALLOW_INTERNAL_IPS:-true}
cd cyclotron-core
cargo sqlx migrate run
cd ..

./target/debug/cyclotron-fetch &
./target/debug/cyclotron-janitor &
Expand Down
16 changes: 0 additions & 16 deletions rust/Dockerfile.migrate-cyclotron

This file was deleted.

3 changes: 3 additions & 0 deletions rust/cyclotron-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ mod config;
pub use config::ManagerConfig;
pub use config::PoolConfig;

// Meta
pub use ops::meta::run_migrations;

// Some data is shared between workers and janitors on a given shard, using
// the metadata table. These keys are reserved for that purpose

Expand Down
10 changes: 9 additions & 1 deletion rust/cyclotron-core/src/ops/meta.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use sqlx::postgres::PgQueryResult;
use sqlx::{postgres::PgQueryResult, PgPool};
use uuid::Uuid;

use crate::error::QueueError;
Expand All @@ -24,3 +24,11 @@ pub fn throw_if_no_rows(res: PgQueryResult, job: Uuid, lock: Uuid) -> Result<(),
Ok(())
}
}

/// Run the latest cyclotron migrations. Panics if the migrations can't be run - failure to run migrations is purposefully fatal.
pub async fn run_migrations(pool: &PgPool) {
sqlx::migrate!("./migrations")
.run(pool)
.await
.expect("Failed to run migrations");
}
10 changes: 9 additions & 1 deletion rust/cyclotron-core/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@ use std::sync::Mutex;
use uuid::Uuid;

use crate::{
ops::worker::{dequeue_jobs, dequeue_with_vm_state, flush_job, get_vm_state, set_heartbeat},
ops::{
meta::run_migrations,
worker::{dequeue_jobs, dequeue_with_vm_state, flush_job, get_vm_state, set_heartbeat},
},
Job, JobState, JobUpdate, PoolConfig, QueueError,
};

Expand Down Expand Up @@ -49,6 +52,11 @@ impl Worker {
}
}

/// Run the latest cyclotron migrations. Panics if the migrations can't be run - failure to run migrations is purposefully fatal.
pub async fn run_migrations(&self) {
run_migrations(&self.pool).await;
}

/// Dequeues jobs from the queue, and returns them. Job sorting happens at the queue level,
/// workers can't provide any filtering or sorting criteria - queue managers decide which jobs are run,
/// workers just run them.
Expand Down
2 changes: 2 additions & 0 deletions rust/cyclotron-fetch/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ async fn main() {
.await
.expect("failed to create app context");

context.worker.run_migrations().await;

let http_server = tokio::spawn(listen(app, bind));

let worker_loop = tokio::spawn(worker_loop(context));
Expand Down
8 changes: 8 additions & 0 deletions rust/cyclotron-fetch/tests/fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,14 @@ use utils::{

mod utils;

#[sqlx::test(migrations = "../cyclotron-core/migrations")]
pub async fn test_run_migrations(db: PgPool) {
// This is a no-op, since the db sqlx::test gives use already has the migrations run, but it asserts that the migrations
// being run repeatedly doesn't cause any issues, and that the migrations being run are the same as the ones in the core
let context = get_app_test_context(db).await;
context.worker.run_migrations().await;
}

#[sqlx::test(migrations = "../cyclotron-core/migrations")]
pub async fn test_completes_fetch(db: PgPool) {
let context = Arc::new(get_app_test_context(db.clone()).await);
Expand Down
4 changes: 4 additions & 0 deletions rust/cyclotron-janitor/src/janitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ impl Janitor {
}
}

pub async fn run_migrations(&self) {
cyclotron_core::run_migrations(&self.pool).await;
}

pub async fn run_once(&self) -> Result<CleanupResult, QueueError> {
info!("Running janitor loop");
let _loop_start = common_metrics::timing_guard(RUN_TIME, &self.metrics_labels);
Expand Down
2 changes: 2 additions & 0 deletions rust/cyclotron-janitor/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ async fn main() {
.await
.expect("failed to create janitor");

janitor.run_migrations().await;

let janitor_liveness = liveness
.register(
"janitor".to_string(),
Expand Down

0 comments on commit 145cacf

Please sign in to comment.