diff --git a/dlt/common/destination/typing.py b/dlt/common/destination/typing.py index 8cc08756cd..972b88a457 100644 --- a/dlt/common/destination/typing.py +++ b/dlt/common/destination/typing.py @@ -1,6 +1,11 @@ from typing import Optional -from dlt.common.schema.typing import _TTableSchemaBase, TWriteDisposition, TTableReferenceParam +from dlt.common.schema.typing import ( + _TTableSchemaBase, + TWriteDisposition, + TTableReferenceParam, + IncrementalArgs, +) class PreparedTableSchema(_TTableSchemaBase, total=False): @@ -8,4 +13,5 @@ class PreparedTableSchema(_TTableSchemaBase, total=False): write_disposition: TWriteDisposition references: Optional[TTableReferenceParam] + incremental: Optional[IncrementalArgs] _x_prepared: bool # needed for the type checker diff --git a/dlt/common/incremental/__init__.py b/dlt/common/incremental/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/dlt/extract/incremental/typing.py b/dlt/common/incremental/typing.py similarity index 66% rename from dlt/extract/incremental/typing.py rename to dlt/common/incremental/typing.py index 7b7786b529..460e2f234b 100644 --- a/dlt/extract/incremental/typing.py +++ b/dlt/common/incremental/typing.py @@ -2,9 +2,7 @@ from typing import Any, Callable, List, Literal, Optional, Sequence, TypeVar, Union -from dlt.common.schema.typing import TColumnNames -from dlt.common.typing import TSortOrder -from dlt.extract.items import TTableHintTemplate +from dlt.common.typing import TSortOrder, TTableHintTemplate, TColumnNames TCursorValue = TypeVar("TCursorValue", bound=Any) LastValueFunc = Callable[[Sequence[TCursorValue]], Any] @@ -19,10 +17,12 @@ class IncrementalColumnState(TypedDict): class IncrementalArgs(TypedDict, total=False): cursor_path: str - initial_value: Optional[str] - last_value_func: Optional[LastValueFunc[str]] + initial_value: Optional[Any] + last_value_func: Optional[Union[LastValueFunc[str], Literal["min", "max"]]] + """Last value callable or name of built in function""" primary_key: Optional[TTableHintTemplate[TColumnNames]] - end_value: Optional[str] + end_value: Optional[Any] row_order: Optional[TSortOrder] allow_external_schedulers: Optional[bool] lag: Optional[Union[float, int]] + on_cursor_value_missing: Optional[OnCursorValueMissing] diff --git a/dlt/common/pipeline.py b/dlt/common/pipeline.py index dba1036f85..9d3d5792ea 100644 --- a/dlt/common/pipeline.py +++ b/dlt/common/pipeline.py @@ -48,7 +48,6 @@ ) from dlt.common.schema import Schema from dlt.common.schema.typing import ( - TColumnNames, TColumnSchema, TWriteDispositionConfig, TSchemaContract, @@ -56,7 +55,7 @@ from dlt.common.storages.load_package import ParsedLoadJobFileName from dlt.common.storages.load_storage import LoadPackageInfo from dlt.common.time import ensure_pendulum_datetime, precise_time -from dlt.common.typing import DictStrAny, REPattern, StrAny, SupportsHumanize +from dlt.common.typing import DictStrAny, REPattern, StrAny, SupportsHumanize, TColumnNames from dlt.common.jsonpath import delete_matches, TAnyJsonPath from dlt.common.data_writers.writers import TLoaderFileFormat from dlt.common.utils import RowCounts, merge_row_counts diff --git a/dlt/common/schema/typing.py b/dlt/common/schema/typing.py index ed6c1c6d78..41fbd5840b 100644 --- a/dlt/common/schema/typing.py +++ b/dlt/common/schema/typing.py @@ -20,6 +20,7 @@ from dlt.common.data_types import TDataType from dlt.common.normalizers.typing import TNormalizersConfig from dlt.common.typing import TSortOrder, TAnyDateTime, TLoaderFileFormat +from dlt.common.incremental.typing import IncrementalArgs try: from pydantic import BaseModel as _PydanticBaseModel @@ -132,8 +133,6 @@ class TColumnPropInfo(NamedTuple): "timestamp", "iso_timestamp", "iso_date", "large_integer", "hexbytes_to_text", "wei_to_double" ] TTypeDetectionFunc = Callable[[Type[Any], Any], Optional[TDataType]] -TColumnNames = Union[str, Sequence[str]] -"""A string representing a column name or a list of""" class TColumnType(TypedDict, total=False): @@ -279,6 +278,7 @@ class TTableSchema(_TTableSchemaBase, total=False): write_disposition: Optional[TWriteDisposition] references: Optional[TTableReferenceParam] + incremental: Optional[IncrementalArgs] class TPartialTableSchema(TTableSchema): diff --git a/dlt/common/typing.py b/dlt/common/typing.py index 94edb57194..a3364d1b07 100644 --- a/dlt/common/typing.py +++ b/dlt/common/typing.py @@ -29,6 +29,7 @@ Iterator, Generator, NamedTuple, + Sequence, ) from typing_extensions import ( @@ -112,6 +113,8 @@ class SecretSentinel: TSecretStrValue = Annotated[str, SecretSentinel] +TColumnNames = Union[str, Sequence[str]] +"""A string representing a column name or a list of""" TDataItem: TypeAlias = Any """A single data item as extracted from data source""" TDataItems: TypeAlias = Union[TDataItem, List[TDataItem]] @@ -126,6 +129,10 @@ class SecretSentinel: TLoaderFileFormat = Literal["jsonl", "typed-jsonl", "insert_values", "parquet", "csv", "reference"] """known loader file formats""" +TDynHintType = TypeVar("TDynHintType") +TFunHintTemplate = Callable[[TDataItem], TDynHintType] +TTableHintTemplate = Union[TDynHintType, TFunHintTemplate[TDynHintType]] + class ConfigValueSentinel(NamedTuple): """Class to create singleton sentinel for config and secret injected value""" diff --git a/dlt/destinations/impl/bigquery/bigquery_adapter.py b/dlt/destinations/impl/bigquery/bigquery_adapter.py index 5f6a1fab85..05b26530d9 100644 --- a/dlt/destinations/impl/bigquery/bigquery_adapter.py +++ b/dlt/destinations/impl/bigquery/bigquery_adapter.py @@ -4,10 +4,8 @@ from dlt.common.destination import PreparedTableSchema from dlt.common.pendulum import timezone -from dlt.common.schema.typing import ( - TColumnNames, - TTableSchemaColumns, -) +from dlt.common.schema.typing import TTableSchemaColumns +from dlt.common.typing import TColumnNames from dlt.destinations.utils import get_resource_for_adapter from dlt.extract import DltResource from dlt.extract.items import TTableHintTemplate diff --git a/dlt/destinations/impl/lancedb/lancedb_adapter.py b/dlt/destinations/impl/lancedb/lancedb_adapter.py index 4314dd703f..d192168d0a 100644 --- a/dlt/destinations/impl/lancedb/lancedb_adapter.py +++ b/dlt/destinations/impl/lancedb/lancedb_adapter.py @@ -1,6 +1,7 @@ from typing import Any, Dict -from dlt.common.schema.typing import TColumnNames, TTableSchemaColumns +from dlt.common.schema.typing import TTableSchemaColumns +from dlt.common.typing import TColumnNames from dlt.destinations.utils import get_resource_for_adapter from dlt.extract import DltResource from dlt.extract.items import TTableHintTemplate diff --git a/dlt/destinations/impl/qdrant/qdrant_adapter.py b/dlt/destinations/impl/qdrant/qdrant_adapter.py index bbc2d719a8..5a5a44965c 100644 --- a/dlt/destinations/impl/qdrant/qdrant_adapter.py +++ b/dlt/destinations/impl/qdrant/qdrant_adapter.py @@ -1,6 +1,7 @@ from typing import Any -from dlt.common.schema.typing import TColumnNames, TTableSchemaColumns +from dlt.common.schema.typing import TTableSchemaColumns +from dlt.common.typing import TColumnNames from dlt.extract import DltResource from dlt.destinations.utils import get_resource_for_adapter diff --git a/dlt/destinations/impl/weaviate/weaviate_adapter.py b/dlt/destinations/impl/weaviate/weaviate_adapter.py index 0ca9047528..329d13c493 100644 --- a/dlt/destinations/impl/weaviate/weaviate_adapter.py +++ b/dlt/destinations/impl/weaviate/weaviate_adapter.py @@ -1,6 +1,7 @@ from typing import Dict, Any, Literal, Set, get_args -from dlt.common.schema.typing import TColumnNames, TTableSchemaColumns +from dlt.common.schema.typing import TTableSchemaColumns +from dlt.common.typing import TColumnNames from dlt.extract import DltResource, resource as make_resource from dlt.destinations.utils import get_resource_for_adapter diff --git a/dlt/extract/decorators.py b/dlt/extract/decorators.py index 63140e8f78..ee2c427f98 100644 --- a/dlt/extract/decorators.py +++ b/dlt/extract/decorators.py @@ -32,7 +32,6 @@ from dlt.common.schema.utils import DEFAULT_WRITE_DISPOSITION from dlt.common.schema.schema import Schema from dlt.common.schema.typing import ( - TColumnNames, TFileFormat, TWriteDisposition, TWriteDispositionConfig, @@ -43,7 +42,8 @@ ) from dlt.common.storages.exceptions import SchemaNotFoundError from dlt.common.storages.schema_storage import SchemaStorage -from dlt.common.typing import AnyFun, ParamSpec, Concatenate, TDataItem, TDataItems +from dlt.common.typing import AnyFun, ParamSpec, Concatenate, TDataItem, TDataItems, TColumnNames + from dlt.common.utils import get_callable_name, get_module_name, is_inner_callable from dlt.extract.hints import make_hints @@ -70,6 +70,7 @@ TSourceFunParams, ) from dlt.extract.resource import DltResource, TUnboundDltResource, TDltResourceImpl +from dlt.extract.incremental import TIncrementalConfig @configspec @@ -446,6 +447,7 @@ def resource( selected: bool = True, spec: Type[BaseConfiguration] = None, parallelized: bool = False, + incremental: Optional[TIncrementalConfig] = None, _impl_cls: Type[TDltResourceImpl] = DltResource, # type: ignore[assignment] ) -> TDltResourceImpl: ... @@ -468,6 +470,7 @@ def resource( selected: bool = True, spec: Type[BaseConfiguration] = None, parallelized: bool = False, + incremental: Optional[TIncrementalConfig] = None, _impl_cls: Type[TDltResourceImpl] = DltResource, # type: ignore[assignment] ) -> Callable[[Callable[TResourceFunParams, Any]], TDltResourceImpl]: ... @@ -490,6 +493,7 @@ def resource( selected: bool = True, spec: Type[BaseConfiguration] = None, parallelized: bool = False, + incremental: Optional[TIncrementalConfig] = None, _impl_cls: Type[TDltResourceImpl] = DltResource, # type: ignore[assignment] standalone: Literal[True] = True, ) -> Callable[ @@ -515,6 +519,7 @@ def resource( selected: bool = True, spec: Type[BaseConfiguration] = None, parallelized: bool = False, + incremental: Optional[TIncrementalConfig] = None, _impl_cls: Type[TDltResourceImpl] = DltResource, # type: ignore[assignment] ) -> TDltResourceImpl: ... @@ -536,6 +541,7 @@ def resource( selected: bool = True, spec: Type[BaseConfiguration] = None, parallelized: bool = False, + incremental: Optional[TIncrementalConfig] = None, _impl_cls: Type[TDltResourceImpl] = DltResource, # type: ignore[assignment] standalone: bool = False, data_from: TUnboundDltResource = None, @@ -642,7 +648,9 @@ def make_resource(_name: str, _section: str, _data: Any) -> TDltResourceImpl: selected, cast(DltResource, data_from), True, + incremental=incremental, ) + # If custom nesting level was specified then # we need to add it to table hints so that # later in normalizer dlt/common/normalizers/json/relational.py diff --git a/dlt/extract/extract.py b/dlt/extract/extract.py index e65f6cf0d0..9067079672 100644 --- a/dlt/extract/extract.py +++ b/dlt/extract/extract.py @@ -2,7 +2,7 @@ from collections.abc import Sequence as C_Sequence from copy import copy import itertools -from typing import Iterator, List, Dict, Any, Optional +from typing import Iterator, List, Dict, Any, Optional, Mapping import yaml from dlt.common.configuration.container import Container @@ -17,13 +17,12 @@ WithStepInfo, reset_resource_state, ) -from dlt.common.typing import DictStrAny +from dlt.common.typing import DictStrAny, TColumnNames from dlt.common.runtime import signals from dlt.common.runtime.collector import Collector, NULL_COLLECTOR from dlt.common.schema import Schema, utils from dlt.common.schema.typing import ( TAnySchemaColumns, - TColumnNames, TSchemaContract, TTableFormat, TWriteDispositionConfig, @@ -39,7 +38,7 @@ from dlt.extract.decorators import SourceInjectableContext, SourceSchemaInjectableContext from dlt.extract.exceptions import DataItemRequiredForDynamicTableHints -from dlt.extract.incremental import IncrementalResourceWrapper +from dlt.extract.incremental import IncrementalResourceWrapper, Incremental from dlt.extract.pipe_iterator import PipeIterator from dlt.extract.source import DltSource from dlt.extract.resource import DltResource @@ -238,7 +237,10 @@ def _compute_metrics(self, load_id: str, source: DltSource) -> ExtractMetrics: hint = hint.incremental # sometimes internal incremental is not bound if hint: - hints[name] = dict(hint) # type: ignore[call-overload] + if isinstance(hint, Incremental): + hints[name] = hint.to_table_hint() + else: + hints[name] = dict(hint) # type: ignore[call-overload] continue if name == "original_columns": # this is original type of the columns ie. Pydantic model diff --git a/dlt/extract/hints.py b/dlt/extract/hints.py index 5daabd0c6a..a8acb98f2c 100644 --- a/dlt/extract/hints.py +++ b/dlt/extract/hints.py @@ -4,7 +4,6 @@ from dlt.common import logger from dlt.common.schema.typing import ( C_DLT_ID, - TColumnNames, TColumnProp, TFileFormat, TPartialTableSchema, @@ -28,7 +27,7 @@ new_column, new_table, ) -from dlt.common.typing import TDataItem +from dlt.common.typing import TDataItem, TColumnNames from dlt.common.time import ensure_pendulum_datetime from dlt.common.utils import clone_dict_nested from dlt.common.normalizers.json.relational import DataItemNormalizer @@ -204,6 +203,10 @@ def compute_table_schema(self, item: TDataItem = None, meta: Any = None) -> TTab for k, v in table_template.items() if k not in NATURAL_CALLABLES } # type: ignore + if "incremental" in table_template: + incremental = table_template["incremental"] + if isinstance(incremental, Incremental) and incremental is not Incremental.EMPTY: + resolved_template["incremental"] = incremental table_schema = self._create_table_schema(resolved_template, self.name) migrate_complex_types(table_schema, warn=True) validate_dict_ignoring_xkeys( @@ -518,6 +521,9 @@ def _create_table_schema(resource_hints: TResourceHints, resource_name: str) -> "disposition": resource_hints["write_disposition"] } # wrap in dict DltResourceHints._merge_write_disposition_dict(resource_hints) # type: ignore[arg-type] + if "incremental" in resource_hints: + if isinstance(resource_hints["incremental"], Incremental): + resource_hints["incremental"] = resource_hints["incremental"].to_table_hint() # type: ignore[typeddict-item] dict_ = cast(TTableSchema, resource_hints) dict_["resource"] = resource_name return dict_ diff --git a/dlt/extract/incremental/__init__.py b/dlt/extract/incremental/__init__.py index 9d73269a20..942c225951 100644 --- a/dlt/extract/incremental/__init__.py +++ b/dlt/extract/incremental/__init__.py @@ -1,6 +1,6 @@ import os from datetime import datetime # noqa: I251 -from typing import Generic, ClassVar, Any, Optional, Type, Dict, Union +from typing import Generic, ClassVar, Any, Optional, Type, Dict, Union, Literal, Tuple from typing_extensions import get_args import inspect @@ -19,8 +19,8 @@ get_generic_type_argument_from_instance, is_optional_type, is_subclass, + TColumnNames, ) -from dlt.common.schema.typing import TColumnNames from dlt.common.configuration import configspec, ConfigurationValueError from dlt.common.configuration.specs import BaseConfiguration from dlt.common.pipeline import resource_state @@ -29,17 +29,19 @@ coerce_value, py_type_to_sc_type, ) +from dlt.common.utils import without_none from dlt.extract.exceptions import IncrementalUnboundError from dlt.extract.incremental.exceptions import ( IncrementalCursorPathMissing, IncrementalPrimaryKeyMissing, ) -from dlt.extract.incremental.typing import ( +from dlt.common.incremental.typing import ( IncrementalColumnState, TCursorValue, LastValueFunc, OnCursorValueMissing, + IncrementalArgs, ) from dlt.extract.items import SupportsPipe, TTableHintTemplate, ItemTransform from dlt.extract.incremental.transform import ( @@ -122,7 +124,7 @@ def __init__( self, cursor_path: str = None, initial_value: Optional[TCursorValue] = None, - last_value_func: Optional[LastValueFunc[TCursorValue]] = max, + last_value_func: Optional[Union[LastValueFunc[TCursorValue], Literal["min", "max"]]] = max, primary_key: Optional[TTableHintTemplate[TColumnNames]] = None, end_value: Optional[TCursorValue] = None, row_order: Optional[TSortOrder] = None, @@ -134,6 +136,16 @@ def __init__( if cursor_path: compile_path(cursor_path) self.cursor_path = cursor_path + if isinstance(last_value_func, str): + if last_value_func == "min": + last_value_func = min + elif last_value_func == "max": + last_value_func = max + else: + raise ValueError( + f"Unknown last_value_func '{last_value_func}' passed as string. Provide a" + " callable to use a custom function." + ) self.last_value_func = last_value_func self.initial_value = initial_value """Initial value of last_value""" @@ -165,6 +177,17 @@ def __init__( self._bound_pipe: SupportsPipe = None """Bound pipe""" + def to_table_hint(self) -> Optional[IncrementalArgs]: + """Table hint is only returned when all properties are serializable""" + if self.last_value_func not in (min, max): + logger.warning( + "Custom last_value_func %s is not serializable. Incremental hint will not be saved" + " in schema.", + self.last_value_func, + ) + return None + return without_none(dict(self, last_value_func=self.last_value_func.__name__)) # type: ignore[return-value] + @property def primary_key(self) -> Optional[TTableHintTemplate[TColumnNames]]: return self._primary_key @@ -490,6 +513,12 @@ def can_close(self) -> bool: and self.start_out_of_range ) + @classmethod + def ensure_instance(cls, value: "TIncrementalConfig") -> "Incremental[TCursorValue]": + if isinstance(value, Incremental): + return value + return cls(**value) + def __str__(self) -> str: return ( f"Incremental at 0x{id(self):x} for resource {self.resource_name} with cursor path:" @@ -539,6 +568,8 @@ def __call__(self, rows: TDataItems, meta: Any = None) -> Optional[TDataItems]: Incremental.EMPTY = Incremental[Any]() Incremental.EMPTY.__is_resolved__ = True +TIncrementalConfig = Union[Incremental[Any], IncrementalArgs] + class IncrementalResourceWrapper(ItemTransform[TDataItem]): placement_affinity: ClassVar[float] = 1 # stick to end @@ -578,6 +609,34 @@ def get_incremental_arg(sig: inspect.Signature) -> Optional[inspect.Parameter]: break return incremental_param + @staticmethod + def inject_implicit_incremental_arg( + incremental: Optional[Union[Incremental[Any], "IncrementalResourceWrapper"]], + sig: inspect.Signature, + func_args: Tuple[Any], + func_kwargs: Dict[str, Any], + fallback: Optional[Incremental[Any]] = None, + ) -> Tuple[Tuple[Any], Dict[str, Any], Optional[Incremental[Any]]]: + """Inject the incremental instance into function arguments + if the function has an incremental argument without default in its signature and it is not already set in the arguments. + + Returns: + Tuple of the new args, kwargs and the incremental instance that was injected (if any) + """ + if isinstance(incremental, IncrementalResourceWrapper): + incremental = incremental.incremental + if not incremental: + if not fallback: + return func_args, func_kwargs, None + incremental = fallback + incremental_param = IncrementalResourceWrapper.get_incremental_arg(sig) + if incremental_param: + bound_args = sig.bind_partial(*func_args, **func_kwargs) + if not bound_args.arguments.get(incremental_param.name): + bound_args.arguments[incremental_param.name] = incremental + return bound_args.args, bound_args.kwargs, incremental + return func_args, func_kwargs, None + def wrap(self, sig: inspect.Signature, func: TFun) -> TFun: """Wrap the callable to inject an `Incremental` object configured for the resource.""" incremental_param = self.get_incremental_arg(sig) @@ -649,12 +708,14 @@ def incremental(self) -> Optional[Incremental[Any]]: return self._incremental def set_incremental( - self, incremental: Optional[Incremental[Any]], from_hints: bool = False + self, incremental: Optional[TIncrementalConfig], from_hints: bool = False ) -> None: """Sets the incremental. If incremental was set from_hints, it can only be changed in the same manner""" if self._from_hints and not from_hints: # do not accept incremental if apply hints were used return + if incremental is not None: + incremental = Incremental.ensure_instance(incremental) self._from_hints = from_hints self._incremental = incremental @@ -693,6 +754,12 @@ def __call__(self, item: TDataItems, meta: Any = None) -> Optional[TDataItems]: return self._incremental(item, meta) +def incremental_config_to_instance(cfg: TIncrementalConfig) -> Incremental[Any]: + if isinstance(cfg, Incremental): + return cfg + return Incremental(**cfg) + + __all__ = [ "Incremental", "IncrementalResourceWrapper", @@ -700,6 +767,7 @@ def __call__(self, item: TDataItems, meta: Any = None) -> Optional[TDataItems]: "IncrementalCursorPathMissing", "IncrementalPrimaryKeyMissing", "IncrementalUnboundError", + "TIncrementalconfig", "LastValueFunc", "TCursorValue", ] diff --git a/dlt/extract/incremental/transform.py b/dlt/extract/incremental/transform.py index fe15571e41..6844fff9e4 100644 --- a/dlt/extract/incremental/transform.py +++ b/dlt/extract/incremental/transform.py @@ -5,7 +5,7 @@ from dlt.common.utils import digest128 from dlt.common.json import json from dlt.common.pendulum import pendulum -from dlt.common.typing import TDataItem +from dlt.common.typing import TDataItem, TColumnNames from dlt.common.jsonpath import find_values, JSONPathFields, compile_path from dlt.extract.incremental.exceptions import ( IncrementalCursorInvalidCoercion, @@ -13,10 +13,9 @@ IncrementalPrimaryKeyMissing, IncrementalCursorPathHasValueNone, ) -from dlt.extract.incremental.typing import TCursorValue, LastValueFunc, OnCursorValueMissing +from dlt.common.incremental.typing import TCursorValue, LastValueFunc, OnCursorValueMissing from dlt.extract.utils import resolve_column_value from dlt.extract.items import TTableHintTemplate -from dlt.common.schema.typing import TColumnNames try: from dlt.common.libs import pyarrow diff --git a/dlt/extract/items.py b/dlt/extract/items.py index d721e8094e..888787e6b7 100644 --- a/dlt/extract/items.py +++ b/dlt/extract/items.py @@ -19,7 +19,14 @@ ) from concurrent.futures import Future -from dlt.common.typing import TAny, TDataItem, TDataItems +from dlt.common.typing import ( + TAny, + TDataItem, + TDataItems, + TTableHintTemplate, + TFunHintTemplate, + TDynHintType, +) TDecompositionStrategy = Literal["none", "scc"] @@ -27,9 +34,6 @@ TAwaitableDataItems = Awaitable[TDataItems] TPipedDataItems = Union[TDataItems, TDeferredDataItems, TAwaitableDataItems] -TDynHintType = TypeVar("TDynHintType") -TFunHintTemplate = Callable[[TDataItem], TDynHintType] -TTableHintTemplate = Union[TDynHintType, TFunHintTemplate[TDynHintType]] if TYPE_CHECKING: TItemFuture = Future[TPipedDataItems] diff --git a/dlt/extract/resource.py b/dlt/extract/resource.py index c6ca1660f4..1f06925b1f 100644 --- a/dlt/extract/resource.py +++ b/dlt/extract/resource.py @@ -28,6 +28,7 @@ pipeline_state, ) from dlt.common.utils import flatten_list_or_items, get_callable_name, uniq_id +from dlt.common.schema.typing import TTableSchema from dlt.extract.utils import wrap_async_iterator, wrap_parallel_iterator from dlt.extract.items import ( @@ -43,7 +44,7 @@ from dlt.extract.pipe_iterator import ManagedPipeIterator from dlt.extract.pipe import Pipe, TPipeStep from dlt.extract.hints import DltResourceHints, HintsMeta, TResourceHints -from dlt.extract.incremental import Incremental, IncrementalResourceWrapper +from dlt.extract.incremental import Incremental, IncrementalResourceWrapper, TIncrementalConfig from dlt.extract.exceptions import ( InvalidTransformerDataTypeGeneratorFunctionRequired, InvalidParentResourceDataType, @@ -102,6 +103,7 @@ def __init__( section: str = None, args_bound: bool = False, SPEC: Type[BaseConfiguration] = None, + incremental: Optional[TIncrementalConfig] = None, ) -> None: self.section = section self.selected = selected @@ -110,6 +112,9 @@ def __init__( self._explicit_args: DictStrAny = None self.SPEC = SPEC self.source_name = None + self._initial_incremental: Optional[Incremental[Any]] = ( + Incremental.ensure_instance(incremental) if incremental else None + ) super().__init__(hints) @classmethod @@ -122,6 +127,7 @@ def from_data( selected: bool = True, data_from: Union["DltResource", Pipe] = None, inject_config: bool = False, + incremental: Optional[TIncrementalConfig] = None, ) -> Self: """Creates an instance of DltResource from compatible `data` with a given `name` and `section`. @@ -139,7 +145,7 @@ def from_data( if isinstance(data, Pipe): SPEC_ = None if data.is_empty else get_fun_spec(data.gen) # type: ignore[arg-type] - r_ = cls(data, hints, selected, section=section, SPEC=SPEC_) + r_ = cls(data, hints, selected, section=section, SPEC=SPEC_, incremental=incremental) if inject_config: r_._inject_config() return r_ @@ -178,6 +184,7 @@ def from_data( section=section, args_bound=not callable(data), SPEC=get_fun_spec(data), + incremental=incremental, ) if inject_config: r_._inject_config() @@ -442,35 +449,51 @@ def add_step( self._pipe.insert_step(item_transform, insert_at) return self + def set_incremental( + self, new_incremental: Incremental[Any], from_hints: bool = False + ) -> Optional[Incremental[Any]]: + """Set/replace the incremental transform for the resource. + + Args: + new_incremental: The Incremental instance/hint to set or replace + from_hints: If the incremental is set from hints. Defaults to False. + fixed: The Incremental is fixed on the resource (I.e. set from `@resource` decorator) + Ensures it will be persisted when cloning the resource. + """ + if new_incremental is Incremental.EMPTY: + new_incremental = None + incremental = self.incremental + if incremental is not None: + if isinstance(incremental, IncrementalResourceWrapper): + incremental.set_incremental(new_incremental, from_hints=from_hints) + else: + step_no = self._pipe.find(Incremental) + self._pipe.remove_step(step_no) + # re-add the step + incremental = None + if incremental is None: + # if there's no wrapper add incremental as a transform + if new_incremental: + new_incremental = Incremental.ensure_instance(new_incremental) + self.add_step(new_incremental) + # if fixed: + # # Make sure incremental is added to hints so cloning reosurce will keep it + # self._set_hints(self._hints | {"incremental": new_incremental}) + return new_incremental + def _set_hints( self, table_schema_template: TResourceHints, create_table_variant: bool = False ) -> None: super()._set_hints(table_schema_template, create_table_variant) # validators and incremental apply only to resource hints if not create_table_variant: - incremental = self.incremental # try to late assign incremental if table_schema_template.get("incremental") is not None: - new_incremental = table_schema_template["incremental"] - # remove incremental if empty - if new_incremental is Incremental.EMPTY: - new_incremental = None - - if incremental is not None: - if isinstance(incremental, IncrementalResourceWrapper): - # replace in wrapper - incremental.set_incremental(new_incremental, from_hints=True) - else: - step_no = self._pipe.find(Incremental) - self._pipe.remove_step(step_no) - # re-add the step - incremental = None - - if incremental is None: - # if there's no wrapper add incremental as a transform - incremental = new_incremental # type: ignore - if new_incremental: - self.add_step(new_incremental) + incremental = self.set_incremental( + table_schema_template["incremental"], from_hints=True + ) + else: + incremental = self.incremental # type: ignore[assignment] if incremental: primary_key = table_schema_template.get("primary_key", incremental.primary_key) @@ -480,10 +503,39 @@ def _set_hints( if table_schema_template.get("validator") is not None: self.validator = table_schema_template["validator"] + def compute_table_schema(self, item: TDataItem = None, meta: Any = None) -> TTableSchema: + table_schema = super().compute_table_schema(item, meta) + if self.incremental and "incremental" not in table_schema: + incremental = self.incremental + if isinstance(incremental, IncrementalResourceWrapper): + incremental = incremental.incremental # type: ignore[assignment] + if incremental: + table_schema["incremental"] = incremental.to_table_hint() # type: ignore[attr-defined] + + return table_schema + def bind(self: TDltResourceImpl, *args: Any, **kwargs: Any) -> TDltResourceImpl: """Binds the parametrized resource to passed arguments. Modifies resource pipe in place. Does not evaluate generators or iterators.""" if self._args_bound: raise TypeError(f"Parametrized resource {self.name} is not callable") + + incremental = self.incremental + + if self._initial_incremental and ( + not incremental + or (isinstance(incremental, IncrementalResourceWrapper) and not incremental.incremental) + ): + # Inject the incremental into function args when needed if it was set in decorator and it was not overriden explicitly + sig = inspect.signature(self._pipe.gen) # type: ignore[arg-type] + args, kwargs, injected = IncrementalResourceWrapper.inject_implicit_incremental_arg( + self._initial_incremental, + sig, + args, + kwargs, + ) + if not injected or injected == self._initial_incremental: + self.set_incremental(self._initial_incremental) + orig_gen = self._pipe.gen gen = self._pipe.bind_gen(*args, **kwargs) if isinstance(gen, DltResource): @@ -618,6 +670,8 @@ def _inject_config(self) -> "DltResource": sig = inspect.signature(gen) if IncrementalResourceWrapper.should_wrap(sig): incremental = IncrementalResourceWrapper(self._hints.get("primary_key")) + # if incr_hint := self._hints.get("incremental"): + # incremental.set_incremental(incr_hint, from_hints=True) incr_f = incremental.wrap(sig, gen) self.add_step(incremental) else: @@ -649,6 +703,7 @@ def _clone( if self._pipe and not self._pipe.is_empty: pipe = pipe._clone(new_name=new_name, with_parent=with_parent) # incremental and parent are already in the pipe (if any) + r_ = self.__class__( pipe, self._clone_hints(self._hints), @@ -656,6 +711,7 @@ def _clone( section=self.section, args_bound=self._args_bound, SPEC=self.SPEC, + incremental=self._initial_incremental, ) # try to eject and then inject configuration and incremental wrapper when resource is cloned # this makes sure that a take config values from a right section and wrapper has a separated diff --git a/dlt/extract/utils.py b/dlt/extract/utils.py index 55a8b0b8c4..68570d0995 100644 --- a/dlt/extract/utils.py +++ b/dlt/extract/utils.py @@ -22,8 +22,15 @@ from dlt.common.data_writers import TDataItemFormat from dlt.common.exceptions import MissingDependencyException from dlt.common.pipeline import reset_resource_state -from dlt.common.schema.typing import TColumnNames, TAnySchemaColumns, TTableSchemaColumns -from dlt.common.typing import AnyFun, DictStrAny, TDataItem, TDataItems, TAnyFunOrGenerator +from dlt.common.schema.typing import TAnySchemaColumns, TTableSchemaColumns +from dlt.common.typing import ( + AnyFun, + DictStrAny, + TDataItem, + TDataItems, + TAnyFunOrGenerator, + TColumnNames, +) from dlt.common.utils import get_callable_name from dlt.extract.exceptions import ( diff --git a/dlt/pipeline/pipeline.py b/dlt/pipeline/pipeline.py index 7c13d76e26..ff4eca8ecb 100644 --- a/dlt/pipeline/pipeline.py +++ b/dlt/pipeline/pipeline.py @@ -37,7 +37,6 @@ from dlt.common.exceptions import MissingDependencyException from dlt.common.runtime import signals, apply_runtime_config from dlt.common.schema.typing import ( - TColumnNames, TSchemaTables, TTableFormat, TWriteDispositionConfig, @@ -46,7 +45,7 @@ ) from dlt.common.schema.utils import normalize_schema_name from dlt.common.storages.exceptions import LoadPackageNotFound -from dlt.common.typing import ConfigValue, TFun, TSecretStrValue, is_optional_type +from dlt.common.typing import ConfigValue, TFun, TSecretStrValue, is_optional_type, TColumnNames from dlt.common.runners import pool_runner as runner from dlt.common.storages import ( LiveSchemaStorage, diff --git a/dlt/sources/rest_api/typing.py b/dlt/sources/rest_api/typing.py index d4cea892a3..e728888900 100644 --- a/dlt/sources/rest_api/typing.py +++ b/dlt/sources/rest_api/typing.py @@ -15,7 +15,7 @@ from dlt.common.schema.typing import ( TAnySchemaColumns, ) -from dlt.extract.incremental.typing import IncrementalArgs +from dlt.common.incremental.typing import IncrementalArgs from dlt.extract.items import TTableHintTemplate from dlt.extract.hints import TResourceHintsBase from dlt.sources.helpers.rest_client.auth import AuthConfigBase, TApiKeyLocation @@ -23,9 +23,8 @@ from dataclasses import dataclass, field from dlt.common import jsonpath -from dlt.common.typing import TSortOrder +from dlt.common.typing import TSortOrder, TColumnNames from dlt.common.schema.typing import ( - TColumnNames, TTableFormat, TAnySchemaColumns, TWriteDispositionConfig, @@ -33,7 +32,7 @@ ) from dlt.extract.items import TTableHintTemplate -from dlt.extract.incremental.typing import LastValueFunc +from dlt.common.incremental.typing import LastValueFunc from dlt.extract.resource import DltResource from requests import Session diff --git a/tests/common/test_validation.py b/tests/common/test_validation.py index 3f8ccfc20f..f3ebb02b46 100644 --- a/tests/common/test_validation.py +++ b/tests/common/test_validation.py @@ -19,13 +19,13 @@ from dlt.common import Decimal, jsonpath from dlt.common.exceptions import DictValidationException from dlt.common.schema.typing import ( - TColumnNames, TStoredSchema, TColumnSchema, TWriteDispositionConfig, ) from dlt.common.schema.utils import simple_regex_validator -from dlt.common.typing import DictStrStr, StrStr, TDataItem, TSortOrder +from dlt.common.typing import DictStrStr, StrStr, TDataItem, TSortOrder, TColumnNames + from dlt.common.validation import validate_dict, validate_dict_ignoring_xkeys diff --git a/tests/extract/test_incremental.py b/tests/extract/test_incremental.py index 683cea51bc..58cfc180ee 100644 --- a/tests/extract/test_incremental.py +++ b/tests/extract/test_incremental.py @@ -5,7 +5,7 @@ from datetime import datetime, date # noqa: I251 from itertools import chain, count from time import sleep -from typing import Any, Optional +from typing import Any, Optional, Literal, Sequence, Dict from unittest import mock import duckdb @@ -1468,10 +1468,13 @@ def test_apply_hints_incremental(item_type: TestDataItemFormat) -> None: data = [{"created_at": 1}, {"created_at": 2}, {"created_at": 3}] source_items = data_to_item_format(item_type, data) + should_have_arg = True + @dlt.resource def some_data(created_at: Optional[dlt.sources.incremental[int]] = None): # make sure that incremental from apply_hints is here - if created_at is not None: + if should_have_arg: + assert created_at is not None assert created_at.cursor_path == "created_at" assert created_at.last_value_func is max yield source_items @@ -1505,6 +1508,7 @@ def some_data(created_at: Optional[dlt.sources.incremental[int]] = None): assert list(r) == [] # remove incremental + should_have_arg = False r.apply_hints(incremental=dlt.sources.incremental.EMPTY) assert r.incremental is not None assert r.incremental.incremental is None @@ -1515,6 +1519,7 @@ def some_data(created_at: Optional[dlt.sources.incremental[int]] = None): # as above but we provide explicit incremental when creating resource p = p.drop() + should_have_arg = True r = some_data(created_at=dlt.sources.incremental("created_at", last_value_func=min)) # hints have precedence, as expected r.apply_hints(incremental=dlt.sources.incremental("created_at", last_value_func=max)) @@ -3539,3 +3544,203 @@ def test_apply_lag() -> None: assert apply_lag(2, 0, 1, max) == 0 assert apply_lag(1, 2, 1, min) == 2 assert apply_lag(2, 2, 1, min) == 2 + + +def _resource_for_table_hint( + hint_type: Literal[ + "default_arg", "explicit_arg", "apply_hints", "default_arg_override", "decorator" + ], + data: Sequence[Dict[str, Any]], + incremental_arg: dlt.sources.incremental[Any], + incremental_arg_default: dlt.sources.incremental[Any] = None, +) -> DltResource: + if incremental_arg is None and incremental_arg_default is None: + raise ValueError("One of the incremental arguments must be provided.") + + decorator_arg = None + if hint_type == "default_arg": + default_arg = incremental_arg_default + override_arg = None + elif hint_type == "default_arg_override": + default_arg = incremental_arg_default + override_arg = incremental_arg + elif hint_type == "decorator": + default_arg = None + override_arg = None + decorator_arg = incremental_arg_default + else: + default_arg = None + override_arg = incremental_arg + + @dlt.resource(incremental=decorator_arg) + def some_data( + updated_at: dlt.sources.incremental[Any] = default_arg, + ) -> Any: + yield data_to_item_format("object", data) + + if override_arg is None: + return some_data() + + if hint_type == "apply_hints": + rs = some_data() + rs.apply_hints(incremental=override_arg) + return rs + + return some_data(updated_at=override_arg) + + +@pytest.mark.parametrize( + "hint_type", ["default_arg", "explicit_arg", "apply_hints", "default_arg_override", "decorator"] +) +@pytest.mark.parametrize( + "incremental_settings", + [ + { + "last_value_func": "min", + "row_order": "desc", + "on_cursor_value_missing": "include", + }, + {"last_value_func": "max", "on_cursor_value_missing": "raise"}, + ], +) +def test_incremental_table_hint_datetime_column( + hint_type: Literal[ + "default_arg", + "explicit_arg", + "default_arg_override", + "apply_hints", + "decorator", + ], + incremental_settings: Dict[str, Any], +) -> None: + initial_value_override = pendulum.now() + initial_value_default = pendulum.now().subtract(seconds=10) + initial_value_in_hint = ( + initial_value_default + if hint_type in ("default_arg", "decorator") + else initial_value_override + ).isoformat() + rs = _resource_for_table_hint( + hint_type, + [{"updated_at": pendulum.now().add(seconds=i)} for i in range(1, 12)], + dlt.sources.incremental( + "updated_at", initial_value=initial_value_override, **incremental_settings + ), + dlt.sources.incremental( + "updated_at", initial_value=initial_value_default, **incremental_settings + ), + ) + + pipeline = dlt.pipeline(pipeline_name=uniq_id()) + pipeline.extract(rs) + + table_schema = pipeline.default_schema.tables["some_data"] + + expected = { + "cursor_path": "updated_at", + "allow_external_schedulers": False, + "initial_value": initial_value_in_hint, + "last_value_func": incremental_settings["last_value_func"], + "on_cursor_value_missing": incremental_settings["on_cursor_value_missing"], + } + if incremental_settings.get("row_order"): + expected["row_order"] = incremental_settings["row_order"] + + assert table_schema["incremental"] == expected + + +def incremental_instance_or_dict(use_dict: bool, **kwargs): + if use_dict: + return kwargs + return dlt.sources.incremental(**kwargs) + + +@pytest.mark.parametrize("use_dict", [True, False]) +def test_incremental_in_resource_decorator(use_dict: bool) -> None: + @dlt.resource( + incremental=incremental_instance_or_dict( + use_dict, cursor_path="value", initial_value=5, last_value_func=min + ) + ) + def with_required_incremental_arg(incremental: dlt.sources.incremental[int]): + assert incremental.last_value == 5 + assert incremental.last_value_func == min + yield [{"value": i} for i in range(10)] + + # Don't pass the argument, use the decorator settings + result = list(with_required_incremental_arg()) + assert result == [{"value": i} for i in range(0, 6)] + + result = list(with_required_incremental_arg(incremental=None)) + assert result == [{"value": i} for i in range(0, 6)] + + # Incremental set in decorator, without any arguments + @dlt.resource( + incremental=incremental_instance_or_dict( + use_dict, cursor_path="value", initial_value=5, last_value_func=min + ) + ) + def no_incremental_arg(): + yield [{"value": i} for i in range(10)] + + result = list(no_incremental_arg()) + # filtering is applied + assert result == [{"value": i} for i in range(0, 6)] + + # Apply hints overrides the decorator settings + rs = no_incremental_arg() + rs.apply_hints( + incremental=incremental_instance_or_dict( + use_dict, cursor_path="value", initial_value=3, last_value_func=max + ) + ) + result = list(rs) + assert result == [{"value": i} for i in range(3, 10)] + + @dlt.resource( + incremental=incremental_instance_or_dict( + use_dict, cursor_path="value", initial_value=5, last_value_func=min + ) + ) + def with_optional_incremental_arg(incremental: Optional[dlt.sources.incremental[int]] = None): + assert incremental is not None + yield [{"value": i} for i in range(10)] + + # Decorator settings are used + result = list(with_optional_incremental_arg()) + assert result == [{"value": i} for i in range(0, 6)] + + +@pytest.mark.parametrize("use_dict", [True, False]) +def test_incremental_in_resource_decorator_default_arg(use_dict: bool) -> None: + @dlt.resource( + incremental=incremental_instance_or_dict( + use_dict, cursor_path="value", initial_value=5, last_value_func=min + ) + ) + def with_default_incremental_arg( + incremental: dlt.sources.incremental[int] = dlt.sources.incremental( + "value", initial_value=3, last_value_func=min + ) + ): + assert incremental.last_value == initial_value + assert incremental.last_value_func == last_value_func + yield [{"value": i} for i in range(10)] + + last_value_func = max + initial_value = 4 + # Explicit argument overrides the default and decorator argument + result = list( + with_default_incremental_arg( + incremental=dlt.sources.incremental( + "value", initial_value=initial_value, last_value_func=last_value_func + ) + ) + ) + assert result == [{"value": i} for i in range(4, 10)] + + # Decorator param overrides function default arg + last_value_func = min + initial_value = 5 + result = list(with_default_incremental_arg()) + assert result == [{"value": i} for i in range(0, 6)]