diff --git a/lib/oban/engines/basic.ex b/lib/oban/engines/basic.ex index 83a17e29..638e09dc 100644 --- a/lib/oban/engines/basic.ex +++ b/lib/oban/engines/basic.ex @@ -153,9 +153,10 @@ defmodule Oban.Engines.Basic do subquery = queryable |> select([:id, :queue, :state]) - |> where([j], j.state in ~w(completed cancelled discarded)) + |> where([j], j.state == "completed" and j.scheduled_at < ^time) + |> or_where([j], j.state == "cancelled" and j.cancelled_at < ^time) + |> or_where([j], j.state == "discarded" and j.discarded_at < ^time) |> where([j], not is_nil(j.queue)) - |> where([j], j.scheduled_at < ^time) |> limit(^limit) query = diff --git a/lib/oban/engines/lite.ex b/lib/oban/engines/lite.ex index 5c62e1a6..23a08a77 100644 --- a/lib/oban/engines/lite.ex +++ b/lib/oban/engines/lite.ex @@ -146,8 +146,9 @@ defmodule Oban.Engines.Lite do select_query = queryable |> select([j], map(j, [:id, :queue, :state])) - |> where([j], j.state in ~w(completed cancelled discarded)) - |> where([j], j.scheduled_at < ^time) + |> where([j], j.state == "completed" and j.scheduled_at < ^time) + |> or_where([j], j.state == "cancelled" and j.cancelled_at < ^time) + |> or_where([j], j.state == "discarded" and j.discarded_at < ^time) |> limit(^limit) pruned = Repo.all(conf, select_query) diff --git a/test/oban/engine_test.exs b/test/oban/engine_test.exs index 1ca091be..ffeb7bd0 100644 --- a/test/oban/engine_test.exs +++ b/test/oban/engine_test.exs @@ -445,7 +445,13 @@ for engine <- [Oban.Engines.Basic, Oban.Engines.Lite] do TelemetryHandler.attach_events(span_type: [:job, [:engine, :prune_jobs]]) for state <- Job.states(), seconds <- 59..61 do - opts = [state: to_string(state), scheduled_at: seconds_ago(seconds)] + opts = [ + state: to_string(state), + completed_at: seconds_ago(seconds), + discarded_at: seconds_ago(seconds), + cancelled_at: seconds_ago(seconds), + scheduled_at: seconds_ago(seconds) + ] # Insert one job at a time to avoid a "Cell-wise defaults" error in SQLite. Oban.insert!(name, Worker.new(%{}, opts)) diff --git a/test/oban/plugins/pruner_test.exs b/test/oban/plugins/pruner_test.exs index 60356626..783b5fcc 100644 --- a/test/oban/plugins/pruner_test.exs +++ b/test/oban/plugins/pruner_test.exs @@ -27,17 +27,26 @@ defmodule Oban.Plugins.PrunerTest do test "historic jobs are pruned when they are older than the configured age" do TelemetryHandler.attach_events() - %Job{id: _id_} = insert!(%{}, state: "completed", scheduled_at: seconds_ago(61)) - %Job{id: _id_} = insert!(%{}, state: "cancelled", scheduled_at: seconds_ago(61)) - %Job{id: _id_} = insert!(%{}, state: "discarded", scheduled_at: seconds_ago(61)) - %Job{id: id_1} = insert!(%{}, state: "completed", scheduled_at: seconds_ago(59)) + %Job{id: _id_} = insert_historical("cancelled", :cancelled_at, 61, 61) + %Job{id: _id_} = insert_historical("cancelled", :cancelled_at, 61, 59) + %Job{id: _id_} = insert_historical("discarded", :discarded_at, 61, 61) + %Job{id: _id_} = insert_historical("discarded", :discarded_at, 61, 59) + %Job{id: _id_} = insert_historical("completed", :completed_at, 61, 61) + %Job{id: _id_} = insert_historical("completed", :completed_at, 59, 61) + + %Job{id: id_1} = insert_historical("cancelled", :cancelled_at, 59, 61) + %Job{id: id_2} = insert_historical("cancelled", :cancelled_at, 59, 59) + %Job{id: id_3} = insert_historical("discarded", :discarded_at, 59, 61) + %Job{id: id_4} = insert_historical("discarded", :discarded_at, 59, 59) + %Job{id: id_5} = insert_historical("completed", :completed_at, 59, 59) + %Job{id: id_6} = insert_historical("completed", :completed_at, 61, 59) start_supervised_oban!(plugins: [{Pruner, interval: 10, max_age: 60}]) assert_receive {:event, :stop, _, %{plugin: Pruner} = meta} - assert %{pruned_count: 3, pruned_jobs: [_ | _]} = meta + assert %{pruned_count: 6, pruned_jobs: [_ | _]} = meta - assert [id_1] == + assert [id_1, id_2, id_3, id_4, id_5, id_6] == Job |> select([j], j.id) |> order_by(asc: :id) diff --git a/test/support/case.ex b/test/support/case.ex index 184e2333..36440056 100644 --- a/test/support/case.ex +++ b/test/support/case.ex @@ -102,6 +102,19 @@ defmodule Oban.Case do Oban.insert!(oban, changeset) end + # Pruning test helpers + + def insert_historical(state, timestamp_field, timestamp_age, scheduled_age) do + opts = + Keyword.put( + [state: state, scheduled_at: seconds_ago(scheduled_age)], + timestamp_field, + seconds_ago(timestamp_age) + ) + + insert!(%{}, opts) + end + # Time def seconds_from_now(seconds) do