Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: data injection to return errors #2720

Merged
merged 1 commit into from
Jul 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
182 changes: 92 additions & 90 deletions src/pkg/cluster/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"sort"
"strconv"
"strings"
"sync"
"time"

corev1 "k8s.io/api/core/v1"
Expand All @@ -32,12 +31,10 @@ import (

// HandleDataInjection waits for the target pod(s) to come up and inject the data into them
// todo: this currently requires kubectl but we should have enough k8s work to make this native now.
func (c *Cluster) HandleDataInjection(ctx context.Context, wg *sync.WaitGroup, data types.ZarfDataInjection, componentPath *layout.ComponentPaths, dataIdx int) {
defer wg.Done()
func (c *Cluster) HandleDataInjection(ctx context.Context, data types.ZarfDataInjection, componentPath *layout.ComponentPaths, dataIdx int) error {
injectionCompletionMarker := filepath.Join(componentPath.DataInjections, config.GetDataInjectionMarker())
if err := os.WriteFile(injectionCompletionMarker, []byte("🦄"), helpers.ReadWriteUser); err != nil {
message.WarnErrf(err, "Unable to create the data injection completion marker")
return
return fmt.Errorf("unable to create the data injection completion marker: %w", err)
}

tarCompressFlag := ""
Expand All @@ -59,103 +56,110 @@ func (c *Cluster) HandleDataInjection(ctx context.Context, wg *sync.WaitGroup, d
shell, shellArgs := exec.GetOSShell(exec.Shell{Windows: "cmd"})

if _, _, err := exec.Cmd(shell, append(shellArgs, "tar --version")...); err != nil {
message.WarnErr(err, "Unable to execute tar on this system. Please ensure it is installed and on your $PATH.")
return
return fmt.Errorf("unable to execute tar, ensure it is installed in the $PATH: %w", err)
}

iterator:
// The eternal loop because some data injections can take a very long time
for {
message.Debugf("Attempting to inject data into %s", data.Target)
source := filepath.Join(componentPath.DataInjections, filepath.Base(data.Target.Path))
if helpers.InvalidPath(source) {
// The path is likely invalid because of how we compose OCI components, add an index suffix to the filename
source = filepath.Join(componentPath.DataInjections, strconv.Itoa(dataIdx), filepath.Base(data.Target.Path))
select {
case <-ctx.Done():
return ctx.Err()
default:
message.Debugf("Attempting to inject data into %s", data.Target)
source := filepath.Join(componentPath.DataInjections, filepath.Base(data.Target.Path))
if helpers.InvalidPath(source) {
message.Warnf("Unable to find the data injection source path %s", source)
return
// The path is likely invalid because of how we compose OCI components, add an index suffix to the filename
source = filepath.Join(componentPath.DataInjections, strconv.Itoa(dataIdx), filepath.Base(data.Target.Path))
if helpers.InvalidPath(source) {
return fmt.Errorf("could not find the data injection source path %s", source)
}
}
}

target := podLookup{
Namespace: data.Target.Namespace,
Selector: data.Target.Selector,
Container: data.Target.Container,
}

// Wait until the pod we are injecting data into becomes available
pods := waitForPodsAndContainers(ctx, c.Clientset, target, podFilterByInitContainer)
if len(pods) < 1 {
continue
}
target := podLookup{
Namespace: data.Target.Namespace,
Selector: data.Target.Selector,
Container: data.Target.Container,
}

// Inject into all the pods
for _, pod := range pods {
// Try to use the embedded kubectl if we can
zarfCommand, err := utils.GetFinalExecutableCommand()
kubectlBinPath := "kubectl"
// Wait until the pod we are injecting data into becomes available
pods, err := waitForPodsAndContainers(ctx, c.Clientset, target, podFilterByInitContainer)
if err != nil {
message.Warnf("Unable to get the zarf executable path, falling back to host kubectl: %s", err)
} else {
kubectlBinPath = fmt.Sprintf("%s tools kubectl", zarfCommand)
return err
}
if len(pods) < 1 {
continue
}
kubectlCmd := fmt.Sprintf("%s exec -i -n %s %s -c %s ", kubectlBinPath, data.Target.Namespace, pod.Name, data.Target.Container)

// Note that each command flag is separated to provide the widest cross-platform tar support
tarCmd := fmt.Sprintf("tar -c %s -f -", tarCompressFlag)
untarCmd := fmt.Sprintf("tar -x %s -v -f - -C %s", tarCompressFlag, data.Target.Path)
// Inject into all the pods
for _, pod := range pods {
// Try to use the embedded kubectl if we can
zarfCommand, err := utils.GetFinalExecutableCommand()
kubectlBinPath := "kubectl"
if err != nil {
message.Warnf("Unable to get the zarf executable path, falling back to host kubectl: %s", err)
} else {
kubectlBinPath = fmt.Sprintf("%s tools kubectl", zarfCommand)
}
kubectlCmd := fmt.Sprintf("%s exec -i -n %s %s -c %s ", kubectlBinPath, data.Target.Namespace, pod.Name, data.Target.Container)

// Must create the target directory before trying to change to it for untar
mkdirCmd := fmt.Sprintf("%s -- mkdir -p %s", kubectlCmd, data.Target.Path)
if err := exec.CmdWithPrint(shell, append(shellArgs, mkdirCmd)...); err != nil {
message.Warnf("Unable to create the data injection target directory %s in pod %s", data.Target.Path, pod.Name)
continue iterator
}
// Note that each command flag is separated to provide the widest cross-platform tar support
tarCmd := fmt.Sprintf("tar -c %s -f -", tarCompressFlag)
untarCmd := fmt.Sprintf("tar -x %s -v -f - -C %s", tarCompressFlag, data.Target.Path)

cpPodCmd := fmt.Sprintf("%s -C %s . | %s -- %s",
tarCmd,
source,
kubectlCmd,
untarCmd,
)

// Do the actual data injection
if err := exec.CmdWithPrint(shell, append(shellArgs, cpPodCmd)...); err != nil {
message.Warnf("Error copying data into the pod %#v: %#v\n", pod.Name, err)
continue iterator
}
// Must create the target directory before trying to change to it for untar
mkdirCmd := fmt.Sprintf("%s -- mkdir -p %s", kubectlCmd, data.Target.Path)
if err := exec.CmdWithPrint(shell, append(shellArgs, mkdirCmd)...); err != nil {
return fmt.Errorf("unable to create the data injection target directory %s in pod %s: %w", data.Target.Path, pod.Name, err)
}

cpPodCmd := fmt.Sprintf("%s -C %s . | %s -- %s",
tarCmd,
source,
kubectlCmd,
untarCmd,
)

// Leave a marker in the target container for pods to track the sync action
cpPodCmd = fmt.Sprintf("%s -C %s %s | %s -- %s",
tarCmd,
componentPath.DataInjections,
config.GetDataInjectionMarker(),
kubectlCmd,
untarCmd,
)

if err := exec.CmdWithPrint(shell, append(shellArgs, cpPodCmd)...); err != nil {
message.Warnf("Error saving the zarf sync completion file after injection into pod %#v\n", pod.Name)
continue iterator
// Do the actual data injection
if err := exec.CmdWithPrint(shell, append(shellArgs, cpPodCmd)...); err != nil {
return fmt.Errorf("could not copy data into the pod %s: %w", pod.Name, err)
}

// Leave a marker in the target container for pods to track the sync action
cpPodCmd = fmt.Sprintf("%s -C %s %s | %s -- %s",
tarCmd,
componentPath.DataInjections,
config.GetDataInjectionMarker(),
kubectlCmd,
untarCmd,
)

if err := exec.CmdWithPrint(shell, append(shellArgs, cpPodCmd)...); err != nil {
return fmt.Errorf("could not save the Zarf sync completion file after injection into pod %s: %w", pod.Name, err)
}
}
}

// Do not look for a specific container after injection in case they are running an init container
podOnlyTarget := podLookup{
Namespace: data.Target.Namespace,
Selector: data.Target.Selector,
}
// Do not look for a specific container after injection in case they are running an init container
podOnlyTarget := podLookup{
Namespace: data.Target.Namespace,
Selector: data.Target.Selector,
}

// Block one final time to make sure at least one pod has come up and injected the data
// Using only the pod as the final selector because we don't know what the container name will be
// Still using the init container filter to make sure we have the right running pod
_ = waitForPodsAndContainers(ctx, c.Clientset, podOnlyTarget, podFilterByInitContainer)
// Block one final time to make sure at least one pod has come up and injected the data
// Using only the pod as the final selector because we don't know what the container name will be
// Still using the init container filter to make sure we have the right running pod
_, err = waitForPodsAndContainers(ctx, c.Clientset, podOnlyTarget, podFilterByInitContainer)
if err != nil {
return err
}

// Cleanup now to reduce disk pressure
_ = os.RemoveAll(source)
// Cleanup now to reduce disk pressure
err = os.RemoveAll(source)
if err != nil {
return err
}

// Return to stop the loop
return
// Return to stop the loop
return nil
}
}
}

Expand All @@ -173,7 +177,7 @@ type podFilter func(pod corev1.Pod) bool
// It will wait up to 90 seconds for the pods to be found and will return a list of matching pod names
// If the timeout is reached, an empty list will be returned.
// TODO: Test, refactor and/or remove.
func waitForPodsAndContainers(ctx context.Context, clientset kubernetes.Interface, target podLookup, include podFilter) []corev1.Pod {
func waitForPodsAndContainers(ctx context.Context, clientset kubernetes.Interface, target podLookup, include podFilter) ([]corev1.Pod, error) {
waitCtx, cancel := context.WithTimeout(ctx, 90*time.Second)
defer cancel()

Expand All @@ -183,16 +187,14 @@ func waitForPodsAndContainers(ctx context.Context, clientset kubernetes.Interfac
for {
select {
case <-waitCtx.Done():
message.Debug("Pod lookup failed: %v", ctx.Err())
return nil
return nil, ctx.Err()
case <-timer.C:
listOpts := metav1.ListOptions{
LabelSelector: target.Selector,
}
podList, err := clientset.CoreV1().Pods(target.Namespace).List(ctx, listOpts)
if err != nil {
message.Debug("Unable to find matching pods: %w", err)
return nil
return nil, err
}

message.Debug("Found %d pods for target %#v", len(podList.Items), target)
Expand Down Expand Up @@ -245,7 +247,7 @@ func waitForPodsAndContainers(ctx context.Context, clientset kubernetes.Interfac
}
}
if len(readyPods) > 0 {
return readyPods
return readyPods, nil
}
timer.Reset(3 * time.Second)
}
Expand Down
21 changes: 11 additions & 10 deletions src/pkg/packager/deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@ import (
"runtime"
"strconv"
"strings"
"sync"
"time"

"golang.org/x/sync/errgroup"

corev1 "k8s.io/api/core/v1"
kerrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -293,7 +294,6 @@ func (p *Packager) deployComponent(ctx context.Context, component types.ZarfComp
hasCharts := len(component.Charts) > 0
hasManifests := len(component.Manifests) > 0
hasRepos := len(component.Repos) > 0
hasDataInjections := len(component.DataInjections) > 0
hasFiles := len(component.Files) > 0

onDeploy := component.Actions.OnDeploy
Expand Down Expand Up @@ -344,14 +344,11 @@ func (p *Packager) deployComponent(ctx context.Context, component types.ZarfComp
}
}

if hasDataInjections {
waitGroup := sync.WaitGroup{}
defer waitGroup.Wait()

for idx, data := range component.DataInjections {
waitGroup.Add(1)
go p.cluster.HandleDataInjection(ctx, &waitGroup, data, componentPath, idx)
}
g, gCtx := errgroup.WithContext(ctx)
for idx, data := range component.DataInjections {
g.Go(func() error {
return p.cluster.HandleDataInjection(gCtx, data, componentPath, idx)
})
}

if hasCharts || hasManifests {
Expand All @@ -364,6 +361,10 @@ func (p *Packager) deployComponent(ctx context.Context, component types.ZarfComp
return charts, fmt.Errorf("unable to run component after action: %w", err)
}

err = g.Wait()
if err != nil {
return nil, err
}
phillebaba marked this conversation as resolved.
Show resolved Hide resolved
return charts, nil
}

Expand Down
Loading