diff --git a/config/config.exs b/config/config.exs
index 529ea8e65f2..bfcf6ebfabd 100644
--- a/config/config.exs
+++ b/config/config.exs
@@ -40,7 +40,8 @@ config :livebook,
teams_url: "https://teams.livebook.dev",
github_release_info: %{repo: "livebook-dev/livebook", version: Mix.Project.config()[:version]},
update_instructions_url: nil,
- within_iframe: false
+ within_iframe: false,
+ k8s_kubeconfig_pipeline: Kubereq.Kubeconfig.Default
config :livebook, Livebook.Apps.Manager, retry_backoff_base_ms: 5_000
diff --git a/config/test.exs b/config/test.exs
index 3e9471b2d17..316b41d3ee0 100644
--- a/config/test.exs
+++ b/config/test.exs
@@ -24,6 +24,12 @@ end
config :livebook,
data_path: data_path,
- agent_name: "chonky-cat"
+ agent_name: "chonky-cat",
+ k8s_kubeconfig_pipeline:
+ {Kubereq.Kubeconfig.Stub,
+ plugs: %{
+ "default" => {Req.Test, :k8s_cluster},
+ "no-permission" => {Req.Test, :k8s_cluster}
+ }}
config :livebook, Livebook.Apps.Manager, retry_backoff_base_ms: 0
diff --git a/lib/livebook.ex b/lib/livebook.ex
index 5455ce503d0..3cb1fe1fb9d 100644
--- a/lib/livebook.ex
+++ b/lib/livebook.ex
@@ -166,7 +166,8 @@ defmodule Livebook do
[
Livebook.Runtime.Standalone,
Livebook.Runtime.Attached,
- Livebook.Runtime.Fly
+ Livebook.Runtime.Fly,
+ Livebook.Runtime.K8s
]
if home = Livebook.Config.writable_dir!("LIVEBOOK_HOME") do
diff --git a/lib/livebook/k8s/auth.ex b/lib/livebook/k8s/auth.ex
new file mode 100644
index 00000000000..207fb0c6840
--- /dev/null
+++ b/lib/livebook/k8s/auth.ex
@@ -0,0 +1,65 @@
+defmodule Livebook.K8s.Auth do
+ # Implementation of Access Review checks for the authenticated user
+ # using the `SelfSubjectAccessReview` [1] resource.
+ #
+ # [1]: https://kubernetes.io/docs/reference/kubernetes-api/authorization-resources/self-subject-access-review-v1/#SelfSubjectAccessReviewSpec
+
+ @doc """
+ Concurrently reviews access according to a list of `resource_attributes`.
+
+ Expects `req` to be prepared for `SelfSubjectAccessReview`.
+ """
+ @spec batch_check(Req.Request.t(), [keyword()]) ::
+ [:ok | {:error, %Req.Response{}} | {:error, Exception.t()}]
+ def batch_check(req, resource_attribute_list) do
+ resource_attribute_list
+ |> Enum.map(&Task.async(fn -> can_i?(req, &1) end))
+ |> Task.await_many(:infinity)
+ end
+
+ @doc """
+ Reviews access according to `resource_attributes`.
+
+ Expects `req` to be prepared for `SelfSubjectAccessReview`.
+ """
+ @spec can_i?(Req.Request.t(), keyword()) ::
+ :ok | {:error, %Req.Response{}} | {:error, Exception.t()}
+ def can_i?(req, resource_attributes) do
+ resource_attributes =
+ resource_attributes
+ |> Keyword.validate!([
+ :name,
+ :namespace,
+ :path,
+ :resource,
+ :subresource,
+ :verb,
+ :version,
+ group: ""
+ ])
+ |> Enum.into(%{})
+
+ access_review = %{
+ "apiVersion" => "authorization.k8s.io/v1",
+ "kind" => "SelfSubjectAccessReview",
+ "spec" => %{
+ "resourceAttributes" => resource_attributes
+ }
+ }
+
+ create_self_subject_access_review(req, access_review)
+ end
+
+ defp create_self_subject_access_review(req, access_review) do
+ case Kubereq.create(req, access_review) do
+ {:ok, %Req.Response{status: 201, body: %{"status" => %{"allowed" => true}}}} ->
+ :ok
+
+ {:ok, %Req.Response{} = response} ->
+ {:error, response}
+
+ {:error, error} ->
+ {:error, error}
+ end
+ end
+end
diff --git a/lib/livebook/k8s/pod.ex b/lib/livebook/k8s/pod.ex
new file mode 100644
index 00000000000..0337416e6ca
--- /dev/null
+++ b/lib/livebook/k8s/pod.ex
@@ -0,0 +1,182 @@
+defmodule Livebook.K8s.Pod do
+ @main_container_name "livebook-runtime"
+ @home_pvc_volume_name "livebook-home"
+
+ @default_pod_template """
+ apiVersion: v1
+ kind: Pod
+ metadata:
+ generateName: livebook-runtime-
+ spec:
+ containers:
+ - name: livebook-runtime
+ resources:
+ limits:
+ cpu: "1"
+ memory: 1Gi
+ requests:
+ cpu: "1"
+ memory: 1Gi\
+ """
+
+ @doc """
+ Returns the default pod template.
+ """
+ @spec default_pod_template() :: String.t()
+ def default_pod_template(), do: @default_pod_template
+
+ @doc """
+ Set the namespace on the given manifest.
+ """
+ @spec set_namespace(map(), String.t()) :: map()
+ def set_namespace(manifest, namespace) do
+ put_in(manifest, ["metadata", "namespace"], namespace)
+ end
+
+ @doc """
+ Adds "volume" and "volumeMount" configurations to `manifest` in order
+ to mount `home_pvc` under /home/livebook on the pod.
+ """
+ @spec set_home_pvc(map(), String.t()) :: map()
+ def set_home_pvc(manifest, home_pvc) do
+ manifest
+ |> update_in(["spec", Access.key("volumes", [])], fn volumes ->
+ volume = %{
+ "name" => @home_pvc_volume_name,
+ "persistentVolumeClaim" => %{"claimName" => home_pvc}
+ }
+
+ [volume | volumes]
+ end)
+ |> update_in(
+ ["spec", "containers", access_main_container(), Access.key("volumeMounts", [])],
+ fn volume_mounts ->
+ [%{"name" => @home_pvc_volume_name, "mountPath" => "/home/livebook"} | volume_mounts]
+ end
+ )
+ end
+
+ @doc """
+ Adds the list of `env_vars` to the main container of the given `manifest`.
+ """
+ @spec add_env_vars(map(), list()) :: map()
+ def add_env_vars(manifest, env_vars) do
+ update_in(
+ manifest,
+ ["spec", "containers", access_main_container(), Access.key("env", [])],
+ fn existing_vars -> env_vars ++ existing_vars end
+ )
+ end
+
+ @doc """
+ Sets the tag of the main container's image.
+ """
+ @spec set_docker_tag(map(), String.t()) :: map()
+ def set_docker_tag(manifest, docker_tag) do
+ image = "ghcr.io/livebook-dev/livebook:#{docker_tag}"
+ put_in(manifest, ["spec", "containers", access_main_container(), "image"], image)
+ end
+
+ @doc """
+ Adds the `port` to the main container and adds a readiness probe.
+ """
+ @spec add_container_port(map(), non_neg_integer()) :: map()
+ def add_container_port(manifest, port) do
+ readiness_probe = %{
+ "tcpSocket" => %{"port" => port},
+ "initialDelaySeconds" => 1,
+ "periodSeconds" => 1
+ }
+
+ manifest
+ |> update_in(
+ ["spec", "containers", access_main_container(), Access.key("ports", [])],
+ &[%{"containerPort" => port} | &1]
+ )
+ |> put_in(["spec", "containers", access_main_container(), "readinessProbe"], readiness_probe)
+ end
+
+ @doc """
+ Turns the given `pod_template` into a Pod manifest.
+ """
+ @spec pod_from_template(String.t()) :: map()
+ def pod_from_template(pod_template) do
+ pod_template
+ |> YamlElixir.read_from_string!()
+ |> do_pod_from_template()
+ end
+
+ defp do_pod_from_template(pod) do
+ pod
+ |> Map.merge(%{"apiVersion" => "v1", "kind" => "Pod"})
+ |> put_in(["spec", "restartPolicy"], "Never")
+ end
+
+ @doc """
+ Validates the given Pod manifest.
+ """
+ @spec validate_pod_template(map(), String.t()) :: :ok | {:error, String.t()}
+ def validate_pod_template(pod, namespace)
+
+ def validate_pod_template(%{"apiVersion" => "v1", "kind" => "Pod"} = pod, namespace) do
+ with :ok <- validate_basics(pod),
+ :ok <- validate_main_container(pod),
+ :ok <- validate_namespace(pod, namespace) do
+ validate_container_image(pod)
+ end
+ end
+
+ def validate_pod_template(_other_input, _namespace) do
+ {:error, ~s/Make sure to define a valid resource of apiVersion "v1" and kind "Pod"./}
+ end
+
+ defp validate_basics(pod) do
+ cond do
+ not match?(%{"metadata" => %{}}, pod) ->
+ {:error, ".metadata is missing in your pod template."}
+
+ not match?(%{"spec" => %{"containers" => containers}} when is_list(containers), pod) ->
+ {:error, ".spec.containers is missing in your pod template."}
+
+ pod["metadata"]["name"] in [nil, ""] and pod["metadata"]["generateName"] in [nil, ""] ->
+ {:error,
+ "Make sure to define .metadata.name or .metadata.generateName in your pod template."}
+
+ true ->
+ :ok
+ end
+ end
+
+ defp validate_main_container(pod) do
+ if get_in(pod, ["spec", "containers", access_main_container()]) do
+ :ok
+ else
+ {:error,
+ ~s/Main container is missing. The main container should be named "#{@main_container_name}"./}
+ end
+ end
+
+ defp validate_container_image(pod) do
+ if get_in(pod, ["spec", "containers", access_main_container(), "image"]) do
+ {:error,
+ "You can't set the container image of the main container. It's going to be overridden."}
+ else
+ :ok
+ end
+ end
+
+ defp validate_namespace(pod, namespace) do
+ template_ns = get_in(pod, ["metadata", "namespace"])
+
+ if template_ns == nil or template_ns == namespace do
+ :ok
+ else
+ {:error,
+ "The field .template.metadata.namespace has to be omitted or set to the namespace you selected."}
+ end
+ end
+
+ defp access_main_container() do
+ Kubereq.Access.find(&(&1["name"] == @main_container_name))
+ end
+end
diff --git a/lib/livebook/k8s/pvc.ex b/lib/livebook/k8s/pvc.ex
new file mode 100644
index 00000000000..85a3df054b3
--- /dev/null
+++ b/lib/livebook/k8s/pvc.ex
@@ -0,0 +1,58 @@
+defmodule Livebook.K8s.PVC do
+ use Ecto.Schema
+
+ import Ecto.Changeset
+
+ @type t :: %__MODULE__{
+ name: String.t(),
+ size_gb: integer(),
+ access_mode: String.t(),
+ storage_class: String.t()
+ }
+
+ @primary_key false
+ embedded_schema do
+ field :name, :string
+ field :size_gb, :integer
+ field :access_mode, :string, default: "ReadWriteOnce"
+ field :storage_class, :string, default: nil
+ end
+
+ @fields ~w(name size_gb access_mode storage_class)a
+ @required ~w(name size_gb access_mode)a
+
+ @doc """
+ Build a PVC changeset for the given `attrs`.
+ """
+ @spec changeset(map()) :: Ecto.Changeset.t()
+ def changeset(attrs \\ %{}) do
+ %__MODULE__{}
+ |> cast(attrs, @fields)
+ |> validate_required(@required)
+ end
+
+ @doc """
+ Build PVC manifest for the given `pvc` and `namespace` to be applied to a
+ cluster.
+ """
+ @spec manifest(pvc :: t(), namespace: String.t()) :: manifest :: map()
+ def manifest(pvc, namespace) do
+ %{
+ "apiVersion" => "v1",
+ "kind" => "PersistentVolumeClaim",
+ "metadata" => %{
+ "name" => pvc.name,
+ "namespace" => namespace
+ },
+ "spec" => %{
+ "storageClassName" => pvc.storage_class,
+ "accessModes" => [pvc.access_mode],
+ "resources" => %{
+ "requests" => %{
+ "storage" => "#{pvc.size_gb}Gi"
+ }
+ }
+ }
+ }
+ end
+end
diff --git a/lib/livebook/runtime/k8s.ex b/lib/livebook/runtime/k8s.ex
new file mode 100644
index 00000000000..65765af64fe
--- /dev/null
+++ b/lib/livebook/runtime/k8s.ex
@@ -0,0 +1,555 @@
+defmodule Livebook.Runtime.K8s do
+ # A runtime backed by a Kubernetes Pod managed by Livebook.
+ #
+ # This runtime uses the same concepts as the Fly runtime. In this
+ # case, we start a Pod in a Kubernetes cluster and use kubectl to
+ # proxy a local port to the distribution port of the remote node.
+ # See `Livebook.Runtime.Fly` for more design details.
+
+ defstruct [:config, :node, :req, :server_pid, :lv_pid, :pod_name]
+
+ @type config :: %{
+ context: String.t(),
+ namespace: String.t(),
+ home_pvc: String.t() | nil,
+ docker_tag: String.t(),
+ pod_template: String.t()
+ }
+
+ @type t :: %__MODULE__{
+ node: node() | nil,
+ req: Req.Request.t(),
+ server_pid: pid() | nil,
+ lv_pid: pid(),
+ pod_name: String.t() | nil
+ }
+
+ use GenServer, restart: :temporary
+
+ require Logger
+
+ alias Livebook.K8s.Pod
+
+ @doc """
+ Returns a new runtime instance.
+ """
+ @spec new(config :: map(), req :: Req.Request.t()) :: t()
+ def new(config, req) do
+ %__MODULE__{config: config, req: req, lv_pid: self()}
+ end
+
+ def __connect__(runtime) do
+ {:ok, pid} =
+ DynamicSupervisor.start_child(Livebook.RuntimeSupervisor, {__MODULE__, {runtime, self()}})
+
+ pid
+ end
+
+ @doc false
+ def start_link({runtime, caller}) do
+ GenServer.start_link(__MODULE__, {runtime, caller})
+ end
+
+ @impl true
+ def init({runtime, caller}) do
+ state = %{primary_ref: nil}
+ {:ok, state, {:continue, {:init, runtime, caller}}}
+ end
+
+ @impl true
+ def handle_continue({:init, runtime, caller}, state) do
+ config = runtime.config
+ %{namespace: namespace, context: context} = config
+ req = runtime.req
+
+ kubeconfig =
+ if System.get_env("KUBERNETES_SERVICE_HOST") do
+ nil
+ else
+ System.get_env("KUBECONFIG") || Path.join(System.user_home!(), ".kube/config")
+ end
+
+ cluster_data = get_cluster_data(kubeconfig)
+
+ runtime_data =
+ %{
+ node_base: cluster_data.node_base,
+ cookie: Node.get_cookie(),
+ dist_port: cluster_data.remote_port
+ }
+ |> :erlang.term_to_binary()
+ |> Base.encode64()
+
+ parent = self()
+
+ {:ok, watcher_pid} =
+ DynamicSupervisor.start_child(
+ Livebook.RuntimeSupervisor,
+ {Task, fn -> watcher(parent, req, config) end}
+ )
+
+ with {:ok, pod_name} <-
+ with_log(caller, "create pod", fn ->
+ create_pod(req, config, runtime_data, cluster_data.remote_port)
+ end),
+ _ <- send(watcher_pid, {:pod_created, pod_name}),
+ {:ok, pod_ip} <-
+ with_pod_events(caller, "waiting for pod", req, namespace, pod_name, fn ->
+ await_pod_ready(req, namespace, pod_name)
+ end),
+ child_node <- :"#{cluster_data.node_base}@#{pod_ip}",
+ :ok <-
+ with_log(caller, "start proxy", fn ->
+ k8s_forward_port(kubeconfig, context, cluster_data, pod_name, namespace)
+ end),
+ :ok <-
+ with_log(caller, "connect to node", fn ->
+ connect_loop(child_node, 40, 250)
+ end),
+ {:ok, primary_pid} <- fetch_runtime_info(child_node) do
+ primary_ref = Process.monitor(primary_pid)
+
+ server_pid =
+ with_log(caller, "initialize node", fn ->
+ initialize_node(child_node)
+ end)
+
+ send(primary_pid, :node_initialized)
+
+ send(watcher_pid, :done)
+
+ runtime = %{runtime | node: child_node, server_pid: server_pid, pod_name: pod_name}
+ send(caller, {:runtime_connect_done, self(), {:ok, runtime}})
+
+ {:noreply, %{state | primary_ref: primary_ref}}
+ else
+ {:error, error} ->
+ send(caller, {:runtime_connect_done, self(), {:error, error}})
+
+ {:stop, :shutdown, state}
+ end
+ end
+
+ @impl true
+ def handle_info({:DOWN, ref, :process, _pid, _reason}, state) when ref == state.primary_ref do
+ {:stop, :shutdown, state}
+ end
+
+ def handle_info({port, _message}, state) when is_port(port) do
+ {:noreply, state}
+ end
+
+ defp get_free_port!() do
+ {:ok, socket} = :gen_tcp.listen(0, active: false, reuseaddr: true)
+ {:ok, port} = :inet.port(socket)
+ :gen_tcp.close(socket)
+ port
+ end
+
+ defp with_log(caller, name, fun) do
+ send(caller, {:runtime_connect_info, self(), name})
+
+ {microseconds, result} = :timer.tc(fun)
+ milliseconds = div(microseconds, 1000)
+
+ case result do
+ {:error, error} ->
+ Logger.debug("[K8s runtime] #{name} FAILED in #{milliseconds}ms, error: #{error}")
+
+ _ ->
+ Logger.debug("[K8s runtime] #{name} finished in #{milliseconds}ms")
+ end
+
+ result
+ end
+
+ defp with_pod_events(caller, name, req, namespace, pod_name, fun) do
+ with_log(caller, name, fn ->
+ runtime_pid = self()
+
+ event_watcher_pid =
+ spawn_link(fn ->
+ watch_result =
+ req
+ |> Req.merge(
+ resource_path: "api/v1/namespaces/:namespace/events/:name",
+ resource_list_path: "api/v1/namespaces/:namespace/events"
+ )
+ |> Kubereq.watch(namespace,
+ field_selectors: [
+ {"involvedObject.kind", "Pod"},
+ {"involvedObject.name", pod_name}
+ ]
+ )
+
+ case watch_result do
+ {:ok, stream} ->
+ Enum.each(stream, fn event ->
+ message = Livebook.Utils.downcase_first(event["object"]["message"])
+ Logger.debug(~s'[K8s runtime] Pod event: "#{message}"')
+ send(caller, {:runtime_connect_info, runtime_pid, message})
+ end)
+
+ _error ->
+ :ok
+ end
+ end)
+
+ result = fun.()
+ Process.exit(event_watcher_pid, :normal)
+ result
+ end)
+ end
+
+ defp watcher(parent, req, config) do
+ ref = Process.monitor(parent)
+ watcher_loop(%{ref: ref, config: config, req: req, pod_name: nil})
+ end
+
+ defp watcher_loop(state) do
+ receive do
+ {:DOWN, ref, :process, _pid, _reason} when ref == state.ref ->
+ # If the parent process is killed, we try to eagerly free the
+ # created resources
+ if pod_name = state.pod_name do
+ namespace = state.config.namespace
+ _ = Kubereq.delete(state.req, namespace, pod_name)
+ end
+
+ {:pod_created, pod_name} ->
+ watcher_loop(%{state | pod_name: pod_name})
+
+ :done ->
+ :ok
+ end
+ end
+
+ defp create_pod(req, config, runtime_data, remote_port) do
+ %{
+ pod_template: pod_template,
+ docker_tag: docker_tag,
+ home_pvc: home_pvc,
+ namespace: namespace
+ } = config
+
+ manifest =
+ pod_template
+ |> Pod.pod_from_template()
+ |> Pod.add_env_vars([
+ %{"name" => "LIVEBOOK_RUNTIME", "value" => runtime_data},
+ %{
+ "name" => "POD_IP",
+ "valueFrom" => %{"fieldRef" => %{"apiVersion" => "v1", "fieldPath" => "status.podIP"}}
+ },
+ %{
+ "name" => "POD_NAMESPACE",
+ "valueFrom" => %{
+ "fieldRef" => %{"apiVersion" => "v1", "fieldPath" => "metadata.namespace"}
+ }
+ },
+ %{
+ "name" => "POD_NAME",
+ "valueFrom" => %{"fieldRef" => %{"apiVersion" => "v1", "fieldPath" => "metadata.name"}}
+ }
+ ])
+ |> Pod.set_docker_tag(docker_tag)
+ |> Pod.set_namespace(namespace)
+ |> Pod.add_container_port(remote_port)
+
+ manifest =
+ if home_pvc do
+ Pod.set_home_pvc(manifest, home_pvc)
+ else
+ manifest
+ end
+
+ case Kubereq.create(req, manifest) do
+ {:ok, %{status: 201, body: %{"metadata" => %{"name" => pod_name}}}} ->
+ {:ok, pod_name}
+
+ {:ok, %{body: body}} ->
+ {:error, "could not create Pod, reason: #{body["message"]}"}
+
+ {:error, error} ->
+ {:error, "could not create Pod, reason: #{Exception.message(error)}"}
+ end
+ end
+
+ defp get_cluster_data(_kubeconfig = nil) do
+ # When already running within Kubernetes we don't need the proxy,
+ # the node is reachable directly
+ %{node_base: "k8s_runtime", remote_port: 44444}
+ end
+
+ defp get_cluster_data(_kubeconfig) do
+ local_port = get_free_port!()
+ %{node_base: "remote_runtime_#{local_port}", remote_port: 44444, local_port: local_port}
+ end
+
+ defp k8s_forward_port(_kubeconfig = nil, _, _, _, _), do: :ok
+
+ defp k8s_forward_port(kubeconfig, context, cluster_data, pod_name, namespace) do
+ %{local_port: local_port, remote_port: remote_port} = cluster_data
+
+ with {:ok, kubectl_path} <- find_kubectl_executable() do
+ ports = "#{local_port}:#{remote_port}"
+
+ # We want the proxy to accept the same protocol that we are
+ # going to use for distribution
+ bind_addr =
+ if Livebook.Utils.proto_dist() == :inet6_tcp do
+ "[::1]"
+ else
+ "127.0.0.1"
+ end
+
+ args =
+ [
+ "port-forward",
+ "--kubeconfig",
+ Path.expand(kubeconfig),
+ "--context",
+ context,
+ "-n",
+ namespace,
+ pod_name,
+ ports,
+ "--address",
+ bind_addr
+ ]
+
+ port =
+ Port.open(
+ {:spawn_executable, kubectl_path},
+ [:binary, :hide, :stderr_to_stdout, args: args, env: []]
+ )
+
+ port_ref = Port.monitor(port)
+
+ result =
+ receive do
+ {^port, {:data, "Forwarding from " <> _}} ->
+ :ok
+
+ {^port, {:data, "Error " <> _ = message}} ->
+ {:error, "failed to port-forward. #{String.trim(message)}"}
+
+ {:DOWN, ^port_ref, :port, _object, reason} ->
+ {:error, "failed to port-forward. Process terminated, reason: #{inspect(reason)}"}
+ after
+ 30_000 ->
+ {:error, "failed to port-forward. Timed out after 30s"}
+ end
+
+ Port.demonitor(port_ref, [:flush])
+
+ result
+ end
+ end
+
+ defp find_kubectl_executable() do
+ if path = System.find_executable("kubectl") do
+ {:ok, path}
+ else
+ {:error, "no kubectl executable found in PATH."}
+ end
+ end
+
+ defp await_pod_ready(req, namespace, pod_name) do
+ with :ok <-
+ Kubereq.wait_until(
+ req,
+ namespace,
+ pod_name,
+ fn
+ :deleted ->
+ {:error, "The Pod was deleted before it started running."}
+
+ pod ->
+ get_in(pod, [
+ "status",
+ "conditions",
+ Access.filter(&(&1["type"] == "Ready")),
+ "status"
+ ]) == ["True"]
+ end,
+ # 30 minutes
+ 1_800_000
+ ),
+ {:ok, %{status: 200, body: pod}} <- Kubereq.get(req, namespace, pod_name) do
+ {:ok, pod["status"]["podIP"]}
+ else
+ {:error, :watch_timeout} ->
+ {:error, "Timed out waiting for Pod to start up."}
+
+ {:error, error} ->
+ {:error, error}
+
+ _other ->
+ {:error, "Failed getting the Pod's IP address."}
+ end
+ end
+
+ defp connect_loop(_node, 0, _interval) do
+ {:error, "could not establish connection with the node"}
+ end
+
+ defp connect_loop(node, attempts, interval) do
+ if Node.connect(node) do
+ :ok
+ else
+ Process.sleep(interval)
+ connect_loop(node, attempts - 1, interval)
+ end
+ end
+
+ defp fetch_runtime_info(child_node) do
+ # Note: it is Livebook that starts the runtime node, so we know
+ # that the node runs Livebook release of the exact same version
+ #
+ # Also, the remote node already has all the runtime modules in
+ # the code path, compiled for its Elixir version, so we don't
+ # need to check for matching Elixir version.
+
+ %{pid: pid} = :erpc.call(child_node, :persistent_term, :get, [:livebook_runtime_info])
+
+ {:ok, pid}
+ end
+
+ defp initialize_node(child_node) do
+ init_opts = [
+ runtime_server_opts: [
+ extra_smart_cell_definitions: Livebook.Runtime.Definitions.smart_cell_definitions()
+ ]
+ ]
+
+ Livebook.Runtime.ErlDist.initialize(child_node, init_opts)
+ end
+end
+
+defimpl Livebook.Runtime, for: Livebook.Runtime.K8s do
+ alias Livebook.Runtime.ErlDist.RuntimeServer
+
+ def describe(runtime) do
+ [{"Type", "K8s Pod"}] ++
+ if runtime.node do
+ [{"Pod name", runtime.pod_name}, {"Node name", Atom.to_string(runtime.node)}]
+ else
+ []
+ end
+ end
+
+ def connect(runtime) do
+ Livebook.Runtime.K8s.__connect__(runtime)
+ end
+
+ def take_ownership(runtime, opts \\ []) do
+ RuntimeServer.attach(runtime.server_pid, self(), opts)
+ Process.monitor(runtime.server_pid)
+ end
+
+ def disconnect(runtime) do
+ :ok = RuntimeServer.stop(runtime.server_pid)
+ end
+
+ def duplicate(runtime) do
+ Livebook.Runtime.K8s.new(runtime.config, runtime.req)
+ end
+
+ def evaluate_code(runtime, language, code, locator, parent_locators, opts \\ []) do
+ RuntimeServer.evaluate_code(
+ runtime.server_pid,
+ language,
+ code,
+ locator,
+ parent_locators,
+ opts
+ )
+ end
+
+ def forget_evaluation(runtime, locator) do
+ RuntimeServer.forget_evaluation(runtime.server_pid, locator)
+ end
+
+ def drop_container(runtime, container_ref) do
+ RuntimeServer.drop_container(runtime.server_pid, container_ref)
+ end
+
+ def handle_intellisense(runtime, send_to, request, parent_locators, node) do
+ RuntimeServer.handle_intellisense(runtime.server_pid, send_to, request, parent_locators, node)
+ end
+
+ def read_file(runtime, path) do
+ RuntimeServer.read_file(runtime.server_pid, path)
+ end
+
+ def transfer_file(runtime, path, file_id, callback) do
+ RuntimeServer.transfer_file(runtime.server_pid, path, file_id, callback)
+ end
+
+ def relabel_file(runtime, file_id, new_file_id) do
+ RuntimeServer.relabel_file(runtime.server_pid, file_id, new_file_id)
+ end
+
+ def revoke_file(runtime, file_id) do
+ RuntimeServer.revoke_file(runtime.server_pid, file_id)
+ end
+
+ def start_smart_cell(runtime, kind, ref, attrs, parent_locators) do
+ RuntimeServer.start_smart_cell(runtime.server_pid, kind, ref, attrs, parent_locators)
+ end
+
+ def set_smart_cell_parent_locators(runtime, ref, parent_locators) do
+ RuntimeServer.set_smart_cell_parent_locators(runtime.server_pid, ref, parent_locators)
+ end
+
+ def stop_smart_cell(runtime, ref) do
+ RuntimeServer.stop_smart_cell(runtime.server_pid, ref)
+ end
+
+ def fixed_dependencies?(_runtime), do: false
+
+ def add_dependencies(_runtime, code, dependencies) do
+ Livebook.Runtime.Dependencies.add_dependencies(code, dependencies)
+ end
+
+ def has_dependencies?(runtime, dependencies) do
+ RuntimeServer.has_dependencies?(runtime.server_pid, dependencies)
+ end
+
+ def snippet_definitions(_runtime) do
+ Livebook.Runtime.Definitions.snippet_definitions()
+ end
+
+ def search_packages(_runtime, send_to, search) do
+ Livebook.Runtime.Dependencies.search_packages_on_hex(send_to, search)
+ end
+
+ def put_system_envs(runtime, envs) do
+ RuntimeServer.put_system_envs(runtime.server_pid, envs)
+ end
+
+ def delete_system_envs(runtime, names) do
+ RuntimeServer.delete_system_envs(runtime.server_pid, names)
+ end
+
+ def restore_transient_state(runtime, transient_state) do
+ RuntimeServer.restore_transient_state(runtime.server_pid, transient_state)
+ end
+
+ def register_clients(runtime, clients) do
+ RuntimeServer.register_clients(runtime.server_pid, clients)
+ end
+
+ def unregister_clients(runtime, client_ids) do
+ RuntimeServer.unregister_clients(runtime.server_pid, client_ids)
+ end
+
+ def fetch_proxy_handler_spec(runtime) do
+ RuntimeServer.fetch_proxy_handler_spec(runtime.server_pid)
+ end
+
+ def disconnect_node(runtime, node) do
+ RuntimeServer.disconnect_node(runtime.server_pid, node)
+ end
+end
diff --git a/lib/livebook_web/live/session_live/k8s_runtime_component.ex b/lib/livebook_web/live/session_live/k8s_runtime_component.ex
new file mode 100644
index 00000000000..2f9c3ec5b42
--- /dev/null
+++ b/lib/livebook_web/live/session_live/k8s_runtime_component.ex
@@ -0,0 +1,992 @@
+defmodule LivebookWeb.SessionLive.K8sRuntimeComponent do
+ use LivebookWeb, :live_component
+
+ import Ecto.Changeset
+
+ alias Livebook.{Session, Runtime}
+ alias Livebook.K8s.{Auth, Pod, PVC}
+
+ @config_secret_prefix "K8S_RUNTIME_"
+ @kubeconfig_pipeline Application.compile_env(:livebook, :k8s_kubeconfig_pipeline)
+
+ @impl true
+ def mount(socket) do
+ unless Livebook.Config.runtime_enabled?(Livebook.Runtime.K8s) do
+ raise "runtime module not allowed"
+ end
+
+ kubeconfig = Kubereq.Kubeconfig.load(@kubeconfig_pipeline)
+ context_options = Enum.map(kubeconfig.contexts, & &1["name"])
+
+ {:ok,
+ socket
+ |> assign(
+ kubeconfig: kubeconfig,
+ context_options: context_options,
+ context: nil,
+ reqs: nil,
+ cluster_check: %{status: :initial, error: nil},
+ namespace: nil,
+ namespace_options: nil,
+ rbac: %{status: :inflight, errors: [], permissions: []},
+ save_config: nil,
+ pvcs: nil,
+ pvc_action: nil,
+ home_pvc: nil,
+ docker_tag: hd(Livebook.Config.docker_images()).tag,
+ pod_template: %{template: Pod.default_pod_template(), status: :valid, message: nil}
+ )}
+ end
+
+ @impl true
+ @spec update(maybe_improper_list() | map(), any()) :: {:ok, any()}
+ def update(assigns, socket) do
+ socket = assign(socket, assigns)
+
+ socket =
+ cond do
+ is_map_key(socket.assigns, :config_defaults) ->
+ socket
+
+ is_struct(assigns.runtime, Runtime.K8s) ->
+ %{config: config} = assigns.runtime
+
+ config_defaults =
+ Map.new(config, fn {key, value} ->
+ {Atom.to_string(key), value}
+ end)
+
+ socket
+ |> assign(config_defaults: config_defaults)
+ |> load_config_defaults()
+
+ true ->
+ socket
+ |> assign(config_defaults: nil)
+ |> set_context(socket.assigns.kubeconfig.current_context)
+ end
+
+ {:ok, socket}
+ end
+
+ @impl true
+ def render(assigns) do
+ ~H"""
+
+
+ Start a temporary Kubernetes Pod with an Elixir node to evaluate code.
+ The Pod is automatically deleted, once you disconnect the runtime.
+
+
+ <.save_config_form :if={@save_config} save_config={@save_config} hub={@hub} myself={@myself} />
+
+
+ <.config_actions hub_secrets={@hub_secrets} myself={@myself} />
+
+ <.message_box :if={@kubeconfig.current_cluster == nil} kind={:error}>
+ In order to use the Kubernetes context, you need to set the
KUBECONFIG
+ environment variable to a path pointing to a
Kubernetes configuration YAML file (e.g. to
"~/.kube/config"
).
+
+
+
+
+ <.loader :if={@cluster_check.status == :inflight} />
+
+ <.cluster_check_error :if={@cluster_check.status == :error} error={@cluster_check.error} />
+
+
+
+ <.message_box :if={@rbac.status === :errors} kind={:error}>
+ <%= for error <- @rbac.errors do %>
+ <.rbac_error error={error} />
+ <% end %>
+
+
+
+
+ Pod
+
+
+ You can fully customize the runtime pod by editing the pod template.
+
+
+
+
+
+ <.storage_config
+ :if={@rbac.status == :ok}
+ myself={@myself}
+ home_pvc={@home_pvc}
+ pvcs={@pvcs}
+ pvc_action={@pvc_action}
+ rbac={@rbac}
+ />
+
+
+
+ <.button phx-click="init" phx-target={@myself} disabled={@runtime_status == :connecting}>
+ <%= label(@namespace, @runtime, @runtime_status) %>
+
+ <.button
+ :if={@runtime_status == :connecting}
+ color="red"
+ outlined
+ phx-click="disconnect"
+ phx-target={@myself}
+ >
+ Disconnect
+
+
+
+
+
+
+ """
+ end
+
+ defp storage_config(assigns) do
+ ~H"""
+
+
+ Storage
+
+
+ Every time you connect to the runtime, a fresh machine is created.
+ In order to persist data and caches, you can optionally mount a
+ volume at /home/livebook
. Setting a Persistent Volume
+ Claim will automatically add a .template.spec.volumes[]
+ entry and a .template.spec.containers[name="livebook-runtime"].volumeMounts[]
+ entry to the pod template.
+
+
+
+
+
+
+
+
+ <.icon_button
+ phx-click="delete_pvc"
+ phx-target={@myself}
+ disabled={@home_pvc == nil or @pvc_action != nil}
+ >
+ <.remix_icon icon="delete-bin-6-line" />
+
+
+
+ <.icon_button phx-click="new_pvc" phx-target={@myself}>
+ <.remix_icon icon="add-line" />
+
+
+
+
+
+
+ Are you sure you want to irreversibly delete Persistent Volume Claim <%= @home_pvc %> ?
+
+
+
+ <.remix_icon icon="delete-bin-6-line" class="align-middle mr-1" />
+ <%= if @pvc_action[:type] == :delete, do: "Delete", else: "Deleting..." %>
+
+
+ Cancel
+
+
+
+
+ <.form
+ :let={pvcf}
+ :if={@pvc_action[:type] in [:new, :new_inflight]}
+ for={@pvc_action.changeset}
+ as={:pvc}
+ phx-submit="create_pvc"
+ phx-change="validate_pvc"
+ phx-target={@myself}
+ class="flex gap-2 mt-4 items-center"
+ autocomplete="off"
+ spellcheck="false"
+ >
+
+ <.remix_icon icon="corner-down-right-line" class="text-gray-400 text-lg" />
+
+
+ <.text_field field={pvcf[:name]} placeholder="Name" />
+ <.text_field field={pvcf[:size_gb]} placeholder="Size (Gi)" type="number" min="1" />
+ <.select_field
+ field={pvcf[:access_mode]}
+ options={["ReadWriteOnce", "ReadWriteMany", "ReadWriteOncePod"]}
+ />
+ <.select_field field={pvcf[:storage_class]} options={@pvc_action.storage_classes} />
+
+ <.button
+ :if={@pvc_action[:type] == :new}
+ type="submit"
+ disabled={not @pvc_action.changeset.valid? or @pvc_action[:type] == :new_inflight}
+ >
+ <%= if @pvc_action[:type] == :new, do: "Create", else: "Creating..." %>
+
+ <.button
+ :if={@pvc_action[:type] == :new}
+ type="button"
+ color="gray"
+ outlined
+ phx-click="cancel_new_pvc"
+ phx-target={@myself}
+ disabled={@pvc_action[:type] == :new_inflight}
+ >
+ Cancel
+
+
+ <.error :if={@pvc_action[:error]}><%= @pvc_action[:error] %>
+
+
+ """
+ end
+
+ defp save_config_form(assigns) do
+ ~H"""
+ <.form
+ :let={f}
+ for={@save_config.changeset}
+ as={:secret}
+ class="mt-4 flex flex-col"
+ phx-change="validate_save_config"
+ phx-submit="save_config"
+ phx-target={@myself}
+ autocomplete="off"
+ spellcheck="false"
+ >
+
+ Save config
+
+
+ Store the config in a secret in the <.workspace hub={@hub} /> workspace to reuse it later.
+
+
+ <.message_box kind={:error} message={error} />
+
+
+ <.text_field field={f[:name]} label="Secret name" class="uppercase" autofocus />
+
+
+ <.button type="submit" disabled={not @save_config.changeset.valid? or @save_config.inflight}>
+ <%= if(@save_config.inflight, do: "Saving...", else: "Save") %>
+
+ <.button
+ color="gray"
+ outlined
+ type="button"
+ phx-click="cancel_save_config"
+ phx-target={@myself}
+ >
+ Cancel
+
+
+
+ """
+ end
+
+ defp workspace(assigns) do
+ ~H"""
+
+ <%= @hub.hub_emoji %>
+ <%= @hub.hub_name %>
+
+ """
+ end
+
+ defp config_actions(assigns) do
+ ~H"""
+
+ <.button
+ color="gray"
+ outlined
+ small
+ type="button"
+ phx-click="open_save_config"
+ phx-target={@myself}
+ >
+ Save config
+
+ <.menu id="config-secret-menu">
+ <:toggle>
+ <.button color="gray" outlined small type="button">
+
Load config
+ <.remix_icon icon="arrow-down-s-line" class="text-base leading-none" />
+
+
+
+ No configs saved yet
+
+ <.menu_item :for={name <- config_secret_names(@hub_secrets)}>
+
+ <%= name %>
+
+
+
+
+ """
+ end
+
+ defp loader(assigns) do
+ ~H"""
+
+ Loading
+ <.spinner />
+
+ """
+ end
+
+ defp cluster_check_error(%{error: %{status: 401}} = assigns) do
+ ~H"""
+ <.message_box kind={:error}>
+
+
Authentication with cluster failed.
+
+
+ """
+ end
+
+ defp cluster_check_error(%{error: %{reason: :timeout}} = assigns) do
+ ~H"""
+ <.message_box kind={:error}>
+
+
Connection to cluster timed out.
+
+
+ """
+ end
+
+ defp cluster_check_error(assigns) do
+ ~H"""
+ <.message_box kind={:error}>
+
+
Connection to cluster failed.
+
+
+ """
+ end
+
+ defp rbac_error(%{error: %Req.Response{status: 201} = resp} = assigns) do
+ resourceAttributes = resp.body["spec"]["resourceAttributes"]
+ verb = resourceAttributes["verb"]
+ namespace = resourceAttributes["namespace"]
+
+ gkv =
+ String.trim(
+ "#{resourceAttributes["group"]}/#{resourceAttributes["version"]}/#{resourceAttributes["resource"]}",
+ "/"
+ )
+
+ assigns = assign(assigns, verb: verb, gkv: gkv, namespace: namespace)
+
+ ~H"""
+
+
+ Authenticated user has no permission to <%= @verb %>
+ <%= @gkv %>
+ in namespace <%= @namespace %>
(or the namespace doesn't exist) .
+
+
+ """
+ end
+
+ @impl true
+ def handle_event("set_context", %{"context" => context}, socket) do
+ {:noreply, socket |> set_context(context) |> set_namespace(nil)}
+ end
+
+ def handle_event("set_namespace", %{"namespace" => namespace}, socket) do
+ {:noreply, set_namespace(socket, namespace)}
+ end
+
+ def handle_event("set_docker_tag", %{"docker_tag" => docker_tag}, socket) do
+ {:noreply, assign(socket, :docker_tag, docker_tag)}
+ end
+
+ def handle_event("set_pod_template", %{"pod_template" => pod_template}, socket) do
+ {:noreply, set_pod_template(socket, pod_template)}
+ end
+
+ def handle_event("set_home_pvc", %{"home_pvc" => home_pvc}, socket) do
+ {:noreply, assign(socket, :home_pvc, home_pvc)}
+ end
+
+ def handle_event("disconnect", %{}, socket) do
+ Session.disconnect_runtime(socket.assigns.session.pid)
+ {:noreply, socket}
+ end
+
+ def handle_event("new_pvc", %{}, socket) do
+ pvc_action = %{
+ type: :new,
+ changeset: PVC.changeset(),
+ storage_classes: storage_classes(socket.assigns),
+ inflight: false,
+ error: false
+ }
+
+ {:noreply, assign(socket, pvc_action: pvc_action)}
+ end
+
+ def handle_event("validate_pvc", %{"pvc" => pvc}, socket) do
+ changeset =
+ pvc
+ |> PVC.changeset()
+ |> Map.replace!(:action, :validate)
+
+ {:noreply, assign_nested(socket, :pvc_action, changeset: changeset)}
+ end
+
+ def handle_event("cancel_new_pvc", %{}, socket) do
+ {:noreply, assign(socket, pvc_action: nil)}
+ end
+
+ def handle_event("create_pvc", %{"pvc" => pvc}, socket) do
+ pvc
+ |> PVC.changeset()
+ |> apply_action(:insert)
+ |> case do
+ {:ok, applied_pvc} ->
+ {:noreply, create_pvc(socket, applied_pvc)}
+
+ {:error, changeset} ->
+ {:noreply, assign_nested(socket, :pvc_action, changeset: changeset)}
+ end
+ end
+
+ def handle_event("delete_pvc", %{}, socket) do
+ pvc_action = %{type: :delete, error: nil}
+ {:noreply, assign(socket, pvc_action: pvc_action)}
+ end
+
+ def handle_event("confirm_delete_pvc", %{}, socket) do
+ %{namespace: namespace, home_pvc: name} = socket.assigns
+ req = socket.assigns.reqs.pvc
+
+ socket =
+ socket
+ |> start_async(:delete_pvc, fn -> Kubereq.delete(req, namespace, name) end)
+ |> assign_nested(:pvc_action, type: :delete_inflight)
+
+ {:noreply, socket}
+ end
+
+ def handle_event("cancel_delete_pvc", %{}, socket) do
+ {:noreply, assign(socket, pvc_action: nil)}
+ end
+
+ def handle_event("init", %{}, socket) do
+ config = build_config(socket)
+ runtime = Runtime.K8s.new(config, socket.assigns.reqs.pod)
+ Session.set_runtime(socket.assigns.session.pid, runtime)
+ Session.connect_runtime(socket.assigns.session.pid)
+ {:noreply, socket}
+ end
+
+ def handle_event("open_save_config", %{}, socket) do
+ changeset = config_secret_changeset(socket, %{name: @config_secret_prefix})
+ save_config = %{changeset: changeset, inflight: false, error: false}
+ {:noreply, assign(socket, save_config: save_config)}
+ end
+
+ def handle_event("cancel_save_config", %{}, socket) do
+ {:noreply, assign(socket, save_config: nil)}
+ end
+
+ def handle_event("validate_save_config", %{"secret" => secret}, socket) do
+ changeset =
+ socket
+ |> config_secret_changeset(secret)
+ |> Map.replace!(:action, :validate)
+
+ {:noreply, assign_nested(socket, :save_config, changeset: changeset)}
+ end
+
+ def handle_event("save_config", %{"secret" => secret}, socket) do
+ changeset = config_secret_changeset(socket, secret)
+
+ case Ecto.Changeset.apply_action(changeset, :insert) do
+ {:ok, secret} ->
+ {:noreply, save_config_secret(socket, secret, changeset)}
+
+ {:error, changeset} ->
+ {:noreply, assign_nested(socket, :save_config, changeset: changeset)}
+ end
+ end
+
+ def handle_event("load_config", %{"name" => name}, socket) do
+ secret = Enum.find(socket.assigns.hub_secrets, &(&1.name == name))
+
+ case Jason.decode(secret.value) do
+ {:ok, config_defaults} ->
+ {:noreply,
+ socket
+ |> assign(config_defaults: config_defaults)
+ |> load_config_defaults()}
+
+ {:error, _} ->
+ {:noreply, socket}
+ end
+ end
+
+ @impl true
+ def handle_async(:rbac_check, {:ok, %{errors: errors, permissions: permissions}}, socket) do
+ status = if errors === [], do: :ok, else: :errors
+ {:noreply, assign(socket, :rbac, %{status: status, errors: errors, permissions: permissions})}
+ end
+
+ def handle_async(:load_namespace_options, {:ok, [:ok, {:ok, resp}]}, socket) do
+ socket =
+ case resp do
+ %Req.Response{status: 200, body: %{"items" => resources}} ->
+ namespace_options = Enum.map(resources, & &1["metadata"]["name"])
+
+ socket
+ |> assign(:namespace_options, namespace_options)
+ |> set_namespace(List.first(namespace_options))
+ |> assign(:cluster_check, %{status: :ok, error: nil})
+
+ %Req.Response{status: _other} ->
+ # cannot list namespaces
+ socket
+ |> assign(:namespace_options, nil)
+ |> assign(:cluster_check, %{status: :ok, error: nil})
+ end
+
+ {:noreply, socket}
+ end
+
+ def handle_async(:delete_pvc, {:ok, result}, socket) do
+ socket =
+ case result do
+ {:ok, %{status: 200}} ->
+ socket
+ |> assign(home_pvc: nil, pvc_action: nil)
+ |> pvc_options()
+
+ {:ok, %{body: %{"message" => message}}} ->
+ assign_nested(socket, :pvc_action, error: message, type: :delete)
+ end
+
+ {:noreply, socket}
+ end
+
+ def handle_async(:create_pvc, {:ok, result}, socket) do
+ socket =
+ case result do
+ {:ok, %{status: 201, body: created_pvc}} ->
+ socket
+ |> assign(home_pvc: created_pvc["metadata"]["name"], pvc_action: nil)
+ |> pvc_options()
+
+ {:ok, %{body: body}} ->
+ socket
+ |> assign_nested(:pvc_action,
+ error: "Creating the PVC failed: #{body["message"]}",
+ type: :new
+ )
+
+ {:error, error} when is_exception(error) ->
+ socket
+ |> assign_nested(:pvc_action,
+ error: "Creating the PVC failed: #{Exception.message(error)}",
+ type: :new
+ )
+ end
+
+ {:noreply, socket}
+ end
+
+ def handle_async(:load_namespace_options, {:ok, results}, socket) do
+ {:error, error} = List.first(results, &match?({:error, _}, &1))
+
+ socket =
+ socket
+ |> assign(:namespace_options, nil)
+ |> assign(:cluster_check, %{status: :error, error: error})
+
+ {:noreply, socket}
+ end
+
+ def handle_async(:save_config, {:ok, result}, socket) do
+ socket =
+ case result do
+ :ok ->
+ assign(socket, save_config: nil)
+
+ {:error, %Ecto.Changeset{} = changeset} ->
+ assign_nested(socket, :save_config, changeset: changeset, inflight: false)
+
+ {:transport_error, error} ->
+ assign_nested(socket, :save_config, error: error, inflight: false)
+ end
+
+ {:noreply, socket}
+ end
+
+ defp label(namespace, runtime, runtime_status) do
+ reconnecting? = reconnecting?(namespace, runtime)
+
+ case {reconnecting?, runtime_status} do
+ {true, :connected} -> "Reconnect"
+ {true, :connecting} -> "Connecting..."
+ _ -> "Connect"
+ end
+ end
+
+ defp reconnecting?(namespace, runtime) do
+ match?(%Runtime.K8s{config: %{namespace: ^namespace}}, runtime)
+ end
+
+ defp create_pvc(socket, pvc) do
+ namespace = socket.assigns.namespace
+ manifest = PVC.manifest(pvc, namespace)
+ req = socket.assigns.reqs.pvc
+
+ socket
+ |> start_async(:create_pvc, fn -> Kubereq.create(req, manifest) end)
+ |> assign_nested(:pvc_action, type: :new_inflight)
+ end
+
+ defp set_context(socket, nil), do: assign(socket, :context, nil)
+
+ defp set_context(socket, context) do
+ kubeconfig = Kubereq.Kubeconfig.set_current_context(socket.assigns.kubeconfig, context)
+
+ reqs = %{
+ access_reviews:
+ Kubereq.new(kubeconfig, "apis/authorization.k8s.io/v1/selfsubjectaccessreviews"),
+ namespaces: Kubereq.new(kubeconfig, "api/v1/namespaces/:name"),
+ pod: Kubereq.new(kubeconfig, "api/v1/namespaces/:namespace/pods/:name"),
+ pvc: Kubereq.new(kubeconfig, "api/v1/namespaces/:namespace/persistentvolumeclaims/:name"),
+ sc: Kubereq.new(kubeconfig, "apis/storage.k8s.io/v1/storageclasses/:name")
+ }
+
+ socket
+ |> start_async(:load_namespace_options, fn ->
+ [
+ Task.async(fn ->
+ Livebook.K8s.Auth.can_i?(reqs.access_reviews,
+ verb: "create",
+ group: "authorization.k8s.io",
+ version: "v1",
+ resource: "selfsubjectaccessreviews"
+ )
+ end),
+ Task.async(fn -> Kubereq.list(reqs.namespaces, nil) end)
+ ]
+ |> Task.await_many(:infinity)
+ end)
+ |> assign(
+ kubeconfig: kubeconfig,
+ context: context,
+ namespace: nil,
+ namespace_options: nil,
+ rbac_error: nil,
+ reqs: reqs,
+ cluster_check: %{status: :inflight, error: nil}
+ )
+ end
+
+ defp set_namespace(socket, nil) do
+ assign(socket, namespace: nil, rbac: %{status: :inflight, errors: [], permissions: []})
+ end
+
+ defp set_namespace(socket, ns) do
+ reqs = socket.assigns.reqs
+
+ socket
+ |> start_async(:rbac_check, fn ->
+ {required_permissions, optional_permissions} =
+ Auth.batch_check(reqs.access_reviews, [
+ # required permissions:
+ [verb: "get", version: "v1", resource: "pods", namespace: ns],
+ [verb: "list", version: "v1", resource: "pods", namespace: ns],
+ [verb: "watch", version: "v1", resource: "pods", namespace: ns],
+ [verb: "create", version: "v1", resource: "pods", namespace: ns],
+ [verb: "delete", version: "v1", resource: "pods", namespace: ns],
+ [verb: "create", version: "v1", resource: "pods/portforward", namespace: ns],
+ # optional permissions:
+ [verb: "list", version: "v1", resource: "persistentvolumeclaims", namespace: ns],
+ [verb: "create", version: "v1", resource: "persistentvolumeclaims", namespace: ns],
+ [verb: "delete", version: "v1", resource: "persistentvolumeclaims", namespace: ns],
+ [verb: "list", version: "v1", resource: "storageclasses", namespace: ns]
+ ])
+ |> Enum.split(6)
+
+ errors =
+ required_permissions
+ |> Enum.reject(&(&1 === :ok))
+ |> Enum.map(fn {:error, error} -> error end)
+
+ permissions =
+ optional_permissions
+ |> Enum.map(&(&1 === :ok))
+ |> then(&Enum.zip([:list_pvc, :create_pvc, :delete_pvc, :list_sc], &1))
+ |> Map.new()
+
+ %{errors: errors, permissions: permissions}
+ end)
+ |> assign(
+ namespace: ns,
+ rbac: %{status: :inflight, errors: :inflight, permissions: :inflight}
+ )
+ |> pvc_options()
+ end
+
+ def set_pod_template(socket, pod_template_yaml) do
+ namespace = socket.assigns.namespace
+
+ with {:parse, {:ok, pod_template}} <-
+ {:parse, YamlElixir.read_from_string(pod_template_yaml)},
+ {:validate, :ok} <- {:validate, Pod.validate_pod_template(pod_template, namespace)} do
+ assign(socket, :pod_template, %{template: pod_template_yaml, status: :valid, message: nil})
+ else
+ {:parse, {:error, error}} ->
+ assign(socket, :pod_template, %{
+ template: pod_template_yaml,
+ status: :error,
+ message: Exception.message(error)
+ })
+
+ {:validate, {:error, message}} ->
+ assign(socket, :pod_template, %{
+ template: pod_template_yaml,
+ status: :error,
+ message: message
+ })
+ end
+ end
+
+ defp pvc_options(%{assigns: %{rbac: %{permissions: %{list_pvc: false}}}} = socket) do
+ assign(socket, :pvcs, [])
+ end
+
+ defp pvc_options(socket) do
+ %{reqs: %{pvc: req}, namespace: ns} = socket.assigns
+
+ case Kubereq.list(req, ns) do
+ {:ok, %Req.Response{status: 200} = resp} ->
+ pvcs =
+ resp.body["items"]
+ |> Enum.reject(& &1["metadata"]["deletionTimestamp"])
+ |> Enum.map(& &1["metadata"]["name"])
+
+ socket
+ |> assign(:pvcs, pvcs)
+
+ _ ->
+ assign(socket, :pvcs, [])
+ end
+ end
+
+ defp storage_classes(%{rbac: %{permissions: %{list_sc: false}}}), do: []
+
+ defp storage_classes(assigns) do
+ %{reqs: %{sc: req}} = assigns
+
+ case Kubereq.list(req, nil) do
+ {:ok, %Req.Response{status: 200} = resp} ->
+ Enum.map(resp.body["items"], & &1["metadata"]["name"])
+
+ _ ->
+ []
+ end
+ end
+
+ defp config_secret_names(hub_secrets) do
+ names =
+ for %{name: name} <- hub_secrets,
+ String.starts_with?(name, @config_secret_prefix),
+ do: name
+
+ Enum.sort(names)
+ end
+
+ defp load_config_defaults(socket) do
+ config_defaults = socket.assigns.config_defaults
+
+ socket
+ |> assign(
+ home_pvc: config_defaults["home_pvc"],
+ docker_tag: config_defaults["docker_tag"]
+ )
+ |> set_context(config_defaults["context"])
+ |> set_namespace(config_defaults["namespace"])
+ |> set_pod_template(config_defaults["pod_template"])
+ end
+
+ defp config_secret_changeset(socket, attrs) do
+ hub = socket.assigns.hub
+ value = socket |> build_config() |> Jason.encode!()
+ secret = %Livebook.Secrets.Secret{hub_id: hub.id, name: nil, value: value}
+
+ secret
+ |> Livebook.Secrets.change_secret(attrs)
+ |> validate_format(:name, ~r/^#{@config_secret_prefix}\w+$/,
+ message: "must be in the format #{@config_secret_prefix}*"
+ )
+ end
+
+ defp save_config_secret(socket, secret, changeset) do
+ hub = socket.assigns.hub
+ exists? = Enum.any?(socket.assigns.hub_secrets, &(&1.name == secret.name))
+
+ socket
+ |> start_async(:save_config, fn ->
+ result =
+ if exists? do
+ Livebook.Hubs.update_secret(hub, secret)
+ else
+ Livebook.Hubs.create_secret(hub, secret)
+ end
+
+ with {:error, errors} <- result do
+ {:error,
+ changeset
+ |> Livebook.Utils.put_changeset_errors(errors)
+ |> Map.replace!(:action, :validate)}
+ end
+ end)
+ |> assign_nested(:save_config, inflight: true)
+ end
+
+ defp assign_nested(socket, key, keyword) do
+ update(socket, key, fn map ->
+ Enum.reduce(keyword, map, fn {key, value}, map -> Map.replace!(map, key, value) end)
+ end)
+ end
+
+ defp build_config(socket) do
+ %{
+ context: socket.assigns.context,
+ namespace: socket.assigns.namespace,
+ home_pvc: socket.assigns.home_pvc,
+ docker_tag: socket.assigns.docker_tag,
+ pod_template: socket.assigns.pod_template.template
+ }
+ end
+end
diff --git a/lib/livebook_web/live/session_live/runtime_component.ex b/lib/livebook_web/live/session_live/runtime_component.ex
index 0e3c561e5c6..4a339621df2 100644
--- a/lib/livebook_web/live/session_live/runtime_component.ex
+++ b/lib/livebook_web/live/session_live/runtime_component.ex
@@ -73,6 +73,15 @@ defmodule LivebookWeb.SessionLive.RuntimeComponent do
>
Fly.io machine
+ <.choice_button
+ :if={Livebook.Config.runtime_enabled?(Livebook.Runtime.K8s)}
+ active={@type == "k8s"}
+ phx-click="set_runtime_type"
+ phx-value-type="k8s"
+ phx-target={@myself}
+ >
+ Kubernetes Pod
+
type}, socket) do
diff --git a/mix.exs b/mix.exs
index 11cc56834f9..08db7dd8b4c 100644
--- a/mix.exs
+++ b/mix.exs
@@ -117,6 +117,8 @@ defmodule Livebook.MixProject do
{:mint_web_socket, "~> 1.0.0"},
{:protobuf, "~> 0.12.0"},
{:dns_cluster, "~> 0.1.2"},
+ {:kubereq, "~> 0.1.8"},
+ {:yaml_elixir, "~> 2.11"},
{:phoenix_live_reload, "~> 1.2", only: :dev},
{:floki, ">= 0.27.0", only: :test},
{:bypass, "~> 2.1", only: :test},
diff --git a/mix.lock b/mix.lock
index 4158e446bce..ecde2eab604 100644
--- a/mix.lock
+++ b/mix.lock
@@ -21,6 +21,7 @@
"jason": {:hex, :jason, "1.4.1", "af1504e35f629ddcdd6addb3513c3853991f694921b1b9368b0bd32beb9f1b63", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "fbb01ecdfd565b56261302f7e1fcc27c4fb8f32d56eab74db621fc154604a7a1"},
"jose": {:hex, :jose, "1.11.10", "a903f5227417bd2a08c8a00a0cbcc458118be84480955e8d251297a425723f83", [:mix, :rebar3], [], "hexpm", "0d6cd36ff8ba174db29148fc112b5842186b68a90ce9fc2b3ec3afe76593e614"},
"jsx": {:hex, :jsx, "3.1.0", "d12516baa0bb23a59bb35dccaf02a1bd08243fcbb9efe24f2d9d056ccff71268", [:rebar3], [], "hexpm", "0c5cc8fdc11b53cc25cf65ac6705ad39e54ecc56d1c22e4adb8f5a53fb9427f3"},
+ "kubereq": {:hex, :kubereq, "0.1.8", "d84b2a9cb0a5ae9e74243f0ff2d44d91db8e80c3b09498bdb7b1b562335416de", [:mix], [{:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}, {:pluggable, "~> 1.0", [hex: :pluggable, repo: "hexpm", optional: false]}, {:req, "~> 0.5.0", [hex: :req, repo: "hexpm", optional: false]}, {:yaml_elixir, "~> 2.0", [hex: :yaml_elixir, repo: "hexpm", optional: false]}], "hexpm", "de02c60caa2f76a8d72dad329fb8c019f88cb2dd3e2ac8241927e0dabe3b90ad"},
"makeup": {:hex, :makeup, "1.1.1", "fa0bc768698053b2b3869fa8a62616501ff9d11a562f3ce39580d60860c3a55e", [:mix], [{:nimble_parsec, "~> 1.2.2 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "5dc62fbdd0de44de194898b6710692490be74baa02d9d108bc29f007783b0b48"},
"makeup_elixir": {:hex, :makeup_elixir, "0.16.2", "627e84b8e8bf22e60a2579dad15067c755531fea049ae26ef1020cad58fe9578", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2.3 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "41193978704763f6bbe6cc2758b84909e62984c7752b3784bd3c218bb341706b"},
"makeup_erlang": {:hex, :makeup_erlang, "1.0.0", "6f0eff9c9c489f26b69b61440bf1b238d95badae49adac77973cbacae87e3c2e", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "ea7a9307de9d1548d2a72d299058d1fd2339e3d398560a0e46c27dab4891e4d2"},
@@ -42,6 +43,7 @@
"plug": {:hex, :plug, "1.16.1", "40c74619c12f82736d2214557dedec2e9762029b2438d6d175c5074c933edc9d", [:mix], [{:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:plug_crypto, "~> 1.1.1 or ~> 1.2 or ~> 2.0", [hex: :plug_crypto, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.3 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "a13ff6b9006b03d7e33874945b2755253841b238c34071ed85b0e86057f8cddc"},
"plug_cowboy": {:hex, :plug_cowboy, "2.7.1", "87677ffe3b765bc96a89be7960f81703223fe2e21efa42c125fcd0127dd9d6b2", [:mix], [{:cowboy, "~> 2.7", [hex: :cowboy, repo: "hexpm", optional: false]}, {:cowboy_telemetry, "~> 0.3", [hex: :cowboy_telemetry, repo: "hexpm", optional: false]}, {:plug, "~> 1.14", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm", "02dbd5f9ab571b864ae39418db7811618506256f6d13b4a45037e5fe78dc5de3"},
"plug_crypto": {:hex, :plug_crypto, "2.1.0", "f44309c2b06d249c27c8d3f65cfe08158ade08418cf540fd4f72d4d6863abb7b", [:mix], [], "hexpm", "131216a4b030b8f8ce0f26038bc4421ae60e4bb95c5cf5395e1421437824c4fa"},
+ "pluggable": {:hex, :pluggable, "1.1.0", "7eba3bc70c0caf4d9056c63c882df8862f7534f0145da7ab3a47ca73e4adb1e4", [:mix], [], "hexpm", "d12eb00ea47b21e92cd2700d6fbe3737f04b64e71b63aad1c0accde87c751637"},
"protobuf": {:hex, :protobuf, "0.12.0", "58c0dfea5f929b96b5aa54ec02b7130688f09d2de5ddc521d696eec2a015b223", [:mix], [{:jason, "~> 1.2", [hex: :jason, repo: "hexpm", optional: true]}], "hexpm", "75fa6cbf262062073dd51be44dd0ab940500e18386a6c4e87d5819a58964dc45"},
"ranch": {:hex, :ranch, "1.8.0", "8c7a100a139fd57f17327b6413e4167ac559fbc04ca7448e9be9057311597a1d", [:make, :rebar3], [], "hexpm", "49fbcfd3682fab1f5d109351b61257676da1a2fdbe295904176d5e521a2ddfe5"},
"req": {:hex, :req, "0.5.2", "70b4976e5fbefe84e5a57fd3eea49d4e9aa0ac015301275490eafeaec380f97f", [:mix], [{:brotli, "~> 0.3.1", [hex: :brotli, repo: "hexpm", optional: true]}, {:ezstd, "~> 1.0", [hex: :ezstd, repo: "hexpm", optional: true]}, {:finch, "~> 0.17", [hex: :finch, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}, {:mime, "~> 2.0.6 or ~> 2.1", [hex: :mime, repo: "hexpm", optional: false]}, {:nimble_csv, "~> 1.0", [hex: :nimble_csv, repo: "hexpm", optional: true]}, {:plug, "~> 1.0", [hex: :plug, repo: "hexpm", optional: true]}], "hexpm", "0c63539ab4c2d6ced6114d2684276cef18ac185ee00674ee9af4b1febba1f986"},
@@ -51,4 +53,6 @@
"thousand_island": {:hex, :thousand_island, "1.3.5", "6022b6338f1635b3d32406ff98d68b843ba73b3aa95cfc27154223244f3a6ca5", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "2be6954916fdfe4756af3239fb6b6d75d0b8063b5df03ba76fd8a4c87849e180"},
"websock": {:hex, :websock, "0.5.3", "2f69a6ebe810328555b6fe5c831a851f485e303a7c8ce6c5f675abeb20ebdadc", [:mix], [], "hexpm", "6105453d7fac22c712ad66fab1d45abdf049868f253cf719b625151460b8b453"},
"websock_adapter": {:hex, :websock_adapter, "0.5.6", "0437fe56e093fd4ac422de33bf8fc89f7bc1416a3f2d732d8b2c8fd54792fe60", [:mix], [{:bandit, ">= 0.6.0", [hex: :bandit, repo: "hexpm", optional: true]}, {:plug, "~> 1.14", [hex: :plug, repo: "hexpm", optional: false]}, {:plug_cowboy, "~> 2.6", [hex: :plug_cowboy, repo: "hexpm", optional: true]}, {:websock, "~> 0.5", [hex: :websock, repo: "hexpm", optional: false]}], "hexpm", "e04378d26b0af627817ae84c92083b7e97aca3121196679b73c73b99d0d133ea"},
+ "yamerl": {:hex, :yamerl, "0.10.0", "4ff81fee2f1f6a46f1700c0d880b24d193ddb74bd14ef42cb0bcf46e81ef2f8e", [:rebar3], [], "hexpm", "346adb2963f1051dc837a2364e4acf6eb7d80097c0f53cbdc3046ec8ec4b4e6e"},
+ "yaml_elixir": {:hex, :yaml_elixir, "2.11.0", "9e9ccd134e861c66b84825a3542a1c22ba33f338d82c07282f4f1f52d847bd50", [:mix], [{:yamerl, "~> 0.10", [hex: :yamerl, repo: "hexpm", optional: false]}], "hexpm", "53cc28357ee7eb952344995787f4bb8cc3cecbf189652236e9b163e8ce1bc242"},
}
diff --git a/rel/server/overlays/bin/start_runtime.exs b/rel/server/overlays/bin/start_runtime.exs
index e6c966a57f0..cfab9c41f52 100644
--- a/rel/server/overlays/bin/start_runtime.exs
+++ b/rel/server/overlays/bin/start_runtime.exs
@@ -6,10 +6,22 @@ File.cd!(System.user_home!())
dist_port: dist_port
} = System.fetch_env!("LIVEBOOK_RUNTIME") |> Base.decode64!() |> :erlang.binary_to_term()
-# This is the only Fly-specific part of starting Livebook as runtime
-app = System.fetch_env!("FLY_APP_NAME")
-machine_id = System.fetch_env!("FLY_MACHINE_ID")
-node = :"#{node_base}@#{machine_id}.vm.#{app}.internal"
+node =
+ cond do
+ System.get_env("FLY_APP_NAME") ->
+ # This is the only Fly-specific part of starting Livebook as runtime
+ app = System.fetch_env!("FLY_APP_NAME")
+ machine_id = System.fetch_env!("FLY_MACHINE_ID")
+ :"#{node_base}@#{machine_id}.vm.#{app}.internal"
+
+ System.get_env("POD_IP") ->
+ # This is the only K8s-specific part of starting Livebook as runtime
+ hostname = System.fetch_env!("POD_IP")
+ :"#{node_base}@#{hostname}"
+
+ true ->
+ raise "expected either POD_IP (for k8s) or FLY_APP_NAME (for Fly.io) to be set"
+ end
# We persist the information before the node is reachable
:persistent_term.put(:livebook_runtime_info, %{
diff --git a/test/livebook/runtime/k8s_test.exs b/test/livebook/runtime/k8s_test.exs
new file mode 100644
index 00000000000..f65552571a5
--- /dev/null
+++ b/test/livebook/runtime/k8s_test.exs
@@ -0,0 +1,145 @@
+defmodule Livebook.Runtime.K8sTest do
+ alias Livebook.Runtime
+ use ExUnit.Case, async: true
+
+ # To run these tests, install [Kind](https://kind.sigs.k8s.io/) on your machine.
+ @moduletag :k8s
+
+ @assert_receive_timeout 10_000
+ @cluster_name "livebook-runtime-test"
+ @kubeconfig_path "tmp/k8s_runtime/kubeconfig.yaml"
+
+ @default_pod_template """
+ apiVersion: v1
+ kind: Pod
+ metadata:
+ generateName: livebook-runtime-
+ labels:
+ livebook.dev/runtime: integration-test
+ spec:
+ containers:
+ - image: ghcr.io/livebook-dev/livebook:nightly
+ name: livebook-runtime
+ env:
+ - name: TEST_VAR
+ value: present
+
+ """
+ setup_all do
+ unless System.find_executable("kind") do
+ raise "kind is not installed"
+ end
+
+ clusters = cmd!(~w(kind get clusters)) |> String.split("\n", trim: true)
+
+ if @cluster_name not in clusters do
+ cmd!(~w(kind create cluster --name #{@cluster_name}))
+ end
+
+ # Export kubeconfig file
+ cmd!(~w(kind export kubeconfig --name #{@cluster_name} --kubeconfig #{@kubeconfig_path}))
+
+ # In most cases we can use the existing image, but when making
+ # changes to the remote runtime code, we need to build a new image
+ if System.get_env("TEST_K8S_BUILD_IMAGE") in ~w(true 1) do
+ {_, versions} = Code.eval_file("versions")
+
+ cmd!(~w(docker build
+ --build-arg BASE_IMAGE=hexpm/elixir:#{versions[:elixir]}-erlang-#{versions[:otp]}-ubuntu-#{versions[:ubuntu]}
+ --build-arg VARIANT=default
+ -t ghcr.io/livebook-dev/livebook:nightly .))
+ else
+ cmd!(~w(docker image pull ghcr.io/livebook-dev/livebook:nightly))
+ end
+
+ # Load container image into Kind cluster
+ cmd!(~w(kind load docker-image --name #{@cluster_name} ghcr.io/livebook-dev/livebook:nightly))
+
+ :ok
+ end
+
+ test "connecting flow" do
+ config = config()
+ req = req()
+
+ assert [] = list_pods(req)
+
+ pid = Runtime.K8s.new(config, req) |> Runtime.connect()
+
+ assert_receive {:runtime_connect_info, ^pid, "create pod"}, @assert_receive_timeout
+
+ assert_receive {:runtime_connect_info, ^pid, "waiting for pod"}, @assert_receive_timeout
+
+ assert_receive {:runtime_connect_info, ^pid, "created container livebook-runtime"},
+ @assert_receive_timeout
+
+ assert_receive {:runtime_connect_info, ^pid, "started container livebook-runtime"},
+ @assert_receive_timeout
+
+ assert_receive {:runtime_connect_info, ^pid, "start proxy"}, @assert_receive_timeout
+ assert_receive {:runtime_connect_info, ^pid, "connect to node"}, @assert_receive_timeout
+ assert_receive {:runtime_connect_info, ^pid, "initialize node"}, @assert_receive_timeout
+ assert_receive {:runtime_connect_done, ^pid, {:ok, runtime}}, @assert_receive_timeout
+
+ Runtime.take_ownership(runtime)
+
+ assert [_] = list_pods(req)
+
+ # Verify that we can actually evaluate code on the Kubernetes Pod
+ Runtime.evaluate_code(runtime, :elixir, ~s/System.fetch_env!("TEST_VAR")/, {:c1, :e1}, [])
+ assert_receive {:runtime_evaluation_response, :e1, %{type: :terminal_text, text: text}, _meta}
+ assert text =~ "present"
+
+ Runtime.disconnect(runtime)
+
+ # Wait for Pod to terminate
+ assert :ok ==
+ Kubereq.wait_until(
+ req,
+ "default",
+ runtime.pod_name,
+ &(&1["status"]["phase"] == "Succeeded")
+ )
+
+ # Finally, delete the Pod object
+ Kubereq.delete(req, "default", runtime.pod_name)
+ end
+
+ defp req() do
+ [Kubereq.Kubeconfig.ENV, {Kubereq.Kubeconfig.File, path: @kubeconfig_path}]
+ |> Kubereq.Kubeconfig.load()
+ |> Kubereq.new("api/v1/namespaces/:namespace/pods/:name")
+ end
+
+ defp config(attrs \\ %{}) do
+ defaults = %{
+ context: "kind-#{@cluster_name}",
+ namespace: "default",
+ home_pvc: nil,
+ docker_tag: "nightly",
+ pod_template: @default_pod_template
+ }
+
+ Map.merge(defaults, attrs)
+ end
+
+ defp list_pods(req) do
+ {:ok, resp} =
+ Kubereq.list(req, "default",
+ label_selectors: [{"livebook.dev/runtime", "integration-test"}],
+ field_selectors: [{"status.phase", "Running"}]
+ )
+
+ resp.body["items"]
+ end
+
+ defp cmd!([command | args]) do
+ {output, status} = System.cmd(command, args, stderr_to_stdout: true)
+
+ if status != 0 do
+ raise "command #{inspect(command)} #{inspect(args)} failed"
+ end
+
+ output
+ end
+end
diff --git a/test/livebook_web/live/session_live_test.exs b/test/livebook_web/live/session_live_test.exs
index af14557b7a8..0a737b30e6a 100644
--- a/test/livebook_web/live/session_live_test.exs
+++ b/test/livebook_web/live/session_live_test.exs
@@ -1171,6 +1171,201 @@ defmodule LivebookWeb.SessionLiveTest do
|> has_element?()
end
+ test "configuring k8s runtime", %{conn: conn, session: session} do
+ {:ok, view, _} = live(conn, ~p"/sessions/#{session.id}/settings/runtime")
+
+ Session.subscribe(session.id)
+
+ Req.Test.stub(:k8s_cluster, Livebook.K8sClusterStub)
+
+ view
+ |> element("#runtime-settings-modal button", "Kubernetes Pod")
+ |> render_click()
+
+ # Check context switcher and switch to context with no permission
+
+ view
+ |> element(~s{form[phx-change="set_context"]})
+ |> render_change(%{context: "no-permission"})
+
+ rendered = render_async(view)
+
+ assert rendered =~ "Authenticated user has no permission to"
+ refute rendered =~ "You can fully customize"
+
+ # Test cluster with full access
+
+ view
+ |> element(~s{form[phx-change="set_context"]})
+ |> render_change(%{context: "default"})
+
+ render_async(view)
+
+ view
+ |> element(~s{form[phx-change="set_namespace"]})
+ |> render_change(%{namespace: "default"})
+
+ assert view
+ |> element(~s{select[name="home_pvc"] option[value="foo-pvc"]})
+ |> has_element?()
+
+ assert view
+ |> element(~s{select[name="home_pvc"] option[value="new-pvc"]})
+ |> has_element?()
+
+ assert render_async(view) =~ "You can fully customize"
+
+ # Create new PVC
+
+ view
+ |> element(~s{button[phx-click="new_pvc"]})
+ |> render_click()
+
+ assert view
+ |> element(~s{form[phx-submit="create_pvc"]})
+ |> has_element?()
+
+ # Cancel button intermezzo
+
+ view
+ |> element(~s{button[phx-click="cancel_new_pvc"]})
+ |> render_click()
+
+ refute view
+ |> element(~s{form[phx-submit="create_pvc"]})
+ |> has_element?()
+
+ # Create new PVC again
+
+ view
+ |> element(~s{button[phx-click="new_pvc"]})
+ |> render_click()
+
+ assert view
+ |> element(
+ ~s{form[phx-submit="create_pvc"] select[name="pvc[storage_class]"] option[value="first-storage-class"]}
+ )
+ |> has_element?()
+
+ assert view
+ |> element(
+ ~s{form[phx-submit="create_pvc"] select[name="pvc[storage_class]"] option[value="second-storage-class"]}
+ )
+ |> has_element?()
+
+ assert view
+ |> element(~s{form[phx-submit="create_pvc"] button[type="submit"][disabled]})
+ |> has_element?()
+
+ view
+ |> element(~s{form[phx-submit="create_pvc"]})
+ |> render_change(%{pvc: %{name: "new-pvc", size_gb: 1}})
+
+ assert view
+ |> element(~s{form[phx-submit="create_pvc"] button[type="submit"]:not([disabled])})
+ |> has_element?()
+
+ Req.Test.expect(:k8s_cluster, Livebook.K8sClusterStub)
+
+ view
+ |> element(~s{form[phx-submit="create_pvc"]})
+ |> render_submit(%{pvc: %{name: "new-pvc", size_gb: 1}})
+
+ Req.Test.verify!()
+
+ # Delete a PVC
+
+ view
+ |> element(~s{button[phx-click="delete_pvc"]})
+ |> render_click()
+
+ assert render_async(view) =~
+ "Are you sure you want to irreversibly delete Persistent Volume Claim"
+
+ Req.Test.expect(:k8s_cluster, Livebook.K8sClusterStub)
+
+ view
+ |> element(~s{button[phx-click="confirm_delete_pvc"]})
+ |> render_click()
+
+ Req.Test.verify!()
+
+ # Pod Template Validation
+
+ refute render_async(view) =~ ~s/Make sure to define a valid resource of apiVersion /
+
+ view
+ |> element(~s{form[phx-change="set_pod_template"]})
+ |> render_change(%{pod_template: ""})
+
+ assert render_async(view) =~ ~s/Make sure to define a valid resource of apiVersion /
+
+ view
+ |> element(~s{form[phx-change="set_pod_template"]})
+ |> render_change(%{
+ pod_template: """
+ apiVersion: v1
+ kind: Pod
+ metadata:
+ generateName: livebook-runtime-
+ spec:
+ containers:
+ - name: other-name
+ """
+ })
+
+ assert render_async(view) =~ ~s/Main container is missing./
+
+ # We do not actually connect the runtime. We test connecting againast the
+ # real API separately
+ end
+
+ test "populates k8s runtime config form existing runtime", %{conn: conn, session: session} do
+ pod_template = """
+ apiVersion: v1
+ kind: Pod
+ metadata:
+ generateName: livebook-runtime-
+ labels:
+ livebook.dev/component: test
+ spec:
+ containers:
+ - name: livebook-runtime\
+ """
+
+ runtime =
+ Runtime.K8s.new(
+ %{
+ context: "default",
+ namespace: "default",
+ home_pvc: "foo-pvc",
+ docker_tag: "nightly",
+ pod_template: pod_template
+ },
+ nil
+ )
+
+ Req.Test.stub(:k8s_cluster, Livebook.K8sClusterStub)
+
+ Session.set_runtime(session.pid, runtime)
+
+ {:ok, view, _} = live(conn, ~p"/sessions/#{session.id}/settings/runtime")
+
+ assert render_async(view) =~ "You can fully customize"
+
+ assert view
+ |> element(~s{select[name="home_pvc"] option[value="foo-pvc"][selected]})
+ |> has_element?()
+
+ assert view
+ |> element(~s{select[name="home_pvc"] option[value="new-pvc"]})
+ |> has_element?()
+
+ assert view
+ |> element(~s{button[phx-click="init"]:not([disabled])})
+ |> has_element?()
+ end
+
test "saving and loading config from secret", %{conn: conn, session: session} do
runtime =
Runtime.Fly.new(%{
diff --git a/test/support/k8s_cluster_stub.ex b/test/support/k8s_cluster_stub.ex
new file mode 100644
index 00000000000..57e13a72b65
--- /dev/null
+++ b/test/support/k8s_cluster_stub.ex
@@ -0,0 +1,66 @@
+defmodule Livebook.K8sClusterStub do
+ use Plug.Router
+
+ require Logger
+
+ plug :match
+
+ plug Plug.Parsers,
+ parsers: [:urlencoded, :json],
+ json_decoder: Jason
+
+ plug :dispatch
+
+ post "/apis/authorization.k8s.io/v1/selfsubjectaccessreviews" do
+ resource = conn.body_params["spec"]["resourceAttributes"]["resource"]
+ allowed = conn.host == "default" or resource == "selfsubjectaccessreviews"
+
+ conn
+ |> put_status(201)
+ |> Req.Test.json(%{"status" => %{"allowed" => allowed}})
+ end
+
+ get "/api/v1/namespaces", host: "default" do
+ Req.Test.json(conn, %{"items" => [%{"metadata" => %{"name" => "default"}}]})
+ end
+
+ get "/api/v1/namespaces", host: "no-permission" do
+ send_resp(conn, 403, "")
+ end
+
+ get "apis/storage.k8s.io/v1/storageclasses", host: "default" do
+ Req.Test.json(conn, %{
+ "items" => [
+ %{"metadata" => %{"name" => "first-storage-class"}},
+ %{"metadata" => %{"name" => "second-storage-class"}}
+ ]
+ })
+ end
+
+ get "/api/v1/namespaces/default/persistentvolumeclaims", host: "default" do
+ Req.Test.json(conn, %{
+ "items" => [
+ %{"metadata" => %{"name" => "foo-pvc"}},
+ %{"metadata" => %{"name" => "new-pvc"}}
+ ]
+ })
+ end
+
+ delete "/api/v1/namespaces/default/persistentvolumeclaims/:name", host: "default" do
+ send_resp(conn, 200, "")
+ end
+
+ post "/api/v1/namespaces/default/persistentvolumeclaims", host: "default" do
+ conn
+ |> put_status(201)
+ |> Req.Test.json(%{"metadata" => %{"name" => "new-pvc"}})
+ end
+
+ match _ do
+ Logger.error("Unimplemented #{conn.method} Stub Request to #{conn.request_path}")
+
+ conn
+ |> put_status(500)
+ |> Req.Test.text("Endpoint not implemented")
+ end
+end
diff --git a/test/test_helper.exs b/test/test_helper.exs
index f822fe85c99..d1e0ca55d2d 100644
--- a/test/test_helper.exs
+++ b/test/test_helper.exs
@@ -15,7 +15,8 @@ Application.put_env(:livebook, :runtime_modules, [
Livebook.Runtime.Standalone,
Livebook.Runtime.Attached,
Livebook.Runtime.Embedded,
- Livebook.Runtime.Fly
+ Livebook.Runtime.Fly,
+ Livebook.Runtime.K8s
])
defmodule Livebook.Runtime.Embedded.Packages do
@@ -75,5 +76,5 @@ fly_exclude = if System.get_env("TEST_FLY_API_TOKEN"), do: [], else: [:fly]
ExUnit.start(
assert_receive_timeout: if(windows?, do: 5_000, else: 1_500),
- exclude: erl_docs_exclude ++ windows_exclude ++ teams_exclude ++ fly_exclude
+ exclude: erl_docs_exclude ++ windows_exclude ++ teams_exclude ++ fly_exclude ++ [:k8s]
)