Skip to content

Commit

Permalink
Share code between Fly and K8s runtimes (#2788)
Browse files Browse the repository at this point in the history
  • Loading branch information
jonatanklosko authored Sep 18, 2024
1 parent 2e45f8a commit b0ab056
Show file tree
Hide file tree
Showing 12 changed files with 586 additions and 701 deletions.
14 changes: 7 additions & 7 deletions lib/livebook/k8s/pod.ex
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
defmodule Livebook.K8s.Pod do
@main_container_name "livebook-runtime"
@home_pvc_volume_name "livebook-home"
@pvc_name_volume_name "livebook-home"

@default_pod_template """
apiVersion: v1
Expand Down Expand Up @@ -35,23 +35,23 @@ defmodule Livebook.K8s.Pod do

@doc """
Adds "volume" and "volumeMount" configurations to `manifest` in order
to mount `home_pvc` under /home/livebook on the pod.
to mount `pvc_name` under /home/livebook on the pod.
"""
@spec set_home_pvc(map(), String.t()) :: map()
def set_home_pvc(manifest, home_pvc) do
@spec set_pvc_name(map(), String.t()) :: map()
def set_pvc_name(manifest, pvc_name) do
manifest
|> update_in(["spec", Access.key("volumes", [])], fn volumes ->
volume = %{
"name" => @home_pvc_volume_name,
"persistentVolumeClaim" => %{"claimName" => home_pvc}
"name" => @pvc_name_volume_name,
"persistentVolumeClaim" => %{"claimName" => pvc_name}
}

[volume | volumes]
end)
|> update_in(
["spec", "containers", access_main_container(), Access.key("volumeMounts", [])],
fn volume_mounts ->
[%{"name" => @home_pvc_volume_name, "mountPath" => "/home/livebook"} | volume_mounts]
[%{"name" => @pvc_name_volume_name, "mountPath" => "/home/livebook"} | volume_mounts]
end
)
end
Expand Down
83 changes: 10 additions & 73 deletions lib/livebook/runtime/fly.ex
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ defmodule Livebook.Runtime.Fly do

use GenServer, restart: :temporary

require Logger
alias Livebook.Runtime.RemoteUtils

@type t :: %__MODULE__{
config: config(),
Expand Down Expand Up @@ -103,18 +103,10 @@ defmodule Livebook.Runtime.Fly do
@impl true
def handle_continue({:init, runtime, caller}, state) do
config = runtime.config
local_port = get_free_port!()
remote_port = 44444
local_port = RemoteUtils.get_free_port!()
node_base = "remote_runtime_#{local_port}"

runtime_data =
%{
node_base: node_base,
cookie: Node.get_cookie(),
dist_port: remote_port
}
|> :erlang.term_to_binary()
|> Base.encode64()
runtime_data = RemoteUtils.encode_runtime_data(node_base)

parent = self()

Expand All @@ -140,22 +132,22 @@ defmodule Livebook.Runtime.Fly do
child_node <- :"#{node_base}@#{machine_id}.vm.#{config.app_name}.internal",
{:ok, proxy_port} <-
with_log(caller, "start proxy", fn ->
start_fly_proxy(config.app_name, machine_ip, local_port, remote_port, config.token)
start_fly_proxy(config.app_name, machine_ip, local_port, config.token)
end),
:ok <-
with_log(caller, "machine starting", fn ->
await_machine_started(config, machine_id)
end),
:ok <-
with_log(caller, "connect to node", fn ->
connect_loop(child_node, 40, 250)
RemoteUtils.connect(child_node)
end),
{:ok, primary_pid} <- fetch_runtime_info(child_node) do
%{pid: primary_pid} <- RemoteUtils.fetch_runtime_info(child_node) do
primary_ref = Process.monitor(primary_pid)

server_pid =
with_log(caller, "initialize node", fn ->
initialize_node(child_node)
RemoteUtils.initialize_node(child_node)
end)

send(primary_pid, :node_initialized)
Expand Down Expand Up @@ -274,29 +266,9 @@ defmodule Livebook.Runtime.Fly do
end
end

defp connect_loop(_node, 0, _interval) do
{:error, "could not establish connection with the node"}
end

defp connect_loop(node, attempts, interval) do
if Node.connect(node) do
:ok
else
Process.sleep(interval)
connect_loop(node, attempts - 1, interval)
end
end

defp get_free_port!() do
{:ok, socket} = :gen_tcp.listen(0, active: false, reuseaddr: true)
{:ok, port} = :inet.port(socket)
:gen_tcp.close(socket)
port
end

defp start_fly_proxy(app_name, host, local_port, remote_port, token) do
defp start_fly_proxy(app_name, host, local_port, token) do
with {:ok, flyctl_path} <- find_fly_executable() do
ports = "#{local_port}:#{remote_port}"
ports = "#{local_port}:#{RemoteUtils.remote_port()}"

# We want the proxy to accept the same protocol that we are
# going to use for distribution
Expand Down Expand Up @@ -380,44 +352,9 @@ defmodule Livebook.Runtime.Fly do
Enum.find(paths, fn path -> path && File.regular?(path) end)
end

defp fetch_runtime_info(child_node) do
# Note: it is Livebook that starts the runtime node, so we know
# that the node runs Livebook release of the exact same version
#
# Also, the remote node already has all the runtime modules in
# the code path, compiled for its Elixir version, so we don't
# need to check for matching Elixir version.

%{pid: pid} = :erpc.call(child_node, :persistent_term, :get, [:livebook_runtime_info])

{:ok, pid}
end

defp initialize_node(child_node) do
init_opts = [
runtime_server_opts: [
extra_smart_cell_definitions: Livebook.Runtime.Definitions.smart_cell_definitions()
]
]

Livebook.Runtime.ErlDist.initialize(child_node, init_opts)
end

defp with_log(caller, name, fun) do
send(caller, {:runtime_connect_info, self(), name})

{microseconds, result} = :timer.tc(fun)
milliseconds = div(microseconds, 1000)

case result do
{:error, error} ->
Logger.debug("[fly runtime] #{name} FAILED in #{milliseconds}ms, error: #{error}")

_ ->
Logger.debug("[fly runtime] #{name} finished in #{milliseconds}ms")
end

result
RemoteUtils.with_log("[fly runtime] #{name}", fun)
end
end

Expand Down
Loading

0 comments on commit b0ab056

Please sign in to comment.