The postqueue
gem implements a simple to use queue on top of postgresql.
Lets have a word about words first: while a queue like this is typically used for job queues, this document does not talk about jobs, it talks about queue items; we also do not schedule jobs, we enqueues items, and we don't executes a job, we processes queue items instead.
So, why building an additional queue implementation? Compared to the usual suspects this is what postqueue brings to the table:
-
The item structure is intentionally kept very simple: an item is described by an
op
field - a string - and anid
field, an integer. In a typical usecase a queue item would describe an operation on a specific entity, andop
would name both the operation and the entity type (say:"product/invalidate"
) and theid
field would hold the id of the product to invalidate. -
Such a simplistic item structure lends quite well to querying the queue using SQL. While this is great for monitoring purposes - it is quite easy to fetch metrics regarding upcoming items - it also allows special handling of idempotent operations and automatic batching.
-
Some tasks typically handled by queues are of idempotent nature. For example, reindexing a document into a NoSQL search index needs not be done twice in a row, since only the last change to the primary object should and ultimately will be stored in the search index. Such tasks can therefore be run only once. postqueue supports this feature by optionally skipping duplicates when enqueuing tasks.
-
Other tasks can be handled much more efficient when run in batches. Typical examples include processing a larger number of entities that need to be read from a database, but could be pulled much more efficient in a single query instead of in N queries. postqueue automatically batches such items.
-
Being based on Postgresql postqueue provides transactional semantics: an item written into the database in a transaction that fails afterwards is never processed.
-
automatic retries: like delayed job postqueue implements a rudimentary form of error processing. A failing item - this is an item which does not have a handler registered, or whose handler fails by raising an exception - is kept in the queue and reprocessed later. It is reprocessed up to N times (currently 5 times by default) until it is "doomed" ultimately. This is similar to delayed job's error handling, with some differences, however:
- no backtrace is kept in the database
- the waiting time doesn't ramp up as fast (postqueue does
1.5 ** <number of retries>
)
Please be aware that postqueue is using the SELECT .. FOR UPDATE SKIP LOCKED
Postgresql syntax, and therefore needs at least PostgresQL >= 9.5.
Postqueue is able to run queues that use separate tables as their backstore, or use a preexisting table. However, basic usage should cover most scenarios.
Hence we cover the basic scenario here: in that scenario a single table "postqueue" is used to store queue items, and the Postqueue
default queue holds all configuration. We also assume that you want to integrate Postqueue with a Rails application.
Add this line to your application's Gemfile:
gem 'postqueue'
And then execute:
$ bundle
The following migration creates a postqueue table with all necessary entries:
class AddPostqueue < ActiveRecord::Migration
def up
Postqueue.migrate!
end
def down
Postqueue.unmigrate!
end
end
Note: it should be safe to migrate! multiple times.
The postqueue configuration descrives all possible operations and their features:
- is it possible to batch these operationss? In that case multiple queue items will be combined and processed in one go. Set
batch_size:
to a sensible size. - is this an idempotent operation? Set
idempotent:
to true.
The configuration file should live in config/initializer/postqueue.rb
for Rails apps, and in config/postqueue.rb
in other Ruby applications.
# config/initializer/postqueue.rb
Postqueue.on "refresh", batch_size: 10, idempotent: true do |op, entity_ids|
Product.index_many(Product.where(id: entity_ids))
end
Note that you could define an operation without a handler callback:
Postqueue.on "foo"
In this case the foo
ops are just removed from the queue during procesing.
When run from inside a Rails application Postqueue will reuse the applications database connection. When run outside a Rails application postqueue will use a
config/database.yml
file to determine database connection settings. It will use the RAILS_ENV
environment value, defaulting to "development"
, to choose from entries in that file.
Enqueuing items can be done using code like this:
# enqueue a single op
Postqueue.enqueue op: "refresh", entity_id: 12
# enqueue multiple ops in one go.
Postqueue.enqueue op: "refresh", entity_id: [12,13]
Postqueue.enqueue op: "refresh", entity_id: [13,14]
Note that enqueueing is pretty fast. My developer machine is able to enqueue ~20000 items per second.
While we recommend to use the command line interface to process postqueue items you can certainly process these from within ruby code by using one of these methods:
# process the next batch of items
Postqueue.process
# process a single item, do not batch
Postqueue.process_one
# process batches of items until there are none left
Postqueue.process_until_empty
These calls will select a one or more queue items for processing (with the same op
attribute). The process_*
methods will then call the callback for that operation with the entity_ids
of all queue entries. After processing they will return the number of processed items, or 0 if no items could be found or be processed.
The process_*
methods also accept the following arguments:
op
: only process entries with thisop
value;batch_size
: maximum number of items to process in one go.
Example:
# process only `product` queue items
Postqueue.process(op: 'product', batch_size: 10)
bundle exec postqueue run
starts a single postqueue runner. Note that there is intentionally no option to daemonize this process or to run these in parallel.
The postqueue CLI has additional commands, see below.
postqueue comes with a command line interface:
~/postqueue[master] > ./bin/postqueue --help
This is postqueue 0.5.3. Usage examples:
postqueue [ stats ]
postqueue peek
postqueue enqueue op entity_id,entity_id,entity_id
postqueue run
postqueue help
postqueue process
You can use the postqueue CLI to
- enqueue an item, e.g.
bundle exec postqueue enqueue foo 1,2,3
- start a runner to process the queue:
bundle exec postqueue run
- process a single item off the queue:
bundle exec postqueue process
- get some stats for the queue:
bundle exec postqueue stats
- get a list of the next 100 queue items:
bundle exec postqueue peek
Postqueue implements the following concurrency guarantees:
- catastrophic DB failure and communication breakdown aside a queue item which is enqueued will eventually be processed successfully exactly once;
- multiple consumers can work in parallel.
Note that you should not share a Postqueue instance across threads - instead you should create process objects with the identical configuration.
If an operation was configured as idempotent (using the Postqueue.on "op", idempotent: true
configuration) duplicate idempotent operations are not enqueued. However, if multiple transactions are enqueueing items at the same time, or when an idempotent item is processing while another item is being enqueued an additional queue item will still be enqueued. Therefore we also remove duplicate items during processing.
To use a non-default table or a non-default database, change the item_class
attribute of the queue:
Postqueue.new do |queue|
queue.item_class = MyItemClass
end
MyItemClass
should inherit from Postqueue::Item and use the same or a compatible database structure.
Postqueue always registers the following operations:
"test"
will write an output to the Postqueue.logger. Use this to test your infrastructure."fail"
will always raise an exception. Use this to test your error handling integration.
You can define a handler to handle unknown operations like this:
on :missing_handler do |op, entity_ids|
raise MissingHandler, queue: self, op: op, entity_ids: entity_ids
end
You can define a handler to handle any exceptions. This is the integration point for your exception handling framework like rollbar.com or so.
The default exception handler is:
Postqueue.on_exception do |e, _, _|
e.send :raise
end
The following would report exceptions to STDOUT and to rollbar:
Postqueue.on_exception do |e, op, entity_ids|
msg = "Caught error #{e.to_s.inspect}"
msg += " on op: #{op.inspect} "
msg += " w/entity_ids: #{entity_ids.inspect}"
Rollbar.error(e)
end
After a batch of operations is processed, Postqueue calls the after_processing
callback. It receives the op
and entity_ids
from the current processing run, and a timing
object, which has these attributes:
- Timing#avg_queue_time: the average queueing time for all item in the batch;
- Timing#max_queue_time: the maximum queueing time of any item in the batch;
- Timing#processing_time: elapsed time for processing the batch.
The default after_processing
callback simply logs all information. You can
easily use your own:
Postqueue.after_processing do |op, entity_ids, timing|
processing_time = timing.processing_time
Postqueue.logger.info "#{op] processing #{entity_ids.length}: #{'%.3f secs' % processing_time}"
end
Postqueue works usually in an async mode: queue items that are enqueued are kept in a queue, and must be picked up later explicitely for processing (via one of the process
, process_one
or process_until_empty
methods).
During unit tests it is likely preferrable to process queue items synchronously - if you are interested in actual processing - or, at least, in a mode which validates that the op
value is actually configured in your application (i.e. that a handler is registered for that op). You can change the processing mode via
# can be :sync, :async, :verify
Postqueue.processing = :sync
After checking out the repo, run bin/setup
to install dependencies. Make sure you have a local postgresql implementation of at least version 9.5. Add a postqueue
user with a postqueue
password, and create a postqueue_test
database for it. The script ./scripts/prepare_pg
can be somewhat helpful in establishing that.
Then, run rake spec
to run the tests. You can also run bin/console
for an interactive prompt that will allow you to experiment.
To install this gem onto your local machine, run bundle exec rake install
.
To release a new version, run ./scripts/release
, which will bump the version number, create a git tag for the version, push git commits and tags, and push the .gem
file to rubygems.org.
Bug reports and pull requests are welcome on GitHub at https://github.com/[USERNAME]/postqueue.
The gem is available as open source under the terms of the MIT License.