Skip to content

Commit

Permalink
Allow custom enumerator_builder in Tasks
Browse files Browse the repository at this point in the history
Co-authored-by: Sam Bostock <[email protected]>
Co-authored-by: Gannon McGibbon <[email protected]>
  • Loading branch information
3 people committed Jan 22, 2024
1 parent 8ee3dcd commit c0f8220
Show file tree
Hide file tree
Showing 8 changed files with 132 additions and 39 deletions.
29 changes: 29 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,35 @@ module Maintenance
end
```

### Tasks with Custom Enumerators

If you have a special use case requiring iteration over an unsupported
collection type, such as external resources fetched from some API, you can
implement the `enumerator_builder(cursor:)` method in your task.

This method should return an `Enumerator`, yielding pairs of
`[item, cursor]`. Maintenance Tasks takes care of persisting the current
cursor position and will provide it as the `cursor` argument if your task is
interrupted or resumed. The `cursor` is stored as a `String`, so your custom
enumerator should handle serializing/deserializing the value if required.

```ruby
# app/tasks/maintenance/custom_enumerator_task.rb

module Maintenance
class CustomEnumeratorTask < MaintenanceTasks::Task
def enumerator_builder(cursor:)
after_id = cursor&.to_i
PostAPI.index(after_id: after_id).map { |post| [post, post.id] }.to_enum
end

