Skip to content

Commit

Permalink
Improve error message for pyflyte run (flyteorg#2472)
Browse files Browse the repository at this point in the history
Signed-off-by: Kevin Su <[email protected]>
Signed-off-by: mao3267 <[email protected]>
  • Loading branch information
pingsutw authored and mao3267 committed Jul 29, 2024
1 parent 7824e9c commit c9d040e
Show file tree
Hide file tree
Showing 15 changed files with 106 additions and 77 deletions.
3 changes: 2 additions & 1 deletion flytekit/clis/sdk_in_container/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -598,7 +598,8 @@ def _run(*args, **kwargs):
# By the time we get to this function, all the loading has already happened

run_level_params: RunLevelParams = ctx.obj
logger.debug(f"Running {entity.name} with {kwargs} and run_level_params {run_level_params}")
entity_type = "workflow" if isinstance(entity, PythonFunctionWorkflow) else "task"
logger.debug(f"Running {entity_type} {entity.name} with input {kwargs}")

click.secho(f"Running Execution on {'Remote' if run_level_params.is_remote else 'local'}.", fg="cyan")
try:
Expand Down
24 changes: 18 additions & 6 deletions flytekit/clis/sdk_in_container/serve.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
add_AsyncAgentServiceServicer_to_server,
add_SyncAgentServiceServicer_to_server,
)
from rich.console import Console
from rich.table import Table


@click.group("serve")
Expand Down Expand Up @@ -55,8 +57,8 @@ def agent(_: click.Context, port, worker, timeout):
async def _start_grpc_server(port: int, worker: int, timeout: int):
from flytekit.extend.backend.agent_service import AgentMetadataService, AsyncAgentService, SyncAgentService

click.secho("🚀 Starting the agent service...")
_start_http_server()
click.secho("Starting the agent service...", fg="blue")
print_agents_metadata()

server = grpc.aio.server(futures.ThreadPoolExecutor(max_workers=worker))
Expand All @@ -75,7 +77,7 @@ def _start_http_server():
try:
from prometheus_client import start_http_server

click.secho("Starting up the server to expose the prometheus metrics...", fg="blue")
click.secho("Starting up the server to expose the prometheus metrics...")
start_http_server(9090)
except ImportError as e:
click.secho(f"Failed to start the prometheus server with error {e}", fg="red")
Expand Down Expand Up @@ -104,7 +106,17 @@ def print_agents_metadata():
from flytekit.extend.backend.base_agent import AgentRegistry

agents = AgentRegistry.list_agents()
for agent in agents:
name = agent.name
metadata = [category.name for category in agent.supported_task_categories]
click.secho(f"Starting {name} that supports task categories {metadata}", fg="blue")

table = Table(title="Agent Metadata")
table.add_column("Agent Name", style="cyan", no_wrap=True)
table.add_column("Support Task Types", style="cyan")
table.add_column("Is Sync", style="green")

for a in agents:
categories = ""
for c in a.supported_task_categories:
categories += f"{c.name} (v{c.version}) "
table.add_row(a.name, categories, str(a.is_sync))

console = Console()
console.print(table)
4 changes: 2 additions & 2 deletions flytekit/clis/sdk_in_container/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

from flytekit.exceptions.base import FlyteException
from flytekit.exceptions.user import FlyteInvalidInputException
from flytekit.loggers import get_level_from_cli_verbosity, logger, upgrade_to_rich_logging
from flytekit.loggers import get_level_from_cli_verbosity, logger

