Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Incremental table hints and incremental in resource decorator #2033

Open
wants to merge 2 commits into
base: devel
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion dlt/common/destination/typing.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
from typing import Optional

from dlt.common.schema.typing import _TTableSchemaBase, TWriteDisposition, TTableReferenceParam
from dlt.common.schema.typing import (
_TTableSchemaBase,
TWriteDisposition,
TTableReferenceParam,
IncrementalArgs,
)


class PreparedTableSchema(_TTableSchemaBase, total=False):
"""Table schema with all hints prepared to be loaded"""

write_disposition: TWriteDisposition
references: Optional[TTableReferenceParam]
incremental: Optional[IncrementalArgs]
_x_prepared: bool # needed for the type checker
Empty file.
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@

from typing import Any, Callable, List, Literal, Optional, Sequence, TypeVar, Union

from dlt.common.schema.typing import TColumnNames
from dlt.common.typing import TSortOrder
from dlt.extract.items import TTableHintTemplate
from dlt.common.typing import TSortOrder, TTableHintTemplate, TColumnNames

TCursorValue = TypeVar("TCursorValue", bound=Any)
LastValueFunc = Callable[[Sequence[TCursorValue]], Any]
Expand All @@ -19,10 +17,12 @@ class IncrementalColumnState(TypedDict):

class IncrementalArgs(TypedDict, total=False):
cursor_path: str
initial_value: Optional[str]
last_value_func: Optional[LastValueFunc[str]]
initial_value: Optional[Any]
last_value_func: Optional[Union[LastValueFunc[str], Literal["min", "max"]]]
"""Last value callable or name of built in function"""
primary_key: Optional[TTableHintTemplate[TColumnNames]]
end_value: Optional[str]
end_value: Optional[Any]
row_order: Optional[TSortOrder]
allow_external_schedulers: Optional[bool]
lag: Optional[Union[float, int]]
on_cursor_value_missing: Optional[OnCursorValueMissing]
3 changes: 1 addition & 2 deletions dlt/common/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,14 @@
)
from dlt.common.schema import Schema
from dlt.common.schema.typing import (
TColumnNames,
TColumnSchema,
TWriteDispositionConfig,
TSchemaContract,
)
from dlt.common.storages.load_package import ParsedLoadJobFileName
from dlt.common.storages.load_storage import LoadPackageInfo
from dlt.common.time import ensure_pendulum_datetime, precise_time
from dlt.common.typing import DictStrAny, REPattern, StrAny, SupportsHumanize
from dlt.common.typing import DictStrAny, REPattern, StrAny, SupportsHumanize, TColumnNames
from dlt.common.jsonpath import delete_matches, TAnyJsonPath
from dlt.common.data_writers.writers import TLoaderFileFormat
from dlt.common.utils import RowCounts, merge_row_counts
Expand Down
4 changes: 2 additions & 2 deletions dlt/common/schema/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from dlt.common.data_types import TDataType
from dlt.common.normalizers.typing import TNormalizersConfig
from dlt.common.typing import TSortOrder, TAnyDateTime, TLoaderFileFormat
from dlt.common.incremental.typing import IncrementalArgs

try:
from pydantic import BaseModel as _PydanticBaseModel
Expand Down Expand Up @@ -132,8 +133,6 @@ class TColumnPropInfo(NamedTuple):
"timestamp", "iso_timestamp", "iso_date", "large_integer", "hexbytes_to_text", "wei_to_double"
]
TTypeDetectionFunc = Callable[[Type[Any], Any], Optional[TDataType]]
TColumnNames = Union[str, Sequence[str]]
"""A string representing a column name or a list of"""


class TColumnType(TypedDict, total=False):
Expand Down Expand Up @@ -279,6 +278,7 @@ class TTableSchema(_TTableSchemaBase, total=False):

write_disposition: Optional[TWriteDisposition]
references: Optional[TTableReferenceParam]
incremental: Optional[IncrementalArgs]


class TPartialTableSchema(TTableSchema):
Expand Down
7 changes: 7 additions & 0 deletions dlt/common/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
Iterator,
Generator,
NamedTuple,
Sequence,
)