def process(post)
Post.create!(post)
end
end
end
```

### Throttling

Maintenance tasks often modify a lot of data and can be taxing on your database.
Expand Down
40 changes: 1 addition & 39 deletions app/jobs/concerns/maintenance_tasks/task_job_concern.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,45 +32,7 @@ def retry_on(*, **)

def build_enumerator(_run, cursor:)
cursor ||= @run.cursor
collection = @task.collection
@enumerator = nil

@collection_enum = case collection
when :no_collection
enumerator_builder.build_once_enumerator(cursor: nil)
when ActiveRecord::Relation
enumerator_builder.active_record_on_records(collection, cursor: cursor)
when ActiveRecord::Batches::BatchEnumerator
if collection.start || collection.finish
raise ArgumentError, <<~MSG.squish
#{@task.class.name}#collection cannot support
a batch enumerator with the "start" or "finish" options.
MSG
end

# For now, only support automatic count based on the enumerator for
# batches
enumerator_builder.active_record_on_batch_relations(
collection.relation,
cursor: cursor,
batch_size: collection.batch_size,
)
when Array
enumerator_builder.build_array_enumerator(collection, cursor: cursor&.to_i)
when BatchCsvCollectionBuilder::BatchCsv
JobIteration::CsvEnumerator.new(collection.csv).batches(
batch_size: collection.batch_size,
cursor: cursor&.to_i,
)
when CSV
JobIteration::CsvEnumerator.new(collection).rows(cursor: cursor&.to_i)
else
raise ArgumentError, <<~MSG.squish
#{@task.class.name}#collection must be either an
Active Record Relation, ActiveRecord::Batches::BatchEnumerator,
Array, or CSV.
MSG
end
@collection_enum = @task.enumerator_builder(cursor: cursor)
throttle_enumerator(@collection_enum)
end

Expand Down
50 changes: 50 additions & 0 deletions app/models/maintenance_tasks/task.rb
Original file line number Diff line number Diff line change
Expand Up @@ -247,5 +247,55 @@ def process(_item)
def count
self.class.collection_builder_strategy.count(self)
end

# Default enumeration builder. You may override this method to return any
# Enumerator yielding pairs of `[item, item_cursor]`.
#
# @param cursor [String, nil] cursor position to resume from, or nil on
# initial call.
#
# @return [Enumerator]
def enumerator_builder(cursor:)
collection = self.collection

job_iteration_builder = JobIteration::EnumeratorBuilder.new(nil)

case collection
when :no_collection
job_iteration_builder.build_once_enumerator(cursor: nil)
when ActiveRecord::Relation
job_iteration_builder.active_record_on_records(collection, cursor: cursor)
when ActiveRecord::Batches::BatchEnumerator
if collection.start || collection.finish
raise ArgumentError, <<~MSG.squish
#{self.class.name}#collection cannot support
a batch enumerator with the "start" or "finish" options.
MSG
end

# For now, only support automatic count based on the enumerator for
# batches
job_iteration_builder.active_record_on_batch_relations(
collection.relation,
cursor: cursor,
batch_size: collection.batch_size,
)
when Array
job_iteration_builder.build_array_enumerator(collection, cursor: cursor&.to_i)
when BatchCsvCollectionBuilder::BatchCsv
JobIteration::CsvEnumerator.new(collection.csv).batches(
batch_size: collection.batch_size,
cursor: cursor&.to_i,
)
when CSV
JobIteration::CsvEnumerator.new(collection).rows(cursor: cursor&.to_i)
else
raise ArgumentError, <<~MSG.squish
#{self.class.name}#collection must be either an
Active Record Relation, ActiveRecord::Batches::BatchEnumerator,
Array, or CSV.
MSG
end
end
end
end
18 changes: 18 additions & 0 deletions test/dummy/app/tasks/maintenance/custom_enumerating_task.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# frozen_string_literal: true

module Maintenance
class CustomEnumeratingTask < MaintenanceTasks::Task
def enumerator_builder(cursor:)
drop = cursor.nil? ? 0 : cursor.to_i + 1

[:a, :b, :c].lazy.with_index.drop(drop)
end

def count
3
end

def process(_)
end
end
end
31 changes: 31 additions & 0 deletions test/jobs/maintenance_tasks/task_job_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -617,5 +617,36 @@ class << self

assert_equal 2, @run.reload.tick_total
end

test ".perform_now accepts custom enumerated tasks" do
run = Run.create!(task_name: "Maintenance::CustomEnumeratingTask")

[:a, :b, :c].each do |item|
Maintenance::CustomEnumeratingTask.any_instance
.expects(:process).with(item).once
end

TaskJob.perform_now(run)
end

test ".perform_now handles cursors provided by custom enumerated tasks" do
run = Run.create!(task_name: "Maintenance::CustomEnumeratingTask")

TaskJob.perform_now(run)

assert_equal "2", run.reload.cursor
end

test ".perform_now starts custom enumerated tasks from cursor position when job resumes" do
run = Run.create!(task_name: "Maintenance::CustomEnumeratingTask")
run.update!(cursor: "0")

[:b, :c].each do |item|
Maintenance::CustomEnumeratingTask.any_instance
.expects(:process).with(item).once
end

TaskJob.perform_now(run)
end
end
end
1 change: 1 addition & 0 deletions test/models/maintenance_tasks/task_data_index_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ class TaskDataIndexTest < ActiveSupport::TestCase
"Maintenance::BatchImportPostsTask",
"Maintenance::CallbackTestTask",
"Maintenance::CancelledEnqueueTask",
"Maintenance::CustomEnumeratingTask",
"Maintenance::EnqueueErrorTask",
"Maintenance::ErrorTask",
"Maintenance::ImportPostsTask",
Expand Down
1 change: 1 addition & 0 deletions test/models/maintenance_tasks/task_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ class TaskTest < ActiveSupport::TestCase
"Maintenance::BatchImportPostsTask",
"Maintenance::CallbackTestTask",
"Maintenance::CancelledEnqueueTask",
"Maintenance::CustomEnumeratingTask",
"Maintenance::EnqueueErrorTask",
"Maintenance::ErrorTask",
"Maintenance::ImportPostsTask",
Expand Down
1 change: 1 addition & 0 deletions test/system/maintenance_tasks/tasks_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ class TasksTest < ApplicationSystemTestCase
"Maintenance::BatchImportPostsTask\nNew",
"Maintenance::CallbackTestTask\nNew",
"Maintenance::CancelledEnqueueTask\nNew",
"Maintenance::CustomEnumeratingTask\nNew",
"Maintenance::EnqueueErrorTask\nNew",
"Maintenance::ErrorTask\nNew",
"Maintenance::Nested::NestedMore::NestedMoreTask\nNew",
Expand Down

0 comments on commit c0f8220

Please sign in to comment.