Skip to content

Commit

Permalink
SAT: check records to comply jsonschema format field (#5661)
Browse files Browse the repository at this point in the history
* SAT: check records to comply jsonschema format field
  • Loading branch information
avida authored Sep 1, 2021
1 parent 1f7edaa commit 4717257
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 3 deletions.
1 change: 1 addition & 0 deletions airbyte-integrations/bases/source-acceptance-test/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
"pytest-timeout~=1.4",
"pprintpp~=0.4",
"dpath~=2.0.1",
"jsonschema~=3.2.0",
]

setuptools.setup(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,41 @@
#

import logging
import re
import pendulum
from collections import defaultdict
from typing import List, Mapping

from airbyte_cdk.models import AirbyteRecordMessage, ConfiguredAirbyteCatalog
from jsonschema import Draft4Validator, ValidationError
from jsonschema import Draft7Validator, FormatChecker, ValidationError, FormatError

timestamp_regex = re.compile(("^\d{4}-\d?\d-\d?\d" # date
"(\s|T)" # separator
"\d?\d:\d?\d:\d?\d(.\d+)?" # time
".*$" #timezone
))


class CustomFormatChecker(FormatChecker):

@staticmethod
def check_datetime(value: str) -> bool:
valid_format = timestamp_regex.match(value)
try:
pendulum.parse(value, strict=False)
except ValueError:
valid_time = False
else:
valid_time = True
return valid_format and valid_time

def check(self, instance, format):
if format == "date-time":
if not self.check_datetime(instance):
raise FormatError(f"{instance} has invalid datetime format")
else:
return super().check(instance, format)



def verify_records_schema(
Expand All @@ -38,7 +68,7 @@ def verify_records_schema(
"""
validators = {}
for stream in catalog.streams:
validators[stream.stream.name] = Draft4Validator(stream.stream.json_schema)
validators[stream.stream.name] = Draft7Validator(stream.stream.json_schema, format_checker=CustomFormatChecker())

stream_errors = defaultdict(dict)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ def record_schema_fixture():


@pytest.fixture(name="configured_catalog")
def catalog_fixture(record_schema) -> ConfiguredAirbyteCatalog:
def catalog_fixture(request, record_schema) -> ConfiguredAirbyteCatalog:
record_schema = request.param if hasattr(request, "param") else record_schema
stream = ConfiguredAirbyteStream(
stream=AirbyteStream(name="my_stream", json_schema=record_schema),
sync_mode=SyncMode.full_refresh,
Expand Down Expand Up @@ -96,3 +97,61 @@ def test_verify_records_schema(configured_catalog: ConfiguredAirbyteCatalog):
assert len(streams_with_errors) == 1, "only one stream"
assert len(streams_with_errors["my_stream"]) == 3, "only first error for each field"
assert errors == ["123 is not of type 'null', 'string'", "'text' is not of type 'number'", "None is not of type 'string'"]


@pytest.mark.parametrize(
"record, configured_catalog, valid",
[
# Send null data
({"a": None}, {"type": "object", "properties": {"a": {"type": "string", "format": "time"}}}, False),
# time
({"a": "sdf"}, {"type": "object", "properties": {"a": {"type": "string", "format": "time"}}}, False),
({"a": "12:00"}, {"type": "object", "properties": {"a": {"type": "string", "format": "time"}}}, False),
({"a": "12:00:90"}, {"type": "object", "properties": {"a": {"type": "string", "format": "time"}}}, False),
({"a": "12:00:22"}, {"type": "object", "properties": {"a": {"type": "string", "format": "time"}}}, True),
# date
({"a": "12:00:90"}, {"type": "object", "properties": {"a": {"type": "string", "format": "date"}}}, False),
({"a": "2020-12-20"}, {"type": "object", "properties": {"a": {"type": "string", "format": "date"}}}, True),
({"a": "2020-20-20"}, {"type": "object", "properties": {"a": {"type": "string", "format": "date"}}}, False),
# date-time
# full date-time format with timezone only valid, according to https://datatracker.ietf.org/doc/html/rfc3339#section-5.6
({"a": "12:11:00"}, {"type": "object", "properties": {"a": {"type": "string", "format": "date-time"}}}, False),
({"a": "2018-11-13 20:20:39"}, {"type": "object", "properties": {"a": {"type": "string", "format": "date-time"}}}, True),
({"a": "2021-08-10T12:43:15"}, {"type": "object", "properties": {"a": {"type": "string", "format": "date-time"}}}, True),
({"a": "2021-08-10T12:43:15Z"}, {"type": "object", "properties": {"a": {"type": "string", "format": "date-time"}}}, True),
({"a": "2018-11-13T20:20:39+00:00"}, {"type": "object", "properties": {"a": {"type": "string", "format": "date-time"}}}, True),
({"a": "2018-21-13T20:20:39+00:00"}, {"type": "object", "properties": {"a": {"type": "string", "format": "date-time"}}}, False),
# This is valid for postgres sql but not valid for bigquery
({"a": "2014-09-27 9:35z"}, {"type": "object", "properties": {"a": {"type": "string", "format": "date-time"}}}, False),
# Seconds are obligatory for bigquery timestamp
({"a": "2014-09-27 9:35"}, {"type": "object", "properties": {"a": {"type": "string", "format": "date-time"}}}, False),
({"a": "2014-09-27 9:35:0z"}, {"type": "object", "properties": {"a": {"type": "string", "format": "date-time"}}}, True),
# email
({"a": "2018-11-13 20:20:39"}, {"type": "object", "properties": {"a": {"type": "string", "format": "email"}}}, False),
({"a": "[email protected]"}, {"type": "object", "properties": {"a": {"type": "string", "format": "email"}}}, True),
({"a": "Пример@example.com"}, {"type": "object", "properties": {"a": {"type": "string", "format": "email"}}}, True),
({"a": "写电子邮件@子邮件"}, {"type": "object", "properties": {"a": {"type": "string", "format": "email"}}}, True),
# hostname
({"a": "2018-11-13 20:20:39"}, {"type": "object", "properties": {"a": {"type": "string", "format": "hostname"}}}, False),
({"a": "[email protected]"}, {"type": "object", "properties": {"a": {"type": "string", "format": "hostname"}}}, False),
({"a": "localhost"}, {"type": "object", "properties": {"a": {"type": "string", "format": "hostname"}}}, True),
({"a": "example.com"}, {"type": "object", "properties": {"a": {"type": "string", "format": "hostname"}}}, True),
# ipv4
({"a": "example.com"}, {"type": "object", "properties": {"a": {"type": "string", "format": "ipv4"}}}, False),
({"a": "0.0.0.1000"}, {"type": "object", "properties": {"a": {"type": "string", "format": "ipv4"}}}, False),
({"a": "0.0.0.0"}, {"type": "object", "properties": {"a": {"type": "string", "format": "ipv4"}}}, True),
# ipv6
({"a": "example.com"}, {"type": "object", "properties": {"a": {"type": "string", "format": "ipv6"}}}, False),
({"a": "1080:0:0:0:8:800:200C:417A"}, {"type": "object", "properties": {"a": {"type": "string", "format": "ipv6"}}}, True),
({"a": "::1"}, {"type": "object", "properties": {"a": {"type": "string", "format": "ipv6"}}}, True),
({"a": "::"}, {"type": "object", "properties": {"a": {"type": "string", "format": "ipv6"}}}, True),
],
indirect=["configured_catalog"],
)
def test_validate_records_format(record, configured_catalog, valid):
records = [AirbyteRecordMessage(stream="my_stream", data=record, emitted_at=0)]
streams_with_errors = verify_records_schema(records, configured_catalog)
if valid:
assert not streams_with_errors
else:
assert streams_with_errors, f"Record {record} should produce errors against {configured_catalog.streams[0].stream.json_schema}"
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,11 @@ Each stream should have some data, if you can't guarantee this for particular st
||x||
|||x|
||||
### Schema format checking

If some field has [format](https://json-schema.org/understanding-json-schema/reference/string.html#format) attribute specified on its catalog json schema, Source Acceptance Testing framework performs checking against format. It support checking of all [builtin](https://json-schema.org/understanding-json-schema/reference/string.html#built-in-formats) jsonschema formats for draft 7 specification: email, hostnames, ip addresses, time, date and date-time formats.

Note: For date-time we are not checking against compliance against ISO8601 (and RFC3339 as subset of it). Since we are using specified format to set database column type on db normalization stage, value should be compliant to bigquery [timestamp](https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#timestamp_type) and SQL "timestamp with timezone" formats.
### Example of `expected_records.txt`:
In general, the expected_records.json should contain the subset of output of the records of particular stream you need to test.
The required fields are: `stream, data, emitted_at`
Expand Down

0 comments on commit 4717257

Please sign in to comment.