Skip to content

Commit

Permalink
Merge branch 'main' into feat-expand-schema-variables
Browse files Browse the repository at this point in the history
  • Loading branch information
edgarrmondragon authored Apr 4, 2023
2 parents 233ccc6 + de0fe19 commit 9db25a9
Show file tree
Hide file tree
Showing 14 changed files with 114 additions and 67 deletions.
9 changes: 8 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,10 @@ select = [
"PTH", # flake8-use-pathlib
"ERA", # eradicate
"PGH", # pygrep-hooks
"PLC", # pylint (convention)
"PLE", # pylint (error)
"PLR", # pylint (refactor)
"PLW", # pylint (warning)
"RUF", # ruff
]
src = ["samples", "singer_sdk", "tests"]
Expand All @@ -267,7 +271,7 @@ target-version = "py37"
"INP001", # flake8-no-pep420: implicit-namespace-package
]
"noxfile.py" = ["ANN"]
"tests/*" = ["ANN", "D1", "D2", "S101"]
"tests/*" = ["ANN", "D1", "D2", "PLR2004", "S101"]
# Disabled some checks in samples code
"samples/*" = ["ANN", "D"]
# Don't require docstrings conventions or type annotations in private modules
Expand All @@ -291,3 +295,6 @@ required-imports = ["from __future__ import annotations"]

[tool.ruff.pydocstyle]
convention = "google"

[tool.ruff.pylint]
max-args = 9
4 changes: 3 additions & 1 deletion samples/sample_target_parquet/parquet_target_sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ def _json_schema_to_arrow_fields(schema: dict[str, Any]) -> pa.StructType:
return fields


def _json_type_to_arrow_field(schema_type: dict[str, Any]) -> pa.DataType:
def _json_type_to_arrow_field( # noqa: PLR0911
schema_type: dict[str, Any],
) -> pa.DataType:
"""Convert a JSON Schema to an Arrow struct.
Args:
Expand Down
5 changes: 3 additions & 2 deletions singer_sdk/_singerlib/catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def __missing__(self, breadcrumb: Breadcrumb) -> bool:
Returns:
True if the breadcrumb is selected, False otherwise.
"""
if len(breadcrumb) >= 2:
if len(breadcrumb) >= 2: # noqa: PLR2004
parent = breadcrumb[:-2]
return self[parent]

Expand Down Expand Up @@ -160,6 +160,7 @@ def root(self) -> StreamMetadata:
@classmethod
def get_standard_metadata(
cls: type[MetadataMapping],
*,
schema: dict[str, t.Any] | None = None,
schema_name: str | None = None,
key_properties: list[str] | None = None,
Expand Down Expand Up @@ -218,7 +219,7 @@ def resolve_selection(self) -> SelectionMask:
for breadcrumb in self
)

