diff --git a/lib/livebook/k8s/pod.ex b/lib/livebook/k8s/pod.ex index 0337416e6ca..74d28e31159 100644 --- a/lib/livebook/k8s/pod.ex +++ b/lib/livebook/k8s/pod.ex @@ -1,6 +1,6 @@ defmodule Livebook.K8s.Pod do @main_container_name "livebook-runtime" - @home_pvc_volume_name "livebook-home" + @pvc_name_volume_name "livebook-home" @default_pod_template """ apiVersion: v1 @@ -35,15 +35,15 @@ defmodule Livebook.K8s.Pod do @doc """ Adds "volume" and "volumeMount" configurations to `manifest` in order - to mount `home_pvc` under /home/livebook on the pod. + to mount `pvc_name` under /home/livebook on the pod. """ - @spec set_home_pvc(map(), String.t()) :: map() - def set_home_pvc(manifest, home_pvc) do + @spec set_pvc_name(map(), String.t()) :: map() + def set_pvc_name(manifest, pvc_name) do manifest |> update_in(["spec", Access.key("volumes", [])], fn volumes -> volume = %{ - "name" => @home_pvc_volume_name, - "persistentVolumeClaim" => %{"claimName" => home_pvc} + "name" => @pvc_name_volume_name, + "persistentVolumeClaim" => %{"claimName" => pvc_name} } [volume | volumes] @@ -51,7 +51,7 @@ defmodule Livebook.K8s.Pod do |> update_in( ["spec", "containers", access_main_container(), Access.key("volumeMounts", [])], fn volume_mounts -> - [%{"name" => @home_pvc_volume_name, "mountPath" => "/home/livebook"} | volume_mounts] + [%{"name" => @pvc_name_volume_name, "mountPath" => "/home/livebook"} | volume_mounts] end ) end diff --git a/lib/livebook/runtime/fly.ex b/lib/livebook/runtime/fly.ex index 9ea467806bd..bbaeffc206a 100644 --- a/lib/livebook/runtime/fly.ex +++ b/lib/livebook/runtime/fly.ex @@ -51,7 +51,7 @@ defmodule Livebook.Runtime.Fly do use GenServer, restart: :temporary - require Logger + alias Livebook.Runtime.RemoteUtils @type t :: %__MODULE__{ config: config(), @@ -103,18 +103,10 @@ defmodule Livebook.Runtime.Fly do @impl true def handle_continue({:init, runtime, caller}, state) do config = runtime.config - local_port = get_free_port!() - remote_port = 44444 + local_port = RemoteUtils.get_free_port!() node_base = "remote_runtime_#{local_port}" - runtime_data = - %{ - node_base: node_base, - cookie: Node.get_cookie(), - dist_port: remote_port - } - |> :erlang.term_to_binary() - |> Base.encode64() + runtime_data = RemoteUtils.encode_runtime_data(node_base) parent = self() @@ -140,7 +132,7 @@ defmodule Livebook.Runtime.Fly do child_node <- :"#{node_base}@#{machine_id}.vm.#{config.app_name}.internal", {:ok, proxy_port} <- with_log(caller, "start proxy", fn -> - start_fly_proxy(config.app_name, machine_ip, local_port, remote_port, config.token) + start_fly_proxy(config.app_name, machine_ip, local_port, config.token) end), :ok <- with_log(caller, "machine starting", fn -> @@ -148,14 +140,14 @@ defmodule Livebook.Runtime.Fly do end), :ok <- with_log(caller, "connect to node", fn -> - connect_loop(child_node, 40, 250) + RemoteUtils.connect(child_node) end), - {:ok, primary_pid} <- fetch_runtime_info(child_node) do + %{pid: primary_pid} <- RemoteUtils.fetch_runtime_info(child_node) do primary_ref = Process.monitor(primary_pid) server_pid = with_log(caller, "initialize node", fn -> - initialize_node(child_node) + RemoteUtils.initialize_node(child_node) end) send(primary_pid, :node_initialized) @@ -274,29 +266,9 @@ defmodule Livebook.Runtime.Fly do end end - defp connect_loop(_node, 0, _interval) do - {:error, "could not establish connection with the node"} - end - - defp connect_loop(node, attempts, interval) do - if Node.connect(node) do - :ok - else - Process.sleep(interval) - connect_loop(node, attempts - 1, interval) - end - end - - defp get_free_port!() do - {:ok, socket} = :gen_tcp.listen(0, active: false, reuseaddr: true) - {:ok, port} = :inet.port(socket) - :gen_tcp.close(socket) - port - end - - defp start_fly_proxy(app_name, host, local_port, remote_port, token) do + defp start_fly_proxy(app_name, host, local_port, token) do with {:ok, flyctl_path} <- find_fly_executable() do - ports = "#{local_port}:#{remote_port}" + ports = "#{local_port}:#{RemoteUtils.remote_port()}" # We want the proxy to accept the same protocol that we are # going to use for distribution @@ -380,44 +352,9 @@ defmodule Livebook.Runtime.Fly do Enum.find(paths, fn path -> path && File.regular?(path) end) end - defp fetch_runtime_info(child_node) do - # Note: it is Livebook that starts the runtime node, so we know - # that the node runs Livebook release of the exact same version - # - # Also, the remote node already has all the runtime modules in - # the code path, compiled for its Elixir version, so we don't - # need to check for matching Elixir version. - - %{pid: pid} = :erpc.call(child_node, :persistent_term, :get, [:livebook_runtime_info]) - - {:ok, pid} - end - - defp initialize_node(child_node) do - init_opts = [ - runtime_server_opts: [ - extra_smart_cell_definitions: Livebook.Runtime.Definitions.smart_cell_definitions() - ] - ] - - Livebook.Runtime.ErlDist.initialize(child_node, init_opts) - end - defp with_log(caller, name, fun) do send(caller, {:runtime_connect_info, self(), name}) - - {microseconds, result} = :timer.tc(fun) - milliseconds = div(microseconds, 1000) - - case result do - {:error, error} -> - Logger.debug("[fly runtime] #{name} FAILED in #{milliseconds}ms, error: #{error}") - - _ -> - Logger.debug("[fly runtime] #{name} finished in #{milliseconds}ms") - end - - result + RemoteUtils.with_log("[fly runtime] #{name}", fun) end end diff --git a/lib/livebook/runtime/k8s.ex b/lib/livebook/runtime/k8s.ex index 65765af64fe..bcacf7bd1f4 100644 --- a/lib/livebook/runtime/k8s.ex +++ b/lib/livebook/runtime/k8s.ex @@ -8,13 +8,12 @@ defmodule Livebook.Runtime.K8s do defstruct [:config, :node, :req, :server_pid, :lv_pid, :pod_name] - @type config :: %{ - context: String.t(), - namespace: String.t(), - home_pvc: String.t() | nil, - docker_tag: String.t(), - pod_template: String.t() - } + use GenServer, restart: :temporary + + require Logger + + alias Livebook.Runtime.RemoteUtils + alias Livebook.K8s.Pod @type t :: %__MODULE__{ node: node() | nil, @@ -24,18 +23,20 @@ defmodule Livebook.Runtime.K8s do pod_name: String.t() | nil } - use GenServer, restart: :temporary - - require Logger - - alias Livebook.K8s.Pod + @type config :: %{ + context: String.t(), + namespace: String.t(), + docker_tag: String.t(), + pod_template: String.t(), + pvc_name: String.t() | nil + } @doc """ Returns a new runtime instance. """ - @spec new(config :: map(), req :: Req.Request.t()) :: t() - def new(config, req) do - %__MODULE__{config: config, req: req, lv_pid: self()} + @spec new(map()) :: t() + def new(config) do + %__MODULE__{config: config, lv_pid: self()} end def __connect__(runtime) do @@ -60,25 +61,26 @@ defmodule Livebook.Runtime.K8s do def handle_continue({:init, runtime, caller}, state) do config = runtime.config %{namespace: namespace, context: context} = config - req = runtime.req - kubeconfig = - if System.get_env("KUBERNETES_SERVICE_HOST") do - nil + within_kubernetes? = System.get_env("KUBERNETES_SERVICE_HOST") != nil + + {node_base, local_port} = + if within_kubernetes? do + # When already running within Kubernetes we don't need the + # proxy, the node is reachable directly + {"k8s_runtime", nil} else - System.get_env("KUBECONFIG") || Path.join(System.user_home!(), ".kube/config") + local_port = RemoteUtils.get_free_port!() + {"remote_runtime_#{local_port}", local_port} end - cluster_data = get_cluster_data(kubeconfig) + req = + Kubereq.Kubeconfig.Default + |> Kubereq.Kubeconfig.load() + |> Kubereq.Kubeconfig.set_current_context(context) + |> Kubereq.new("api/v1/namespaces/:namespace/pods/:name") - runtime_data = - %{ - node_base: cluster_data.node_base, - cookie: Node.get_cookie(), - dist_port: cluster_data.remote_port - } - |> :erlang.term_to_binary() - |> Base.encode64() + runtime_data = RemoteUtils.encode_runtime_data(node_base) parent = self() @@ -90,28 +92,32 @@ defmodule Livebook.Runtime.K8s do with {:ok, pod_name} <- with_log(caller, "create pod", fn -> - create_pod(req, config, runtime_data, cluster_data.remote_port) + create_pod(req, config, runtime_data) end), _ <- send(watcher_pid, {:pod_created, pod_name}), {:ok, pod_ip} <- with_pod_events(caller, "waiting for pod", req, namespace, pod_name, fn -> await_pod_ready(req, namespace, pod_name) end), - child_node <- :"#{cluster_data.node_base}@#{pod_ip}", + child_node <- :"#{node_base}@#{pod_ip}", :ok <- - with_log(caller, "start proxy", fn -> - k8s_forward_port(kubeconfig, context, cluster_data, pod_name, namespace) - end), + (if within_kubernetes? do + :ok + else + with_log(caller, "start proxy", fn -> + k8s_forward_port(context, local_port, pod_name, namespace) + end) + end), :ok <- with_log(caller, "connect to node", fn -> - connect_loop(child_node, 40, 250) + RemoteUtils.connect(child_node) end), - {:ok, primary_pid} <- fetch_runtime_info(child_node) do + %{pid: primary_pid} <- RemoteUtils.fetch_runtime_info(child_node) do primary_ref = Process.monitor(primary_pid) server_pid = with_log(caller, "initialize node", fn -> - initialize_node(child_node) + RemoteUtils.initialize_node(child_node) end) send(primary_pid, :node_initialized) @@ -139,30 +145,6 @@ defmodule Livebook.Runtime.K8s do {:noreply, state} end - defp get_free_port!() do - {:ok, socket} = :gen_tcp.listen(0, active: false, reuseaddr: true) - {:ok, port} = :inet.port(socket) - :gen_tcp.close(socket) - port - end - - defp with_log(caller, name, fun) do - send(caller, {:runtime_connect_info, self(), name}) - - {microseconds, result} = :timer.tc(fun) - milliseconds = div(microseconds, 1000) - - case result do - {:error, error} -> - Logger.debug("[K8s runtime] #{name} FAILED in #{milliseconds}ms, error: #{error}") - - _ -> - Logger.debug("[K8s runtime] #{name} finished in #{milliseconds}ms") - end - - result - end - defp with_pod_events(caller, name, req, namespace, pod_name, fun) do with_log(caller, name, fn -> runtime_pid = self() @@ -186,8 +168,8 @@ defmodule Livebook.Runtime.K8s do {:ok, stream} -> Enum.each(stream, fn event -> message = Livebook.Utils.downcase_first(event["object"]["message"]) - Logger.debug(~s'[K8s runtime] Pod event: "#{message}"') send(caller, {:runtime_connect_info, runtime_pid, message}) + Logger.debug(~s/[k8s runtime] Pod event: "#{message}"/) end) _error -> @@ -224,11 +206,11 @@ defmodule Livebook.Runtime.K8s do end end - defp create_pod(req, config, runtime_data, remote_port) do + defp create_pod(req, config, runtime_data) do %{ pod_template: pod_template, docker_tag: docker_tag, - home_pvc: home_pvc, + pvc_name: pvc_name, namespace: namespace } = config @@ -254,11 +236,11 @@ defmodule Livebook.Runtime.K8s do ]) |> Pod.set_docker_tag(docker_tag) |> Pod.set_namespace(namespace) - |> Pod.add_container_port(remote_port) + |> Pod.add_container_port(RemoteUtils.remote_port()) manifest = - if home_pvc do - Pod.set_home_pvc(manifest, home_pvc) + if pvc_name do + Pod.set_pvc_name(manifest, pvc_name) else manifest end @@ -275,24 +257,9 @@ defmodule Livebook.Runtime.K8s do end end - defp get_cluster_data(_kubeconfig = nil) do - # When already running within Kubernetes we don't need the proxy, - # the node is reachable directly - %{node_base: "k8s_runtime", remote_port: 44444} - end - - defp get_cluster_data(_kubeconfig) do - local_port = get_free_port!() - %{node_base: "remote_runtime_#{local_port}", remote_port: 44444, local_port: local_port} - end - - defp k8s_forward_port(_kubeconfig = nil, _, _, _, _), do: :ok - - defp k8s_forward_port(kubeconfig, context, cluster_data, pod_name, namespace) do - %{local_port: local_port, remote_port: remote_port} = cluster_data - + defp k8s_forward_port(context, local_port, pod_name, namespace) do with {:ok, kubectl_path} <- find_kubectl_executable() do - ports = "#{local_port}:#{remote_port}" + ports = "#{local_port}:#{RemoteUtils.remote_port()}" # We want the proxy to accept the same protocol that we are # going to use for distribution @@ -303,6 +270,8 @@ defmodule Livebook.Runtime.K8s do "127.0.0.1" end + kubeconfig = System.get_env("KUBECONFIG") || Path.join(System.user_home!(), ".kube/config") + args = [ "port-forward", @@ -351,7 +320,7 @@ defmodule Livebook.Runtime.K8s do if path = System.find_executable("kubectl") do {:ok, path} else - {:error, "no kubectl executable found in PATH."} + {:error, "no kubectl executable found in PATH"} end end @@ -363,7 +332,7 @@ defmodule Livebook.Runtime.K8s do pod_name, fn :deleted -> - {:error, "The Pod was deleted before it started running."} + {:error, "the Pod was deleted before it started running"} pod -> get_in(pod, [ @@ -380,50 +349,19 @@ defmodule Livebook.Runtime.K8s do {:ok, pod["status"]["podIP"]} else {:error, :watch_timeout} -> - {:error, "Timed out waiting for Pod to start up."} + {:error, "timed out waiting for Pod to start up"} {:error, error} -> {:error, error} _other -> - {:error, "Failed getting the Pod's IP address."} - end - end - - defp connect_loop(_node, 0, _interval) do - {:error, "could not establish connection with the node"} - end - - defp connect_loop(node, attempts, interval) do - if Node.connect(node) do - :ok - else - Process.sleep(interval) - connect_loop(node, attempts - 1, interval) + {:error, "tailed getting the Pod's IP address"} end end - defp fetch_runtime_info(child_node) do - # Note: it is Livebook that starts the runtime node, so we know - # that the node runs Livebook release of the exact same version - # - # Also, the remote node already has all the runtime modules in - # the code path, compiled for its Elixir version, so we don't - # need to check for matching Elixir version. - - %{pid: pid} = :erpc.call(child_node, :persistent_term, :get, [:livebook_runtime_info]) - - {:ok, pid} - end - - defp initialize_node(child_node) do - init_opts = [ - runtime_server_opts: [ - extra_smart_cell_definitions: Livebook.Runtime.Definitions.smart_cell_definitions() - ] - ] - - Livebook.Runtime.ErlDist.initialize(child_node, init_opts) + defp with_log(caller, name, fun) do + send(caller, {:runtime_connect_info, self(), name}) + RemoteUtils.with_log("[k8s runtime] #{name}", fun) end end @@ -453,7 +391,7 @@ defimpl Livebook.Runtime, for: Livebook.Runtime.K8s do end def duplicate(runtime) do - Livebook.Runtime.K8s.new(runtime.config, runtime.req) + Livebook.Runtime.K8s.new(runtime.config) end def evaluate_code(runtime, language, code, locator, parent_locators, opts \\ []) do diff --git a/lib/livebook/runtime/remote_utils.ex b/lib/livebook/runtime/remote_utils.ex new file mode 100644 index 00000000000..297bee84d21 --- /dev/null +++ b/lib/livebook/runtime/remote_utils.ex @@ -0,0 +1,113 @@ +defmodule Livebook.Runtime.RemoteUtils do + # Shared code for runtimes using a remote node. + + require Logger + + @doc """ + The port that the remote runtime node uses for distribution. + """ + @spec remote_port() :: pos_integer() + def remote_port(), do: 44444 + + @doc """ + Encodes information for the remote node. + + The returned value should be passed when starting the remote node + via the LIVEBOOK_RUNTIME environment variable. + """ + @spec encode_runtime_data(String.t()) :: String.t() + def encode_runtime_data(node_base) do + %{ + node_base: node_base, + cookie: Node.get_cookie(), + dist_port: remote_port() + } + |> :erlang.term_to_binary() + |> Base.encode64() + end + + @doc """ + Discovers a free TCP port. + """ + @spec get_free_port!() :: pos_integer() + def get_free_port!() do + {:ok, socket} = :gen_tcp.listen(0, active: false, reuseaddr: true) + {:ok, port} = :inet.port(socket) + :gen_tcp.close(socket) + port + end + + @doc """ + Fetches information from the remote runtime node. + """ + @spec fetch_runtime_info(node()) :: %{pid: pid()} + def fetch_runtime_info(child_node) do + # Note: it is Livebook that starts the runtime node, so we know + # that the node runs Livebook release of the exact same version + # + # Also, the remote node already has all the runtime modules in + # the code path, compiled for its Elixir version, so we don't + # need to check for matching Elixir version. + + :erpc.call(child_node, :persistent_term, :get, [:livebook_runtime_info]) + end + + @doc """ + Attempts connecting to the given node. + + Makes several connect attempts over a few seconds. + """ + @spec connect(node()) :: :ok | {:error, String.t()} + def connect(node) do + connect_loop(node, 40, 250) + end + + defp connect_loop(_node, 0, _interval) do + {:error, "could not establish connection with the node"} + end + + defp connect_loop(node, attempts, interval) do + if Node.connect(node) do + :ok + else + Process.sleep(interval) + connect_loop(node, attempts - 1, interval) + end + end + + @doc """ + Starts a runtime server on the remote node. + """ + @spec initialize_node(node()) :: pid() + def initialize_node(child_node) do + init_opts = [ + runtime_server_opts: [ + extra_smart_cell_definitions: Livebook.Runtime.Definitions.smart_cell_definitions() + ] + ] + + Livebook.Runtime.ErlDist.initialize(child_node, init_opts) + end + + @doc """ + Wraps a potentially long operation. + + Logs operation duration after completion. On failure, also logs the + error. + """ + @spec with_log(String.t(), (-> term())) :: term() + def with_log(name, fun) do + {microseconds, result} = :timer.tc(fun) + milliseconds = div(microseconds, 1000) + + case result do + {:error, error} -> + Logger.debug("#{name} FAILED in #{milliseconds}ms, error: #{error}") + + _ -> + Logger.debug("#{name} finished in #{milliseconds}ms") + end + + result + end +end diff --git a/lib/livebook_web/components/core_components.ex b/lib/livebook_web/components/core_components.ex index d260e9b8b1b..b9957931efe 100644 --- a/lib/livebook_web/components/core_components.ex +++ b/lib/livebook_web/components/core_components.ex @@ -1017,6 +1017,34 @@ defmodule LivebookWeb.CoreComponents do """ end + @doc """ + Updates keys in a map assign. + """ + def assign_nested(socket, key, keyword) do + update(socket, key, fn map -> + Enum.reduce(keyword, map, fn {key, value}, map -> Map.replace!(map, key, value) end) + end) + end + + @doc """ + Sends an event to the given target. + + Given: + + * a LV pid, sends the event as a regular message to the process + + * a component `{module, id}` tuple, the event is sent as an update + with `:event` assign + + """ + def send_event(target, event) when is_pid(target) do + send(target, event) + end + + def send_event({module, id}, event) when is_atom(module) and is_binary(id) do + Phoenix.LiveView.send_update(module, id: id, event: event) + end + # JS commands @doc """ diff --git a/lib/livebook_web/live/file_select_component.ex b/lib/livebook_web/live/file_select_component.ex index 0d90b527c95..96191665ee4 100644 --- a/lib/livebook_web/live/file_select_component.ex +++ b/lib/livebook_web/live/file_select_component.ex @@ -490,7 +490,7 @@ defmodule LivebookWeb.FileSelectComponent do file = FileSystem.File.new(file_system) - send_event(socket, {:set_file, file, %{exists: true}}) + send_event(socket.assigns.target, {:set_file, file, %{exists: true}}) {:noreply, socket} end @@ -512,7 +512,7 @@ defmodule LivebookWeb.FileSelectComponent do _info -> %{exists: true} end - send_event(socket, {:set_file, file, info}) + send_event(socket.assigns.target, {:set_file, file, info}) {:noreply, socket} end @@ -759,14 +759,4 @@ defmodule LivebookWeb.FileSelectComponent do new_file = FileSystem.File.resolve(parent_dir, new_name) FileSystem.File.rename(file, new_file) end - - defp send_event(socket, event) do - case socket.assigns.target do - {module, id} -> - send_update(module, id: id, event: event) - - pid when is_pid(pid) -> - send(pid, event) - end - end end diff --git a/lib/livebook_web/live/session_live/fly_runtime_component.ex b/lib/livebook_web/live/session_live/fly_runtime_component.ex index 5693fa9878a..29844a15357 100644 --- a/lib/livebook_web/live/session_live/fly_runtime_component.ex +++ b/lib/livebook_web/live/session_live/fly_runtime_component.ex @@ -5,8 +5,6 @@ defmodule LivebookWeb.SessionLive.FlyRuntimeComponent do alias Livebook.{Session, Runtime} - @config_secret_prefix "FLY_RUNTIME_" - @impl true def mount(socket) do unless Livebook.Config.runtime_enabled?(Livebook.Runtime.Fly) do @@ -26,11 +24,26 @@ defmodule LivebookWeb.SessionLive.FlyRuntimeComponent do specs_changeset: specs_changeset(), volume_id: nil, volume_action: nil, - save_config: nil + save_config_payload: nil )} end @impl true + def update(%{event: :open_save_config}, socket) do + {:ok, assign(socket, save_config_payload: build_config(socket))} + end + + def update(%{event: :close_save_config}, socket) do + {:ok, assign(socket, save_config_payload: nil)} + end + + def update(%{event: {:load_config, config_defaults}}, socket) do + {:ok, + socket + |> assign(config_defaults: config_defaults) + |> load_config_defaults()} + end + def update(assigns, socket) do socket = assign(socket, assigns) @@ -67,11 +80,17 @@ defmodule LivebookWeb.SessionLive.FlyRuntimeComponent do The machine is automatically destroyed, once you disconnect the runtime.
- <.save_config_form :if={@save_config} save_config={@save_config} hub={@hub} myself={@myself} /> - -