Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Input through file and pipe #2552

Merged
merged 102 commits into from
Aug 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
102 commits
Select commit Hold shift + click to select a range
6597884
feat: support input by file (--flyte-inputs) and through pipe
mao3267 Jul 2, 2024
8f66a57
Ensure that PythonAutoContainerTask has a container_image attribute (…
pingsutw Jun 26, 2024
0f4eb4b
Catch silent subproc_execute failures (#2404)
pryce-turner Jun 27, 2024
4a230a4
fix: Prevent local files from being moved when using FlyteFile on loc…
ggydush Jun 27, 2024
9513e87
[fix] Validate interface variable names are python-valid (#2538)
ddl-rliu Jun 28, 2024
2e5532b
[Feature] Support positional arguments (#2522)
MortalHappiness Jun 28, 2024
321cf67
Adds a Default ImageSpec image builder (#2346)
thomasjpfan Jul 1, 2024
94124e6
Fix pandera plugin for 0.20 (#2545)
thomasjpfan Jul 1, 2024
df958a9
Fix `Flytefile` Remote Path Error (#2544)
Future-Outlier Jul 1, 2024
66d9a76
Core/enable deck (#2314)
novahow Jul 2, 2024
b636493
Add thonmas and future outlier to CODEOWNERS (#2548)
Future-Outlier Jul 2, 2024
7824e9c
Improve error message for ImageSpec (#2498)
pingsutw Jul 2, 2024
c9d040e
Improve error message for pyflyte run (#2472)
pingsutw Jul 2, 2024
3b7076d
fix: enhance error msg of missing params and prevent non-ideal type c…
mao3267 Jul 3, 2024
7f4987f
Support s3 path for click Dir param type (#2547)
Future-Outlier Jul 2, 2024
bec0ee5
remove upper bound of plugin dependencies for flytekit-greatexpectati…
Mecoli1219 Jul 3, 2024
4c548e9
Add myself to code owners of flytekit-kf-pytorch (#2556)
fg91 Jul 3, 2024
d06b085
Add run policy (#2555)
bgedik Jul 3, 2024
fa95e56
Added entrypoint to imagespec and default builder (#2553)
pryce-turner Jul 3, 2024
8a69750
Set default width if fail to get terminal size (#2558)
pingsutw Jul 4, 2024
8496794
Disable rich traceback for papermill plugin (#2559)
pingsutw Jul 4, 2024
6969c7f
fix datetime in eager workflow (#2541)
novahow Jul 4, 2024
97f3a16
Fix CSS with Flyte decks (#2565)
thomasjpfan Jul 5, 2024
2ab7186
Fix `FlyteDirectory` on Azure (#2564)
Tom-Newton Jul 5, 2024
6086648
Makes home dir if it does not exists (#2562)
thomasjpfan Jul 5, 2024
573a8c3
Removes isodate requirement (#2568)
thomasjpfan Jul 8, 2024
4e6c5c3
Read FLYTE_SDK_DEV_LOGGING_LEVEL from env (#2571)
pingsutw Jul 8, 2024
d5b3f3a
print native input/output when cache hit (#2567)
pingsutw Jul 8, 2024
81c9edc
Fix: Set OMP_NUM_THREADS by default in Elastic (#2569)
fellhorn Jul 8, 2024
1cd2870
Use logging level in env for default dev logger (#2572)
pingsutw Jul 8, 2024
fd0f84d
Bump certifi (#2566)
dependabot[bot] Jul 8, 2024
1085450
Allow for flytekit version to be specified in default image builder (…
thomasjpfan Jul 9, 2024
0656926
Removes jinja2 dependency (#2570)
thomasjpfan Jul 9, 2024
5b10828
Improve error when missing type annotations in task/workflow (#2549)
pingsutw Jul 9, 2024
3b95a51
Rename databricks task type (#2574)
pingsutw Jul 12, 2024
de3a137
fix: wrong type conversion
mao3267 Jul 12, 2024
ce385f0
fix: enhance error and help msg
mao3267 Jul 12, 2024
72e704c
fix: skip following param options when input from file/pipe
mao3267 Jul 12, 2024
43b4752
remove upper bound of plugin dependencies for flytekit-sqlalchemy (#2…
Mecoli1219 Jul 12, 2024
5e5a161
Remove flytekitplugins-deck-standard in Dockerfile (#2582)
Future-Outlier Jul 13, 2024
5d7b03c
Bump certifi from 2024.2.2 to 2024.7.4 (#2581)
dependabot[bot] Jul 13, 2024
23b1864
handle existing sagemaker deployments gracefully (#2400)
samhita-alla Jul 15, 2024
d744584
Use /opt/micromamba for default image builder (#2578)
thomasjpfan Jul 15, 2024
766bc58
Improve error message for missing return (#2551)
pingsutw Jul 15, 2024
dff79dd
Override Dataclass Serialization/Deserialization Behavior for `FlyteT…
Future-Outlier Jul 15, 2024
7883c97
add flytedirectory to custom docs template (#2584)
ppiegaze Jul 16, 2024
e358e11
Make `openai_organization` Optional in OpenAI plugins (#2585)
Future-Outlier Jul 17, 2024
c6cb0ea
Adds comet-ml plugin (#2550)
thomasjpfan Jul 17, 2024
2afea04
Replace super(AsyncAgentBase) with super(DatabricksAgent) (#2590)
pingsutw Jul 19, 2024
faf1bf0
add kubernetes package to official docker image again (#2589)
flixr Jul 19, 2024
72b0c96
test: support types and replace if options specified
mao3267 Jul 20, 2024
bfa4273
fix: support input from YAML/JSON file and pipe by click options (#2583)
mao3267 Jul 20, 2024
df3ba4f
fix: change command to --inputs-file
mao3267 Jul 26, 2024
dd6ef80
fix: raise click.BadParameter instead of click.secho
mao3267 Jul 26, 2024
2a54e48
test: add test for inputs from yaml file and pipe
mao3267 Jul 26, 2024
0c70e9e
test: add yaml input file
mao3267 Jul 26, 2024
fad3f1a
Feat: Improve UX of pytorch-elastic plugin by configuring reasonable …
fg91 Jul 21, 2024
4bb172f
Improved date parsing in cli (#2595)
kumare3 Jul 22, 2024
bba4e97
truncate sagemaker agent outputs and automate idempotence token handl…
samhita-alla Jul 22, 2024
089dd75
[BUG] support setting extended resources for array node map tasks (#2…
pvditt Jul 22, 2024
0c695a8
Fix DataClass Json Schema Error for `get literal type` method (#2587)
Future-Outlier Jul 22, 2024
acfb05d
Sagemaker dict determinism (#2597)
samhita-alla Jul 23, 2024
38da47d
refactor(core): Enhance return type extraction logic (#2598)
pingsutw Jul 23, 2024
755b726
Feat: Make exception raised by external command authenticator more ac…
fg91 Jul 24, 2024
8ad7f55
Fix: Properly re-raise non-grpc exceptions during refreshing of proxy…
fg91 Jul 25, 2024
860cbbf
fix: remove duplicated code loading new arguments
mao3267 Jul 29, 2024
c775fac
fix: lint input files
mao3267 Jul 29, 2024
5259b5c
validate idempotence token length in subsequent tasks (#2604)
samhita-alla Jul 26, 2024
72deccb
Add nvidia-l4 gpu accelerator (#2608)
eapolinario Jul 26, 2024
8880194
eliminate redundant literal conversion for `Iterator[JSON]` type (#2602)
samhita-alla Jul 26, 2024
d82f2cf
[FlyteSchema] Fix numpy problems (#2619)
Future-Outlier Jul 29, 2024
13af4ab
add nim plugin (#2475)
samhita-alla Jul 29, 2024
77ae497
[Elastic/Artifacts] Pass through model card (#2575)
wild-endeavor Jul 29, 2024
0730f56
Remove pyarrow as a direct dependency (#2228)
thomasjpfan Jul 29, 2024
6cdb3f3
Boolean flag to show local container logs to the terminal (#2521)
aditya7302 Jul 29, 2024
ae26404
Enable Ray Fast Register (#2606)
fiedlerNr9 Jul 29, 2024
3676ca8
[Artifacts/Elastic] Skip partitions (#2620)
wild-endeavor Jul 29, 2024
ee664f1
Install flyteidl from master in plugins tests (#2621)
eapolinario Jul 30, 2024
d90b6ec
Using ParamSpec to show underlying typehinting (#2617)
JackUrb Jul 30, 2024
e2e1a81
Support ArrayNode mapping over Launch Plans (#2480)
pvditt Jul 31, 2024
21a6c8c
Richer printing for some artifact objects (#2624)
wild-endeavor Jul 31, 2024
86977ac
ci: Add Python 3.9 to build matrix (#2622)
pingsutw Jul 31, 2024
48ee6a7
bump (#2627)
wild-endeavor Jul 31, 2024
40f6bc1
Added alt prefix head to FlyteFile.new_remote (#2601)
pryce-turner Jul 31, 2024
0e9a409
Feature gate for FlyteMissingReturnValueException (#2623)
pingsutw Jul 31, 2024
1a7496f
Remove use of multiprocessing from the OAuth client (#2626)
rdeaton-freenome Jul 31, 2024
f50bbe7
Update codespell in precommit to version 2.3.0 (#2630)
eapolinario Jul 31, 2024
ab63ddc
Fix Snowflake Agent Bug (#2605)
Future-Outlier Jul 31, 2024
f439ad4
fix: skip following param options when input from file/pipe
mao3267 Jul 12, 2024
2444d74
test: support types and replace if options specified
mao3267 Jul 20, 2024
a39dc5e
fix: support input from YAML/JSON file and pipe by click options (#2583)
mao3267 Jul 20, 2024
314ee12
fix: common function parsing inputs from file and pipe
mao3267 Aug 1, 2024
df9b350
fix: remove parentheses and lint
mao3267 Aug 1, 2024
2d9499a
Merge remote-tracking branch 'origin' into input-through-file-and-pipe
mao3267 Aug 3, 2024
b2cb7b1
fix: recover merge lint errors
mao3267 Aug 3, 2024
e63194b
Merge branch 'master' of https://github.com/mao3267/flytekit into inp…
mao3267 Aug 9, 2024
ff92807
Merge branch 'master' of https://github.com/mao3267/flytekit into inp…
mao3267 Aug 13, 2024
4d7d6a7
fix: change file path to relative path
mao3267 Aug 16, 2024
422c0a3
fix: make fmt && make lint
mao3267 Aug 16, 2024
b67929e
Merge branch 'master' of https://github.com/mao3267/flytekit into inp…
mao3267 Aug 16, 2024
ef0ca44
fix: remove parametrize for tests with a single test case
mao3267 Aug 17, 2024
28e6298
Merge branch 'master' of https://github.com/mao3267/flytekit into inp…
mao3267 Aug 17, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 99 additions & 8 deletions flytekit/clis/sdk_in_container/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,13 @@
import sys
import tempfile
import typing
import typing as t
from dataclasses import dataclass, field, fields
from typing import Iterator, get_args

import rich_click as click
import yaml
from click import Context
from mashumaro.codecs.json import JSONEncoder
from rich.progress import Progress
from typing_extensions import get_origin
Expand All @@ -25,7 +28,12 @@
pretty_print_exception,
project_option,
)
from flytekit.configuration import DefaultImages, FastSerializationSettings, ImageConfig, SerializationSettings
from flytekit.configuration import (
DefaultImages,
FastSerializationSettings,
ImageConfig,
SerializationSettings,
)
from flytekit.configuration.plugin import get_plugin
from flytekit.core import context_manager
from flytekit.core.artifact import ArtifactQuery
Expand All @@ -34,14 +42,24 @@
from flytekit.core.type_engine import TypeEngine
from flytekit.core.workflow import PythonFunctionWorkflow, WorkflowBase
from flytekit.exceptions.system import FlyteSystemException
from flytekit.interaction.click_types import FlyteLiteralConverter, key_value_callback, labels_callback
from flytekit.interaction.click_types import (
FlyteLiteralConverter,
key_value_callback,
labels_callback,
)
from flytekit.interaction.string_literals import literal_string_repr
from flytekit.loggers import logger
from flytekit.models import security
from flytekit.models.common import RawOutputDataConfig
from flytekit.models.interface import Parameter, Variable
from flytekit.models.types import SimpleType
from flytekit.remote import FlyteLaunchPlan, FlyteRemote, FlyteTask, FlyteWorkflow, remote_fs
from flytekit.remote import (
FlyteLaunchPlan,
FlyteRemote,
FlyteTask,
FlyteWorkflow,
remote_fs,
)
from flytekit.remote.executions import FlyteWorkflowExecution
from flytekit.tools import module_loader
from flytekit.tools.script_mode import _find_project_root, compress_scripts, get_all_modules
Expand Down Expand Up @@ -489,7 +507,8 @@ def _update_flyte_context(params: RunLevelParams) -> FlyteContext.Builder:
return ctx.current_context().new_builder()

file_access = FileAccessProvider(
local_sandbox_dir=tempfile.mkdtemp(prefix="flyte"), raw_output_prefix=output_prefix
local_sandbox_dir=tempfile.mkdtemp(prefix="flyte"),
raw_output_prefix=output_prefix,
)

# The task might run on a remote machine if raw_output_prefix is a remote path,
Expand Down Expand Up @@ -539,7 +558,10 @@ def _run(*args, **kwargs):
entity_type = "workflow" if isinstance(entity, PythonFunctionWorkflow) else "task"
logger.debug(f"Running {entity_type} {entity.name} with input {kwargs}")

click.secho(f"Running Execution on {'Remote' if run_level_params.is_remote else 'local'}.", fg="cyan")
click.secho(
f"Running Execution on {'Remote' if run_level_params.is_remote else 'local'}.",
fg="cyan",
)
try:
inputs = {}
for input_name, v in entity.python_interface.inputs_with_defaults.items():
Expand Down Expand Up @@ -576,6 +598,8 @@ def _run(*args, **kwargs):
)
if processed_click_value is not None or optional_v:
inputs[input_name] = processed_click_value
if processed_click_value is None and v[0] == bool:
inputs[input_name] = False

if not run_level_params.is_remote:
with FlyteContextManager.with_context(_update_flyte_context(run_level_params)):
Expand Down Expand Up @@ -755,7 +779,10 @@ def list_commands(self, ctx):
run_level_params: RunLevelParams = ctx.obj
r = run_level_params.remote_instance()
progress = Progress(transient=True)
task = progress.add_task(f"[cyan]Gathering [{run_level_params.limit}] remote LaunchPlans...", total=None)
task = progress.add_task(
f"[cyan]Gathering [{run_level_params.limit}] remote LaunchPlans...",
total=None,
)
with progress:
progress.start_task(task)
try:
Expand Down Expand Up @@ -783,6 +810,70 @@ def get_command(self, ctx, name):
)


class YamlFileReadingCommand(click.RichCommand):
def __init__(
self,
name: str,
params: typing.List[click.Option],
help: str,
callback: typing.Callable = None,
):
params.append(
click.Option(
["--inputs-file"],
required=False,
type=click.Path(exists=True, dir_okay=False, resolve_path=True),
help="Path to a YAML | JSON file containing inputs for the workflow.",
)
)
super().__init__(name=name, params=params, callback=callback, help=help)

def parse_args(self, ctx: Context, args: t.List[str]) -> t.List[str]:
def load_inputs(f: str) -> t.Dict[str, str]:
try:
inputs = yaml.safe_load(f)
except yaml.YAMLError as e:
yaml_e = e
try:
inputs = json.loads(f)
except json.JSONDecodeError as e:
raise click.BadParameter(
message=f"Could not load the inputs file. Please make sure it is a valid JSON or YAML file."
f"\n json error: {e},"
f"\n yaml error: {yaml_e}",
param_hint="--inputs-file",
)

return inputs

inputs = {}
if "--inputs-file" in args:
idx = args.index("--inputs-file")
args.pop(idx)
f = args.pop(idx)
with open(f, "r") as f:
inputs = load_inputs(f.read())
elif not sys.stdin.isatty():
f = sys.stdin.read()
if f != "":
inputs = load_inputs(f)

new_args = []
for k, v in inputs.items():
if isinstance(v, str):
new_args.extend([f"--{k}", v])
elif isinstance(v, bool):
if v:
new_args.append(f"--{k}")
else:
v = json.dumps(v)
new_args.extend([f"--{k}", v])
new_args.extend(args)
args = new_args

return super().parse_args(ctx, args)


class WorkflowCommand(click.RichGroup):
"""
click multicommand at the python file layer, subcommands should be all the workflows in the file.
Expand Down Expand Up @@ -837,11 +928,11 @@ def _create_command(
h = f"{click.style(entity_type, bold=True)} ({run_level_params.computed_params.module}.{entity_name})"
if loaded_entity.__doc__:
h = h + click.style(f"{loaded_entity.__doc__}", dim=True)
cmd = click.RichCommand(
cmd = YamlFileReadingCommand(
name=entity_name,
params=params,
callback=run_command(ctx, loaded_entity),
help=h,
callback=run_command(ctx, loaded_entity),
)
return cmd

Expand Down
21 changes: 18 additions & 3 deletions flytekit/core/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,18 @@
import sys
import typing
from collections import OrderedDict
from typing import Any, Dict, Generator, List, Optional, Tuple, Type, TypeVar, Union, cast
from typing import (
Any,
Dict,
Generator,
List,
Optional,
Tuple,
Type,
TypeVar,
Union,
cast,
)

from flyteidl.core import artifact_id_pb2 as art_id
from typing_extensions import get_args, get_type_hints
Expand Down Expand Up @@ -370,7 +381,9 @@ def transform_interface_to_list_interface(


def transform_function_to_interface(
fn: typing.Callable, docstring: Optional[Docstring] = None, is_reference_entity: bool = False
fn: typing.Callable,
docstring: Optional[Docstring] = None,
is_reference_entity: bool = False,
) -> Interface:
"""
From the annotations on a task function that the user should have provided, and the output names they want to use
Expand Down Expand Up @@ -463,7 +476,9 @@ def transform_type(x: type, description: Optional[str] = None) -> _interface_mod
if artifact_id:
logger.debug(f"Found artifact id spec: {artifact_id}")
return _interface_models.Variable(
type=TypeEngine.to_literal_type(x), description=description, artifact_partial_id=artifact_id
type=TypeEngine.to_literal_type(x),
description=description,
artifact_partial_id=artifact_id,
)


Expand Down
18 changes: 8 additions & 10 deletions flytekit/image_spec/default_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,25 +19,24 @@
)
from flytekit.tools.ignore import DockerIgnore, GitIgnore, IgnoreGroup, StandardIgnore

UV_PYTHON_INSTALL_COMMAND_TEMPLATE = Template("""\
UV_PYTHON_INSTALL_COMMAND_TEMPLATE = Template(
"""\
RUN --mount=type=cache,sharing=locked,mode=0777,target=/root/.cache/uv,id=uv \
--mount=from=uv,source=/uv,target=/usr/bin/uv \
--mount=type=bind,target=requirements_uv.txt,src=requirements_uv.txt \
/usr/bin/uv \
pip install --python /opt/micromamba/envs/runtime/bin/python $PIP_EXTRA \
--requirement requirements_uv.txt
""")
"""
)

APT_INSTALL_COMMAND_TEMPLATE = Template(
"""\
APT_INSTALL_COMMAND_TEMPLATE = Template("""\
RUN --mount=type=cache,sharing=locked,mode=0777,target=/var/cache/apt,id=apt \
apt-get update && apt-get install -y --no-install-recommends \
$APT_PACKAGES
"""
)
""")

DOCKER_FILE_TEMPLATE = Template(
"""\
DOCKER_FILE_TEMPLATE = Template("""\
#syntax=docker/dockerfile:1.5
FROM ghcr.io/astral-sh/uv:0.2.37 as uv
FROM mambaorg/micromamba:1.5.8-bookworm-slim as micromamba
Expand Down Expand Up @@ -84,8 +83,7 @@
USER flytekit
RUN mkdir -p $$HOME && \
echo "export PATH=$$PATH" >> $$HOME/.profile
"""
)
""")


def get_flytekit_for_pypi():
Expand Down
22 changes: 17 additions & 5 deletions plugins/flytekit-kf-pytorch/flytekitplugins/kfpytorch/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ def _convert_replica_spec(
replicas=replicas,
image=replica_config.image,
resources=resources.to_flyte_idl() if resources else None,
restart_policy=replica_config.restart_policy.value if replica_config.restart_policy else None,
restart_policy=(replica_config.restart_policy.value if replica_config.restart_policy else None),
)

def get_custom(self, settings: SerializationSettings) -> Dict[str, Any]:
Expand Down Expand Up @@ -289,9 +289,11 @@ def spawn_helper(
return ElasticWorkerResult(return_value=return_val, decks=flytekit.current_context().decks, om=om)


def _convert_run_policy_to_flyte_idl(run_policy: RunPolicy) -> kubeflow_common.RunPolicy:
def _convert_run_policy_to_flyte_idl(
run_policy: RunPolicy,
) -> kubeflow_common.RunPolicy:
return kubeflow_common.RunPolicy(
clean_pod_policy=run_policy.clean_pod_policy.value if run_policy.clean_pod_policy else None,
clean_pod_policy=(run_policy.clean_pod_policy.value if run_policy.clean_pod_policy else None),
ttl_seconds_after_finished=run_policy.ttl_seconds_after_finished,
active_deadline_seconds=run_policy.active_deadline_seconds,
backoff_limit=run_policy.backoff_limit,
Expand Down Expand Up @@ -416,7 +418,13 @@ def _execute(self, **kwargs) -> Any:
checkpoint_dest = None
checkpoint_src = None

launcher_args = (dumped_target_function, ctx.raw_output_prefix, checkpoint_dest, checkpoint_src, kwargs)
launcher_args = (
dumped_target_function,
ctx.raw_output_prefix,
checkpoint_dest,
checkpoint_src,
kwargs,
)
elif self.task_config.start_method == "fork":
"""
The torch elastic launcher doesn't support passing kwargs to the target function,
Expand All @@ -440,7 +448,11 @@ def fn_partial():
if isinstance(e, FlyteRecoverableException):
create_recoverable_error_file()
raise
return ElasticWorkerResult(return_value=return_val, decks=flytekit.current_context().decks, om=om)
return ElasticWorkerResult(
return_value=return_val,
decks=flytekit.current_context().decks,
om=om,
)

launcher_target_func = fn_partial
launcher_args = ()
Expand Down
Loading
Loading