Skip to content

Commit

Permalink
SDK - Added support for raw artifact values to ContainerOp
Browse files Browse the repository at this point in the history
  • Loading branch information
Ark-kun committed Feb 7, 2019
1 parent cab950c commit f2db334
Show file tree
Hide file tree
Showing 6 changed files with 229 additions and 0 deletions.
16 changes: 16 additions & 0 deletions sdk/python/kfp/compiler/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,22 @@ def _build_conventional_artifact(name):
template['container']['command'] = processed_command
if input_parameters:
template['inputs'] = {'parameters': input_parameters}

#Adding input artifacts section
#Only constant arguments are supported for now
#Constant arguments are compiled to Argo's input artifact default values, not as real arguments
if op.input_artifact_paths:
input_artifacts = template.setdefault('inputs', {}).setdefault('artifacts', [])
for input_name, path in op.input_artifact_paths.items():
input_artifact = {'name': input_name, 'path': path}
if op.input_artifact_arguments:
input_artifact_value = op.input_artifact_arguments.get(input_name, None)
if input_artifact_value:
if not isinstance(input_artifact_value, str):
raise TypeError('Currently ContainerOp only supports artifact values that are constant strings.')
input_artifact['raw'] = {'data': input_artifact_value}
input_artifacts.append(input_artifact)
input_artifacts.sort(key=lambda x: x['name']) #Stabilizing the input artifact ordering

template['outputs'] = {}
if output_parameters:
Expand Down
9 changes: 9 additions & 0 deletions sdk/python/kfp/dsl/_container_op.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ class ContainerOp(object):
"""Represents an op implemented by a docker container image."""

def __init__(self, name: str, image: str, command: str=None, arguments: str=None,
input_artifact_paths: Dict[str, str] = None,
input_artifact_arguments: Dict[str, str] = None,
file_outputs : Dict[str, str]=None, is_exit_handler=False):
"""Create a new instance of ContainerOp.
Expand All @@ -38,6 +40,10 @@ def __init__(self, name: str, image: str, command: str=None, arguments: str=None
file_outputs: Maps output labels to local file paths. At pipeline run time,
the value of a PipelineParam is saved to its corresponding local file. It's
one way for outside world to receive outputs of the container.
input_artifact_paths: Maps artifact input names to local file paths.
At pipeline run time, the value of the input artifact argument is saved to this local file.
input_artifact_arguments: Maps artifact input names to the artifact values.
At pipeline run time, the value of the input artifact argument is saved to a local file specified in the input_artifact_paths map.
is_exit_handler: Whether it is used as an exit handler.
"""

