Skip to content

Commit

Permalink
fix: Handle decimal.Decimal instances in flattening (#1939)
Browse files Browse the repository at this point in the history
  • Loading branch information
edgarrmondragon authored Sep 11, 2023
1 parent 6ba1519 commit 0fb0c1b
Show file tree
Hide file tree
Showing 14 changed files with 83 additions and 67 deletions.
27 changes: 7 additions & 20 deletions singer_sdk/helpers/_flattening.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@

import collections
import itertools
import json
import re
import typing as t
from copy import deepcopy

import inflection
import simplejson as json

DEFAULT_FLATTENING_SEPARATOR = "__"

Expand Down Expand Up @@ -155,17 +155,7 @@ def flatten_schema(
"type": "string"
},
"foo__bar": {
"type": "object",
"properties": {
"baz": {
"type": "object",
"properties": {
"qux": {
"type": "string"
}
}
}
}
"type": "string"
}
}
}
Expand All @@ -178,12 +168,7 @@ def flatten_schema(
"type": "string"
},
"foo__bar__baz": {
"type": "object",
"properties": {
"qux": {
"type": "string"
}
}
"type": "string"
}
}
}
Expand All @@ -210,7 +195,7 @@ def flatten_schema(
return new_schema


def _flatten_schema( # noqa: C901
def _flatten_schema( # noqa: C901, PLR0912
schema_node: dict,
parent_keys: list[str] | None = None,
separator: str = "__",
Expand Down Expand Up @@ -249,6 +234,8 @@ def _flatten_schema( # noqa: C901
max_level=max_level,
).items(),
)
elif "array" in v["type"] or "object" in v["type"] and max_level > 0:
items.append((new_key, {"type": "string"}))
else:
items.append((new_key, v))
elif len(v.values()) > 0:
Expand Down Expand Up @@ -347,7 +334,7 @@ def _flatten_record(
items.append(
(
new_key,
json.dumps(v)
json.dumps(v, use_decimal=True)
if _should_jsondump_value(k, v, flattened_schema)
else v,
),
Expand Down
29 changes: 26 additions & 3 deletions tests/core/test_mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import logging
import typing as t
from contextlib import redirect_stdout
from decimal import Decimal

import pytest
from freezegun import freeze_time
Expand All @@ -19,7 +20,9 @@
from singer_sdk.streams.core import Stream
from singer_sdk.tap_base import Tap
from singer_sdk.typing import (
ArrayType,
IntegerType,
NumberType,
ObjectType,
PropertiesList,
Property,
Expand Down Expand Up @@ -415,6 +418,7 @@ class MappedStream(Stream):
ObjectType(
Property("id", IntegerType()),
Property("sub", ObjectType(Property("num", IntegerType()))),
Property("some_numbers", ArrayType(NumberType())),
),
),
).to_dict()
Expand All @@ -423,17 +427,29 @@ def get_records(self, context): # noqa: ARG002
yield {
"email": "[email protected]",
"count": 21,
"user": {"id": 1, "sub": {"num": 1}},
"user": {
"id": 1,
"sub": {"num": 1},
"some_numbers": [Decimal("3.14"), Decimal("2.718")],
},
}
yield {
"email": "[email protected]",
"count": 13,
"user": {"id": 2, "sub": {"num": 2}},
"user": {
"id": 2,
"sub": {"num": 2},
"some_numbers": [Decimal("10.32"), Decimal("1.618")],
},
}
yield {
"email": "[email protected]",
"count": 19,
"user": {"id": 3, "sub": {"num": 3}},
"user": {
"id": 3,
"sub": {"num": 3},
"some_numbers": [Decimal("1.414"), Decimal("1.732")],
},
}


Expand Down Expand Up @@ -545,6 +561,13 @@ def _clear_schema_cache() -> None:
"aliased_stream.jsonl",
id="aliased_stream",
),
pytest.param(
{},
True,
0,
"flatten_depth_0.jsonl",
id="flatten_depth_0",
),
pytest.param(
{},
True,
Expand Down
8 changes: 4 additions & 4 deletions tests/snapshots/mapped_stream/aliased_stream.jsonl
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{"type": "STATE", "value": {}}
{"type": "SCHEMA", "stream": "aliased_stream", "schema": {"properties": {"email": {"type": ["string", "null"]}, "count": {"type": ["integer", "null"]}, "user": {"properties": {"id": {"type": ["integer", "null"]}, "sub": {"properties": {"num": {"type": ["integer", "null"]}}, "type": ["object", "null"]}}, "type": ["object", "null"]}}, "type": "object"}, "key_properties": []}
{"type": "RECORD", "stream": "aliased_stream", "record": {"email": "[email protected]", "count": 21, "user": {"id": 1, "sub": {"num": 1}}}, "time_extracted": "2022-01-01T00:00:00+00:00"}
{"type": "RECORD", "stream": "aliased_stream", "record": {"email": "[email protected]", "count": 13, "user": {"id": 2, "sub": {"num": 2}}}, "time_extracted": "2022-01-01T00:00:00+00:00"}
{"type": "RECORD", "stream": "aliased_stream", "record": {"email": "[email protected]", "count": 19, "user": {"id": 3, "sub": {"num": 3}}}, "time_extracted": "2022-01-01T00:00:00+00:00"}
{"type": "SCHEMA", "stream": "aliased_stream", "schema": {"properties": {"email": {"type": ["string", "null"]}, "count": {"type": ["integer", "null"]}, "user": {"properties": {"id": {"type": ["integer", "null"]}, "sub": {"properties": {"num": {"type": ["integer", "null"]}}, "type": ["object", "null"]}, "some_numbers": {"items": {"type": ["number"]}, "type": ["array", "null"]}}, "type": ["object", "null"]}}, "type": "object"}, "key_properties": []}
{"type": "RECORD", "stream": "aliased_stream", "record": {"email": "[email protected]", "count": 21, "user": {"id": 1, "sub": {"num": 1}, "some_numbers": [3.14, 2.718]}}, "time_extracted": "2022-01-01T00:00:00+00:00"}
{"type": "RECORD", "stream": "aliased_stream", "record": {"email": "[email protected]", "count": 13, "user": {"id": 2, "sub": {"num": 2}, "some_numbers": [10.32, 1.618]}}, "time_extracted": "2022-01-01T00:00:00+00:00"}
{"type": "RECORD", "stream": "aliased_stream", "record": {"email": "[email protected]", "count": 19, "user": {"id": 3, "sub": {"num": 3}, "some_numbers": [1.414, 1.732]}}, "time_extracted": "2022-01-01T00:00:00+00:00"}
{"type": "STATE", "value": {"bookmarks": {"mystream": {}}}}
8 changes: 4 additions & 4 deletions tests/snapshots/mapped_stream/drop_property.jsonl
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{"type": "STATE", "value": {}}
{"type": "SCHEMA", "stream": "mystream", "schema": {"properties": {"count": {"type": ["integer", "null"]}, "user": {"properties": {"id": {"type": ["integer", "null"]}, "sub": {"properties": {"num": {"type": ["integer", "null"]}}, "type": ["object", "null"]}}, "type": ["object", "null"]}}, "type": "object"}, "key_properties": []}
{"type": "RECORD", "stream": "mystream", "record": {"count": 21, "user": {"id": 1, "sub": {"num": 1}}}, "time_extracted": "2022-01-01T00:00:00+00:00"}
{"type": "RECORD", "stream": "mystream", "record": {"count": 13, "user": {"id": 2, "sub": {"num": 2}}}, "time_extracted": "2022-01-01T00:00:00+00:00"}
{"type": "RECORD", "stream": "mystream", "record": {"count": 19, "user": {"id": 3, "sub": {"num": 3}}}, "time_extracted": "2022-01-01T00:00:00+00:00"}
{"type": "SCHEMA", "stream": "mystream", "schema": {"properties": {"count": {"type": ["integer", "null"]}, "user": {"properties": {"id": {"type": ["integer", "null"]}, "sub": {"properties": {"num": {"type": ["integer", "null"]}}, "type": ["object", "null"]}, "some_numbers": {"items": {"type": ["number"]}, "type": ["array", "null"]}}, "type": ["object", "null"]}}, "type": "object"}, "key_properties": []}
{"type": "RECORD", "stream": "mystream", "record": {"count": 21, "user": {"id": 1, "sub": {"num": 1}, "some_numbers": [3.14, 2.718]}}, "time_extracted": "2022-01-01T00:00:00+00:00"}
{"type": "RECORD", "stream": "mystream", "record": {"count": 13, "user": {"id": 2, "sub": {"num": 2}, "some_numbers": [10.32, 1.618]}}, "time_extracted": "2022-01-01T00:00:00+00:00"}
{"type": "RECORD", "stream": "mystream", "record": {"count": 19, "user": {"id": 3, "sub": {"num": 3}, "some_numbers": [1.414, 1.732]}}, "time_extracted": "2022-01-01T00:00:00+00:00"}
{"type": "STATE", "value": {"bookmarks": {"mystream": {}}}}
8 changes: 4 additions & 4 deletions tests/snapshots/mapped_stream/drop_property_null_string.jsonl
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{"type": "STATE", "value": {}}
{"type": "SCHEMA", "stream": "mystream", "schema": {"properties": {"count": {"type": ["integer", "null"]}, "user": {"properties": {"id": {"type": ["integer", "null"]}, "sub": {"properties": {"num": {"type": ["integer", "null"]}}, "type": ["object", "null"]}}, "type": ["object", "null"]}}, "type": "object"}, "key_properties": []}
{"type": "RECORD", "stream": "mystream", "record": {"count": 21, "user": {"id": 1, "sub": {"num": 1}}}, "time_extracted": "2022-01-01T00:00:00+00:00"}
{"type": "RECORD", "stream": "mystream", "record": {"count": 13, "user": {"id": 2, "sub": {"num": 2}}}, "time_extracted": "2022-01-01T00:00:00+00:00"}
{"type": "RECORD", "stream": "mystream", "record": {"count": 19, "user": {"id": 3, "sub": {"num": 3}}}, "time_extracted": "2022-01-01T00:00:00+00:00"}
{"type": "SCHEMA", "stream": "mystream", "schema": {"properties": {"count": {"type": ["integer", "null"]}, "user": {"properties": {"id": {"type": ["integer", "null"]}, "sub": {"properties": {"num": {"type": ["integer", "null"]}}, "type": ["object", "null"]}, "some_numbers": {"items": {"type": ["number"]}, "type": ["array", "null"]}}, "type": ["object", "null"]}}, "type": "object"}, "key_properties": []}
{"type": "RECORD", "stream": "mystream", "record": {"count": 21, "user": {"id": 1, "sub": {"num": 1}, "some_numbers": [3.14, 2.718]}}, "time_extracted": "2022-01-01T00:00:00+00:00"}
{"type": "RECORD", "stream": "mystream", "record": {"count": 13, "user": {"id": 2, "sub": {"num": 2}, "some_numbers": [10.32, 1.618]}}, "time_extracted": "2022-01-01T00:00:00+00:00"}
{"type": "RECORD", "stream": "mystream", "record": {"count": 19, "user": {"id": 3, "sub": {"num": 3}, "some_numbers": [1.414, 1.732]}}, "time_extracted": "2022-01-01T00:00:00+00:00"}
{"type": "STATE", "value": {"bookmarks": {"mystream": {}}}}
8 changes: 4 additions & 4 deletions tests/snapshots/mapped_stream/flatten_all.jsonl
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{"type": "STATE", "value": {}}
{"type": "SCHEMA", "stream": "mystream", "schema": {"properties": {"email": {"type": ["string", "null"]}, "count": {"type": ["integer", "null"]}, "user__id": {"type": ["integer", "null"]}, "user__sub__num": {"type": ["integer", "null"]}}, "type": "object"}, "key_properties": []}
{"type": "RECORD", "stream": "mystream", "record": {"email": "[email protected]", "count": 21, "user__id": 1, "user__sub__num": 1}, "time_extracted": "2022-01-01T00:00:00+00:00"}
{"type": "RECORD", "stream": "mystream", "record": {"email": "[email protected]", "count": 13, "user__id": 2, "user__sub__num": 2}, "time_extracted": "2022-01-01T00:00:00+00:00"}
{"type": "RECORD", "stream": "mystream", "record": {"email": "[email protected]", "count": 19, "user__id": 3, "user__sub__num": 3}, "time_extracted": "2022-01-01T00:00:00+00:00"}
{"type": "SCHEMA", "stream": "mystream", "schema": {"properties": {"email": {"type": ["string", "null"]}, "count": {"type": ["integer", "null"]}, "user__id": {"type": ["integer", "null"]}, "user__sub__num": {"type": ["integer", "null"]}, "user__some_numbers": {"type": "string"}}, "type": "object"}, "key_properties": []}
{"type": "RECORD", "stream": "mystream", "record": {"email": "[email protected]", "count": 21, "user__id": 1, "user__sub__num": 1, "user__some_numbers": "[3.14, 2.718]"}, "time_extracted": "2022-01-01T00:00:00+00:00"}
{"type": "RECORD", "stream": "mystream", "record": {"email": "[email protected]", "count": 13, "user__id": 2, "user__sub__num": 2, "user__some_numbers": "[10.32, 1.618]"}, "time_extracted": "2022-01-01T00:00:00+00:00"}
{"type": "RECORD", "stream": "mystream", "record": {"email": "[email protected]", "count": 19, "user__id": 3, "user__sub__num": 3, "user__some_numbers": "[1.414, 1.732]"}, "time_extracted": "2022-01-01T00:00:00+00:00"}
{"type": "STATE", "value": {"bookmarks": {"mystream": {}}}}
6 changes: 6 additions & 0 deletions tests/snapshots/mapped_stream/flatten_depth_0.jsonl
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{"type": "STATE", "value": {}}
{"type": "SCHEMA", "stream": "mystream", "schema": {"properties": {"email": {"type": ["string", "null"]}, "count": {"type": ["integer", "null"]}, "user": {"properties": {"id": {"type": ["integer", "null"]}, "sub": {"properties": {"num": {"type": ["integer", "null"]}}, "type": ["object", "null"]}, "some_numbers": {"items": {"type": ["number"]}, "type": ["array", "null"]}}, "type": ["object", "null"]}}, "type": "object"}, "key_properties": []}
{"type": "RECORD", "stream": "mystream", "record": {"email": "[email protected]", "count": 21, "user": {"id": 1, "sub": {"num": 1}, "some_numbers": [3.14, 2.718]}}, "time_extracted": "2022-01-01T00:00:00+00:00"}
{"type": "RECORD", "stream": "mystream", "record": {"email": "[email protected]", "count": 13, "user": {"id": 2, "sub": {"num": 2}, "some_numbers": [10.32, 1.618]}}, "time_extracted": "2022-01-01T00:00:00+00:00"}
{"type": "RECORD", "stream": "mystream", "record": {"email": "[email protected]", "count": 19, "user": {"id": 3, "sub": {"num": 3}, "some_numbers": [1.414, 1.732]}}, "time_extracted": "2022-01-01T00:00:00+00:00"}
{"type": "STATE", "value": {"bookmarks": {"mystream": {}}}}
8 changes: 4 additions & 4 deletions tests/snapshots/mapped_stream/flatten_depth_1.jsonl
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{"type": "STATE", "value": {}}
{"type": "SCHEMA", "stream": "mystream", "schema": {"properties": {"email": {"type": ["string", "null"]}, "count": {"type": ["integer", "null"]}, "user__id": {"type": ["integer", "null"]}, "user__sub": {"properties": {"num": {"type": ["integer", "null"]}}, "type": ["object", "null"]}}, "type": "object"}, "key_properties": []}
{"type": "RECORD", "stream": "mystream", "record": {"email": "[email protected]", "count": 21, "user__id": 1, "user__sub": "{\"num\": 1}"}, "time_extracted": "2022-01-01T00:00:00+00:00"}
{"type": "RECORD", "stream": "mystream", "record": {"email": "[email protected]", "count": 13, "user__id": 2, "user__sub": "{\"num\": 2}"}, "time_extracted": "2022-01-01T00:00:00+00:00"}
{"type": "RECORD", "stream": "mystream", "record": {"email": "[email protected]", "count": 19, "user__id": 3, "user__sub": "{\"num\": 3}"}, "time_extracted": "2022-01-01T00:00:00+00:00"}
{"type": "SCHEMA", "stream": "mystream", "schema": {"properties": {"email": {"type": ["string", "null"]}, "count": {"type": ["integer", "null"]}, "user__id": {"type": ["integer", "null"]}, "user__sub": {"type": "string"}, "user__some_numbers": {"type": "string"}}, "type": "object"}, "key_properties": []}
{"type": "RECORD", "stream": "mystream", "record": {"email": "[email protected]", "count": 21, "user__id": 1, "user__sub": "{\"num\": 1}", "user__some_numbers": "[3.14, 2.718]"}, "time_extracted": "2022-01-01T00:00:00+00:00"}
{"type": "RECORD", "stream": "mystream", "record": {"email": "[email protected]", "count": 13, "user__id": 2, "user__sub": "{\"num\": 2}", "user__some_numbers": "[10.32, 1.618]"}, "time_extracted": "2022-01-01T00:00:00+00:00"}
{"type": "RECORD", "stream": "mystream", "record": {"email": "[email protected]", "count": 19, "user__id": 3, "user__sub": "{\"num\": 3}", "user__some_numbers": "[1.414, 1.732]"}, "time_extracted": "2022-01-01T00:00:00+00:00"}
{"type": "STATE", "value": {"bookmarks": {"mystream": {}}}}
8 changes: 4 additions & 4 deletions tests/snapshots/mapped_stream/keep_all_fields.jsonl
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{"type": "STATE", "value": {}}
{"type": "SCHEMA", "stream": "mystream", "schema": {"properties": {"email": {"type": ["string", "null"]}, "count": {"type": ["integer", "null"]}, "user": {"properties": {"id": {"type": ["integer", "null"]}, "sub": {"properties": {"num": {"type": ["integer", "null"]}}, "type": ["object", "null"]}}, "type": ["object", "null"]}, "email_hash": {"type": ["string", "null"]}}, "type": "object"}, "key_properties": []}
{"type": "RECORD", "stream": "mystream", "record": {"email": "[email protected]", "count": 21, "user": {"id": 1, "sub": {"num": 1}}, "email_hash": "c160f8cc69a4f0bf2b0362752353d060"}, "time_extracted": "2022-01-01T00:00:00+00:00"}
{"type": "RECORD", "stream": "mystream", "record": {"email": "[email protected]", "count": 13, "user": {"id": 2, "sub": {"num": 2}}, "email_hash": "4b9bb80620f03eb3719e0a061c14283d"}, "time_extracted": "2022-01-01T00:00:00+00:00"}
{"type": "RECORD", "stream": "mystream", "record": {"email": "[email protected]", "count": 19, "user": {"id": 3, "sub": {"num": 3}}, "email_hash": "426b189df1e2f359efe6ee90f2d2030f"}, "time_extracted": "2022-01-01T00:00:00+00:00"}
{"type": "SCHEMA", "stream": "mystream", "schema": {"properties": {"email": {"type": ["string", "null"]}, "count": {"type": ["integer", "null"]}, "user": {"properties": {"id": {"type": ["integer", "null"]}, "sub": {"properties": {"num": {"type": ["integer", "null"]}}, "type": ["object", "null"]}, "some_numbers": {"items": {"type": ["number"]}, "type": ["array", "null"]}}, "type": ["object", "null"]}, "email_hash": {"type": ["string", "null"]}}, "type": "object"}, "key_properties": []}
{"type": "RECORD", "stream": "mystream", "record": {"email": "[email protected]", "count": 21, "user": {"id": 1, "sub": {"num": 1}, "some_numbers": [3.14, 2.718]}, "email_hash": "c160f8cc69a4f0bf2b0362752353d060"}, "time_extracted": "2022-01-01T00:00:00+00:00"}
{"type": "RECORD", "stream": "mystream", "record": {"email": "[email protected]", "count": 13, "user": {"id": 2, "sub": {"num": 2}, "some_numbers": [10.32, 1.618]}, "email_hash": "4b9bb80620f03eb3719e0a061c14283d"}, "time_extracted": "2022-01-01T00:00:00+00:00"}
{"type": "RECORD", "stream": "mystream", "record": {"email": "[email protected]", "count": 19, "user": {"id": 3, "sub": {"num": 3}, "some_numbers": [1.414, 1.732]}, "email_hash": "426b189df1e2f359efe6ee90f2d2030f"}, "time_extracted": "2022-01-01T00:00:00+00:00"}
{"type": "STATE", "value": {"bookmarks": {"mystream": {}}}}
8 changes: 4 additions & 4 deletions tests/snapshots/mapped_stream/map_and_flatten.jsonl
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{"type": "STATE", "value": {}}
{"type": "SCHEMA", "stream": "mystream", "schema": {"properties": {"email": {"type": ["string", "null"]}, "count": {"type": ["integer", "null"]}, "user__id": {"type": ["integer", "null"]}, "user__sub__num": {"type": ["integer", "null"]}, "email_hash": {"type": ["string", "null"]}}, "type": "object"}, "key_properties": ["email_hash"]}
{"type": "RECORD", "stream": "mystream", "record": {"email": "[email protected]", "count": 21, "user__id": 1, "user__sub__num": 1, "email_hash": "c160f8cc69a4f0bf2b0362752353d060"}, "time_extracted": "2022-01-01T00:00:00+00:00"}
{"type": "RECORD", "stream": "mystream", "record": {"email": "[email protected]", "count": 13, "user__id": 2, "user__sub__num": 2, "email_hash": "4b9bb80620f03eb3719e0a061c14283d"}, "time_extracted": "2022-01-01T00:00:00+00:00"}
{"type": "RECORD", "stream": "mystream", "record": {"email": "[email protected]", "count": 19, "user__id": 3, "user__sub__num": 3, "email_hash": "426b189df1e2f359efe6ee90f2d2030f"}, "time_extracted": "2022-01-01T00:00:00+00:00"}
{"type": "SCHEMA", "stream": "mystream", "schema": {"properties": {"email": {"type": ["string", "null"]}, "count": {"type": ["integer", "null"]}, "user__id": {"type": ["integer", "null"]}, "user__sub__num": {"type": ["integer", "null"]}, "user__some_numbers": {"type": "string"}, "email_hash": {"type": ["string", "null"]}}, "type": "object"}, "key_properties": ["email_hash"]}
{"type": "RECORD", "stream": "mystream", "record": {"email": "[email protected]", "count": 21, "user__id": 1, "user__sub__num": 1, "user__some_numbers": "[3.14, 2.718]", "email_hash": "c160f8cc69a4f0bf2b0362752353d060"}, "time_extracted": "2022-01-01T00:00:00+00:00"}
{"type": "RECORD", "stream": "mystream", "record": {"email": "[email protected]", "count": 13, "user__id": 2, "user__sub__num": 2, "user__some_numbers": "[10.32, 1.618]", "email_hash": "4b9bb80620f03eb3719e0a061c14283d"}, "time_extracted": "2022-01-01T00:00:00+00:00"}
{"type": "RECORD", "stream": "mystream", "record": {"email": "[email protected]", "count": 19, "user__id": 3, "user__sub__num": 3, "user__some_numbers": "[1.414, 1.732]", "email_hash": "426b189df1e2f359efe6ee90f2d2030f"}, "time_extracted": "2022-01-01T00:00:00+00:00"}
{"type": "STATE", "value": {"bookmarks": {"mystream": {}}}}
Loading

0 comments on commit 0fb0c1b

Please sign in to comment.