-
Notifications
You must be signed in to change notification settings - Fork 356
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: support addTask and startTask for k8 RP [DET-3416, DET-3419] #798
Conversation
master/internal/core.go
Outdated
@@ -267,6 +267,39 @@ func (m *Master) rwCoordinatorWebSocket(socket *websocket.Conn, c echo.Context) | |||
return actorRef.AwaitTermination() | |||
} | |||
|
|||
func (m *Master) initializeResourceProviders(proxyRef *actor.Ref, provisionerSlotsPerInstance int) { | |||
var resourceProvider *actor.Ref | |||
if m.config.Scheduler.ResourceProvider.DefaultRPConfig != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
blocking: Be more explicit about selecting which case to use:
switch {
case ...DefaultRPConfig != nil:
...
case ...KubernetesRPConfig != nil:
...
default:
panic("...")
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
master/pkg/etc/etc.go
Outdated
@@ -28,6 +28,8 @@ const ( | |||
TrialEntrypointScriptResource = "entrypoint.sh" | |||
// AgentSetupScriptTemplateResource is the template for the script to run a dynamic agent. | |||
AgentSetupScriptTemplateResource = "agent_setup_script.sh.template" | |||
// InitContainerEntryScriptResource is the script to run the init container on k8s. | |||
InitContainerEntryScriptResource = "init_container_entrypoint.sh" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
non-blocking: Put something about Kubernetes into the name?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated
master/internal/kubernetes/pods.go
Outdated
) | ||
|
||
if !ok { | ||
return errors.Errorf(fmt.Sprintf("pod actor %s already exits", ref.Address().String())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return errors.Errorf(fmt.Sprintf("pod actor %s already exits", ref.Address().String())) | |
return errors.Errorf("pod actor %s already exists", ref.Address().String()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
return newTaskSummary(a.task) | ||
} | ||
|
||
type podAssignment struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
non-blocking: Probably these (both this and containerAssignment
) should just be moved into the respective *_resource_provider.go
files. That feels like a more natural grouping to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
moved them over, although it doesn't seem that great adding this to the default_resource_provider as that has bloated to 600+ lines.
k.receiveStartTask(ctx, msg) | ||
|
||
default: | ||
ctx.Log().Error("Unexpected message", reflect.TypeOf(msg)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like we don't normally bother logging these, so I'd probably just drop this (and the other instance in the PR). But if you really want to have it here, it should use %T
rather than reflect
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I find these logs useful, especially during development. I updated to not use reflect.
As part of this change added a pods actor that is part of the k8 RP, and added several additional configurations to the k8 RP config.
Translate task spec to pod spec and submit to k8.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dzhu I made the changes as a separate new commit.
master/internal/core.go
Outdated
@@ -267,6 +267,39 @@ func (m *Master) rwCoordinatorWebSocket(socket *websocket.Conn, c echo.Context) | |||
return actorRef.AwaitTermination() | |||
} | |||
|
|||
func (m *Master) initializeResourceProviders(proxyRef *actor.Ref, provisionerSlotsPerInstance int) { | |||
var resourceProvider *actor.Ref | |||
if m.config.Scheduler.ResourceProvider.DefaultRPConfig != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
master/internal/kubernetes/pods.go
Outdated
) | ||
|
||
if !ok { | ||
return errors.Errorf(fmt.Sprintf("pod actor %s already exits", ref.Address().String())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
return newTaskSummary(a.task) | ||
} | ||
|
||
type podAssignment struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
moved them over, although it doesn't seem that great adding this to the default_resource_provider as that has bloated to 600+ lines.
k.receiveStartTask(ctx, msg) | ||
|
||
default: | ||
ctx.Log().Error("Unexpected message", reflect.TypeOf(msg)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I find these logs useful, especially during development. I updated to not use reflect.
master/pkg/etc/etc.go
Outdated
@@ -28,6 +28,8 @@ const ( | |||
TrialEntrypointScriptResource = "entrypoint.sh" | |||
// AgentSetupScriptTemplateResource is the template for the script to run a dynamic agent. | |||
AgentSetupScriptTemplateResource = "agent_setup_script.sh.template" | |||
// InitContainerEntryScriptResource is the script to run the init container on k8s. | |||
InitContainerEntryScriptResource = "init_container_entrypoint.sh" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mostly nitpicks; see kubernetes_resource_provider.go
and volumes.go
for some that may actually matter.
func (k *kubernetesResourceProvider) receiveAddTask(ctx *actor.Context, msg AddTask) { | ||
actors.NotifyOnStop(ctx, msg.TaskHandler, taskStopped{Ref: msg.TaskHandler}) | ||
|
||
if task, ok := k.tasksByHandler[ctx.Sender()]; ok { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
blocking: Should this be msg.TaskHandler
? Isn't the sender always the resource providers actor due to forwarding?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh really good catch. Had the same mistake in the default RP also.
|
||
if task.SlotsNeeded()%k.slotsPerNode != 0 { | ||
ctx.Log().WithField("task ID", task.ID).Error( | ||
"task number of slots is not schedulable on the configured slots_per_node") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
non-blocking: This could bear to be more explicit:
"task number of slots is not schedulable on the configured slots_per_node") | |
"task is not schedulable: number of slots (%d) is not a multiple of slots_per_node (%d)", task.SlotsNeeded(), k.slotsPerNode) |
(adjusted for line length)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
master/internal/kubernetes/pod.go
Outdated
}, | ||
Spec: v1.PodSpec{ | ||
Volumes: volumes, | ||
HostNetwork: p.taskSpec.TaskContainerDefaults.NetworkMode == hostNetwork, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
blocking:
HostNetwork: p.taskSpec.TaskContainerDefaults.NetworkMode == hostNetwork, | |
HostNetwork: p.taskSpec.TaskContainerDefaults.NetworkMode.IsHost(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh yeah good idea
master/internal/kubernetes/pod.go
Outdated
"github.com/docker/docker/api/types/mount" | ||
|
||
"github.com/determined-ai/determined/master/pkg/container" | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
non-blocking: Put all the internal imports into one block.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
"github.com/determined-ai/determined/master/pkg/model" | ||
"github.com/determined-ai/determined/master/pkg/tasks" | ||
|
||
v1 "k8s.io/api/core/v1" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
non-blocking: corev1
or k8sv1
, maybe?
master/pkg/tasks/ports.go
Outdated
// LocalRendezvousPort is the start of the range of ports used for rendezvous by tasks. | ||
const LocalRendezvousPort = 1734 | ||
|
||
// LocalRendezvousPortOffset is the offset for rendezvous ports. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
non-blocking: Just "the offset" isn't super clear. Maybe "...the difference between the two rendezvous ports for each container"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated it
master/pkg/tasks/task.go
Outdated
@@ -18,16 +18,17 @@ import ( | |||
) | |||
|
|||
const ( | |||
containerWorkDir = "/run/determined/workdir" | |||
// ContainerWorkDir is working directory for containers. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// ContainerWorkDir is working directory for containers. | |
// ContainerWorkDir is the working directory for tasks. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated
master/pkg/tasks/task.go
Outdated
return user | ||
} | ||
|
||
// ConfigureCommandEnvVars configures environment variables for cmd tasks. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
non-blocking: ConfigureXXX
for all of the new functions here feels a bit off, since that sounds like something that actually goes out and tweaks settings somewhere as part of the function. I think just the XXX
part usually sounds good (CommandEnvVars
, CommandArchives
, etc.), or maybe XXXConfiguration
if that's too bare.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yep, updated these
master/pkg/tasks/task.go
Outdated
return envVarsMap | ||
} | ||
|
||
// ConfigureCommandArchives returns the additional files for a c as an archive. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// ConfigureCommandArchives returns the additional files for a c as an archive. | |
// ConfigureCommandArchives returns the additional files for a command as an archive. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
master/pkg/tasks/task.go
Outdated
envVars := defaultEnvVars() | ||
envVars = append(envVars, gcc.ExperimentConfig.Environment.EnvironmentVariables.For(deviceType)...) | ||
mounts := toDockerMounts(gcc.ExperimentConfig.BindMounts) | ||
// ConfigureGCEnvVars returns environment variables for gc. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
non-blocking:
// ConfigureGCEnvVars returns environment variables for gc. | |
// ConfigureGCEnvVars returns environment variables for checkpoint GC. |
(Also several spots below.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated
} | ||
|
||
func (k *kubernetesResourceProvider) getOrCreateGroup( | ||
handler *actor.Ref, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
non-blocking: Let's keep the context first everywhere it's passed as an argument.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
Description
This PR supports addTask and startTask for Kubernetes. It includes the initial implementation of the Kubernetes RP actor, the pods actor, and the pod actor. So far these actors take task spec and translate it to a pod spec amd submit the pod spec to k8.
Test Plan
Tested manually by running on a k8 cluster. Adding CI will be part of M4.
Commentary
One thing that I wish we could do better is not using scheduler/agentState to represent the Kubernetes resources in the scheduler. However, refactoring this will be a significant amount of work that we will plan to complete as future tech debt work.