def _breadcrumb_is_selected(self, breadcrumb: Breadcrumb) -> bool:
def _breadcrumb_is_selected(self, breadcrumb: Breadcrumb) -> bool: # noqa: PLR0911
"""Determine if a property breadcrumb is selected based on existing metadata.
An empty breadcrumb (empty tuple) indicates the stream itself. Otherwise, the
Expand Down
6 changes: 3 additions & 3 deletions singer_sdk/connectors/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -529,9 +529,9 @@ def parse_full_table_name(
parts = full_table_name.split(".")
if len(parts) == 1:
table_name = full_table_name
if len(parts) == 2:
if len(parts) == 2: # noqa: PLR2004
schema_name, table_name = parts
if len(parts) == 3:
if len(parts) == 3: # noqa: PLR2004
db_name, schema_name, table_name = parts

return db_name, schema_name, table_name
Expand Down Expand Up @@ -841,7 +841,7 @@ def merge_sql_types(
sql_types = self._sort_types(sql_types)

# If greater than two evaluate the first pair then on down the line
if len(sql_types) > 2:
if len(sql_types) > 2: # noqa: PLR2004
return self.merge_sql_types(
[self.merge_sql_types([sql_types[0], sql_types[1]])] + sql_types[2:],
)
Expand Down
30 changes: 16 additions & 14 deletions singer_sdk/helpers/_flattening.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,9 @@ def flatten_key(key_name: str, parent_keys: list[str], separator: str = "__") ->
full_key = [*parent_keys, key_name]
inflected_key = full_key.copy()
reducer_index = 0
while len(separator.join(inflected_key)) >= 255 and reducer_index < len(
while len(
separator.join(inflected_key),
) >= 255 and reducer_index < len( # noqa: PLR2004
inflected_key,
):
reduced_key = re.sub(
Expand Down Expand Up @@ -249,17 +251,16 @@ def _flatten_schema(
)
else:
items.append((new_key, v))
else:
if len(v.values()) > 0:
if list(v.values())[0][0]["type"] == "string":
list(v.values())[0][0]["type"] = ["null", "string"]
items.append((new_key, list(v.values())[0][0]))
elif list(v.values())[0][0]["type"] == "array":
list(v.values())[0][0]["type"] = ["null", "array"]
items.append((new_key, list(v.values())[0][0]))
elif list(v.values())[0][0]["type"] == "object":
list(v.values())[0][0]["type"] = ["null", "object"]
items.append((new_key, list(v.values())[0][0]))
elif len(v.values()) > 0:
if list(v.values())[0][0]["type"] == "string":
list(v.values())[0][0]["type"] = ["null", "string"]
items.append((new_key, list(v.values())[0][0]))
elif list(v.values())[0][0]["type"] == "array":
list(v.values())[0][0]["type"] = ["null", "array"]
items.append((new_key, list(v.values())[0][0]))
elif list(v.values())[0][0]["type"] == "object":
list(v.values())[0][0]["type"] = ["null", "object"]
items.append((new_key, list(v.values())[0][0]))

# Sort and check for duplicates
def _key_func(item):
Expand Down Expand Up @@ -301,6 +302,7 @@ def flatten_record(

def _flatten_record(
record_node: MutableMapping[Any, Any],
*,
flattened_schema: dict | None = None,
parent_key: list[str] | None = None,
separator: str = "__",
Expand Down Expand Up @@ -333,8 +335,8 @@ def _flatten_record(
items.extend(
_flatten_record(
v,
flattened_schema,
[*parent_key, k],
flattened_schema=flattened_schema,
parent_key=[*parent_key, k],
separator=separator,
level=level + 1,
max_level=max_level,
Expand Down
3 changes: 2 additions & 1 deletion singer_sdk/helpers/_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
STARTING_MARKER = "starting_replication_value"


def get_state_if_exists(
def get_state_if_exists( # noqa: PLR0911
tap_state: dict,
tap_stream_id: str,
state_partition_context: dict | None = None,
Expand Down Expand Up @@ -268,6 +268,7 @@ def finalize_state_progress_markers(stream_or_partition_state: dict) -> dict | N


def log_sort_error(
*,
ex: Exception,
log_fn: Callable,
stream_name: str,
Expand Down
52 changes: 35 additions & 17 deletions singer_sdk/helpers/_typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,9 +249,12 @@ def is_boolean_type(property_schema: dict) -> bool | None:
if "anyOf" not in property_schema and "type" not in property_schema:
return None # Could not detect data type
for property_type in property_schema.get("anyOf", [property_schema.get("type")]):
if isinstance(property_type, dict):
property_type = property_type.get("type", [])
if "boolean" in property_type or property_type == "boolean":
schema_type = (
property_type.get("type", [])
if isinstance(property_type, dict)
else property_type
)
if "boolean" in schema_type or schema_type == "boolean":
return True
return False

Expand All @@ -261,9 +264,12 @@ def is_integer_type(property_schema: dict) -> bool | None:
if "anyOf" not in property_schema and "type" not in property_schema:
return None # Could not detect data type
for property_type in property_schema.get("anyOf", [property_schema.get("type")]):
if isinstance(property_type, dict):
property_type = property_type.get("type", [])
if "integer" in property_type or property_type == "integer":
schema_type = (
property_type.get("type", [])
if isinstance(property_type, dict)
else property_type
)
if "integer" in schema_type or schema_type == "integer":
return True
return False

Expand All @@ -273,9 +279,12 @@ def is_string_type(property_schema: dict) -> bool | None:
if "anyOf" not in property_schema and "type" not in property_schema:
return None # Could not detect data type
for property_type in property_schema.get("anyOf", [property_schema.get("type")]):
if isinstance(property_type, dict):
property_type = property_type.get("type", [])
if "string" in property_type or property_type == "string":
schema_type = (
property_type.get("type", [])
if isinstance(property_type, dict)
else property_type
)
if "string" in schema_type or schema_type == "string":
return True
return False

Expand All @@ -285,9 +294,12 @@ def is_null_type(property_schema: dict) -> bool | None:
if "anyOf" not in property_schema and "type" not in property_schema:
return None # Could not detect data type
for property_type in property_schema.get("anyOf", [property_schema.get("type")]):
if isinstance(property_type, dict):
property_type = property_type.get("type", [])
if "null" in property_type or property_type == "null":
schema_type = (
property_type.get("type", [])
if isinstance(property_type, dict)
else property_type
)
if "null" in schema_type or schema_type == "null":
return True
return False

Expand All @@ -297,9 +309,12 @@ def is_number_type(property_schema: dict) -> bool | None:
if "anyOf" not in property_schema and "type" not in property_schema:
return None # Could not detect data type
for property_type in property_schema.get("anyOf", [property_schema.get("type")]):
if isinstance(property_type, dict):
property_type = property_type.get("type", [])
if "number" in property_type or property_type == "number":
schema_type = (
property_type.get("type", [])
if isinstance(property_type, dict)
else property_type
)
if "number" in schema_type or schema_type == "number":
return True
return False

Expand Down Expand Up @@ -364,7 +379,7 @@ def conform_record_data_types(
return rec


def _conform_record_data_types(
def _conform_record_data_types( # noqa: PLR0912
input_object: dict[str, Any],
schema: dict,
level: TypeConformanceLevel,
Expand Down Expand Up @@ -446,7 +461,10 @@ def _conform_record_data_types(
return output_object, unmapped_properties


def _conform_primitive_property(elem: Any, property_schema: dict) -> Any:
def _conform_primitive_property( # noqa: PLR0911
elem: Any,
property_schema: dict,
) -> Any:
"""Converts a primitive (i.e. not object or array) to a json compatible type."""
if isinstance(elem, (datetime.datetime, pendulum.DateTime)):
return to_json_compatible(elem)
Expand Down
14 changes: 6 additions & 8 deletions singer_sdk/mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,12 +189,8 @@ def transform(self, record: dict) -> None:
Args:
record: An individual record dictionary in a stream.
Returns:
None
"""
_ = record # Drop the record
return

