Skip to content

Commit

Permalink
Add all and worker node type to kubectl ray log (ray-project#2442)
Browse files Browse the repository at this point in the history
  • Loading branch information
chiayi authored Oct 24, 2024
1 parent c86b03b commit 8b61b73
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 114 deletions.
98 changes: 57 additions & 41 deletions kubectl-plugin/pkg/cmd/log/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,12 @@ import (
const filePathInPod = "/tmp/ray/session_latest/logs/"

type ClusterLogOptions struct {
configFlags *genericclioptions.ConfigFlags
ioStreams *genericclioptions.IOStreams
Executor RemoteExecutor
outputDir string
nodeType string
args []string
configFlags *genericclioptions.ConfigFlags
ioStreams *genericclioptions.IOStreams
Executor RemoteExecutor
outputDir string
nodeType string
ResourceName string
}

var (
Expand All @@ -44,14 +44,20 @@ var (
`)

logExample = templates.Examples(`
# Download logs from a RayCluster and save them to a directory with the RayCluster's name
# Download logs from a RayCluster and save them to a directory with the RayCluster's name. Retrieves 'all' logs
kubectl ray log my-raycluster
# Download logs from a RayCluster and save them to a directory named /path/to/dir
kubectl ray log my-raycluster --out-dir /path/to/dir
# Download logs from a RayCluster, but only for the head node
kubectl ray log my-raycluster --node-type head
# Download logs from a RayCluster, but only for the worker nodes
kubectl ray log my-raycluster --node-type worker
# Download all (worker node and head node) the logs from a RayCluster
kubectl ray log my-raycluster --node-type all
`)
)

Expand All @@ -77,7 +83,7 @@ func NewClusterLogCommand(streams genericclioptions.IOStreams) *cobra.Command {
SilenceUsage: true,
ValidArgsFunction: completion.RayClusterCompletionFunc(cmdFactory),
RunE: func(cmd *cobra.Command, args []string) error {
if err := options.Complete(args); err != nil {
if err := options.Complete(cmd, args); err != nil {
return err
}
if err := options.Validate(); err != nil {
Expand All @@ -87,16 +93,20 @@ func NewClusterLogCommand(streams genericclioptions.IOStreams) *cobra.Command {
},
}
cmd.Flags().StringVar(&options.outputDir, "out-dir", options.outputDir, "File Directory PATH of where to download the file logs to.")
cmd.Flags().StringVar(&options.nodeType, "node-type", options.nodeType, "Type of Ray node to download the files for.")
cmd.Flags().StringVar(&options.nodeType, "node-type", options.nodeType, "Type of Ray node to download the files for, supports 'worker', 'head', or 'all'")
options.configFlags.AddFlags(cmd.Flags())
return cmd
}

func (options *ClusterLogOptions) Complete(args []string) error {
options.args = args
func (options *ClusterLogOptions) Complete(cmd *cobra.Command, args []string) error {
if len(args) != 1 {
return cmdutil.UsageErrorf(cmd, "%s", cmd.Use)
}

options.ResourceName = args[0]

if options.nodeType == "" {
options.nodeType = "head"
options.nodeType = "all"
}

return nil
Expand All @@ -112,12 +122,9 @@ func (options *ClusterLogOptions) Validate() error {
return fmt.Errorf("no context is currently set, use %q to select a new one", "kubectl config use-context <context>")
}

// Command must have ray cluster name
if len(options.args) != 1 {
return fmt.Errorf("must have at only one argument")
} else if options.outputDir == "" {
if options.outputDir == "" {
fmt.Fprintln(options.ioStreams.Out, "No output directory specified, creating dir under current directory using cluster name.")
options.outputDir = options.args[0]
options.outputDir = options.ResourceName
err := os.MkdirAll(options.outputDir, 0o755)
if err != nil {
return fmt.Errorf("could not create directory with cluster name %s: %w", options.outputDir, err)
Expand All @@ -126,11 +133,11 @@ func (options *ClusterLogOptions) Validate() error {

switch options.nodeType {
case "all":
return fmt.Errorf("node type `all` is currently not supported")
fmt.Fprintln(options.ioStreams.Out, "Command set to retrieve both head and worker node logs.")
case "head":
break
fmt.Fprintln(options.ioStreams.Out, "Command set to retrieve only head node logs.")
case "worker":
return fmt.Errorf("node type `worker` is currently not supported")
fmt.Fprintln(options.ioStreams.Out, "Command set to retrieve only worker node logs.")
default:
return fmt.Errorf("unknown node type `%s`", options.nodeType)
}
Expand All @@ -154,43 +161,51 @@ func (options *ClusterLogOptions) Run(ctx context.Context, factory cmdutil.Facto
}

var listopts v1.ListOptions
if options.nodeType == "head" {
if options.nodeType == "all" {
listopts = v1.ListOptions{
LabelSelector: fmt.Sprintf("ray.io/group=headgroup, ray.io/cluster=%s", options.args[0]),
LabelSelector: fmt.Sprintf("ray.io/cluster=%s", options.ResourceName),
}
} else if options.nodeType == "head" {
listopts = v1.ListOptions{
LabelSelector: fmt.Sprintf("ray.io/node-type=head, ray.io/cluster=%s", options.ResourceName),
}
} else if options.nodeType == "worker" {
listopts = v1.ListOptions{
LabelSelector: fmt.Sprintf("ray.io/node-type=worker, ray.io/cluster=%s", options.ResourceName),
}
}

// Get list of nodes that are considered ray heads
rayHeads, err := kubeClientSet.CoreV1().Pods(*options.configFlags.Namespace).List(ctx, listopts)
// Get list of nodes that are considered the specified node type
rayNodes, err := kubeClientSet.CoreV1().Pods(*options.configFlags.Namespace).List(ctx, listopts)
if err != nil {
return fmt.Errorf("failed to retrieve head node for cluster %s: %w", options.args[0], err)
return fmt.Errorf("failed to retrieve head node for RayCluster %s: %w", options.ResourceName, err)
}

// Get a list of logs of the ray heads.
// Get a list of logs of the ray nodes.
var logList []*bytes.Buffer
for _, rayHead := range rayHeads.Items {
request := kubeClientSet.CoreV1().Pods(rayHead.Namespace).GetLogs(rayHead.Name, &corev1.PodLogOptions{})
for _, rayNode := range rayNodes.Items {
request := kubeClientSet.CoreV1().Pods(rayNode.Namespace).GetLogs(rayNode.Name, &corev1.PodLogOptions{})

podLogs, err := request.Stream(ctx)
if err != nil {
return fmt.Errorf("Error retrieving log for kuberay-head %s: %w", rayHead.Name, err)
return fmt.Errorf("Error retrieving log for RayCluster node %s: %w", rayNode.Name, err)
}
defer podLogs.Close()

// Get current logs:
buf := new(bytes.Buffer)
_, err = io.Copy(buf, podLogs)
if err != nil {
return fmt.Errorf("Failed to get read current logs for kuberay-head %s: %w", rayHead.Name, err)
return fmt.Errorf("Failed to get read current logs for RayCluster Node %s: %w", rayNode.Name, err)
}

logList = append(logList, buf)
}

// Pod file name format is name of the ray head
// Pod file name format is name of the ray node
for ind, logList := range logList {
curFilePath := filepath.Join(options.outputDir, rayHeads.Items[ind].Name, "stdout.log")
dirPath := filepath.Join(options.outputDir, rayHeads.Items[ind].Name)
curFilePath := filepath.Join(options.outputDir, rayNodes.Items[ind].Name, "stdout.log")
dirPath := filepath.Join(options.outputDir, rayNodes.Items[ind].Name)
err := os.MkdirAll(dirPath, 0o755)
if err != nil {
return fmt.Errorf("failed to create directory within path %s: %w", dirPath, err)
Expand All @@ -203,14 +218,14 @@ func (options *ClusterLogOptions) Run(ctx context.Context, factory cmdutil.Facto

_, err = logList.WriteTo(file)
if err != nil {
return fmt.Errorf("failed to write to file for kuberay-head: %s: %w", rayHeads.Items[ind].Name, err)
return fmt.Errorf("failed to write to file for kuberay-head: %s: %w", rayNodes.Items[ind].Name, err)
}

req := kubeClientSet.CoreV1().RESTClient().
Get().
Namespace(rayHeads.Items[ind].Namespace).
Namespace(rayNodes.Items[ind].Namespace).
Resource("pods").
Name(rayHeads.Items[ind].Name).
Name(rayNodes.Items[ind].Name).
SubResource("exec").
VersionedParams(&corev1.PodExecOptions{
Command: []string{"tar", "--warning=no-file-changed", "-cf", "-", "-C", filePathInPod, "."},
Expand All @@ -230,7 +245,7 @@ func (options *ClusterLogOptions) Run(ctx context.Context, factory cmdutil.Facto
return fmt.Errorf("failed to create executor with error: %w", err)
}

err = options.downloadRayLogFiles(ctx, exec, rayHeads.Items[ind])
err = options.downloadRayLogFiles(ctx, exec, rayNodes.Items[ind])
if err != nil {
return fmt.Errorf("failed to download ray head log files with error: %w", err)
}
Expand All @@ -251,7 +266,7 @@ func (dre *DefaultRemoteExecutor) CreateExecutor(restConfig *rest.Config, url *u
}

// downloadRayLogFiles will use to the executor and retrieve the logs file from the inputted ray head
func (options *ClusterLogOptions) downloadRayLogFiles(ctx context.Context, exec remotecommand.Executor, rayhead corev1.Pod) error {
func (options *ClusterLogOptions) downloadRayLogFiles(ctx context.Context, exec remotecommand.Executor, rayNode corev1.Pod) error {
outreader, outStream := io.Pipe()
go func() {
defer outStream.Close()
Expand All @@ -270,16 +285,17 @@ func (options *ClusterLogOptions) downloadRayLogFiles(ctx context.Context, exec
tarReader := tar.NewReader(outreader)
header, err := tarReader.Next()
if err != nil && !errors.Is(err, io.EOF) {
return fmt.Errorf("error will extracting head tar file for ray head %s: %w", rayhead.Name, err)
return fmt.Errorf("error will extracting head tar file for ray head %s: %w", rayNode.Name, err)
}

fmt.Fprintf(options.ioStreams.Out, "Downloading log for Ray Node %s\n", rayNode.Name)
for !errors.Is(err, io.EOF) {
fmt.Printf("Downloading file %s for Ray Head %s\n", header.Name, rayhead.Name)
if err != nil {
return fmt.Errorf("Error reading tar archive: %w", err)
}

// Construct the full local path and a directory for the tmp file logs
localFilePath := filepath.Join(path.Clean(options.outputDir), path.Clean(rayhead.Name), path.Clean(header.Name))
localFilePath := filepath.Join(path.Clean(options.outputDir), path.Clean(rayNode.Name), path.Clean(header.Name))

switch header.Typeflag {
case tar.TypeDir:
Expand Down
Loading

0 comments on commit 8b61b73

Please sign in to comment.