Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bug: helper-side panic when repeating request for GC-eligible aggregation job #2810

Closed
inahga opened this issue Mar 13, 2024 · 3 comments
Closed

Comments

@inahga
Copy link
Contributor

inahga commented Mar 13, 2024

thread 'aggregator::http_handlers::tests::aggregate_init_with_expired_reports' panicked at aggregator/src/aggregator.rs:1746:17:
found no existing aggregation job for task ID CbZK5CdAfS7_hzp2L_4Y3vqlTgV1xhzMmv2pQH3M1qo and aggregation job ID Wj3h93hr6mqFyq_fqtRz-w
    #[tokio::test]
    #[allow(clippy::unit_arg)]
    async fn aggregate_init_repeat_garbage_collected_job() {
        let (clock, _ephemeral_datastore, datastore, handler) = setup_http_handler_test().await;

        let report_expiry_age = Duration::from_hours(24).unwrap();
        let task = TaskBuilder::new(QueryType::TimeInterval, VdafInstance::Fake)
            .with_report_expiry_age(Some(report_expiry_age))
            .build();

        let helper_task = task.helper_view().unwrap();

        let vdaf = dummy::Vdaf::new(1);
        // let verify_key: VerifyKey<0> = task.vdaf_verify_key().unwrap();
        // let hpke_key = helper_task.current_hpke_key();
        let measurement = 0;
        let prep_init_generator = PrepareInitGenerator::new(
            clock.clone(),
            helper_task.clone(),
            vdaf.clone(),
            dummy::AggregationParam(0),
        );

        // Create a happy path report.
        let (prepare_init_0, transcript_0) = prep_init_generator.next(&measurement);

        datastore
            .put_aggregator_task(&task.helper_view().unwrap())
            .await
            .unwrap();

        let aggregation_param = dummy::AggregationParam(0);
        let request = AggregationJobInitializeReq::new(
            aggregation_param.get_encoded().unwrap(),
            PartialBatchSelector::new_time_interval(),
            Vec::from([prepare_init_0.clone()]),
        );

        let aggregation_job_id: AggregationJobId = random();
        for _ in 0..2 {
            let mut test_conn =
                put_aggregation_job(&task, &aggregation_job_id, &request, &handler).await;
            assert_eq!(test_conn.status(), Some(Status::Ok));
            assert_headers!(
                &test_conn,
                "content-type" => (AggregationJobResp::MEDIA_TYPE)
            );
            let aggregate_resp: AggregationJobResp =
                dbg!(decode_response_body(&mut test_conn).await);

            assert_eq!(aggregate_resp.prepare_resps().len(), 1);

            let prepare_step_0 = aggregate_resp.prepare_resps().first().unwrap();
            assert_eq!(
                prepare_step_0.report_id(),
                prepare_init_0.report_share().metadata().id()
            );
            assert_matches!(prepare_step_0.result(), PrepareStepResult::Continue { message } => {
                assert_eq!(message, &transcript_0.helper_prepare_transitions[0].message);
            });

            let (aggregation_jobs, batch_aggregations) = datastore
                .run_unnamed_tx(|tx| {
                    let task = task.clone();
                    let vdaf = vdaf.clone();
                    Box::pin(async move {
                        Ok((
                            tx.get_aggregation_jobs_for_task::<0, TimeInterval, dummy::Vdaf>(
                                task.id(),
                            )
                            .await?,
                            tx.get_batch_aggregations_for_task::<0, TimeInterval, _>(
                                &vdaf,
                                task.id(),
                            )
                            .await?,
                        ))
                    })
                })
                .await
                .unwrap();
            dbg!(aggregation_jobs);
            dbg!(batch_aggregations);

            // Advance the clock well past the report expiry age.
            clock.advance(
                &report_expiry_age
                    .add(&Duration::from_hours(1).unwrap())
                    .unwrap(),
            );
        }

        // assert_eq!(aggregation_jobs.len(), 1);
    }
@inahga inahga self-assigned this Mar 13, 2024
@inahga
Copy link
Contributor Author

inahga commented Sep 9, 2024

This is no longer a panic on the latest release, but a 500 error. We attempt to write a new aggregation job, but that operation fails because one is already in the database.

The bigger problem may be that the second request is accepted in the first place. Why would we attempt to write an aggregation job containing reports who are all past the report expiry age anyway?

@inahga
Copy link
Contributor Author

inahga commented Sep 9, 2024

This bug reveals a couple of possible problems:

  • Report expiry age is not considered at all in helper-side aggregate init when processing the aggregation job, but it is considered for GC purposes and get_*() calls. So, we can write aggregation jobs and batches that are immediately GC'd. Maybe not a big deal, but I imagine it could cause problem where leader report_expiry_age is greater the the helpers.
  • Our idempotency check respects GC, so an aggregation job that is GC'd but not actually deleted passes the idempotency check.

However I will table working on this bug for now to work on other priorities. AFAIK it has never happened in production, and it only came up while I was searching for a different bug. As mentioned above it doesn't trigger a panic anymore.

@inahga inahga removed their assignment Sep 9, 2024
@inahga
Copy link
Contributor Author

inahga commented Sep 18, 2024

Looked into this a bit more with @branlwyd and @divergentdave. The bug still applies.

The problem is here https://github.com/divviup/janus/blob/main/aggregator_core/src/datastore.rs#L2580, where the GC-eligible job is excluded by the WHERE filter. Swapping the sign fixes the case, but breaks the case where an aggregation job ID collides with new report data, and the first aggregation job is GC eligible (i.e. the report interval is more up to date).

We may be able to fix this by deleting report aggregations for an aggregation job, after we put the aggregation job. But this may have unforeseen consequences.

The edge case that triggers this bug is quite narrow, and the fix may be complex, so we are not going to address it at this time, until it happens in production.

@inahga inahga closed this as not planned Won't fix, can't repro, duplicate, stale Sep 18, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant