Skip to content

Commit

Permalink
Import pull request assignments into triagebot
Browse files Browse the repository at this point in the history
General overview at: rust-lang#1753

- Added a new DB table with the fields to track how many PRs are
  assigned to a contributor
- Initial DB table population with a one-off job, manually run.
  • Loading branch information
apiraino committed Feb 22, 2024
1 parent 2dd37fa commit 24b9b6a
Show file tree
Hide file tree
Showing 11 changed files with 310 additions and 5 deletions.
26 changes: 26 additions & 0 deletions github-graphql/PullRequestsOpen.gql
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
query PullRequestsOpen ($repo_owner: String!, $repo_name: String!, $after: String) {
repository(owner: $repo_owner, name: $repo_name) {
pullRequests(first: 100, after: $after, states:OPEN) {
pageInfo {
hasNextPage
endCursor
}
nodes {
number
updatedAt
createdAt
assignees(first: 10) {
nodes {
login
databaseId
}
}
labels(first:5, orderBy:{field:NAME, direction:DESC}) {
nodes {
name
}
}
}
}
}
}
31 changes: 31 additions & 0 deletions github-graphql/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# How to use GraphQL with Rust

# GUI Clients (Electron apps)

Use a client to experiment and build your GraphQL query/mutation.

https://insomnia.rest/download

https://docs.usebruno.com

Once you're happy with the result, save your query in a `<query>.gql` file in this directory. It will serve as
documentation on how to reproduce the Rust boilerplate.

# Cynic CLI

Introspect a schema and save it locally:

```sh
cynic introspect \
-H "User-Agent: cynic/3.4.3" \
-H "Authorization: Bearer [GITHUB_TOKEN]" \
"https://api.github.com/graphql" \
-o schemas/github.graphql
```

Execute a GraphQL query/mutation and save locally the Rust boilerplate:

``` sh
cynic querygen --schema schemas/github.graphql --query query.gql
```

42 changes: 42 additions & 0 deletions github-graphql/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ pub mod queries {
#[derive(cynic::QueryFragment, Debug)]
pub struct User {
pub login: String,
pub database_id: Option<i32>,
}

#[derive(cynic::QueryFragment, Debug)]
Expand Down Expand Up @@ -385,3 +386,44 @@ pub mod project_items {
pub date: Option<Date>,
}
}

/// Retrieve all pull requests waiting on review from T-compiler
/// GraphQL query: see file github-graphql/PullRequestsOpen.gql
pub mod pull_requests_open {
use crate::queries::{LabelConnection, PullRequestConnection, UserConnection};

use super::queries::DateTime;
use super::schema;

#[derive(cynic::QueryVariables, Clone, Debug)]
pub struct PullRequestsOpenVariables<'a> {
pub repo_owner: &'a str,
pub repo_name: &'a str,
pub after: Option<String>,
}

#[derive(cynic::QueryFragment, Debug)]
#[cynic(graphql_type = "Query", variables = "PullRequestsOpenVariables")]
pub struct PullRequestsOpen {
#[arguments(owner: $repo_owner, name: $repo_name)]
pub repository: Option<Repository>,
}

#[derive(cynic::QueryFragment, Debug)]
#[cynic(variables = "PullRequestsOpenVariables")]
pub struct Repository {
#[arguments(first: 100, after: $after, states: "OPEN")]
pub pull_requests: PullRequestConnection,
}

#[derive(cynic::QueryFragment, Debug)]
pub struct PullRequest {
pub number: i32,
pub updated_at: DateTime,
pub created_at: DateTime,
#[arguments(first: 10)]
pub assignees: UserConnection,
#[arguments(first: 5, orderBy: { direction: "DESC", field: "NAME" })]
pub labels: Option<LabelConnection>,
}
}
11 changes: 10 additions & 1 deletion src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -303,9 +303,18 @@ CREATE TABLE jobs (
);
",
"
CREATE UNIQUE INDEX jobs_name_scheduled_at_unique_index
CREATE UNIQUE INDEX jobs_name_scheduled_at_unique_index
ON jobs (
name, scheduled_at
);
",
"
CREATE table review_prefs (
id UUID DEFAULT gen_random_uuid() PRIMARY KEY,
user_id BIGINT REFERENCES users(user_id),
assigned_prs INT[] NOT NULL DEFAULT array[]::INT[]
);",
"
CREATE UNIQUE INDEX review_prefs_user_id ON review_prefs(user_id);
",
];
3 changes: 2 additions & 1 deletion src/db/notifications.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ pub struct Notification {
pub team_name: Option<String>,
}

