Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add bulk enqueue functionality #790

Merged
merged 31 commits into from
Jan 31, 2023
Merged

Conversation

julik
Copy link
Contributor

@julik julik commented Jan 6, 2023

In a number of situations we've seen cases when there are large numbers of jobs generated from one task. The most typical example is sending out notifications or emails to groups of users of an application. When hundreds or thousands of jobs need to be enqueued, doing one INSERT per job can become expensive. If one INSERT takes 1ms, doing 1K inserts makes it a second. ActiveJob does not support bulk-enqueue by itself, but we can actually use the thread locals to intercept the enqueued job records and delay persisting them until the end of the block.

Note that I've picked "bulk" as name for this to avoid conflating it with "batches" which are in progress. It serves a different purpose though: rapidly inserting large numbers of jobs and nothing else.

The usage is straightforward:

GoodJob.in_bulk do
  User.pending_renewal.find_each do |user_pending_renewal|
    RenewalMailer.renewal_notification(user_pending_renewal).deliver_later
  end
end

@julik julik marked this pull request as ready for review January 6, 2023 15:02
@bensheldon
Copy link
Owner

@julik thanks for opening this PR. You must have been reading my mind because I was just thinking: "hmm. my Batch PR has stalled a minute, maybe I should extract the Bulk API out of that". 🧠

Could you take a look at that implementation and see if it would meet your needs: https://github.com/bensheldon/good_job/pull/712/files#diff-660985340da42f7f83f1a5e03b05f511aa5a1a7221c07be3cbf05c150060dca5

Here's what I'm thinking:

  • I am trying to avoid hanging more methods directly off of GoodJob. Hence making GoodJob::Bulk.enqueue the interface. I also wanted to allow for nested bulk blocks. If you're ok with my implementation, I want to suggest pulling that over here, though it might need a bit more work.
  • I skipped entirely over the insert-all part of it (spike!). There is a proposal in Rails to add Adapter#enqueue_all to ActiveJob. Could you implement that interface and use it for the insert-all?

@julik
Copy link
Contributor Author

julik commented Jan 6, 2023

Hey @bensheldon no worries - it is a spike after all. The enqueue_all seems to be the right entry point for this, but given my experience with Rails core changes there can be more debate about the implementation and then some time will pass before the feature ends up in core proper + gets released. We could of course already add enqueue_all preemptively.

The insert_all bit is crucial though - how would you like to handle it? I could try adding it to the batches PR but that PR ls already quite big. Maybe we could cherry pick your Bulk stuff from there and fold them into this PR? What I do want is that there is some kind of block-accepting method somewhere within GoodJob that people can call without waiting on the Rails core PR needing to be accepted.

@bensheldon
Copy link
Owner

Yep! I was thinking you could copy-paste the GoodJob::Bulk implementation from the Batch PR into this PR.

I agree with you that the the Rails interface is subject to change, but I think the spirit of it (hopefully) is pretty solid: GoodJob::Adapter.enqueue_all(active_jobs = [])

So the steps I'm imagining are:

  1. Copy-paste GoodJob::Bulk over here
  2. Implement GoodJob::Adapter.enqueue_all
  3. Update GoodJob::Bulk and replace the part where it does ~active_jobs.each(&:perform_later) with GoodJob::Adapter.enqueue_all(active_jobs)

@julik
Copy link
Contributor Author

julik commented Jan 6, 2023

I'll try that once I have time. Meanwhile we'll field test the implementation that is already in this PR and see what the improvements will be ;-)

@bensheldon
Copy link
Owner

@julik sounds good 👍

do you mind if I push up some changes to this branch? I can also make my own branch that builds on this one for discussion. No worries either way. I really appreciate the collaboration!

@julik
Copy link
Contributor Author

julik commented Jan 6, 2023

Sure go ahead, do you want me to add you as collaborator on my fork? might save some rebasings later

@julik
Copy link
Contributor Author

julik commented Jan 6, 2023

Couple of other things:

  • enqueue_all does not support timestamp. Looks like ActiveJob::Base#scheduled_at is a more recent Rails feature and is preferred . I'll add a thingy to extract it if it's available but it is not in the method signature for enqueue_all
  • The created Execution may take an advisory lock immediately. With bulk insert we need to support that I guess, which means we need to do some Arel stuff to create a big SELECT which takes the locks. If we do that, some of the locks might fail to set because the job would be already picked up for execution by a running worker. Would it be wise to just remove immediate locking if bulk enqueue is used? What would break if we do?

@julik
Copy link
Contributor Author

julik commented Jan 6, 2023

Ok - did some manipulations which should set us off in the right direction, and you are in the collaborator list 🚀

@bensheldon
Copy link
Owner

@julik I just pushed up a bunch of changes, I think I may have removed some of the instrumentation you just added this morning. That wasn't intentional to reject them, just expedient to get some of the more major changes I wanted to see in this.