Expand Down Expand Up @@ -77,6 +83,9 @@ def __init__(self, name: str, image: str, command: str=None, arguments: str=None
self.inputs = []
if self.argument_inputs:
self.inputs += self.argument_inputs

self.input_artifact_paths = input_artifact_paths
self.input_artifact_arguments = input_artifact_arguments

self.outputs = {}
if file_outputs:
Expand Down
4 changes: 4 additions & 0 deletions sdk/python/tests/compiler/compiler_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,3 +236,7 @@ def test_py_retry(self):
def test_py_image_pull_secret(self):
"""Test pipeline imagepullsecret."""
self._test_py_compile('imagepullsecret')

def test_py_image_pull_secret(self):
"""Test pipeline input_artifact_raw_value."""
self._test_py_compile('input_artifact_raw_value')
60 changes: 60 additions & 0 deletions sdk/python/tests/compiler/testdata/input_artifact_raw_value.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
#!/usr/bin/env python3
# Copyright 2018 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


import sys
from pathlib import Path

sys.path.insert(0, __file__ + '/../../../../')

import kfp.dsl as dsl

def component_with_input_artifact(text):
'''A component that passes text as input artifact'''

text_input_path = '/inputs/text/data'
return dsl.ContainerOp(
name='component_with_input_artifact',
input_artifact_paths={'text': text_input_path},
input_artifact_arguments={'text': text},
image='alpine',
command=['cat', text_input_path],
)

def component_with_hardcoded_input_artifact_value():
'''A component that passes hard-coded text as input artifact'''
return component_with_input_artifact('hard-coded artifact value')


def component_with_input_artifact_value_from_file(file_path):
'''A component that passes contents of a file as input artifact'''
return component_with_input_artifact(Path(file_path).read_text())


file_path = str(Path(__file__).parent.joinpath('input_artifact_raw_value.txt'))


@dsl.pipeline(
name='pipeline includes two steps which fail randomly.',
description='shows how to use ContainerOp set_retry().'
)
def retry_sample_pipeline():
component_with_input_artifact('Constant artifact value')
component_with_hardcoded_input_artifact_value()
component_with_input_artifact_value_from_file(file_path)

if __name__ == '__main__':
import kfp.compiler as compiler
compiler.Compiler().compile(retry_sample_pipeline, __file__ + '.tar.gz')
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Text from a file with hard-coded artifact value
139 changes: 139 additions & 0 deletions sdk/python/tests/compiler/testdata/input_artifact_raw_value.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: pipeline-includes-two-steps-which-fail-randomly-
spec:
arguments:
parameters: []
entrypoint: pipeline-includes-two-steps-which-fail-randomly
serviceAccountName: pipeline-runner
templates:
- container:
command:
- cat
- /inputs/text/data
image: alpine
inputs:
artifacts:
- name: text
path: /inputs/text/data
raw:
data: Constant artifact value
name: component-with-input-artifact
outputs:
artifacts:
- name: mlpipeline-ui-metadata
path: /mlpipeline-ui-metadata.json
s3:
accessKeySecret:
key: accesskey
name: mlpipeline-minio-artifact
bucket: mlpipeline
endpoint: minio-service.kubeflow:9000
insecure: true
key: runs/{{workflow.uid}}/{{pod.name}}/mlpipeline-ui-metadata.tgz
secretKeySecret:
key: secretkey
name: mlpipeline-minio-artifact
- name: mlpipeline-metrics
path: /mlpipeline-metrics.json
s3:
accessKeySecret:
key: accesskey
name: mlpipeline-minio-artifact
bucket: mlpipeline
endpoint: minio-service.kubeflow:9000
insecure: true
key: runs/{{workflow.uid}}/{{pod.name}}/mlpipeline-metrics.tgz
secretKeySecret:
key: secretkey
name: mlpipeline-minio-artifact
- container:
command:
- cat
- /inputs/text/data
image: alpine
inputs:
artifacts:
- name: text
path: /inputs/text/data
raw:
data: hard-coded artifact value
name: component-with-input-artifact-2
outputs:
artifacts:
- name: mlpipeline-ui-metadata
path: /mlpipeline-ui-metadata.json
s3:
accessKeySecret:
key: accesskey
name: mlpipeline-minio-artifact
bucket: mlpipeline
endpoint: minio-service.kubeflow:9000
insecure: true
key: runs/{{workflow.uid}}/{{pod.name}}/mlpipeline-ui-metadata.tgz
secretKeySecret:
key: secretkey
name: mlpipeline-minio-artifact
- name: mlpipeline-metrics
path: /mlpipeline-metrics.json
s3:
accessKeySecret:
key: accesskey
name: mlpipeline-minio-artifact
bucket: mlpipeline
endpoint: minio-service.kubeflow:9000
insecure: true
key: runs/{{workflow.uid}}/{{pod.name}}/mlpipeline-metrics.tgz
secretKeySecret:
key: secretkey
name: mlpipeline-minio-artifact
- container:
command:
- cat
- /inputs/text/data
image: alpine
inputs:
artifacts:
- name: text
path: /inputs/text/data
raw:
data: Text from a file with hard-coded artifact value
name: component-with-input-artifact-2-3
outputs:
artifacts:
- name: mlpipeline-ui-metadata
path: /mlpipeline-ui-metadata.json
s3:
accessKeySecret:
key: accesskey
name: mlpipeline-minio-artifact
bucket: mlpipeline
endpoint: minio-service.kubeflow:9000
insecure: true
key: runs/{{workflow.uid}}/{{pod.name}}/mlpipeline-ui-metadata.tgz
secretKeySecret:
key: secretkey
name: mlpipeline-minio-artifact
- name: mlpipeline-metrics
path: /mlpipeline-metrics.json
s3:
accessKeySecret:
key: accesskey
name: mlpipeline-minio-artifact
bucket: mlpipeline
endpoint: minio-service.kubeflow:9000
insecure: true
key: runs/{{workflow.uid}}/{{pod.name}}/mlpipeline-metrics.tgz
secretKeySecret:
key: secretkey
name: mlpipeline-minio-artifact
- dag:
tasks:
- name: component-with-input-artifact
template: component-with-input-artifact
- name: component-with-input-artifact-2
template: component-with-input-artifact-2
- name: component-with-input-artifact-2-3
template: component-with-input-artifact-2-3
name: pipeline-includes-two-steps-which-fail-randomly

0 comments on commit f2db334

Please sign in to comment.