Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Aggregating the output artifacts of parallel steps (fan-in) #934

Closed
jessesuen opened this issue Aug 3, 2018 · 30 comments · Fixed by #4618 or #4948
Closed

Aggregating the output artifacts of parallel steps (fan-in) #934

jessesuen opened this issue Aug 3, 2018 · 30 comments · Fixed by #4618 or #4948
Labels
type/feature Feature request
Milestone

Comments

@jessesuen
Copy link
Member

Is this a BUG REPORT or FEATURE REQUEST?: FEATURE REQUEST

What happened:

Separating this issue from #861 to handle aggregation of output artifacts. Similar to output parameters which have been expanded using loops, we need some mechanism to aggregate artifacts from parallel steps. With parameters, the solution was to introduce a new variable, steps.XXXX.outputs.parameters as a json list. For artifacts we need something similar. The trick is how we would place aggregate artifacts into a subsequent pod.

@edlee2121
Copy link
Contributor

edlee2121 commented Aug 30, 2018

When aggregating and importing the artifacts generated by steps in a loop, I think the most useful option may be to allow subsequent steps to reference a composite artifact that is a merge of the contents of all the output artifacts into a single composite artifact.

Each instance of the loop could use the iteration index or other parameter passed in to appropriately name the output file/directory so that the merge does not overwrite content. This gives the user control over how to name/structure the layout of the output artifact.

Example:
If the output artifact directory in each step is output-artifact-dir, each step would generate a file or artifact name something like output-artifact-dir/<input-param>-artifact.

The composite artifact could be referenced as steps.my-loop-step.outputs.artifacts.name1
Where name1 is the name of the artifact generated by the output steps. This would create a composite artifact my merging all output artifacts with the name ‘name1’. Note that, one could create multiple output artifacts in a loop step using different names.

What do you think?

@jessesuen
Copy link
Member Author

Each instance of the loop could use the iteration index or other parameter passed in to appropriately name the output file/directory so that the merge does not overwrite content. This gives the user control over how to name/structure the layout of the output artifact.

What if the user does not have control over the location of the output artifact. For example, what if the container being run is some off-the-shelf docker image, and the user wants something like /var/run/mysql.db to be the output artifact?

@jessesuen jessesuen modified the milestones: v2.2, V2.3 Aug 30, 2018
@edlee2121
Copy link
Contributor

Even with an off-the-shelf image, I think the user could override the command parameter to customize the name of the output file/directory.

If desired, we could support a second form for accessing the composite output artifact. E.g. if the input artifact is specified as, for example, steps.my-loop-step.output.artifacts.* the unpacked input artifacts could be placed at input-artifact-dir/{0,1,2,3,...}/. That is, each input artifact is unpacked into a subdirectory corresponding to an index number.

One draw back with this approach is that it only works well with loops. I think the first approach is more general.

@alexmt alexmt modified the milestones: v2.3, v2.4 Jan 25, 2019
@mamoit
Copy link

mamoit commented Apr 12, 2019

This seems to be a fairly stale issue, but it would be extremely handy for easily parallelizable tasks such as scraping multiple sources and generating large datasets that need to be merged in the end and analysed as a whole.
Just spent the past couple of hours trying to achieve this 😄

To be more specific, my use case is something like this:

  1. Get a list of names
  2. For each name generate a dataset (this may take some time and hence it would be ideal to have it parallelized)
  3. Gather the datasets that were created in parallel in the previous step and create a unified one.

This last step is exactly the use case for the feature described in this issue, everything else I was able to do.

@jessesuen
Copy link
Member Author

Since this bug was filed, we now support artifacts with archive: none: {}, and now there is a pattern developed where parallel steps can output to a common s3 "directory". The following step (which needs the aggregated outputs), will download the s3 "directory" as an artifact.

@jessesuen jessesuen removed this from the v2.4 milestone Apr 19, 2019
@fj-sanchez
Copy link

@jessesuen can you point me to that pattern where multiple steps can output to a common s3 directory?

@jessesuen
Copy link
Member Author

jessesuen commented Apr 19, 2019

Need to write a proper example, but the idea is that you would disable .tgz archiving on a file like so:
https://github.com/argoproj/argo/blob/master/examples/artifact-disable-archive.yaml#L38

And then the subsequent step would recursively download the parent s3 key as a "directory." The enhancement that was made in v2.2, is that if the S3 location appears to be a "directory" instead of a file, it performs a recursive download of all of the contents of that "directory." Directory is in quotes because S3 is really just a key/value store.

@fj-sanchez
Copy link

So, does your container job need to be aware of its "index" or is the template using it to save the artifacts with unique names (using the item index i.e.)?

@mostaphaRoudsari
Copy link
Contributor

@jessesuen, did you ever get a chance to add an example which uses archive: none: {} to pass outputs to common s3 directory for parallel task?

@laubosslink
Copy link

laubosslink commented Jul 15, 2019

@jessesuen extremely interesting as well to perform experiments, FYI (https://www.ovh.com/blog/simplify-your-research-experiments-with-kubernetes/)

@Downchuck
Copy link

This seems potentially broken for GCS per #1351

@Downchuck
Copy link

Downchuck commented Aug 1, 2019

@jessesuen: Is there a particular concern in trying to support tasks.X.outputs.artifacts when using loops, and simply extracting all matches? It's up to the workflow designer to ensure the overlay is correct.

@edlee2121 @fj-sanchez I'm suggesting we skip any complexity around indexes and just use unique names.

For example, this is working just fine in a loop to get the artifact onto storage:

    outputs:
      artifacts:
      - name: split
        path: "/tmp/split-{{inputs.parameters.offset}}"

All I'd need to pull out of storage is to just loop through the list, much as parameter aggregation does, and unpack the archives into the target folder.

Unfortunately, due to the GCS folders bug, I can't use the folder clone technique to just download all of the {{workspace.name}} key. #1351 -- I did go ahead and write a template to just use the cloud SDK and gsutil cp to do that work -- mentioned in the comment to that report.

Parameter aggregation is a little funky, {{pod.name}} is evaluated prior to the loop and so is not the same pod that actually writes out the artifact. This PR may address that: #1336

@sebinsua
Copy link

sebinsua commented Nov 12, 2019

Does anybody have a working example of how to do this?

If not, I'll work something out myself.


Edit 1: One thing I tried was using {inputs,outputs}.artifacts.s3 to access the same S3 bucket and key from different pods, however I would need them to use a key which is specific to the current workspace.name, and currently it doesn't seem that {{workspace.name}} is interpolated within inputs.artifacts.s3.key.


Edit 2: I got something to work! Here is my attempt: sebinsua/k8s-argo-parallel-aggregate-workflow

@TekTimmy
Copy link

TekTimmy commented Feb 11, 2020

Does anybody have a working example of how to do this?

If not, I'll work something out myself.

Edit 1: One thing I tried was using {inputs,outputs}.artifacts.s3 to access the same S3 bucket and key from different pods, however I would need them to use a key which is specific to the current workspace.name, and currently it doesn't seem that {{workspace.name}} is interpolated within inputs.artifacts.s3.key.

Edit 2: I got something to work! Here is my attempt: sebinsua/k8s-argo-parallel-aggregate-workflow

I have used that approach initially as well, it works as long as all steps are running on the same Node or the volume runs in ReadWriteMany mode. Minikube provides volumes in mode "ReadWriteMany" but EKS (AWS Kubernetes) EBS volumes do not support to be mounted to several nodes at once.
To implement this feature with ReadWriteOnce volumes the parallel running withParam step must all mount there own volume and when finished the "pre-aggregation" step has to mount all those volumes and copy the data into a single volume. Then the "aggregation" step can mount that single volume that contains all results.

@yoshua0x
Copy link

yoshua0x commented Mar 29, 2020

@sebinsua This example rocks, thanks for sharing!

I was able to get this running with ReadWriteMany volumes using nfs-server-provisioner on multiple cloud providers backed with volumes that are usually constrained to ReadWriteOnce.

@TekTimmy if you are still blocked on this, I provided a link to the helm chart below.

https://github.com/helm/charts/tree/master/stable/nfs-server-provisioner

@alexec
Copy link
Contributor

alexec commented Mar 30, 2020

Document the pattern. #2549

@alexec alexec added this to the v2.8 milestone Mar 30, 2020
@alexec alexec added the docs label Mar 30, 2020
@alexec alexec modified the milestones: v2.8, v2.9 Apr 27, 2020
@foobarbecue
Copy link
Contributor

foobarbecue commented May 20, 2020

I'm having a little trouble figuring out the state of this. Looks like there's a solution that works on a volume mount but not on an artifact store? EDIT: I see now, you can use the "hard-wired" s3 approach, specifying endpoint, bucket, etc. along with a directory key. Doesn't quite work for me because transferring the whole directory to the container is too much. I need to be able to use a parameter as part of the key name, or something like a withArtifact that would work similar to withParam. EDIT2: I just searched for withArtifact and found this #2758

@alexec alexec removed this from the v2.9 milestone Jul 24, 2020
@stale stale bot closed this as completed Aug 3, 2020
@alexec alexec removed the wontfix label Sep 30, 2020
@alexec
Copy link
Contributor

alexec commented Sep 30, 2020

Hmm. Should not have been closed.

@alexec alexec reopened this Sep 30, 2020
@alexec alexec added the type/feature Feature request label Sep 30, 2020
@alexec
Copy link
Contributor

alexec commented Sep 30, 2020

I've created an example of a map-reduce job in Argo Workflows that aggregates outputs. Please take a look

#4175

@Ark-kun
Copy link
Member

Ark-kun commented Oct 26, 2020

If desired, we could support a second form for accessing the composite output artifact. E.g. if the input artifact is specified as, for example, steps.my-loop-step.output.artifacts.* the unpacked input artifacts could be placed at input-artifact-dir/{0,1,2,3,...}/. That is, each input artifact is unpacked into a subdirectory corresponding to an index number.

I like this form, but I think there needs to be a way to access artifacts per-output. For example tasks.my-loop.output.artifacts.something. Then the artifacts are downloaded into input-artifact-dir/{0,1,2,3,...}/ sudirectories as you've proposed.

@Ark-kun
Copy link
Member

Ark-kun commented Nov 1, 2020

I've created an example of a map-reduce job in Argo Workflows that aggregates outputs. Please take a look
#4175

Interesting example.
The problem I see is that it requires explicit manipulation of the artifact location, which makes the workflows less reusable.

@Ark-kun
Copy link
Member

Ark-kun commented Nov 1, 2020

The map-reduce example gave me an idea how Argo could solve the artifact aggregation with minimal effort.
But then I understood that this is a bad idea and won't support DAGs or artifacts with custom URIs.

<bad_idea>
Suppose there is a normal task. It outputs all its artifacts to <run-id>/<task-id>/<output-name>.
Now we make this task a loop by adding withItems, withParam or withSequence.
What if the resulting sub-nodes would output their artifacts to <run-id>/<task-id>/<output-name>/<loop-idx>?
In this case the loop node's output artifact URI is still <run-id>/<task-id>/<output-name> and when downloaded by downstream component it will automatically contain artifacts from all lop iterations.
</bad_idea>

@Ark-kun
Copy link
Member

Ark-kun commented Nov 1, 2020

Perhaps we could implement artifact aggregation the same way as for parameters - the loop node should collect per-output artifact lists and the init executor should be able to download multiple artifacts under a single path.

We could make it possible to consume a list of artifacts (just for illustration - most users won't use this directly - only the init executor would see this):

name: aggregate
inputs:
  artifacts:
  - name: in-art-1
    path: /tmp/inputs/in-art-1/  #Artifacts end up in /tmp/inputs/in-art-1/{1,2,3,4}
    uris:
    - s3: ...
    - s3: ...

The artifact lists can be produced by loop nodes and passed to the aggregators:

name: dag-aggregate-task
template: aggregate
arguments:
- name: in-art-1
   from: {{tasks.loop-task.outputs.artifacts.out1}}

@Downchuck
Copy link

It's simply that map reduce patterns need some enthusiastic support.

I found the easiest way to explore the space was to actually mock out the yaml flows as objects. In one experiment, I just used javascript with a dag library.

Looking forward to seeing what the community comes up with (my experiment was nuked by the client); I'm just saying: yes, map reduce needs to happen, but you can experiment with it in just thinking about an in memory dag.

@alexec
Copy link
Contributor

alexec commented Dec 3, 2020

This can be achieved for bucket based artifacts (S3/GCP/OSS) using key-only artifacts. See #4618

@alexec alexec linked a pull request Jan 19, 2021 that will close this issue
1 task
@alexec alexec changed the title Aggregating the output artifacts of parallel steps Aggregating the output artifacts of parallel steps (fan-in) Jan 19, 2021
@alexec alexec added this to the v3.1 milestone Jan 19, 2021
@alexec alexec linked a pull request Jan 26, 2021 that will close this issue
1 task
@sarabala1979 sarabala1979 self-assigned this Jan 26, 2021
@sarabala1979 sarabala1979 reopened this Jan 26, 2021
@sarabala1979 sarabala1979 removed their assignment Jan 27, 2021
@alexec alexec closed this as completed Feb 27, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/feature Feature request
Projects
None yet