project_option = click.Option(
param_decls=["-p", "--project"],
Expand Down Expand Up @@ -137,7 +137,7 @@ class ErrorHandlingCommand(click.RichGroup):
def invoke(self, ctx: click.Context) -> typing.Any:
verbose = ctx.params["verbose"]
log_level = get_level_from_cli_verbosity(verbose)
upgrade_to_rich_logging(log_level=log_level)
logger.setLevel(log_level)
try:
return super().invoke(ctx)
except Exception as e:
Expand Down
1 change: 0 additions & 1 deletion flytekit/configuration/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,6 @@ def _get_from_yaml(self, c: YamlConfigEntry) -> typing.Any:
d = d[k]
return d
except KeyError:
logger.debug(f"Switch {c.switch} could not be found in yaml config")
return None

def get(self, c: typing.Union[LegacyConfigEntry, YamlConfigEntry]) -> typing.Any:
Expand Down
1 change: 0 additions & 1 deletion flytekit/core/base_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -756,7 +756,6 @@ def dispatch_execute(

return self._async_execute(native_inputs, native_outputs, ctx, exec_ctx, new_user_params)

logger.debug("Task executed successfully in user level")
# Lets run the post_execute method. This may result in a IgnoreOutputs Exception, which is
# bubbled up to be handled at the callee layer.
native_outputs = self.post_execute(new_user_params, native_outputs)
Expand Down
6 changes: 3 additions & 3 deletions flytekit/core/context_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
from flytekit.core.node import Node
from flytekit.interfaces.cli_identifiers import WorkflowExecutionIdentifier
from flytekit.interfaces.stats import taggable
from flytekit.loggers import logger, user_space_logger
from flytekit.loggers import developer_logger, user_space_logger
from flytekit.models.core import identifier as _identifier

if typing.TYPE_CHECKING:
Expand Down Expand Up @@ -867,7 +867,7 @@ def push_context(ctx: FlyteContext, f: Optional[traceback.FrameSummary] = None)
context_list.append(ctx)
flyte_context_Var.set(context_list)
t = "\t"
logger.debug(
developer_logger.debug(
f"{t * ctx.level}[{len(flyte_context_Var.get())}] Pushing context - {'compile' if ctx.compilation_state else 'execute'}, branch[{ctx.in_a_condition}], {ctx.get_origin_stackframe_repr()}"
)
return ctx
Expand All @@ -878,7 +878,7 @@ def pop_context() -> FlyteContext:
ctx = context_list.pop()
flyte_context_Var.set(context_list)
t = "\t"
logger.debug(
developer_logger.debug(
f"{t * ctx.level}[{len(flyte_context_Var.get()) + 1}] Popping context - {'compile' if ctx.compilation_state else 'execute'}, branch[{ctx.in_a_condition}], {ctx.get_origin_stackframe_repr()}"
)
if len(flyte_context_Var.get()) == 0:
Expand Down
4 changes: 2 additions & 2 deletions flytekit/core/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from flytekit.core.sentinel import DYNAMIC_INPUT_BINDING
from flytekit.core.type_engine import TypeEngine, UnionTransformer
from flytekit.exceptions.user import FlyteValidationException
from flytekit.loggers import logger
from flytekit.loggers import developer_logger, logger
from flytekit.models import interface as _interface_models
from flytekit.models.literals import Literal, Scalar, Void

Expand Down Expand Up @@ -512,7 +512,7 @@ def t(a: int, b: str) -> Dict[str, int]: ...

else:
# Handle all other single return types
logger.debug(f"Task returns unnamed native tuple {return_annotation}")
developer_logger.debug(f"Task returns unnamed native tuple {return_annotation}")
return {default_output_name(): cast(Type, return_annotation)}


Expand Down
1 change: 0 additions & 1 deletion flytekit/core/reference_entity.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,6 @@ def unwrap_literal_map_and_execute(
except Exception as e:
logger.exception(f"Exception when executing {e}")
raise e
logger.debug("Task executed successfully in user level")

expected_output_names = list(self.python_interface.outputs.keys())
if len(expected_output_names) == 1:
Expand Down
1 change: 0 additions & 1 deletion flytekit/core/shim_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,6 @@ def dispatch_execute(
logger.exception(f"Exception when executing {e}")
raise e

logger.debug("Task executed successfully in user level")
# Lets run the post_execute method. This may result in a IgnoreOutputs Exception, which is
# bubbled up to be handled at the callee layer.
native_outputs = self.post_execute(new_user_params, native_outputs)
Expand Down
6 changes: 3 additions & 3 deletions flytekit/core/tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

from flytekit.configuration.feature_flags import FeatureFlags
from flytekit.exceptions import system as _system_exceptions
from flytekit.loggers import logger
from flytekit.loggers import developer_logger, logger


def import_module_from_file(module_name, file):
Expand Down Expand Up @@ -125,12 +125,12 @@ def find_lhs(self) -> str:
if self._instantiated_in is None or self._instantiated_in == "":
raise _system_exceptions.FlyteSystemException(f"Object {self} does not have an _instantiated in")

logger.debug(f"Looking for LHS for {self} from {self._instantiated_in}")
developer_logger.debug(f"Looking for LHS for {self} from {self._instantiated_in}")
m = importlib.import_module(self._instantiated_in)
for k in dir(m):
try:
if getattr(m, k) is self:
logger.debug(f"Found LHS for {self}, {k}")
developer_logger.debug(f"Found LHS for {self}, {k}")
self._lhs = k
return k
except ValueError as err:
Expand Down
8 changes: 1 addition & 7 deletions flytekit/core/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -335,13 +335,7 @@ def __exit__(self, exc_type, exc_val, exc_tb):
)
)

logger.info(
"{}. [Wall Time: {}s, Process Time: {}s]".format(
self._name,
end_wall_time - self._start_wall_time,
end_process_time - self._start_process_time,
)
)
logger.info(f"{self._name}. [Time: {end_wall_time - self._start_wall_time:.6f}s]")


class ClassDecorator(ABC):
Expand Down
8 changes: 3 additions & 5 deletions flytekit/extend/backend/base_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from rich.logging import RichHandler
from rich.progress import Progress

from flytekit import FlyteContext, PythonFunctionTask, logger
from flytekit import FlyteContext, PythonFunctionTask
from flytekit.configuration import ImageConfig, SerializationSettings
from flytekit.core import utils
from flytekit.core.base_task import PythonTask
Expand Down Expand Up @@ -210,8 +210,6 @@ def register(agent: Union[AsyncAgentBase, SyncAgentBase], override: bool = False
)
AgentRegistry._METADATA[agent.name] = agent_metadata

logger.info(f"Registering {agent.name} for task type: {agent.task_category}")

@staticmethod
def get_agent(task_type_name: str, task_type_version: int = 0) -> Union[SyncAgentBase, AsyncAgentBase]:
task_category = TaskCategory(name=task_type_name, version=task_type_version)
Expand Down Expand Up @@ -272,8 +270,8 @@ async def _do(
return await mirror_async_methods(
agent.do, task_template=template, inputs=literal_map, output_prefix=output_prefix
)
except Exception as error_message:
raise FlyteUserException(f"Failed to run the task {self.name} with error: {error_message}")
except Exception as e:
raise FlyteUserException(f"Failed to run the task {self.name} with error: {e}") from None


class AsyncAgentExecutorMixin:
Expand Down
82 changes: 55 additions & 27 deletions flytekit/loggers.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@
# For now, assume this is the environment variable whose usage will remain unchanged and controls output for all
# loggers defined in this file.
LOGGING_ENV_VAR = "FLYTE_SDK_LOGGING_LEVEL"
LOGGING_DEV_ENV_VAR = "FLYTE_SDK_DEV_LOGGING_LEVEL"
LOGGING_FMT_ENV_VAR = "FLYTE_SDK_LOGGING_FORMAT"
LOGGING_RICH_FMT_ENV_VAR = "FLYTE_SDK_RICH_TRACEBACKS"

# By default, the root flytekit logger to debug so everything is logged, but enable fine-tuning
logger = logging.getLogger("flytekit")
user_space_logger = logging.getLogger("user_space")
developer_logger = logging.getLogger("developer")

# Stop propagation so that configuration is isolated to this file (so that it doesn't matter what the
# global Python root logger is set to).
Expand Down Expand Up @@ -71,6 +73,27 @@ def set_user_logger_properties(
user_space_logger.setLevel(level)


def set_developer_properties(
handler: typing.Optional[logging.Handler] = None,
filter: typing.Optional[logging.Filter] = None,
level: typing.Optional[int] = None,
):
"""
developer logger is only used for debugging. It is possible to selectively tune the logging for the developer.
:param handler: logging.Handler to add to the user_space_logger
:param filter: logging.Filter to add to the user_space_logger
:param level: logging level to set the user_space_logger to
"""
global developer_logger
if handler is not None:
developer_logger.addHandler(handler)
if filter is not None:
developer_logger.addFilter(filter)
if level is not None:
developer_logger.setLevel(level)


def _get_env_logging_level(default_level: int = logging.WARNING) -> int:
"""
Returns the logging level set in the environment variable, or logging.WARNING if the environment variable is not
Expand All @@ -83,6 +106,17 @@ def initialize_global_loggers():
"""
Initializes the global loggers to the default configuration.
"""
# Use Rich logging while running in the local execution or jupyter notebook.
if (
os.environ.get("FLYTE_INTERNAL_EXECUTION_ID", None) is None or interactive.ipython_check()
) and is_rich_logging_enabled():
try:
upgrade_to_rich_logging()
return
except OSError as e:
logger.warning(f"Failed to initialize rich logging: {e}")
pass

handler = logging.StreamHandler()
handler.setLevel(logging.DEBUG)
formatter = logging.Formatter(fmt="[%(name)s] %(message)s")
Expand All @@ -92,40 +126,34 @@ def initialize_global_loggers():

set_flytekit_log_properties(handler, None, _get_env_logging_level())
set_user_logger_properties(handler, None, logging.INFO)

# Use Rich logging while running in the local execution
if os.environ.get("FLYTE_INTERNAL_EXECUTION_ID", None) is None or interactive.ipython_check():
upgrade_to_rich_logging()
set_developer_properties(handler, None, logging.INFO)


def is_rich_logging_enabled() -> bool:
return os.environ.get(LOGGING_RICH_FMT_ENV_VAR) != "0"


def upgrade_to_rich_logging(log_level: typing.Optional[int] = logging.WARNING):
if not is_rich_logging_enabled():
return
try:
import click
from rich.console import Console
from rich.logging import RichHandler

import flytekit

handler = RichHandler(
tracebacks_suppress=[click, flytekit],
rich_tracebacks=True,
omit_repeated_times=False,
log_time_format="%H:%M:%S.%f",
console=Console(width=os.get_terminal_size().columns),
)
formatter = logging.Formatter(fmt="%(message)s")
handler.setFormatter(formatter)
set_flytekit_log_properties(handler, None, _get_env_logging_level(default_level=log_level))
set_user_logger_properties(handler, None, logging.INFO)
except OSError as e:
logger.debug(f"Failed to initialize rich logging: {e}")
pass
import click
from rich.console import Console
from rich.logging import RichHandler

import flytekit

handler = RichHandler(
tracebacks_suppress=[click, flytekit],
rich_tracebacks=True,
omit_repeated_times=False,
show_path=False,
log_time_format="%H:%M:%S.%f",
console=Console(width=os.get_terminal_size().columns),
)

formatter = logging.Formatter(fmt="%(filename)s:%(lineno)d - %(message)s")
handler.setFormatter(formatter)
set_flytekit_log_properties(handler, None, _get_env_logging_level(default_level=log_level))
set_user_logger_properties(handler, None, logging.INFO)
set_developer_properties(handler, None, logging.INFO)


def get_level_from_cli_verbosity(verbosity: int) -> int:
Expand Down
Loading

0 comments on commit c9d040e

Please sign in to comment.