Skip to content

Commit

Permalink
Use proper fields for finding prunable jobs (#1145)
Browse files Browse the repository at this point in the history
Using `scheduled_at` is not correct in all situations. Depending on job state,  one of `cancelled_at`, `discarded_at`, or `scheduled_at` should be used.
  • Loading branch information
ademenev authored Aug 28, 2024
1 parent 6538277 commit 861dfa5
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 11 deletions.
5 changes: 3 additions & 2 deletions lib/oban/engines/basic.ex
Original file line number Diff line number Diff line change
Expand Up @@ -153,9 +153,10 @@ defmodule Oban.Engines.Basic do
subquery =
queryable
|> select([:id, :queue, :state])
|> where([j], j.state in ~w(completed cancelled discarded))
|> where([j], j.state == "completed" and j.scheduled_at < ^time)
|> or_where([j], j.state == "cancelled" and j.cancelled_at < ^time)
|> or_where([j], j.state == "discarded" and j.discarded_at < ^time)
|> where([j], not is_nil(j.queue))
|> where([j], j.scheduled_at < ^time)
|> limit(^limit)

query =
Expand Down
5 changes: 3 additions & 2 deletions lib/oban/engines/lite.ex
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,9 @@ defmodule Oban.Engines.Lite do
select_query =
queryable
|> select([j], map(j, [:id, :queue, :state]))
|> where([j], j.state in ~w(completed cancelled discarded))
|> where([j], j.scheduled_at < ^time)
|> where([j], j.state == "completed" and j.scheduled_at < ^time)
|> or_where([j], j.state == "cancelled" and j.cancelled_at < ^time)
|> or_where([j], j.state == "discarded" and j.discarded_at < ^time)
|> limit(^limit)

pruned = Repo.all(conf, select_query)
Expand Down
8 changes: 7 additions & 1 deletion test/oban/engine_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,13 @@ for engine <- [Oban.Engines.Basic, Oban.Engines.Lite] do
TelemetryHandler.attach_events(span_type: [:job, [:engine, :prune_jobs]])

for state <- Job.states(), seconds <- 59..61 do
opts = [state: to_string(state), scheduled_at: seconds_ago(seconds)]
opts = [
state: to_string(state),
completed_at: seconds_ago(seconds),
discarded_at: seconds_ago(seconds),
cancelled_at: seconds_ago(seconds),
scheduled_at: seconds_ago(seconds)
]

# Insert one job at a time to avoid a "Cell-wise defaults" error in SQLite.
Oban.insert!(name, Worker.new(%{}, opts))
Expand Down
21 changes: 15 additions & 6 deletions test/oban/plugins/pruner_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,26 @@ defmodule Oban.Plugins.PrunerTest do
test "historic jobs are pruned when they are older than the configured age" do
TelemetryHandler.attach_events()

%Job{id: _id_} = insert!(%{}, state: "completed", scheduled_at: seconds_ago(61))
%Job{id: _id_} = insert!(%{}, state: "cancelled", scheduled_at: seconds_ago(61))
%Job{id: _id_} = insert!(%{}, state: "discarded", scheduled_at: seconds_ago(61))
%Job{id: id_1} = insert!(%{}, state: "completed", scheduled_at: seconds_ago(59))
%Job{id: _id_} = insert_historical("cancelled", :cancelled_at, 61, 61)
%Job{id: _id_} = insert_historical("cancelled", :cancelled_at, 61, 59)
%Job{id: _id_} = insert_historical("discarded", :discarded_at, 61, 61)
%Job{id: _id_} = insert_historical("discarded", :discarded_at, 61, 59)
%Job{id: _id_} = insert_historical("completed", :completed_at, 61, 61)
%Job{id: _id_} = insert_historical("completed", :completed_at, 59, 61)

%Job{id: id_1} = insert_historical("cancelled", :cancelled_at, 59, 61)
%Job{id: id_2} = insert_historical("cancelled", :cancelled_at, 59, 59)
%Job{id: id_3} = insert_historical("discarded", :discarded_at, 59, 61)
%Job{id: id_4} = insert_historical("discarded", :discarded_at, 59, 59)
%Job{id: id_5} = insert_historical("completed", :completed_at, 59, 59)
%Job{id: id_6} = insert_historical("completed", :completed_at, 61, 59)

start_supervised_oban!(plugins: [{Pruner, interval: 10, max_age: 60}])

assert_receive {:event, :stop, _, %{plugin: Pruner} = meta}
assert %{pruned_count: 3, pruned_jobs: [_ | _]} = meta
assert %{pruned_count: 6, pruned_jobs: [_ | _]} = meta

assert [id_1] ==
assert [id_1, id_2, id_3, id_4, id_5, id_6] ==
Job
|> select([j], j.id)
|> order_by(asc: :id)
Expand Down
13 changes: 13 additions & 0 deletions test/support/case.ex
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,19 @@ defmodule Oban.Case do
Oban.insert!(oban, changeset)
end

# Pruning test helpers

def insert_historical(state, timestamp_field, timestamp_age, scheduled_age) do
opts =
Keyword.put(
[state: state, scheduled_at: seconds_ago(scheduled_age)],
timestamp_field,
seconds_ago(timestamp_age)
)

insert!(%{}, opts)
end

# Time

def seconds_from_now(seconds) do
Expand Down

0 comments on commit 861dfa5

Please sign in to comment.