pub async fn record_username(db: &DbClient, user_id: i64, username: String) -> anyhow::Result<()> {
/// Add a new user (if not existing)
pub async fn record_username(db: &DbClient, user_id: i64, username: &str) -> anyhow::Result<()> {
db.execute(
"INSERT INTO users (user_id, username) VALUES ($1, $2) ON CONFLICT DO NOTHING",
&[&user_id, &username],
Expand Down
84 changes: 84 additions & 0 deletions src/github.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2592,6 +2592,90 @@ async fn project_items_by_status(
Ok(all_items)
}

/// Retrieve all pull requests in status OPEN that are not drafts
pub async fn retrieve_pull_requests(
repo: &Repository,
client: &GithubClient,
) -> anyhow::Result<Vec<(User, i32)>> {
use cynic::QueryBuilder;
use github_graphql::pull_requests_open::{PullRequestsOpen, PullRequestsOpenVariables};

let repo_owner = repo.owner();
let repo_name = repo.name();

let mut prs = vec![];

let mut vars = PullRequestsOpenVariables {
repo_owner,
repo_name,
after: None,
};
loop {
let query = PullRequestsOpen::build(vars.clone());
let req = client.post(&client.graphql_url);
let req = req.json(&query);

let data: cynic::GraphQlResponse<PullRequestsOpen> = client.json(req).await?;
if let Some(errors) = data.errors {
anyhow::bail!("There were graphql errors. {:?}", errors);
}
let repository = data
.data
.ok_or_else(|| anyhow::anyhow!("No data returned."))?
.repository
.ok_or_else(|| anyhow::anyhow!("No repository."))?;
prs.extend(repository.pull_requests.nodes);

let page_info = repository.pull_requests.page_info;
if !page_info.has_next_page || page_info.end_cursor.is_none() {
break;
}
vars.after = page_info.end_cursor;
}

let mut prs_processed: Vec<_> = vec![];
let _: Vec<_> = prs
.into_iter()
.filter_map(|pr| {
if pr.is_draft {
return None;
}

// exclude rollup PRs
let labels = pr
.labels
.map(|l| l.nodes)
.unwrap_or_default()
.into_iter()
.map(|node| node.name)
.collect::<Vec<_>>();
if labels.iter().any(|label| label == "rollup") {
return None;
}

let _: Vec<_> = pr
.assignees
.nodes
.iter()
.map(|user| {
let user_id = user.database_id.expect("checked") as i64;
prs_processed.push((
User {
login: user.login.clone(),
id: Some(user_id),
},
pr.number,
));
})
.collect();
Some(true)
})
.collect();
prs_processed.sort_by(|a, b| a.0.id.cmp(&b.0.id));

Ok(prs_processed)
}

pub enum DesignMeetingStatus {
Proposed,
Scheduled,
Expand Down
1 change: 1 addition & 0 deletions src/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ mod notification;
mod notify_zulip;
mod ping;
mod prioritize;
pub mod pull_requests_assignment_update;
mod relabel;
mod review_requested;
mod review_submitted;
Expand Down
2 changes: 1 addition & 1 deletion src/handlers/notification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ pub async fn handle(ctx: &Context, event: &Event) -> anyhow::Result<()> {
continue;
}

if let Err(err) = notifications::record_username(&client, user.id.unwrap(), user.login)
if let Err(err) = notifications::record_username(&client, user.id.unwrap(), &user.login)
.await
.context("failed to record username")
{
Expand Down
72 changes: 72 additions & 0 deletions src/handlers/pull_requests_assignment_update.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
use std::collections::HashMap;

use crate::db::notifications::record_username;
use crate::github::retrieve_pull_requests;
use crate::jobs::Job;
use anyhow::Context as _;
use async_trait::async_trait;
use tokio_postgres::Client as DbClient;

pub struct PullRequestAssignmentUpdate;

#[async_trait]
impl Job for PullRequestAssignmentUpdate {
fn name(&self) -> &'static str {
"pull_request_assignment_update"
}

async fn run(&self, ctx: &super::Context, _metadata: &serde_json::Value) -> anyhow::Result<()> {
let db = ctx.db.get().await;
let gh = &ctx.github;

tracing::trace!("starting pull_request_assignment_update");

let rust_repo = gh.repository("rust-lang/rust").await?;
let prs = retrieve_pull_requests(&rust_repo, &gh).await?;

// delete all PR assignments before populating
init_table(&db).await?;

// aggregate by user first
let aggregated = prs.into_iter().fold(HashMap::new(), |mut acc, (user, pr)| {
let (_, prs) = acc
.entry(user.id.unwrap())
.or_insert_with(|| (user, Vec::new()));
prs.push(pr);
acc
});

// populate the table
for (_user_id, (assignee, prs)) in &aggregated {
let assignee_id = assignee.id.expect("checked");
let _ = record_username(&db, assignee_id, &assignee.login).await;
create_team_member_workqueue(&db, assignee_id, &prs).await?;
}

Ok(())
}
}

/// Truncate the review prefs table
async fn init_table(db: &DbClient) -> anyhow::Result<u64> {
let res = db
.execute("UPDATE review_prefs SET assigned_prs='{}';", &[])
.await?;
Ok(res)
}

/// Create a team member work queue
async fn create_team_member_workqueue(
db: &DbClient,
user_id: i64,
prs: &Vec<i32>,
) -> anyhow::Result<u64, anyhow::Error> {
let q = "
INSERT INTO review_prefs (user_id, assigned_prs) VALUES ($1, $2)
ON CONFLICT (user_id)
DO UPDATE SET assigned_prs = $2
WHERE review_prefs.user_id=$1";
db.execute(q, &[&user_id, prs])
.await
.context("Insert DB error")
}
2 changes: 1 addition & 1 deletion src/jobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ fn jobs_defined() {
unique_all_job_names.dedup();
assert_eq!(all_job_names, unique_all_job_names);

// Also ensure that our defalt jobs are release jobs
// Also ensure that our default jobs are release jobs
let default_jobs = default_jobs();
default_jobs
.iter()
Expand Down
41 changes: 40 additions & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@ use tokio::{task, time};
use tower::{Service, ServiceExt};
use tracing as log;
use tracing::Instrument;
use triagebot::handlers::pull_requests_assignment_update::PullRequestAssignmentUpdate;
use triagebot::jobs::{
default_jobs, JOB_PROCESSING_CADENCE_IN_SECS, JOB_SCHEDULING_CADENCE_IN_SECS,
default_jobs, Job, JOB_PROCESSING_CADENCE_IN_SECS, JOB_SCHEDULING_CADENCE_IN_SECS,
};
use triagebot::{db, github, handlers::Context, notification_listing, payload, EventName};

Expand Down Expand Up @@ -261,6 +262,12 @@ async fn run_server(addr: SocketAddr) -> anyhow::Result<()> {
octocrab: oc,
});

// Run all jobs that don't have a schedule (one-off jobs)
if !is_scheduled_jobs_disabled() {
spawn_job_oneoffs(ctx.clone());
}

// Run all jobs that have a schedule (recurring jobs)
if !is_scheduled_jobs_disabled() {
spawn_job_scheduler();
spawn_job_runner(ctx.clone());
Expand Down Expand Up @@ -310,6 +317,38 @@ async fn run_server(addr: SocketAddr) -> anyhow::Result<()> {
Ok(())
}

/// Spawns a background tokio task which runs all jobs having no schedule
/// i.e. manually started
fn spawn_job_oneoffs(ctx: Arc<Context>) {
// TODO: Ideally JobSchedule.schedule should become an `Option<Schedule>`
// and here we run all those with schedule=None
let jobs: Vec<Box<dyn Job + Send + Sync>> = vec![Box::new(PullRequestAssignmentUpdate)];

for job in jobs {
let ctx = ctx.clone();
task::spawn(async move {
// Allow some spacing between starting jobs
let mut interval =
time::interval(time::Duration::from_secs(JOB_SCHEDULING_CADENCE_IN_SECS));
interval.tick().await;

log::debug!("Running job: {}", job.name());
match job.run(&ctx, &serde_json::Value::Null).await {
Ok(_) => {
log::trace!("job successfully executed (name={})", &job.name());
}
Err(e) => {
log::error!(
"job failed on execution (name={:?}, error={:?})",
job.name(),
e
);
}
}
});
}
}

/// Spawns a background tokio task which runs continuously to queue up jobs
/// to be run by the job runner.
///
Expand Down

0 comments on commit 24b9b6a

Please sign in to comment.