Skip to content

Commit

Permalink
Merge pull request #199 from rkday/post_to_shutdown
Browse files Browse the repository at this point in the history
Improve behaviour when posting to a shutdown thread pool
  • Loading branch information
jdantonio committed Dec 7, 2014
2 parents df20680 + 7051d2f commit 8ecfa2b
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 21 deletions.
11 changes: 10 additions & 1 deletion lib/concurrent/executor/executor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,16 @@ module RubyExecutor
def post(*args, &task)
raise ArgumentError.new('no block given') unless block_given?
mutex.synchronize do
return false unless running?
unless running?
# The executor is shut down - figure out how to reject this task
if self.respond_to?(:handle_overflow, true)
# Reject this task in the same way we'd reject an overflow
return handle_overflow(*args, &task)
else
# No handle_overflow method defined - just return false
return false
end
end
execute(*args, &task)
true
end
Expand Down
44 changes: 24 additions & 20 deletions spec/concurrent/executor/executor_service_shared.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,30 +16,22 @@
latch = Concurrent::CountDownLatch.new(1)
subject.post{ sleep(1) }
subject.shutdown
subject.post{ latch.count_down }
begin
subject.post{ latch.count_down }
rescue Concurrent::RejectedExecutionError
end
expect(latch.wait(0.1)).to be_falsey
end

it 'returns false while shutting down' do
subject.post{ sleep(1) }
subject.shutdown
expect(subject.post{ nil }).to be_falsey
end

it 'rejects the block once shutdown' do
subject.shutdown
latch = Concurrent::CountDownLatch.new(1)
subject.post{ sleep(1) }
subject.post{ latch.count_down }
begin
subject.post{ latch.count_down }
rescue Concurrent::RejectedExecutionError
end
expect(latch.wait(0.1)).to be_falsey
end

it 'returns false once shutdown' do
subject.post{ nil }
subject.shutdown
sleep(0.1)
expect(subject.post{ nil }).to be_falsey
end
end

context '#running?' do
Expand Down Expand Up @@ -75,7 +67,10 @@
latch2 = Concurrent::CountDownLatch.new(1)
subject.post{ sleep(0.2); latch1.count_down }
subject.shutdown
expect(subject.post{ latch2.count_down }).to be_falsey
begin
expect(subject.post{ latch2.count_down }).to be_falsey
rescue Concurrent::RejectedExecutionError
end
expect(latch1.wait(1)).to be_truthy
expect(latch2.wait(0.2)).to be_falsey
end
Expand Down Expand Up @@ -121,7 +116,10 @@
subject.post{ sleep(0.1); expected.increment }
subject.post{ sleep(0.1); expected.increment }
subject.shutdown
subject.post{ expected.increment }
begin
subject.post{ expected.increment }
rescue Concurrent::RejectedExecutionError
end
subject.wait_for_termination(1)
expect(expected.value).to eq(2)
end
Expand All @@ -135,7 +133,10 @@
subject.post{ sleep(0.1); latch.count_down }
latch.wait(1)
subject.kill
expect(subject.post{ expected.make_true }).to be_falsey
begin
expect(subject.post{ expected.make_true }).to be_falsey
rescue Concurrent::RejectedExecutionError
end
sleep(0.1)
expect(expected.value).to be_falsey
end
Expand All @@ -145,7 +146,10 @@
sleep(0.1)
subject.kill
sleep(0.1)
expect(subject.post{ nil }).to be_falsey
begin
expect(subject.post{ nil }).to be_falsey
rescue Concurrent::RejectedExecutionError
end
end
end

Expand Down
49 changes: 49 additions & 0 deletions spec/concurrent/executor/ruby_thread_pool_executor_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,18 @@ module Concurrent
}.to raise_error(Concurrent::RejectedExecutionError)
end

specify '#post raises an error when the executor is shutting down' do
expect {
subject.shutdown; subject.post{ sleep(1) }
}.to raise_error(Concurrent::RejectedExecutionError)
end

specify '#<< raises an error when the executor is shutting down' do
expect {
subject.shutdown; subject << proc { sleep(1) }
}.to raise_error(Concurrent::RejectedExecutionError)
end

specify 'a #post task is never executed when the queue is at capacity' do
executed = Concurrent::AtomicFixnum.new(0)
10.times do
Expand Down Expand Up @@ -134,6 +146,29 @@ module Concurrent
sleep(0.1)
expect(executed.value).to be < 1000
end

specify 'a #post task is never executed when the executor is shutting down' do
executed = Concurrent::AtomicFixnum.new(0)
subject.shutdown
subject.post{ executed.increment }
sleep(0.1)
expect(executed.value).to be 0
end

specify 'a #<< task is never executed when the executor is shutting down' do
executed = Concurrent::AtomicFixnum.new(0)
subject.shutdown
subject << proc { executed.increment }
sleep(0.1)
expect(executed.value).to be 0
end

specify '#post returns false when the executor is shutting down' do
executed = Concurrent::AtomicFixnum.new(0)
subject.shutdown
ret = subject.post{ executed.increment }
expect(ret).to be false
end
end

context ':caller_runs' do
Expand Down Expand Up @@ -165,6 +200,20 @@ module Concurrent
5.times{|i| subject.post{ latch.count_down } }
latch.wait(0.1)
end

specify '#post executes the task on the current thread when the executor is shutting down' do
latch = Concurrent::CountDownLatch.new(1)
subject.shutdown
subject.post{ latch.count_down }
latch.wait(0.1)
end

specify '#<< executes the task on the current thread when the executor is shutting down' do
latch = Concurrent::CountDownLatch.new(1)
subject.shutdown
subject << proc { latch.count_down }
latch.wait(0.1)
end
end
end
end
Expand Down

0 comments on commit 8ecfa2b

Please sign in to comment.