I think we can get inline-enqueing working, it will just involve creating a transaction within which to take the advisory lock. I can work on that.

@julik
Copy link
Contributor Author

julik commented Jan 7, 2023

Lovely, thanks for the changes! My worry with the advisory lock was that if we INSERT all of the jobs atomically, we need to try to take the locks atomically as well - i.e. it was not clear to me what kind of semantics we want to have.

I'll stop tinkering for now, sorry - this was tasty work.

@julik
Copy link
Contributor Author

julik commented Jan 7, 2023

Give a shout when I could continue and whether you want a review

@bensheldon
Copy link
Owner

@julik yes! This is fun. I'm still working on it. I think I feel ok with GoodJob::Bulk's interface if you wanted to take a look at that.

I still do need to work on the Adapter side:

  • Support Inline and Async execution
  • Handle NOTIFY. This will be interesting because I don't want to send hundreds of notifications, so I think I need to add some kind of count option to the notify payload.

This has definitely surfaced for me a lot of messiness in the Adapter and Execution boundaries. Stuff to work though for me :D

@julik
Copy link
Contributor Author

julik commented Jan 9, 2023

What do we want to do with enqueue concurrency? It seems we would need to do a SELECT for all the concurrency keys to not INSERT jobs which would be overly concurrent

thread_mattr_accessor :current_buffer

# @return [nil, Array<ActiveJob::Base>] The ActiveJob instances that have been buffered; nil if no active buffer
def self.capture(active_jobs = nil, queue_adapter: nil)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Under which conditions will the active_jobs be nil? and under which conditions will the queue_adapter be nil?

active_jobs_by_queue_adapter.each do |adapter, jobs|
jobs = jobs.reject(&:provider_job_id) # Do not re-enqueue already enqueued jobs

if adapter.respond_to?(:enqueue_all)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the Bulk API is used, the adapter involved will always be GoodJob's - and if we have Bulk then we support enqueue_all by definition. Is there a need to copy the check from ActiveJob here, which they designed in so that multiple adapters can be used in the same app?

end
end

def active_jobs_by_queue_adapter
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I mentioned above - all adapters here will be GoodJob, just potentially with different settings/different DBs?

buffer.enqueue
buffer.active_jobs
elsif current_buffer.present?
current_buffer.enqueue
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#enqueue and #active_jobs seem to always be used together - maybe return them from enqueue and reset the buffer?

@values = []
end

def add(active_jobs, queue_adapter: nil)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the queue_adapter need to be overridable via a parameter?

@bensheldon
Copy link
Owner

bensheldon commented Jan 16, 2023

I'm planning to work on the Bulk interface this week.

  • Add further documentation. From the feedback, I think the missing thing is documenting the explicit API that allows passing ActiveJob instances directly; separate from the implicit API that wraps .perform_later. The explicit API looks like this, and is why there are lots of checks on what kind of adapter is being used (as well as I personally have an easier time designing these components in situ/isolated as much as possible):

    bulk = GoodJob::Buffer.new
    bulk.capture([MyJob.new, MyJob.new, OtherJob.new]) # these jobs could have any adapter
    bulk.enqueue
    puts "Enqueued #{bulk.active_jobs.count(&:provider_job_id)} / #{bulk.active_jobs.count} jobs"

    My approach with the README has been to document the general shape of the interface, and hope that people will go read the documented code if they want all the gritty details. I should probably include this though in the README (and given the spike, there's a lot of code-level documentation that's missing).

  • Handle jobs that have concurrency controls enabled (complexity!). I need to get into the weeds on it, but I think I want Adapter#enqueue_all to raise an exception/reject them, and have Bulk#enqueue try to handle them gracefully, delegating them to Adapter#enqueue.

@bensheldon
Copy link
Owner

bensheldon commented Jan 24, 2023

Also, I think this needs to cover the inline situation when a job enqueues another job to ensure the second enqueue doesn't get inadvertently added to the batch.

Edit: oops, I already did this: https://github.com/bensheldon/good_job/pull/790/files#diff-660985340da42f7f83f1a5e03b05f511aa5a1a7221c07be3cbf05c150060dca5R75

@bensheldon bensheldon changed the title [spike] Add bulk enqueue Add bulk enqueue functionality Jan 30, 2023
@bensheldon bensheldon added the enhancement New feature or request label Jan 30, 2023
Copy link
Owner

@bensheldon bensheldon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@julik thank you for getting this started and working through it with me! 🚀

@bensheldon bensheldon merged commit 2b007ff into bensheldon:main Jan 31, 2023
@julik
Copy link
Contributor Author

julik commented Feb 6, 2023

Glad to see it got released! sorry I didn't stick around till the end - some other stuff took priority

@bensheldon bensheldon mentioned this pull request Nov 27, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants