Skip to content
This repository has been archived by the owner on Oct 22, 2021. It is now read-only.

Commit

Permalink
Merge branch 'master' into v7.x
Browse files Browse the repository at this point in the history
  • Loading branch information
Mario Manno committed Apr 21, 2021
2 parents 20bcb4c + b39e610 commit 967bc08
Show file tree
Hide file tree
Showing 5 changed files with 98 additions and 61 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/pr.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ jobs:
if: contains(github.event.pull_request.labels.*.name, 'pr-test-queue')

steps:
- uses: actions-ecosystem/action-remove-labels@556e306
- uses: actions-ecosystem/action-remove-labels@v1
with:
github_token: ${{ secrets.GITHUB_TOKEN }}
labels: pr-test-queue
Expand Down Expand Up @@ -195,10 +195,10 @@ jobs:
git config user.email "[email protected]"
git config user.name "$GITHUB_ACTOR"
- name: Create k8s Kind Cluster
uses: engineerd/setup-kind@0.5.0
uses: engineerd/setup-kind@v0.5.0
with:
config: .github/kind-config.yaml
version: v0.7.0
version: v0.9.0
image: kindest/node:${{matrix.kubernetes_version}}

- name: Run cluster tests
Expand Down
1 change: 1 addition & 0 deletions pkg/bosh/bpmconverter/container_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ func capability(s []string) []corev1.Capability {
return capabilities
}

