Skip to content

Commit

Permalink
Add --max-parallel to node-exec
Browse files Browse the repository at this point in the history
Fixes #396

Add `--max-parallel` flag to define the maximum number of parallel tasks.
  • Loading branch information
HeavyWombat committed Jun 18, 2021
1 parent ab70eb0 commit 6f6cbc8
Showing 1 changed file with 43 additions and 24 deletions.
67 changes: 43 additions & 24 deletions internal/cmd/nexec.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,13 @@ import (
const nodeDefaultCommand = "/bin/sh"

var (
nodeExecNoTty bool
nodeExecImage string
nodeExecTimeout int
nodeExecBlock bool
defaultImage = "alpine"
defaultTimeout = 10
nodeExecNoTty bool
nodeExecImage string
nodeExecTimeout int
nodeExecBlock bool
defaultImage = "alpine"
defaultTimeout = 10
nodeExecMaxParallel = 10
)

// nodeExecCmd represents the node-exec command
Expand Down Expand Up @@ -91,6 +92,7 @@ func init() {
nodeExecCmd.PersistentFlags().StringVar(&nodeExecImage, "image", defaultImage, "set image for helper pod from which the root-shell is accessed")
nodeExecCmd.PersistentFlags().IntVar(&nodeExecTimeout, "timeout", defaultTimeout, "set timout in seconds for the setup of the helper pod")
nodeExecCmd.PersistentFlags().BoolVar(&nodeExecBlock, "block", false, "show distributed shell output as block for each node")
nodeExecCmd.PersistentFlags().IntVar(&nodeExecMaxParallel, "max-parallel", 0, "number of parallel executions (defaults to number of nodes)")
}

func execInClusterNodes(args []string) error {
Expand Down Expand Up @@ -147,32 +149,49 @@ func execInClusterNodes(args []string) error {
// In distributed shell mode, TTY is forced to be disabled
nodeExecNoTty = true

// In case nothing is configured, all nodes will be executed concurrently
if nodeExecMaxParallel <= 0 || nodeExecMaxParallel > len(nodes) {
nodeExecMaxParallel = len(nodes)
}

type task struct {
node corev1.Node
reader io.Reader
}

var (
wg = &sync.WaitGroup{}
tasks = make(chan task, len(nodes))
readers = duplicateReader(os.Stdin, len(nodes))
output = make(chan OutputMsg)
errors = make(chan error, len(nodes))
printer = make(chan bool, 1)
)

wg.Add(len(nodes))
for i, node := range nodes {
go func(node corev1.Node, reader io.Reader) {
defer func() {
wg.Done()
}()

errors <- hvnr.NodeExec(
node,
nodeExecImage,
nodeExecTimeout,
command,
reader,
chanWriter("StdOut", node.Name, output),
chanWriter("StdErr", node.Name, output),
!nodeExecNoTty,
)
}(node, readers[i])
// Fill task queue with the list of nodes to be processed
for i := range nodes {
tasks <- task{reader: readers[i], node: nodes[i]}
}
close(tasks)

// Start n task workers to work on task queue
wg.Add(nodeExecMaxParallel)
for i := 0; i < nodeExecMaxParallel; i++ {
go func() {
defer wg.Done()
for task := range tasks {
errors <- hvnr.NodeExec(
task.node,
nodeExecImage,
nodeExecTimeout,
command,
task.reader,
chanWriter("StdOut", task.node.Name, output),
chanWriter("StdErr", task.node.Name, output),
!nodeExecNoTty,
)
}
}()
}

// Start the respective output printer in a separate Go routine
Expand Down

0 comments on commit 6f6cbc8

Please sign in to comment.