-
Notifications
You must be signed in to change notification settings - Fork 2.4k
Batches
Batches are Sidekiq Pro's term for a collection of jobs which can be monitored as a group. You can create a set of jobs to execute in parallel and then execute a callback when all the jobs are finished.
See batches in action here:
Some businesses upload a lot of Excel spreadsheets to load data into their database. These spreadsheets might have hundreds of rows, each row requiring a few seconds of processing. I don't want to process the file synchronously (the web browser will time out after 60 seconds) and I don't want to spin off the upload as a single Sidekiq job (there's no performance benefit to serial execution). Instead I want to break up the Excel spreadsheet into one job per row and get the benefit of parallelism to massively speed up the data load time. But how do I know when the entire thing is done? How do I track the progress?
This is what batches allow you to do!
batch = Sidekiq::Batch.new
batch.description = "Batch description (this is optional)"
batch.on(:success, MyCallback, :to => user.email)
batch.jobs do
rows.each { |row| RowJob.perform_async(row) }
end
puts "Just started Batch #{batch.bid}"
Here we've created a new Batch, told it to fire a callback when all jobs are successful and then filled it with jobs to perform. The bid
, or Batch ID, is the unique identifier for a Batch.
You can dynamically add jobs to a batch from within an executing job:
class SomeJob
include Sidekiq::Job
def perform(...)
puts "Working within batch #{bid}"
batch.jobs do
# add more jobs
end
end
end
bid
is a method on Sidekiq::Job which gives access to the ID of the Batch associated with this job. batch
is a method on Sidekiq::Job that gives access to the Batch associated to this job.
The jobs
method is atomic. All jobs created in the block are actually pushed atomically to Redis at the end of the block. If an error is raised, none of the jobs will go to Redis.
Note that it is not safe to call batch.jobs
more than once when defining the batch. Jobs are asynchronous and once you've sent the jobs to Redis, those jobs can complete at any moment. Calling batch.jobs
again or calling it N times can lead to errors where the batch has already completed due to this race condition. It is safe to "reopen" the batch and add more jobs from within a job in that batch. See the section on Huge Batches for further discussion on safely loading batch jobs.
Bad, calling batch.jobs
10 times:
batch = Sidekiq::Batch.new
10.times do
batch.jobs do
MyJob.perform_async
end
end
Good, one call:
batch = Sidekiq::Batch.new
batch.jobs do
10.times do
MyJob.perform_async
end
end
Batch#jobs does not support Sidekiq's transactional push feature. Batch jobs always push at the end of the jobs
block, even in a transaction.
To fetch the status for a Batch programmatically, you use Sidekiq::Batch::Status
:
status = Sidekiq::Batch::Status.new(bid)
status.total # jobs in the batch => 98
status.failures # failed jobs so far => 5
status.pending # jobs which have not succeeded yet => 17
status.created_at # => 2012-09-04 21:15:05 -0700
status.complete? # if all jobs have executed at least once => false
status.join # blocks until the batch is considered complete, note that some jobs might have failed
status.failure_info # an array of failed jobs
status.data # a hash of data about the batch which can easily be converted to JSON for javascript usage
Sidekiq can notify you when a Batch is complete or successful with batch.on(event, klass, options={})
:
- complete - when all jobs in the batch have run once, successful or not.
- success - when all jobs in the batch have completed successfully.
- death - the first time a batch job dies
class SomeClass
def on_complete(status, options)
puts "Uh oh, batch has failures" if status.failures != 0
end
def on_success(status, options)
puts "#{options['uid']}'s batch succeeded. Kudos!"
end
end
batch = Sidekiq::Batch.new
# this will call "SomeClass.new.on_success"
batch.on(:success, SomeClass, 'uid' => current_user.id)
# You can also use Class#method notation which is like calling "AnotherClass.new.method"
batch.on(:complete, 'AnotherClass#method', 'uid' => current_user.id)
Callback methods receive two parameters: a Status object for the Batch and the set of options which you provided when declaring the callback. Note that options are marshalled through JSON so use only basic types.
Regarding success, if a job fails continually it's possible the success event will never fire. If a job fails all retries and dies, it will fire any :death callbacks. The :death callback is always fired for first job in the batch that fails and does not retry. Even if you configure jobs to disable retries or job death, it will still fire the :death callback.
:death and :success are not mutually exclusive but the death callback firing means that the batch will not fire success without manual intervention. If you deploy a fix and manually re-enqueue a dead batch job, the batch can still fire :success.
As of Sidekiq Pro 7.1, empty Batches are legal and allowed. An empty batch will automatically create a Sidekiq::Batch::Empty
job which fires any callbacks.
Normally batches complete quickly. Upon success, Sidekiq will remove the batch from the Web UI listing but the batch data will linger in Redis for 24 hours so any status or polling API calls you make for that BID will still work during that window of time.
Pending batches will expire in Redis after 30 days. Callbacks won't trigger and you will have to deal with performing any cleanup work manually.
Say you have a batch b
with three jobs, j1
, j2
and j3
. Suppose b
has a success callback. j1
and j2
complete successfully but j3
fails for some reason. If you delete the job j3
manually, then the batch callback will never automatically complete. (It is recommended that you allow jobs to cancel themselves.)
In this case, the batch data will remain in Redis until it expires. The Web UI will show the batch as waiting on a job that has now been deleted. This is another reason to avoid directly deleting jobs -- cancellation does not suffer from this problem.
Batch callbacks run in their own job, they are enqueued in the queue used by the final job in the batch to execute. If there are errors in the batch callback, it will retry like any other job. You can specify a different queue for the callback jobs so they have a higher priority:
batch = Sidekiq::Batch.new
batch.callback_queue = 'critical'
batch.on(:success, ...)
batch.jobs ...
Sidekiq Pro contains extensions for the Sidekiq Web UI, including an overview for Batches which shows the current status of all Batches along with a Batch details page listing any errors associated with jobs in the Batch. Require the Pro extension where you require the standard Web UI:
require 'sidekiq/pro/web'
mount Sidekiq::Web => '/sidekiq'
Note that the UI shows all in-progress batches. Successful batches are removed so as to not fill up the UI.
You can poll for the status of a batch (perhaps to show a progress bar as the batch is processed) using the built-in Rack endpoint. Add it to your application's config.ru
:
require 'sidekiq/pro/batch_status'
use Sidekiq::Pro::BatchStatus
run Myapp::Application
Then you can query the server to get a JSON blob of data about a batch by passing the BID. For example:
http://localhost:3000/batch_status/bc7f822afbb40747.json
{"complete":true,"bid":"bc7f822afbb40747","total":10,"pending":0,"description":null,"failures":0,"created_at":1367700200.1111438,"fail_info":[]}
NB: This feature is deprecated and will be removed in Sidekiq Pro 8.0. The full source code for the feature can be found here for those who wish to continue using it.
Warning: this functionality is only available if you use the Sidekiq::Job
API; it does not work with ActiveJob.
Batches can contain hundreds of thousands of jobs but loading all those jobs in serial can take a long time. There's no reason why you can't parallelize the loading of a batch by first creating a batch with an initial set of jobs whose only purpose is to load other jobs into the batch, perhaps 1000 each:
b = Sidekiq::Batch.new
b.on(:success, ...)
b.jobs do
200.times do |idx|
# each loader job will push 1000 jobs of some other type
Loader.perform_async(idx)
end
end
class Loader
include Sidekiq::Job
SIZE = 1000
def perform(idx)
# assume we want to create a job for each of 200,000 database records
# query for our set of 1000 records
results = SomeModel.limit(SIZE).offset(idx*SIZE).select(:id)
# reopen our own batch to add more jobs to it
batch.jobs do
# push 1000 jobs in one network call to Redis, saves 999 round trips
Sidekiq::Client.push_bulk('class' => SomeJob, 'args' => results.map{|x| [x.id]})
end
end
end
By parallelizing the load and using push_bulk
, creating large batches should go from minutes to seconds!
If a batch of jobs is no longer valid, can you cancel them or remove them from Redis?
Sidekiq's internal data structures don't make it efficient to remove a job in Redis. Instead I recommend you have each job check if it is still valid when it executes. This way the jobs don't do any extra work and Redis is happy. The Batch API makes this pretty easy to do.
Step 1 Create the batch as normal:
batch = Sidekiq::Batch.new
batch.jobs do
# define your work
end
# save batch.bid somewhere
Step 2 Cancel the batch due to some user action
batch = Sidekiq::Batch.new(bid)
batch.invalidate_all
Step 3 Each job verifies its own validity
class MyJob
include Sidekiq::Job
def perform
return unless valid_within_batch? # this method is on Sidekiq::Job
# do actual work
end
end
You can iterate through all known Batches, getting a Sidekiq::Batch::Status
for each entry:
bs = Sidekiq::BatchSet.new
bs.each do |status|
puts status.bid
end
Sidekiq::BatchSet
will contain only Batches with outstanding jobs.
In some rare cases, you no longer need a batch in Redis. To remove batch data from Redis, use delete
:
bs = Sidekiq::BatchSet.new
bs.each do |batch|
batch.delete
end
batch = Sidekiq::Batch::Status.new(bid) # bid is the batch ID
batch.delete
Deleting a batch will break Sidekiq if there are still jobs associated with that batch in Redis.
Learn how Batches can enable some complex workflows and really complex workflows.
Batches always require Redis to be running. Your test code can create a batch and execute those jobs (even with inline mode) but the batch will never run the :complete or :success callbacks until you add the middleware manually to the testing environment like this:
before :suite do
Sidekiq::Testing.server_middleware do |chain|
chain.add Sidekiq::Batch::Server
end
end
Now your batch callbacks will run once all batch jobs have run.
Sidekiq::Testing.inline!
b = Sidekiq::Batch.new
b.on(:success, "Something#done", "foo" => "bar")
b.jobs do
SomeJob.perform_async
SomeJob.perform_async
end
# Something#done should be called
If any job within a batch dies, the Batch will never run its success callback and will be considered "dead" also. As of Sidekiq Pro 4.0, you can enumerate all dead batches via API:
bds = Sidekiq::Batch::DeadSet.new
bds.each do |status|
status.dead_jids # => ["abcdef123456", ...]
# ...recover somehow...
status.delete # removes the batch from Redis
end
and perform any necessary manual repair. This API is still pretty rudimentary; suggestions for improvement are welcome.
As of Sidekiq Pro 5.2.1, the death of a job in a child batch will cause its parent batch's death callback to run.
Some customers have seen batches which get "stuck" with pending jobs which can't be found. There's generally one of several reasons:
- A job was lost due to a process crash. If you have super_fetch enabled, this job should be eventually rescued and retried. If you don't have super_fetch activated, please activate it ASAP!
- A job was lost due to the process being killed during a deploy. This can be caused by long-running jobs which don't allow Sidekiq to shut down within 30 seconds. Use a gem like
sidekiq-iteration
to create well-behaved jobs which can shut down quickly and safely. - A job was registered with a Batch but was never pushed to Redis. This can happen when you create a Batch but are using the transactional client feature which delays the actual push to Redis until the current database transaction has committed. This delay creates a window of time where process death leads to a corrupt batch (fixed in Sidekiq 7.2.2).
Stuck batches are almost always due to forced process death. Remember to use the TSTP signal to give your processes as much time as possible to stop and never use kill -9
in production.
- Before Sidekiq Pro 7.1, a Batch with zero jobs was invalid, its behavior was undefined. As of 7.1, empty Batches will push a Sidekiq::Batch::Empty job which will execute any callbacks.
- If you find that batches are stuck with Pending jobs, especially right around a deployment, verify you are gracefully restarting Sidekiq as designed: send TSTP as early as possible, TERM as late as possible, and never use
kill -9
. - You can dynamically add jobs to a Batch but only by another job within that batch (or child batch). It is not safe to modify a Batch outside of the batch or within its callbacks.
- Batches don't work well with ActiveJob because an ActiveJob retry looks like a success to Sidekiq. Please use native Sidekiq::Jobs.
- Don't ever disable retries in a batch job. If the job fails, it disappears and the batch will never succeed. You need retries to diagnose and fix the job so the batch succeeds.
- Be careful using job uniqueness with a batch. If an error is raised while defining the batch jobs, the lock will remain in Redis but the job will never be pushed to Redis. See sidekiq/sidekiq#3662 for details.
- Batches can contain scheduled jobs too, e.g.
perform_in(10.minutes)
. This will prevent the batch from finishing until that scheduled job runs. - I've heard from multiple customers using batches with millions of jobs. It is not known how high Batches scale but I recommend verifying your Redis instance sizing before pushing the envelope.
- Seeing "negative pending" batches? Make sure you've configured Redis
with
maxmemory-policy noeviction
. - Seeing "positive pending" batches but can't find those pending jobs? They may be in a super_fetch private queue. This can happen if your deploys are misconfigured and creating orphaned jobs. Check your
-t
shutdown timeout value (default: 25) and make sure your deploy tool is giving Sidekiq at least N+5 (i.e. 30) seconds before killing the process.