Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add hotstart_threshold to flame.pool #32

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 14 additions & 4 deletions lib/flame/pool.ex
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ defmodule FLAME.Pool do
on_grow_start: nil,
on_grow_end: nil,
on_shrink: nil,
async_boot_timer: nil
async_boot_timer: nil,
hotstart_threshold: 1

def child_spec(opts) do
%{
Expand Down Expand Up @@ -128,6 +129,10 @@ defmodule FLAME.Pool do

* `:name` - The name of the pool
* `:count` - The number of runners the pool is attempting to shrink to

* `:hotstart_threshold` - The percentage of pool utilisation before a new node is preemptively
added to the pool.
Should be a number 0 < n <= 1
"""
def start_link(opts) do
Keyword.validate!(opts, [
Expand All @@ -149,7 +154,8 @@ defmodule FLAME.Pool do
:shutdown_timeout,
:on_grow_start,
:on_grow_end,
:on_shrink
:on_shrink,
:hotstart_threshold
])

GenServer.start_link(__MODULE__, opts, name: Keyword.fetch!(opts, :name))
Expand Down Expand Up @@ -330,7 +336,8 @@ defmodule FLAME.Pool do
on_grow_start: opts[:on_grow_start],
on_grow_end: opts[:on_grow_end],
on_shrink: opts[:on_shrink],
runner_opts: runner_opts
runner_opts: runner_opts,
hotstart_threshold: Keyword.get(opts, :hotstart_threshold, 1)
}

{:ok, boot_runners(state)}
Expand Down Expand Up @@ -452,10 +459,13 @@ defmodule FLAME.Pool do
defp checkout_runner(%Pool{} = state, deadline, from, monitor_ref \\ nil) do
min_runner = min_runner(state)
runner_count = runner_count(state)
threshold = state.max_concurrency * state.hotstart_threshold

cond do
runner_count == 0 || !min_runner ||
(min_runner.count == state.max_concurrency && runner_count < state.max) ->
(min_runner.count == state.max_concurrency && runner_count < state.max) ||
(threshold > 1 && min_runner.count > threshold) ->
# if the threshold is less than 1 you would get a recursive creation of new nodes
if map_size(state.pending_runners) > 0 || state.async_boot_timer do
waiting_in(state, deadline, from)
else
Expand Down
26 changes: 26 additions & 0 deletions test/flame_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,32 @@ defmodule FLAME.FLAMETest do
assert new_pool == Supervisor.which_children(runner_sup)
end

@tag runner: [min: 1, max: 2, max_concurrency: 8, hotstart_threshold: 0.5]
test "init boots min runners and grows on demand",
%{runner_sup: runner_sup} = config do
min_pool = Supervisor.which_children(runner_sup)
assert [{:undefined, _pid, :worker, [FLAME.Runner]}] = min_pool
# execute against single runner
assert FLAME.call(config.test, fn -> :works end) == :works

# dynamically grows to max
_task1 = sim_long_running(config.test)
assert FLAME.call(config.test, fn -> :works end) == :works
assert Supervisor.which_children(runner_sup) == min_pool
_task2 = sim_long_running(config.test)
assert FLAME.call(config.test, fn -> :works end) == :works
_task3 = sim_long_running(config.test)
_task4 = sim_long_running(config.test)
_task5 = sim_long_running(config.test)
assert FLAME.call(config.test, fn -> :works end) == :works
# concurrency above hotstart threshold boots new runner
new_pool = Supervisor.which_children(runner_sup)
refute new_pool == min_pool
assert length(new_pool) == 2
# caller is now queued while waiting for available runner
assert new_pool == Supervisor.which_children(runner_sup)
end

@tag runner: [min: 0, max: 1, max_concurrency: 2]
test "concurrent calls on fully pending runners",
%{runner_sup: runner_sup} = config do
Expand Down