Skip to content

Commit

Permalink
feat: add command infra endpoints (#986)
Browse files Browse the repository at this point in the history
  • Loading branch information
Yoni Ben-tzur authored Jul 31, 2020
1 parent b23fa52 commit 913fdc3
Show file tree
Hide file tree
Showing 27 changed files with 858 additions and 122 deletions.
76 changes: 8 additions & 68 deletions master/internal/agent/proto.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
package agent

import (
"github.com/golang/protobuf/ptypes/timestamp"

"github.com/determined-ai/determined/master/pkg/container"
"github.com/determined-ai/determined/master/pkg/device"
"github.com/determined-ai/determined/master/pkg/protoutils"
proto "github.com/determined-ai/determined/proto/pkg/agentv1"
)

Expand All @@ -15,76 +12,19 @@ func ToProtoAgent(a AgentSummary) *proto.Agent {
slots[s.ID] = toProtoSlot(s)
}
return &proto.Agent{
Id: a.ID,
RegisteredTime: &timestamp.Timestamp{
Seconds: a.RegisteredTime.Unix(),
Nanos: int32(a.RegisteredTime.Nanosecond()),
},
Slots: slots,
Containers: nil,
Label: a.Label,
Id: a.ID,
RegisteredTime: protoutils.ToTimestamp(a.RegisteredTime),
Slots: slots,
Containers: nil,
Label: a.Label,
}
}

func toProtoSlot(s SlotSummary) *proto.Slot {
var c *proto.Container
if s.Container != nil {
c = toProtoContainer(*s.Container)
}
return &proto.Slot{
Id: s.ID,
Device: toProtoDevice(s.Device),
Device: s.Device.Proto(),
Enabled: s.Enabled,
Container: c,
}
}

func toProtoContainer(c container.Container) *proto.Container {
var devices []*proto.Device
for _, d := range c.Devices {
devices = append(devices, toProtoDevice(d))
}
return &proto.Container{
Parent: c.Parent.String(),
Id: c.ID.String(),
State: toProtoContainerState(c.State),
Devices: devices,
}
}

func toProtoContainerState(s container.State) proto.Container_State {
switch s {
case container.Assigned:
return proto.Container_STATE_ASSIGNED
case container.Pulling:
return proto.Container_STATE_PULLING
case container.Starting:
return proto.Container_STATE_STARTING
case container.Running:
return proto.Container_STATE_RUNNING
case container.Terminated:
return proto.Container_STATE_TERMINATED
default:
return proto.Container_STATE_UNSPECIFIED
}
}

func toProtoDevice(d device.Device) *proto.Device {
return &proto.Device{
Id: int32(d.ID),
Brand: d.Brand,
Uuid: d.UUID,
Type: toProtoDeviceType(d.Type),
}
}

func toProtoDeviceType(t device.Type) proto.Device_Type {
switch t {
case device.CPU:
return proto.Device_TYPE_CPU
case device.GPU:
return proto.Device_TYPE_GPU
default:
return proto.Device_TYPE_UNSPECIFIED
Container: s.Container.Proto(),
}
}
29 changes: 29 additions & 0 deletions master/internal/api_command.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package internal

import (
"context"
"fmt"

"github.com/determined-ai/determined/proto/pkg/apiv1"
)

func (a *apiServer) GetCommands(
_ context.Context, req *apiv1.GetCommandsRequest,
) (resp *apiv1.GetCommandsResponse, err error) {
err = a.actorRequest("/commands", req, &resp)
if err != nil {
return nil, err
}
a.sort(resp.Commands, req.OrderBy, req.SortBy, apiv1.GetCommandsRequest_SORT_BY_ID)
return resp, a.paginate(&resp.Pagination, &resp.Commands, req.Offset, req.Limit)
}

func (a *apiServer) GetCommand(
_ context.Context, req *apiv1.GetCommandRequest) (resp *apiv1.GetCommandResponse, err error) {
return resp, a.actorRequest(fmt.Sprintf("/commands/%s", req.CommandId), req, &resp)
}

func (a *apiServer) KillCommand(
_ context.Context, req *apiv1.KillCommandRequest) (resp *apiv1.KillCommandResponse, err error) {
return resp, a.actorRequest(fmt.Sprintf("/commands/%s", req.CommandId), req, &resp)
}
29 changes: 29 additions & 0 deletions master/internal/api_notebook.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package internal

import (
"context"
"fmt"

"github.com/determined-ai/determined/proto/pkg/apiv1"
)

func (a *apiServer) GetNotebooks(
_ context.Context, req *apiv1.GetNotebooksRequest,
) (resp *apiv1.GetNotebooksResponse, err error) {
err = a.actorRequest("/notebooks", req, &resp)
if err != nil {
return nil, err
}
a.sort(resp.Notebooks, req.OrderBy, req.SortBy, apiv1.GetNotebooksRequest_SORT_BY_ID)
return resp, a.paginate(&resp.Pagination, &resp.Notebooks, req.Offset, req.Limit)
}

