Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Keyword-only arguments in pipeline function signature break the compilation #4543

Closed
Udiknedormin opened this issue Sep 25, 2020 · 9 comments
Assignees
Labels
area/backend kind/bug lifecycle/stale The issue / pull request is stale, any activities remove this label.

Comments

@Udiknedormin
Copy link
Contributor

What steps did you take:

  1. Create a pipeline function with keyword-only argument(s).
  2. Pass it to compile.
def some_pipeline(*, keyword_only_argument: str):
  pass

kfp.compiler.Compiler().compile(some_pipeline)

What happened:

Compilation fails with exception:

TypeError: missing a required argument: 'keyword_only_argument'

What did you expect to happen:

Should compile just fine, just like it does for non-keyword-only arguments:

def some_pipeline(normal_argument: str):
  pass

kfp.compiler.Compiler().compile(some_pipeline)

Environment:

KFP SDK version: 1.0.0

Anything else you would like to add:

Looks like it's due to passing all arguments as args in compiler.py (for current master's HEAD, d8e82602b0):

args_list = []
signature = inspect.signature(pipeline_func)
for arg_name in signature.parameters:
  arg_type = None
  for input in pipeline_meta.inputs or []:
    if arg_name == input.name:
      arg_type = input.type
      break
  args_list.append(dsl.PipelineParam(sanitize_k8s_name(arg_name, True), param_type=arg_type))

with dsl.Pipeline(pipeline_name) as dsl_pipeline:
  pipeline_func(*args_list)  # <-- here, all arguments are passed as *args, will fail for kwargs-only

signature.parameters being Mapping[str, inspect.Parameter], it could be used to retrieve parameter's data. One of its fields, kind, holds information on what kind of parameter it is, including if it is keyword-only (Parameter.KEYWORD_ONLY).

Suggested solution:
It seems enough to test the object and pass keyword-only parameters as kwargs:

args_list = []
kwargs_dict = dict()
signature = inspect.signature(pipeline_func)
for arg_name, arg in signature.parameters.items():
  arg_type = None
  for input in pipeline_meta.inputs or []:
    if arg_name == input.name:
      arg_type = input.type
      break
  param = dsl.PipelineParam(sanitize_k8s_name(arg_name, True), param_type=arg_type)
  if arg.kind == inspect.Parameter.KEYWORD_ONLY:  # if it's keyword-only, it cannot be passed as *args...
    kwargs_dict[arg_name] = param                 #  ...so it will be passed as kwargs
  else:
    args_list.append(param)                       # others can use args

with dsl.Pipeline(pipeline_name) as dsl_pipeline:
  pipeline_func(*args_list, **kwargs_dict)        # pass kwargs, so keyword-only params can work

/kind bug
/area backend

@Ark-kun
Copy link
Contributor

Ark-kun commented Sep 26, 2020

Thank you for researching this issue and finding the solution.

Just out of curiosity, what scenarios call for pipeline functions with keyword-only parameters?

@Udiknedormin
Copy link
Contributor Author

@Ark-kun

Just out of curiosity, what scenarios call for pipeline functions with keyword-only parameters?

Keyword-only arguments can have default arguments without reordering them (as they have no order, by definition). It's very useful for generated functions. Of course one could reorder the arguments so that it will be "legal" for them to have default values, but that's very misleading when calling them:

# illegal generated signature:
def foo(a: str, b: Optional[str] = None, c: str): ...

# legal generated signature
def foo(*, a: str, b: Optional[str] = None, c: str): ...

# also legal...
def foo(a: str, c: str, b: Optional[str] = None): ...
# ...but misleading:
foo("A", "B", "C")  # a="A", c="B", b="C"; probably not what was intended 

Sorry for late reply.

@stale
Copy link

stale bot commented Dec 31, 2020

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

@stale stale bot added the lifecycle/stale The issue / pull request is stale, any activities remove this label. label Dec 31, 2020
@stale
Copy link

stale bot commented Jul 8, 2021

This issue has been automatically closed because it has not had recent activity. Please comment "/reopen" to reopen it.

@stale stale bot closed this as completed Jul 8, 2021
@ra312
Copy link

ra312 commented Apr 12, 2022

/reopen

@google-oss-prow
Copy link

@ra312: You can't reopen an issue/PR unless you authored it or you are a collaborator.

In response to this:

/reopen

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository.

@ra312
Copy link

ra312 commented Apr 12, 2022

Hello everyone,
If you have lots of ml artefacts with different types, you might want to pass them all as key-word argument list to the pipeline to be consumed in the components.
It would be great if we can introduce a change allowing to pass **kwargs: to the pipeline function

@ra312
Copy link

ra312 commented Apr 15, 2022

I researched into it, and google.cloud.aiplatform project allows to pass pipeline_params as a dictionary. So I am closing the issue.
from google.cloud import aiplatform as aip
from kfp.v2 import compiler # noqa: F811
package_path = "my_custom_pipeline.json"
compiler.Compiler().compile(
pipeline_func=my_custom_pipeline, package_path=package_path
)
def my_custom_pipeline(
message: str,
tf_model_gcs_file: str
):
predict_data = input_data()
message=message,
tf_model_gcs_file=tf_model_gcs_file,
input_dataset_path=predict_data.outputs["output_dataset_path"]
)
pipeline_params = dict(
message="let us do machine learning",
tf_model_gcs_file=(
"gs://my-bucket/"
"artifacts/main_model/eng_model.h5"
)
)
PROJECT_ID = "my-project"
REGION = "europe-west1"
BUCKET_NAME = "gs://my-bucket"
DISPLAY_NAME = "my_custom_pipeline_run"
aip.init(project=PROJECT_ID, location=REGION, staging_bucket=BUCKET_NAME)
job = aip.PipelineJob(
display_name=DISPLAY_NAME,
template_path="my_custom_pipeline.json"
pipeline_root=PIPELINE_ROOT,
parameter_values=pipeline_params,
enable_caching=True
)
job.run()

@ra312
Copy link

ra312 commented Apr 15, 2022

However, when I try to declare my_custom_pipeline with **kwargs, it breaks the compilation, this seems due to how the compiler expects *args in the pipeline declaration in kfp.v2.compiler._create_pipeline_v2 lines 1192-1206

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/backend kind/bug lifecycle/stale The issue / pull request is stale, any activities remove this label.
Projects
None yet
Development

No branches or pull requests

4 participants