from typing_extensions import (
Expand Down Expand Up @@ -112,6 +113,8 @@ class SecretSentinel:

TSecretStrValue = Annotated[str, SecretSentinel]

TColumnNames = Union[str, Sequence[str]]
"""A string representing a column name or a list of"""
TDataItem: TypeAlias = Any
"""A single data item as extracted from data source"""
TDataItems: TypeAlias = Union[TDataItem, List[TDataItem]]
Expand All @@ -126,6 +129,10 @@ class SecretSentinel:
TLoaderFileFormat = Literal["jsonl", "typed-jsonl", "insert_values", "parquet", "csv", "reference"]
"""known loader file formats"""

TDynHintType = TypeVar("TDynHintType")
TFunHintTemplate = Callable[[TDataItem], TDynHintType]
TTableHintTemplate = Union[TDynHintType, TFunHintTemplate[TDynHintType]]


class ConfigValueSentinel(NamedTuple):
"""Class to create singleton sentinel for config and secret injected value"""
Expand Down
6 changes: 2 additions & 4 deletions dlt/destinations/impl/bigquery/bigquery_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,8 @@

from dlt.common.destination import PreparedTableSchema
from dlt.common.pendulum import timezone
from dlt.common.schema.typing import (
TColumnNames,
TTableSchemaColumns,
)
from dlt.common.schema.typing import TTableSchemaColumns
from dlt.common.typing import TColumnNames
from dlt.destinations.utils import get_resource_for_adapter
from dlt.extract import DltResource
from dlt.extract.items import TTableHintTemplate
Expand Down
3 changes: 2 additions & 1 deletion dlt/destinations/impl/lancedb/lancedb_adapter.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from typing import Any, Dict

from dlt.common.schema.typing import TColumnNames, TTableSchemaColumns
from dlt.common.schema.typing import TTableSchemaColumns
from dlt.common.typing import TColumnNames
from dlt.destinations.utils import get_resource_for_adapter
from dlt.extract import DltResource
from dlt.extract.items import TTableHintTemplate
Expand Down
3 changes: 2 additions & 1 deletion dlt/destinations/impl/qdrant/qdrant_adapter.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from typing import Any

from dlt.common.schema.typing import TColumnNames, TTableSchemaColumns
from dlt.common.schema.typing import TTableSchemaColumns
from dlt.common.typing import TColumnNames
from dlt.extract import DltResource
from dlt.destinations.utils import get_resource_for_adapter

Expand Down
3 changes: 2 additions & 1 deletion dlt/destinations/impl/weaviate/weaviate_adapter.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from typing import Dict, Any, Literal, Set, get_args

from dlt.common.schema.typing import TColumnNames, TTableSchemaColumns
from dlt.common.schema.typing import TTableSchemaColumns
from dlt.common.typing import TColumnNames
from dlt.extract import DltResource, resource as make_resource
from dlt.destinations.utils import get_resource_for_adapter

Expand Down
12 changes: 10 additions & 2 deletions dlt/extract/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
from dlt.common.schema.utils import DEFAULT_WRITE_DISPOSITION
from dlt.common.schema.schema import Schema
from dlt.common.schema.typing import (
TColumnNames,
TFileFormat,
TWriteDisposition,
TWriteDispositionConfig,
Expand All @@ -43,7 +42,8 @@
)
from dlt.common.storages.exceptions import SchemaNotFoundError
from dlt.common.storages.schema_storage import SchemaStorage
from dlt.common.typing import AnyFun, ParamSpec, Concatenate, TDataItem, TDataItems
from dlt.common.typing import AnyFun, ParamSpec, Concatenate, TDataItem, TDataItems, TColumnNames

from dlt.common.utils import get_callable_name, get_module_name, is_inner_callable

from dlt.extract.hints import make_hints
Expand All @@ -70,6 +70,7 @@
TSourceFunParams,
)
from dlt.extract.resource import DltResource, TUnboundDltResource, TDltResourceImpl
from dlt.extract.incremental import TIncrementalConfig


@configspec
Expand Down Expand Up @@ -446,6 +447,7 @@ def resource(
selected: bool = True,
spec: Type[BaseConfiguration] = None,
parallelized: bool = False,
incremental: Optional[TIncrementalConfig] = None,
_impl_cls: Type[TDltResourceImpl] = DltResource, # type: ignore[assignment]
) -> TDltResourceImpl: ...

Expand All @@ -468,6 +470,7 @@ def resource(
selected: bool = True,
spec: Type[BaseConfiguration] = None,
parallelized: bool = False,
incremental: Optional[TIncrementalConfig] = None,
_impl_cls: Type[TDltResourceImpl] = DltResource, # type: ignore[assignment]
) -> Callable[[Callable[TResourceFunParams, Any]], TDltResourceImpl]: ...