func (a *apiServer) GetNotebook(
_ context.Context, req *apiv1.GetNotebookRequest) (resp *apiv1.GetNotebookResponse, err error) {
return resp, a.actorRequest(fmt.Sprintf("/notebooks/%s", req.NotebookId), req, &resp)
}

func (a *apiServer) KillNotebook(
_ context.Context, req *apiv1.KillNotebookRequest) (resp *apiv1.KillNotebookResponse, err error) {
return resp, a.actorRequest(fmt.Sprintf("/notebooks/%s", req.NotebookId), req, &resp)
}
29 changes: 29 additions & 0 deletions master/internal/api_shell.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package internal

import (
"context"
"fmt"

"github.com/determined-ai/determined/proto/pkg/apiv1"
)

func (a *apiServer) GetShells(
_ context.Context, req *apiv1.GetShellsRequest,
) (resp *apiv1.GetShellsResponse, err error) {
err = a.actorRequest("/shells", req, &resp)
if err != nil {
return nil, err
}
a.sort(resp.Shells, req.OrderBy, req.SortBy, apiv1.GetShellsRequest_SORT_BY_ID)
return resp, a.paginate(&resp.Pagination, &resp.Shells, req.Offset, req.Limit)
}

func (a *apiServer) GetShell(
_ context.Context, req *apiv1.GetShellRequest) (resp *apiv1.GetShellResponse, err error) {
return resp, a.actorRequest(fmt.Sprintf("/shells/%s", req.ShellId), req, &resp)
}

func (a *apiServer) KillShell(
_ context.Context, req *apiv1.KillShellRequest) (resp *apiv1.KillShellResponse, err error) {
return resp, a.actorRequest(fmt.Sprintf("/shells/%s", req.ShellId), req, &resp)
}
31 changes: 31 additions & 0 deletions master/internal/api_tensorboard.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package internal

import (
"context"
"fmt"

"github.com/determined-ai/determined/proto/pkg/apiv1"
)

func (a *apiServer) GetTensorboards(
_ context.Context, req *apiv1.GetTensorboardsRequest,
) (resp *apiv1.GetTensorboardsResponse, err error) {
err = a.actorRequest("/tensorboards", req, &resp)
if err != nil {
return nil, err
}
a.sort(resp.Tensorboards, req.OrderBy, req.SortBy, apiv1.GetTensorboardsRequest_SORT_BY_ID)
return resp, a.paginate(&resp.Pagination, &resp.Tensorboards, req.Offset, req.Limit)
}

func (a *apiServer) GetTensorboard(
_ context.Context, req *apiv1.GetTensorboardRequest,
) (resp *apiv1.GetTensorboardResponse, err error) {
return resp, a.actorRequest(fmt.Sprintf("/tensorboards/%s", req.TensorboardId), req, &resp)
}

func (a *apiServer) KillTensorboard(
_ context.Context, req *apiv1.KillTensorboardRequest,
) (resp *apiv1.KillTensorboardResponse, err error) {
return resp, a.actorRequest(fmt.Sprintf("/tensorboards/%s", req.TensorboardId), req, &resp)
}
107 changes: 107 additions & 0 deletions master/internal/command/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,13 @@ import (
"github.com/determined-ai/determined/master/pkg/archive"
"github.com/determined-ai/determined/master/pkg/container"
"github.com/determined-ai/determined/master/pkg/model"
"github.com/determined-ai/determined/master/pkg/protoutils"
"github.com/determined-ai/determined/master/pkg/tasks"
"github.com/determined-ai/determined/proto/pkg/apiv1"
"github.com/determined-ai/determined/proto/pkg/commandv1"
"github.com/determined-ai/determined/proto/pkg/notebookv1"
"github.com/determined-ai/determined/proto/pkg/shellv1"
"github.com/determined-ai/determined/proto/pkg/tensorboardv1"
)

// terminatedDuration defines the amount of time the command stays in a
Expand Down Expand Up @@ -103,6 +109,55 @@ func (c *command) Receive(ctx *actor.Context) error {
ctx.Respond(newSummary(c))
}

case *notebookv1.Notebook:
ctx.Respond(c.toNotebook(ctx))

case *apiv1.GetNotebookRequest:
ctx.Respond(&apiv1.GetNotebookResponse{
Notebook: c.toNotebook(ctx),
Config: protoutils.ToStruct(c.config),
})

case *apiv1.KillNotebookRequest:
c.terminate(ctx)
ctx.Respond(&apiv1.KillNotebookResponse{Notebook: c.toNotebook(ctx)})

case *commandv1.Command:
ctx.Respond(c.toCommand(ctx))

case *apiv1.GetCommandRequest:
ctx.Respond(&apiv1.GetCommandResponse{
Command: c.toCommand(ctx),
Config: protoutils.ToStruct(c.config),
})

case *apiv1.KillCommandRequest:
c.terminate(ctx)
ctx.Respond(&apiv1.KillCommandResponse{Command: c.toCommand(ctx)})

case *shellv1.Shell:
ctx.Respond(c.toShell(ctx))

case *apiv1.GetShellRequest:
ctx.Respond(&apiv1.GetShellResponse{
Shell: c.toShell(ctx),
Config: protoutils.ToStruct(c.config),
})

