Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mark RubySingleThreadExecutor as a SerialExecutorService #1070

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

meineerde
Copy link
Contributor

This resolves #1069.

However, I'm still not 100% sure if the RubySingleThreadExecutor is actually serialized. From my understanding of the code, it is, but there may be edge-cases I'm missing (such as when the thread dies).

As such, I'd still appreciate if someone who knows the (intended) RubyThreadPoolExecutor semantics to review this.

@bensheldon
Copy link
Contributor

The old implementation doesn't look substantially different than the new one does. Both call Queue#pop and rescue from Exception, albeit the old implementation would swallow the exception whereas the new one will create a new worker thread; that doesn't change the behavior in my eyes.

Old implementation:

def work
loop do
task = @queue.pop
break if task == :stop
begin
task.last.call(*task.first)
rescue => ex
# let it fail
log DEBUG, ex
end
end
stopped_event.set
end

Current implementation:

def create_worker(queue, pool, idletime)
Thread.new(queue, pool, idletime) do |my_queue, my_pool, my_idletime|
catch(:stop) do
loop do
case message = my_queue.pop
when :stop
my_pool.remove_busy_worker(self)
throw :stop
else
task, args = message
run_task my_pool, task, args
my_pool.ready_worker(self, Concurrent.monotonic_time)
end
end
end
end
end

I'll leave this PR open a little longer to see if anyone else has thoughts, otherwise I can accept it.

@bensheldon
Copy link
Contributor

bensheldon commented Nov 1, 2024

I should also note I didn't see any test assertions of this behavior. The only place I saw #serialized? meaningfully change test behavior is right here, though I honestly think that the test is wrong and should actually be omitting on immediacy not serialized.

it 'returns false when shutdown fails to complete before timeout' do
unless subject.serialized?
latch = Concurrent::CountDownLatch.new 1
100.times{ subject.post{ latch.wait } }
sleep(0.1)
subject.shutdown
expect(subject.wait_for_termination(0.01)).to be_falsey
latch.count_down
end
end

@meineerde
Copy link
Contributor Author

That's what I saw too when checking if/how to adapt tests. On the other hand, the serialized? method also doesn't seem to be used that much anywhere else either.

To be honest, my original intend for my question in #1069 was to ensure that the semantics of a RubySingleThreadExecutor were in fact to serialize tasks as I want to reply on that. The correct return value of the serialized? method is then a nice bonus on top :)

Still, if this holds, it may be worthwhile to update the RubySingleThreadExecutor itself to implement serialized? as something like

def serialized?
  @max_length <= 1
end

I believe the same should be valid for the JavaThreadPoolExecutor, but I'm less familiar with the exact semantics of those. The documentation however seems to imply this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Should RubySingleThreadExecutor be marked to be a SerialExecutorService again?
2 participants