Expand All @@ -490,6 +493,7 @@ def resource(
selected: bool = True,
spec: Type[BaseConfiguration] = None,
parallelized: bool = False,
incremental: Optional[TIncrementalConfig] = None,
_impl_cls: Type[TDltResourceImpl] = DltResource, # type: ignore[assignment]
standalone: Literal[True] = True,
) -> Callable[
Expand All @@ -515,6 +519,7 @@ def resource(
selected: bool = True,
spec: Type[BaseConfiguration] = None,
parallelized: bool = False,
incremental: Optional[TIncrementalConfig] = None,
_impl_cls: Type[TDltResourceImpl] = DltResource, # type: ignore[assignment]
) -> TDltResourceImpl: ...

Expand All @@ -536,6 +541,7 @@ def resource(
selected: bool = True,
spec: Type[BaseConfiguration] = None,
parallelized: bool = False,
incremental: Optional[TIncrementalConfig] = None,
_impl_cls: Type[TDltResourceImpl] = DltResource, # type: ignore[assignment]
standalone: bool = False,
data_from: TUnboundDltResource = None,
Expand Down Expand Up @@ -642,7 +648,9 @@ def make_resource(_name: str, _section: str, _data: Any) -> TDltResourceImpl:
selected,
cast(DltResource, data_from),
True,
incremental=incremental,
)

# If custom nesting level was specified then
# we need to add it to table hints so that
# later in normalizer dlt/common/normalizers/json/relational.py
Expand Down
12 changes: 7 additions & 5 deletions dlt/extract/extract.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from collections.abc import Sequence as C_Sequence
from copy import copy
import itertools
from typing import Iterator, List, Dict, Any, Optional
from typing import Iterator, List, Dict, Any, Optional, Mapping
import yaml

from dlt.common.configuration.container import Container
Expand All @@ -17,13 +17,12 @@
WithStepInfo,
reset_resource_state,
)
from dlt.common.typing import DictStrAny
from dlt.common.typing import DictStrAny, TColumnNames
from dlt.common.runtime import signals
from dlt.common.runtime.collector import Collector, NULL_COLLECTOR
from dlt.common.schema import Schema, utils
from dlt.common.schema.typing import (
TAnySchemaColumns,
TColumnNames,
TSchemaContract,
TTableFormat,
TWriteDispositionConfig,
Expand All @@ -39,7 +38,7 @@

from dlt.extract.decorators import SourceInjectableContext, SourceSchemaInjectableContext
from dlt.extract.exceptions import DataItemRequiredForDynamicTableHints
from dlt.extract.incremental import IncrementalResourceWrapper
from dlt.extract.incremental import IncrementalResourceWrapper, Incremental
from dlt.extract.pipe_iterator import PipeIterator
from dlt.extract.source import DltSource
from dlt.extract.resource import DltResource
Expand Down Expand Up @@ -238,7 +237,10 @@ def _compute_metrics(self, load_id: str, source: DltSource) -> ExtractMetrics:
hint = hint.incremental
# sometimes internal incremental is not bound
if hint:
hints[name] = dict(hint) # type: ignore[call-overload]
if isinstance(hint, Incremental):
hints[name] = hint.to_table_hint()
else:
hints[name] = dict(hint) # type: ignore[call-overload]
continue
if name == "original_columns":
# this is original type of the columns ie. Pydantic model
Expand Down
10 changes: 8 additions & 2 deletions dlt/extract/hints.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
from dlt.common import logger
from dlt.common.schema.typing import (
C_DLT_ID,
TColumnNames,
TColumnProp,
TFileFormat,
TPartialTableSchema,
Expand All @@ -28,7 +27,7 @@
new_column,
new_table,
)
from dlt.common.typing import TDataItem
from dlt.common.typing import TDataItem, TColumnNames
from dlt.common.time import ensure_pendulum_datetime
from dlt.common.utils import clone_dict_nested
from dlt.common.normalizers.json.relational import DataItemNormalizer
Expand Down Expand Up @@ -204,6 +203,10 @@ def compute_table_schema(self, item: TDataItem = None, meta: Any = None) -> TTab
for k, v in table_template.items()
if k not in NATURAL_CALLABLES
} # type: ignore
if "incremental" in table_template:
incremental = table_template["incremental"]
if isinstance(incremental, Incremental) and incremental is not Incremental.EMPTY:
resolved_template["incremental"] = incremental
table_schema = self._create_table_schema(resolved_template, self.name)
migrate_complex_types(table_schema, warn=True)
validate_dict_ignoring_xkeys(
Expand Down Expand Up @@ -518,6 +521,9 @@ def _create_table_schema(resource_hints: TResourceHints, resource_name: str) ->
"disposition": resource_hints["write_disposition"]
} # wrap in dict
DltResourceHints._merge_write_disposition_dict(resource_hints) # type: ignore[arg-type]
if "incremental" in resource_hints:
if isinstance(resource_hints["incremental"], Incremental):
resource_hints["incremental"] = resource_hints["incremental"].to_table_hint() # type: ignore[typeddict-item]
dict_ = cast(TTableSchema, resource_hints)
dict_["resource"] = resource_name
return dict_
Expand Down
Loading
Loading