case *apiv1.KillShellRequest:
c.terminate(ctx)
ctx.Respond(&apiv1.KillShellResponse{Shell: c.toShell(ctx)})

case *tensorboardv1.Tensorboard:
ctx.Respond(c.toTensorboard(ctx))

case *apiv1.GetTensorboardRequest:
ctx.Respond(&apiv1.GetTensorboardResponse{Tensorboard: c.toTensorboard(ctx)})

case *apiv1.KillTensorboardRequest:
c.terminate(ctx)
ctx.Respond(&apiv1.KillTensorboardResponse{Tensorboard: c.toTensorboard(ctx)})

case sproto.ContainerStateChanged:
c.container = &msg.Container
if msg.Container.State == container.Terminated {
Expand Down Expand Up @@ -199,3 +254,55 @@ func (c *command) exit(ctx *actor.Context, exitStatus string) {
ctx.Tell(c.eventStream, event{Snapshot: newSummary(c), ExitedEvent: c.exitStatus})
actors.NotifyAfter(ctx, terminatedDuration, terminateForGC{})
}

func (c *command) toNotebook(ctx *actor.Context) *notebookv1.Notebook {
return &notebookv1.Notebook{
Id: ctx.Self().Address().Local(),
Description: c.config.Description,
Container: c.container.Proto(),
StartTime: protoutils.ToTimestamp(ctx.Self().RegisteredTime()),
Username: c.owner.Username,
}
}

func (c *command) toCommand(ctx *actor.Context) *commandv1.Command {
return &commandv1.Command{
Id: ctx.Self().Address().Local(),
Description: c.config.Description,
Container: c.container.Proto(),
StartTime: protoutils.ToTimestamp(ctx.Self().RegisteredTime()),
Username: c.owner.Username,
}
}

func (c *command) toShell(ctx *actor.Context) *shellv1.Shell {
return &shellv1.Shell{
Id: ctx.Self().Address().Local(),
Description: c.config.Description,
StartTime: protoutils.ToTimestamp(ctx.Self().RegisteredTime()),
Container: c.container.Proto(),
PrivateKey: c.metadata["privateKey"].(string),
PublicKey: c.metadata["publicKey"].(string),
Username: c.owner.Username,
}
}

func (c *command) toTensorboard(ctx *actor.Context) *tensorboardv1.Tensorboard {
var eids []int32
for _, id := range c.metadata["experiment_ids"].([]int) {
eids = append(eids, int32(id))
}
var tids []int32
for _, id := range c.metadata["trial_ids"].([]int) {
tids = append(tids, int32(id))
}
return &tensorboardv1.Tensorboard{
Id: ctx.Self().Address().Local(),
Description: c.config.Description,
StartTime: protoutils.ToTimestamp(ctx.Self().RegisteredTime()),
Container: c.container.Proto(),
ExperimentIds: eids,
TrialIds: tids,
Username: c.owner.Username,
}
}
9 changes: 9 additions & 0 deletions master/internal/command/command_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
"github.com/determined-ai/determined/master/pkg/actor"
"github.com/determined-ai/determined/master/pkg/check"
"github.com/determined-ai/determined/master/pkg/model"
"github.com/determined-ai/determined/proto/pkg/apiv1"
"github.com/determined-ai/determined/proto/pkg/commandv1"
)

// If an entrypoint is specified as a singleton string, Determined will follow the "shell form"
Expand All @@ -29,6 +31,13 @@ type commandManager struct {

func (c *commandManager) Receive(ctx *actor.Context) error {
switch msg := ctx.Message().(type) {
case *apiv1.GetCommandsRequest:
resp := &apiv1.GetCommandsResponse{}
for _, command := range ctx.AskAll(&commandv1.Command{}, ctx.Children()...).GetAll() {
resp.Commands = append(resp.Commands, command.(*commandv1.Command))
}
ctx.Respond(resp)

case echo.Context:
c.handleAPIRequest(ctx, msg)
}
Expand Down
9 changes: 9 additions & 0 deletions master/internal/command/notebook_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
"github.com/determined-ai/determined/master/pkg/check"
"github.com/determined-ai/determined/master/pkg/etc"
"github.com/determined-ai/determined/master/pkg/model"
"github.com/determined-ai/determined/proto/pkg/apiv1"
"github.com/determined-ai/determined/proto/pkg/notebookv1"
)

const (
Expand Down Expand Up @@ -103,6 +105,13 @@ type notebookManager struct {

func (n *notebookManager) Receive(ctx *actor.Context) error {
switch msg := ctx.Message().(type) {
case *apiv1.GetNotebooksRequest:
resp := &apiv1.GetNotebooksResponse{}
for _, notebook := range ctx.AskAll(&notebookv1.Notebook{}, ctx.Children()...).GetAll() {
resp.Notebooks = append(resp.Notebooks, notebook.(*notebookv1.Notebook))
}
ctx.Respond(resp)

case echo.Context:
n.handleAPIRequest(ctx, msg)
}
Expand Down
Loading

0 comments on commit 913fdc3

Please sign in to comment.