def get_filter_result(self, record: dict) -> bool: # noqa: ARG002
"""Exclude all records.
Expand Down Expand Up @@ -387,7 +383,7 @@ def _eval_type(

return default

def _init_functions_and_schema(
def _init_functions_and_schema( # noqa: PLR0912, PLR0915
self,
stream_map: dict,
) -> tuple[Callable[[dict], bool], Callable[[dict], dict | None], dict]:
Expand Down Expand Up @@ -631,7 +627,7 @@ def register_raw_streams_from_catalog(self, catalog: Catalog) -> None:
catalog_entry.key_properties,
)

def register_raw_stream_schema(
def register_raw_stream_schema( # noqa: PLR0912
self,
stream_name: str,
schema: dict,
Expand Down Expand Up @@ -671,9 +667,11 @@ def register_raw_stream_schema(
),
]

for stream_map_key, stream_def in self.stream_maps_dict.items():
for stream_map_key, stream_map_val in self.stream_maps_dict.items():
stream_def = (
stream_def.copy() if isinstance(stream_def, dict) else stream_def
stream_map_val.copy()
if isinstance(stream_map_val, dict)
else stream_map_val
)
stream_alias: str = stream_map_key
source_stream: str = stream_map_key
Expand Down
12 changes: 9 additions & 3 deletions singer_sdk/sinks/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -492,9 +492,15 @@ def process_batch_files(
tail,
mode="rb",
) as file:
if encoding.compression == "gzip":
file = gzip_open(file)
context = {"records": [json.loads(line) for line in file]}
open_file = (
gzip_open(file) if encoding.compression == "gzip" else file
)
context = {
"records": [
json.loads(line)
for line in open_file # type: ignore[attr-defined]
],
}
self.process_batch(context)
else:
raise NotImplementedError(
Expand Down
16 changes: 7 additions & 9 deletions singer_sdk/streams/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -989,8 +989,7 @@ def reset_state_progress_markers(self, state: dict | None = None) -> None:
if state is None or state == {}:
context: dict | None
for context in self.partitions or [{}]:
context = context or None
state = self.get_context_state(context)
state = self.get_context_state(context or None)
reset_state_progress_markers(state)
return

Expand All @@ -1012,8 +1011,7 @@ def finalize_state_progress_markers(self, state: dict | None = None) -> None:

context: dict | None
for context in self.partitions or [{}]:
context = context or None
state = self.get_context_state(context)
state = self.get_context_state(context or None)
finalize_state_progress_markers(state)
return

Expand Down Expand Up @@ -1070,18 +1068,18 @@ def _sync_records(
timer = metrics.sync_timer(self.name)

record_index = 0
current_context: dict | None
context_element: dict | None
context_list: list[dict] | None
context_list = [context] if context is not None else self.partitions
selected = self.selected

with record_counter, timer:
for current_context in context_list or [{}]:
record_counter.context = current_context
timer.context = current_context
for context_element in context_list or [{}]:
record_counter.context = context_element
timer.context = context_element

partition_record_index = 0
current_context = current_context or None
current_context = context_element or None
state = self.get_context_state(current_context)
state_partition_context = self._get_state_partition_context(
current_context,
Expand Down
Loading

0 comments on commit 9db25a9

Please sign in to comment.