// proccessVolumentMounts returns the volumes for a process in a special order
func proccessVolumentMounts(defaultVolumeMounts []corev1.VolumeMount, processDisks bdm.Disks, ephemeralMount *corev1.VolumeMount, persistentDiskMount *corev1.VolumeMount) []corev1.VolumeMount {
bpmVolumeMounts := make([]corev1.VolumeMount, 0)
for _, processDisk := range processDisks {
Expand Down
145 changes: 89 additions & 56 deletions pkg/bosh/bpmconverter/containers.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ func (c *ContainerFactoryImpl) JobsToContainers(
return nil, errors.Errorf("instance group '%s' has no jobs defined", c.instanceGroupName)
}

// wait for that many drain stamps to appear
drainStampCount := strconv.Itoa(len(jobs))

// each job can produce multiple BPM process containers
for _, job := range jobs {
jobImage, err := c.releaseImageProvider.GetReleaseImage(c.instanceGroupName, job.Name)
if err != nil {
Expand All @@ -54,10 +58,12 @@ func (c *ContainerFactoryImpl) JobsToContainers(
}

for processIndex, process := range bpmConfig.Processes {
// extra volume mounts for this container
processDisks := jobDisks.Filter("process_name", process.Name)
volumeMounts := deduplicateVolumeMounts(proccessVolumentMounts(defaultVolumeMounts, processDisks, ephemeralMount, persistentDiskMount))

// The post-start script should be executed only once per job, so we set it up in the first
// process container.
// process container. (container-run wrapper)
var postStart postStart
if processIndex == 0 {
conditionProperty := bpmConfig.PostStart.Condition
Expand All @@ -73,22 +79,34 @@ func (c *ContainerFactoryImpl) JobsToContainers(
}
}

// each bpm process gets one container
container, err := bpmProcessContainer(
job.Name,
process.Name,
jobImage,
process,
proccessVolumentMounts(defaultVolumeMounts, processDisks, ephemeralMount, persistentDiskMount),
volumeMounts,
bpmConfig.Run.HealthCheck,
job.Properties.Quarks.Envs,
bpmConfig.Run.SecurityContext.DeepCopy(),
postStart,
strconv.Itoa(len(bpmConfig.Processes)),
)
if err != nil {
return []corev1.Container{}, err
}

// Setup the job drain script handler, on the first bpm
// container. There is only one BOSH drain script per
// job.
// Theoretically that container might be missing
// proccessVolumentMounts for the job's drain script.
if processIndex == 0 {
container.Lifecycle.PreStop = newDrainScript(job.Name, drainStampCount)
} else {
// all the other containers also should not terminate
container.Lifecycle.PreStop = newDrainWait(drainStampCount)
}

containers = append(containers, *container.DeepCopy())
}
}
Expand Down Expand Up @@ -136,6 +154,8 @@ type postStartCmd struct {
Name string
Arg []string
}

// postStart controls the --post-start-* feature of the container-run wrapper
type postStart struct {
command, condition *postStartCmd
}
Expand All @@ -150,7 +170,6 @@ func bpmProcessContainer(
quarksEnvs []corev1.EnvVar,
securityContext *corev1.SecurityContext,
postStart postStart,
processCount string,
) (corev1.Container, error) {
name := names.Sanitize(fmt.Sprintf("%s-%s", jobName, processName))

Expand Down Expand Up @@ -201,69 +220,19 @@ func bpmProcessContainer(
container := corev1.Container{
Name: names.Sanitize(name),
Image: jobImage,
VolumeMounts: deduplicateVolumeMounts(volumeMounts),
VolumeMounts: volumeMounts,
Command: command,
Args: args,
Env: newEnvs,
Lifecycle: &corev1.Lifecycle{},
WorkingDir: workdir,
SecurityContext: securityContext,
Lifecycle: &corev1.Lifecycle{},
Resources: corev1.ResourceRequirements{
Requests: process.Requests,
Limits: limits,
},
}

// Setup the job drain script handler.
drainScript := filepath.Join(VolumeJobsDirMountPath, jobName, "bin", "drain")
container.Lifecycle.PreStop = &corev1.Handler{
Exec: &corev1.ExecAction{
Command: []string{
"/bin/sh",
"-c",
`
shopt -s nullglob
waitExit() {
e="$1"
touch /mnt/drain-done/` + container.Name + `
while [ $(ls -1 /mnt/drain-done | wc -l) -lt ` + processCount + ` ]; do sleep 5; done
exit "$e"
}
s="` + drainScript + `"
(
if [ ! -x "$s" ]; then
waitExit 0
fi
echo "Running drain script $s"
while true; do
out=$($s)
status=$?
if [ "$status" -ne "0" ]; then
echo "$s FAILED with exit code $status"
waitExit $status
fi
if [ "$out" -lt "0" ]; then
echo "Sleeping dynamic draining wait time for $s..."
sleep ${out:1}
echo "Running $s again"
else
echo "Sleeping static draining wait time for $s..."
sleep $out
echo "$s done"
waitExit 0
fi
done
)&
done
echo "Waiting for subprocesses to finish..."
wait
echo "Done"`,
},
},
}

for name, hc := range healthChecks {
if name == process.Name {
if hc.ReadinessProbe != nil {
Expand Down Expand Up @@ -314,3 +283,67 @@ func generateBPMCommand(

return command, args
}

func newDrainScript(jobName string, processCount string) *corev1.Handler {
drainScript := filepath.Join(VolumeJobsDirMountPath, jobName, "bin", "drain")
return &corev1.Handler{
Exec: &corev1.ExecAction{
Command: []string{
"/bin/sh",
"-c",
`
shopt -s nullglob
waitExit() {
e="$1"
touch /mnt/drain-stamps/` + jobName + `
echo "Waiting for other drain scripts to finish."
while [ $(ls -1 /mnt/drain-stamps | wc -l) -lt ` + processCount + ` ]; do sleep 5; done
exit "$e"
}
s="` + drainScript + `"
if [ ! -x "$s" ]; then
waitExit 0
fi
echo "Running drain script $s for ` + jobName + `"
while true; do
out=$( $s )
status=$?
if [ "$status" -ne "0" ]; then
echo "$s FAILED with exit code $status"
waitExit $status
fi
if [ "$out" -lt "0" ]; then
echo "Sleeping dynamic draining wait time for $s..."
sleep ${out:1}
echo "Running $s again"
else
echo "Sleeping static draining wait time for $s..."
sleep $out
echo "$s done"
waitExit 0
fi
done
echo "Done"`,
},
},
}
}

func newDrainWait(processCount string) *corev1.Handler {
return &corev1.Handler{
Exec: &corev1.ExecAction{
Command: []string{
"/bin/sh",
"-c",
`
echo "Wait for drain scripts in other containers to finish"
while [ $(ls -1 /mnt/drain-stamps | wc -l) -lt ` + processCount + ` ]; do sleep 5; done
exit 0
echo "Done"
`,
},
},
}
}
1 change: 1 addition & 0 deletions pkg/bosh/manifest/disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ type Disk struct {
}

// Disks represents a slice of BPMResourceDisk.
// Part of the BOSH manifest at '<instance-group>.env.bosh.agent.settings.disks'.
type Disks []Disk

// MatchesFilter returns true if the disk matches the filter with one of its Filters.
Expand Down
6 changes: 4 additions & 2 deletions pkg/bosh/manifest/instance_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,8 @@ type PreRenderOps struct {
InstanceGroup OpsPatches `json:"instanceGroup,omitempty"`
}

// AgentSettings from BOSH deployment manifest.
// AgentSettings from BOSH deployment manifest,
// '<instance-group>.env.bosh.agent.settings'.
// These annotations and labels are added to kube resources.
// Affinity & tolerations are added into the pod's definition.
type AgentSettings struct {
Expand Down Expand Up @@ -299,7 +300,8 @@ type Agent struct {
Tmpfs *bool `json:"tmpfs,omitempty"`
}

// AgentEnvBoshConfig from BOSH deployment manifest.
// AgentEnvBoshConfig contains supported settings from the
// <instance-group>.env.bosh hash of the BOSH deployment manifest.
type AgentEnvBoshConfig struct {
Password string `json:"password,omitempty"`
KeepRootPassword string `json:"keep_root_password,omitempty"`
Expand Down

0 comments on commit 967bc08

Please sign in to comment.