Skip to content

Commit

Permalink
feat: Set steps.*.ip to a json list if the node is an aggregate
Browse files Browse the repository at this point in the history
Signed-off-by: Andrew Melnick <[email protected]>
  • Loading branch information
meln5674 committed Sep 20, 2024
1 parent ce7f9bf commit 8ed1252
Show file tree
Hide file tree
Showing 4 changed files with 197 additions and 20 deletions.
4 changes: 2 additions & 2 deletions docs/variables.md
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,8 @@ sprig.trim(inputs.parameters['my-string-param'])
| Variable | Description|
|----------|------------|
| `steps.name` | Name of the step |
| `steps.<STEPNAME>.id` | unique id of container step |
| `steps.<STEPNAME>.ip` | IP address of a previous daemon container step |
| `steps.<STEPNAME>.id` | unique id of container step. |
| `steps.<STEPNAME>.ip` | IP address of a previous daemon container step. When the previous step uses `withItems` or `withParams`, this contains a JSON array of the ips of each invocation |
| `steps.<STEPNAME>.status` | Phase status of any previous step |
| `steps.<STEPNAME>.exitCode` | Exit code of any previous script or container step |
| `steps.<STEPNAME>.startedAt` | Time-stamp when the step started |
Expand Down
2 changes: 2 additions & 0 deletions docs/walk-through/daemon-containers.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,3 +76,5 @@ spec:
```
Step templates use the `steps` prefix to refer to another step: for example `{{steps.influx.ip}}`. In DAG templates, the `tasks` prefix is used instead: for example `{{tasks.influx.ip}}`.

If the step or task uses `withSequence`, `withItems`, or `withParam`, then this will instead be set to a JSON list of IP addresses.
162 changes: 162 additions & 0 deletions test/e2e/daemon_pod_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,168 @@ func (s *DaemonPodSuite) TestMarkDaemonedPodSucceeded() {
})
}

func (s *DaemonPodSuite) TestDaemonPodIP() {
s.Given().
Workflow(`
metadata:
generateName: curl-
spec:
entrypoint: main
templates:
- name: main
dag:
tasks:
- name: nginx
template: nginx
- name: curl
dependencies: [nginx]
template: curl
arguments:
parameters: [{name: ip, value: '{{tasks.nginx.ip}}'}]
- name: nginx
daemon: true
container:
image: nginx:1.13
readinessProbe:
httpGet:
port: 80
- name: curl
inputs:
parameters:
- name: ip
container:
image: appropriate/curl
command: [curl, -vf, '{{inputs.parameters.ip}}']
`).
When().
SubmitWorkflow().
WaitForWorkflow(fixtures.ToBeSucceeded)

}

func (s *DaemonPodSuite) TestDaemonPodSequenceIPs() {
s.Given().
Workflow(`
metadata:
generateName: curl-
spec:
entrypoint: main
templates:
- name: main
dag:
tasks:
- name: nginxen
template: nginx
withSequence:
count: 3
- name: curl
dependencies: [nginxen]
template: curl
withParam: '{{tasks.nginxen.ip}}'
arguments:
parameters: [{name: ip, value: '{{item}}'}]
- name: nginx
daemon: true
container:
image: nginx:1.13
readinessProbe:
httpGet:
port: 80
- name: curl
inputs:
parameters:
- name: ip
container:
image: appropriate/curl
command: [curl, -vf, '{{inputs.parameters.ip}}']
`).
When().
SubmitWorkflow().
WaitForWorkflow(fixtures.ToBeSucceeded)
}

func (s *DaemonPodSuite) TestDaemonPodItemsIPs() {
s.Given().
Workflow(`
metadata:
generateName: curl-
spec:
entrypoint: main
templates:
- name: main
dag:
tasks:
- name: nginxen
template: nginx
withItems: [a, b, c]
- name: curl
dependencies: [nginxen]
template: curl
withParam: '{{tasks.nginxen.ip}}'
arguments:
parameters: [{name: ip, value: '{{item}}'}]
- name: nginx
daemon: true
container:
image: nginx:1.13
readinessProbe:
httpGet:
port: 80
- name: curl
inputs:
parameters:
- name: ip
container:
image: appropriate/curl
command: [curl, -vf, '{{inputs.parameters.ip}}']
`).
When().
SubmitWorkflow().
WaitForWorkflow(fixtures.ToBeSucceeded)
}

func (s *DaemonPodSuite) TestDaemonPodParamsIPs() {
s.Given().
Workflow(`
metadata:
generateName: curl-
spec:
entrypoint: main
arguments:
parameters: [{name: items, value: '["a","b","c"]'}]
templates:
- name: main
dag:
tasks:
- name: nginxen
template: nginx
withParam: '{{workflow.parameters.items}}'
- name: curl
dependencies: [nginxen]
template: curl
withParam: '{{tasks.nginxen.ip}}'
arguments:
parameters: [{name: ip, value: '{{item}}'}]
- name: nginx
daemon: true
container:
image: nginx:1.13
readinessProbe:
httpGet:
port: 80
- name: curl
inputs:
parameters:
- name: ip
container:
image: appropriate/curl
command: [curl, -vf, '{{inputs.parameters.ip}}']
`).
When().
SubmitWorkflow().
WaitForWorkflow(fixtures.ToBeSucceeded)
}

func TestDaemonPodSuite(t *testing.T) {
suite.Run(t, new(DaemonPodSuite))
}
49 changes: 31 additions & 18 deletions workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2835,7 +2835,6 @@ func (woc *wfOperationCtx) executeContainer(ctx context.Context, nodeName string
onExitPod: opts.onExitTemplate,
executionDeadline: opts.executionDeadline,
})

if err != nil {
return woc.requeueIfTransientErr(err, node.Name)
}
Expand Down Expand Up @@ -3191,28 +3190,34 @@ func (woc *wfOperationCtx) processAggregateNodeOutputs(scope *wfScope, prefix st
paramList := make([]map[string]string, 0)
outputParamValueLists := make(map[string][]string)
resultsList := make([]wfv1.Item, 0)
ipList := make([]string, 0)
for _, node := range childNodes {
if node.Outputs == nil || node.Phase != wfv1.NodeSucceeded || node.Type == wfv1.NodeTypeRetry {
if node.Type == wfv1.NodeTypeRetry {
continue
}
if len(node.Outputs.Parameters) > 0 {
param := make(map[string]string)
for _, p := range node.Outputs.Parameters {
param[p.Name] = p.Value.String()
outputParamValueList := outputParamValueLists[p.Name]
outputParamValueList = append(outputParamValueList, p.Value.String())
outputParamValueLists[p.Name] = outputParamValueList
if node.Outputs != nil && node.Phase == wfv1.NodeSucceeded {
if len(node.Outputs.Parameters) > 0 {
param := make(map[string]string)
for _, p := range node.Outputs.Parameters {
param[p.Name] = p.Value.String()
outputParamValueList := outputParamValueLists[p.Name]
outputParamValueList = append(outputParamValueList, p.Value.String())
outputParamValueLists[p.Name] = outputParamValueList
}
paramList = append(paramList, param)
}
paramList = append(paramList, param)
}
if node.Outputs.Result != nil {
// Support the case where item may be a map
var item wfv1.Item
err := json.Unmarshal([]byte(*node.Outputs.Result), &item)
if err != nil {
return err
if node.Outputs.Result != nil {
// Support the case where item may be a map
var item wfv1.Item
err := json.Unmarshal([]byte(*node.Outputs.Result), &item)
if err != nil {
return err
}
resultsList = append(resultsList, item)
}
resultsList = append(resultsList, item)
}
if node.Daemoned != nil && *node.Daemoned {
ipList = append(ipList, node.PodIP)
}
}
{
Expand All @@ -3238,6 +3243,14 @@ func (woc *wfOperationCtx) processAggregateNodeOutputs(scope *wfScope, prefix st
}
scope.addParamToScope(key, valueListJson)
}
{
ipListJSON, err := json.Marshal(ipList)
if err != nil {
return err
}
key := fmt.Sprintf("%s.ip", prefix)
scope.addParamToScope(key, string(ipListJSON))
}
return nil
}

Expand Down

0 comments on commit 8ed1252

Please sign in to comment.