Skip to content

Commit

Permalink
[CHORE] Refactor logging (#1489)
Browse files Browse the repository at this point in the history
1. Removes `loguru` as a dependency
2. Removes custom handler that forwards logs to loguru
3. Removes all custom formatting of our logs
4. Warning which runner is being used is only done now for when the
PyRunner is being used with an existing Ray connection

I did (3) because Daft is a library, and custom formatting of logs
should be done by the application. E.g. if a user was building a
webserver with their own custom logging setup, then Daft mangling log
formatting on the root logger would be very annoying.

This is what our logs look like now on a jupyter notebook:
<img width="1074" alt="image"
src="https://github.com/Eventual-Inc/Daft/assets/17691182/c5674a52-b41a-40c5-aebd-9a9ddc965915">

Enabling more verbose logs at a higher level (e.g. INFO logs) is
performed by the user/embedding that uses Daft, e.g.

```python
import logging

logging.basicConfig(
    format='%(asctime)s,%(msecs)03d %(levelname)-8s [%(pathname)s:%(lineno)d] %(message)s',
    datefmt='%Y-%m-%d:%H:%M:%S',
    level=logging.INFO,
)

# Outputs logs that look like:
# 2023-10-16:11:25:46,195 INFO     [/Users/jaychia/.cargo/registry/src/index.crates.io-6f17d22bba15001f/aws-config-0.55.3/src/meta/region.rs:43] load_region; provider=EnvironmentVariableRegionProvider { env: Env(Real) }
```

Daft now respects normal Python logging and does not rely on loguru at
all to do any of this configurations. This lets us play much nicer with
applications that use normal Python logging.

---------

Co-authored-by: Jay Chia <[email protected]@users.noreply.github.com>
  • Loading branch information
jaychia and Jay Chia authored Oct 16, 2023
1 parent bf5b598 commit 9d20890
Show file tree
Hide file tree
Showing 19 changed files with 86 additions and 111 deletions.
4 changes: 3 additions & 1 deletion benchmarking/tpch/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import argparse
import contextlib
import csv
import logging
import math
import os
import platform
Expand All @@ -13,14 +14,15 @@
from typing import Any, Callable

import ray
from loguru import logger

import daft
from benchmarking.tpch import answers, data_generation
from daft import DataFrame
from daft.context import get_context
from daft.runners.profiler import profiler

logger = logging.getLogger(__name__)

ALL_TABLES = [
"part",
"supplier",
Expand Down
5 changes: 3 additions & 2 deletions benchmarking/tpch/data_generation.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
from __future__ import annotations

import argparse
import logging
import math
import os
import shlex
import sqlite3
import subprocess
from glob import glob

from loguru import logger

import daft

logger = logging.getLogger(__name__)

SCHEMA = {
"part": [
"P_PARTKEY",
Expand Down
5 changes: 3 additions & 2 deletions benchmarking/tpch/pipelined_data_generation.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,18 @@

import argparse
import glob
import logging
import os
import pathlib
import shlex
import shutil
import subprocess
from multiprocessing import Pool

from loguru import logger

from benchmarking.tpch.data_generation import gen_parquet

logger = logging.getLogger(__name__)

STATIC_TABLES = ["nation", "region"]


Expand Down
8 changes: 0 additions & 8 deletions daft/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@

import os

from daft.logging import setup_logger

###
# Set up code coverage for when running code coverage with ray
###
Expand All @@ -20,12 +18,6 @@
"Environ: {!r} "
"Exception: {!r}\n".format({k: v for k, v in os.environ.items() if k.startswith("COV_CORE")}, exc)
)
###
# Setup logging
###


setup_logger()

###
# Get build constants from Rust .so
Expand Down
20 changes: 16 additions & 4 deletions daft/context.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
from __future__ import annotations

import dataclasses
import logging
import os
import warnings
from typing import TYPE_CHECKING, ClassVar

from loguru import logger

if TYPE_CHECKING:
from daft.logical.builder import LogicalPlanBuilder
from daft.runners.runner import Runner

logger = logging.getLogger(__name__)


class _RunnerConfig:
name = ClassVar[str]
Expand Down Expand Up @@ -75,7 +76,6 @@ def runner(self) -> Runner:
if self.runner_config.name == "ray":
from daft.runners.ray_runner import RayRunner

logger.info("Using RayRunner")
assert isinstance(self.runner_config, _RayRunnerConfig)
_RUNNER = RayRunner(
address=self.runner_config.address,
Expand All @@ -84,7 +84,19 @@ def runner(self) -> Runner:
elif self.runner_config.name == "py":
from daft.runners.pyrunner import PyRunner

logger.info("Using PyRunner")
try:
import ray

if ray.is_initialized():
logger.warning(
"WARNING: Daft is NOT using Ray for execution!\n"
"Daft is using the PyRunner but we detected an active Ray connection. "
"If you intended to use the Daft RayRunner, please first run `daft.context.set_runner_ray()` "
"before executing Daft queries."
)
except ImportError:
pass

assert isinstance(self.runner_config, _PyRunnerConfig)
_RUNNER = PyRunner(use_thread_pool=self.runner_config.use_thread_pool)

Expand Down
3 changes: 2 additions & 1 deletion daft/dataframe/to_torch.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
from __future__ import annotations

import logging
from typing import Any, Iterable, Iterator

from loguru import logger
logger = logging.getLogger(__name__)

try:
# When available, subclass from the newer torchdata DataPipes instead of torch Datasets.
Expand Down
37 changes: 16 additions & 21 deletions daft/execution/physical_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,12 @@

from __future__ import annotations

import logging
import math
import pathlib
from collections import deque
from typing import Generator, Iterator, TypeVar, Union

from loguru import logger

from daft.daft import (
FileFormat,
FileFormatConfig,
Expand All @@ -40,6 +39,8 @@
from daft.logical.schema import Schema
from daft.runners.partitioning import PartialPartitionMetadata

logger = logging.getLogger(__name__)

PartitionT = TypeVar("PartitionT")
T = TypeVar("T")

Expand Down Expand Up @@ -123,7 +124,7 @@ def file_read(

except StopIteration:
if len(materializations) > 0:
logger.debug("file_read blocked on completion of first source in: {sources}", sources=materializations)
logger.debug(f"file_read blocked on completion of first source in: {materializations}")
yield None
else:
return
Expand Down Expand Up @@ -231,10 +232,8 @@ def join(
if len(left_requests) + len(right_requests) > 0:
logger.debug(
"join blocked on completion of sources.\n"
"Left sources: {left_requests}\n"
"Right sources: {right_requests}",
left_requests=left_requests,
right_requests=right_requests,
f"Left sources: {left_requests}\n"
f"Right sources: {right_requests}",
)
yield None

Expand Down Expand Up @@ -339,7 +338,7 @@ def global_limit(

# (Optimization. If we are doing limit(0) and already have a partition executing to use for it, just wait.)
if remaining_rows == 0 and len(materializations) > 0:
logger.debug("global_limit blocked on completion of: {source}", source=materializations[0])
logger.debug(f"global_limit blocked on completion of: {materializations[0]}")
yield None
continue

Expand All @@ -364,9 +363,7 @@ def global_limit(

except StopIteration:
if len(materializations) > 0:
logger.debug(
"global_limit blocked on completion of first source in: {sources}", sources=materializations
)
logger.debug(f"global_limit blocked on completion of first source in: {materializations}")
yield None
else:
return
Expand Down Expand Up @@ -396,9 +393,7 @@ def flatten_plan(child_plan: InProgressPhysicalPlan[PartitionT]) -> InProgressPh

except StopIteration:
if len(materializations) > 0:
logger.debug(
"flatten_plan blocked on completion of first source in: {sources}", sources=materializations
)
logger.debug(f"flatten_plan blocked on completion of first source in: {materializations}")
yield None
else:
return
Expand Down Expand Up @@ -427,7 +422,7 @@ def split(
yield step

while any(not _.done() for _ in materializations):
logger.debug("split_to blocked on completion of all sources: {sources}", sources=materializations)
logger.debug(f"split_to blocked on completion of all sources: {materializations}")
yield None

splits_per_partition = deque([1 for _ in materializations])
Expand Down Expand Up @@ -517,7 +512,7 @@ def coalesce(

except StopIteration:
if len(materializations) > 0:
logger.debug("coalesce blocked on completion of a task in: {sources}", sources=materializations)
logger.debug(f"coalesce blocked on completion of a task in: {materializations}")
yield None
else:
return
Expand Down Expand Up @@ -547,7 +542,7 @@ def reduce(
# All fanouts dispatched. Wait for all of them to materialize
# (since we need all of them to emit even a single reduce).
while any(not _.done() for _ in materializations):
logger.debug("reduce blocked on completion of all sources in: {sources}", sources=materializations)
logger.debug(f"reduce blocked on completion of all sources in: {materializations}")
yield None

inputs_to_reduce = [deque(_.partitions()) for _ in materializations]
Expand Down Expand Up @@ -587,7 +582,7 @@ def sort(
sample_materializations: deque[SingleOutputPartitionTask[PartitionT]] = deque()
for source in source_materializations:
while not source.done():
logger.debug("sort blocked on completion of source: {source}", source=source)
logger.debug(f"sort blocked on completion of source: {source}")
yield None

sample = (
Expand All @@ -606,7 +601,7 @@ def sort(

# Wait for samples to materialize.
while any(not _.done() for _ in sample_materializations):
logger.debug("sort blocked on completion of all samples: {samples}", samples=sample_materializations)
logger.debug(f"sort blocked on completion of all samples: {sample_materializations}")
yield None

# Reduce the samples to get sort boundaries.
Expand All @@ -628,7 +623,7 @@ def sort(

# Wait for boundaries to materialize.
while not boundaries.done():
logger.debug("sort blocked on completion of boundary partition: {boundaries}", boundaries=boundaries)
logger.debug(f"sort blocked on completion of boundary partition: {boundaries}")
yield None

# Create a range fanout plan.
Expand Down Expand Up @@ -699,7 +694,7 @@ def materialize(

except StopIteration:
if len(materializations) > 0:
logger.debug("materialize blocked on completion of all sources: {sources}", sources=materializations)
logger.debug(f"materialize blocked on completion of all sources: {materializations}")
yield None
else:
return
Expand Down
4 changes: 3 additions & 1 deletion daft/filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@
else:
from typing import Literal

import logging
from typing import Any

import fsspec
import pyarrow as pa
from fsspec.registry import get_filesystem_class
from loguru import logger
from pyarrow.fs import (
FileSystem,
FSSpecHandler,
Expand All @@ -28,6 +28,8 @@
from daft.daft import FileFormat, FileInfos, NativeStorageConfig, StorageConfig
from daft.table import Table

logger = logging.getLogger(__name__)

_CACHED_FSES: dict[str, FileSystem] = {}


Expand Down
5 changes: 3 additions & 2 deletions daft/internal/rule_runner.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
from __future__ import annotations

import logging
from dataclasses import dataclass
from typing import Generic, TypeVar

from loguru import logger

from daft.internal.rule import Rule
from daft.internal.treenode import TreeNode

logger = logging.getLogger(__name__)

TreeNodeType = TypeVar("TreeNodeType", bound="TreeNode")


Expand Down
5 changes: 3 additions & 2 deletions daft/internal/treenode.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
from __future__ import annotations

import logging
import os
import typing
from typing import TYPE_CHECKING, Generic, List, TypeVar, cast

from loguru import logger

if TYPE_CHECKING:
from daft.internal.rule import Rule

logger = logging.getLogger(__name__)

TreeNodeType = TypeVar("TreeNodeType", bound="TreeNode")


Expand Down
39 changes: 0 additions & 39 deletions daft/logging.py

This file was deleted.

Loading

0 comments on commit 9d20890

Please sign in to comment.