From 41f24cc2fbf52186716850a0bb76fd6a35ee2461 Mon Sep 17 00:00:00 2001 From: Bozo Dragojevic Date: Sun, 22 Mar 2020 18:14:29 +0100 Subject: [PATCH 01/38] Add support for starting from a known schema_map --- bigquery_schema_generator/generate_schema.py | 26 +++++++++++++------- 1 file changed, 17 insertions(+), 9 deletions(-) diff --git a/bigquery_schema_generator/generate_schema.py b/bigquery_schema_generator/generate_schema.py index 8f1b0c9..08b9cda 100755 --- a/bigquery_schema_generator/generate_schema.py +++ b/bigquery_schema_generator/generate_schema.py @@ -123,7 +123,7 @@ def log_error(self, msg): # TODO: BigQuery is case-insensitive with regards to the 'name' of the # field. Verify that the 'name' is unique regardless of the case. - def deduce_schema(self, file): + def deduce_schema(self, file, *, schema_map=None): """Loop through each newlined-delimited line of 'file' and deduce the BigQuery schema. The schema is returned as a recursive map that contains both the database schema and some additional metadata @@ -178,7 +178,9 @@ def deduce_schema(self, file): else: raise Exception("Unknown input_format '%s'" % self.input_format) - schema_map = OrderedDict() + if schema_map is None: + schema_map = OrderedDict() + try: for json_object in reader: self.line_number += 1 @@ -296,11 +298,17 @@ def merge_schema_entry(self, old_schema_entry, new_schema_entry): # upgraded to a REPEATED {primitive_type}, but currently 'bq load' does # not support that so we must also follow that rule. if old_mode != new_mode: - self.log_error( - f'Ignoring non-RECORD field with mismatched mode: ' - f'old=({old_status},{old_name},{old_mode},{old_type}); ' - f'new=({new_status},{new_name},{new_mode},{new_type})') - return None + # primitive-types are conservatively deduced NULLABLE. In case we + # know a-priori that a field is REQUIRED, we accept that + new_might_be_required = new_mode == 'NULLABLE' and new_schema_entry['filled'] + if self.infer_mode and old_mode == 'REQUIRED' and new_might_be_required: + new_info['mode'] = old_mode + else: + self.log_error( + f'Ignoring non-RECORD field with mismatched mode: ' + f'old=({old_status},{old_name},{old_mode},{old_type}); ' + f'new=({new_status},{new_name},{new_mode},{new_type})') + return None # Check that the converted types are compatible. candidate_type = convert_type(old_type, new_type) @@ -509,14 +517,14 @@ def flatten_schema(self, schema_map): infer_mode=self.infer_mode, sanitize_names=self.sanitize_names) - def run(self, input_file=sys.stdin, output_file=sys.stdout): + def run(self, input_file=sys.stdin, output_file=sys.stdout, schema_map=None): """Read the data records from the input_file and print out the BigQuery schema on the output_file. The error logs are printed on the sys.stderr. Args: input_file: a file-like object (default: sys.stdin) output_file: a file-like object (default: sys.stdout) """ - schema_map, error_logs = self.deduce_schema(input_file) + schema_map, error_logs = self.deduce_schema(input_file, schema_map=schema_map) for error in error_logs: logging.info("Problem on line %s: %s", error['line'], error['msg']) From 99c5168a2eeb44bc77d1095e4ce6564489bb76ae Mon Sep 17 00:00:00 2001 From: Bozo Dragojevic Date: Tue, 24 Mar 2020 09:34:32 +0100 Subject: [PATCH 02/38] A SCHEMA tag can not be followed by DATA, ERROR or SCHEMA tag --- tests/data_reader.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/data_reader.py b/tests/data_reader.py index 8a7f0ac..f412c0e 100755 --- a/tests/data_reader.py +++ b/tests/data_reader.py @@ -216,8 +216,8 @@ def read_schema_section(self): break (tag, _) = self.parse_tag_line(line) if tag in self.TAG_TOKENS: - if tag == 'SCHEMA': - raise Exception("Unexpected SCHEMA tag") + if tag in ('DATA', 'ERROR', 'SCHEMA'): + raise Exception("Unexpected {} tag".format(tag)) self.push_back(line) break schema_lines.append(line) From 783e1f130460c471528b7f321645ef3998fc25ba Mon Sep 17 00:00:00 2001 From: Bozo Dragojevic Date: Sun, 22 Mar 2020 18:09:57 +0100 Subject: [PATCH 03/38] Convert tests to pytest. Identify data chunks by count and line number --- setup.py | 7 ++++++- tests/data_reader.py | 13 ++++++++++--- tests/test_generate_schema.py | 35 ++++++++++++++++++++++++----------- 3 files changed, 40 insertions(+), 15 deletions(-) diff --git a/setup.py b/setup.py index c59e42c..e858221 100644 --- a/setup.py +++ b/setup.py @@ -27,5 +27,10 @@ 'console_scripts': [ 'generate-schema = bigquery_schema_generator.generate_schema:main' ] - } + }, + extras_require={ + 'dev': [ + 'pytest', + ], + }, ) diff --git a/tests/data_reader.py b/tests/data_reader.py index f412c0e..2daaf7f 100755 --- a/tests/data_reader.py +++ b/tests/data_reader.py @@ -103,6 +103,8 @@ class DataReader: def __init__(self, testdata_file): self.testdata_file = testdata_file self.next_line = None + self.lineno = 0 + self.chunk_count = 0 def read_chunk(self): """Returns a dict with the next test chunk from the data file, @@ -115,15 +117,18 @@ def read_chunk(self): } Returns None if there are no more test chunks. """ - data_flags, records = self.read_data_section() + data_flags, records, line = self.read_data_section() if data_flags is None: return None errors = self.read_errors_section() error_map = self.process_errors(errors) schema = self.read_schema_section() self.read_end_marker() + self.chunk_count += 1 return { + 'chunk_count': self.chunk_count, + 'line': line, 'data_flags': data_flags, 'records': records, 'errors': errors, @@ -138,8 +143,9 @@ def read_data_section(self): # First tag must be 'DATA [flags]' tag_line = self.read_line() + lineno = self.lineno if tag_line is None: - return (None, None) + return (None, None, lineno) (tag, data_flags) = self.parse_tag_line(tag_line) if tag != 'DATA': raise Exception( @@ -160,7 +166,7 @@ def read_data_section(self): break records.append(line) - return (data_flags, records) + return (data_flags, records, lineno) def read_errors_section(self): """Return a dictionary of errors which are expected from the parsing of @@ -259,6 +265,7 @@ def read_line(self): while True: line = self.testdata_file.readline() + self.lineno += 1 # EOF if line == '': return None diff --git a/tests/test_generate_schema.py b/tests/test_generate_schema.py index 894da67..0b163cd 100755 --- a/tests/test_generate_schema.py +++ b/tests/test_generate_schema.py @@ -23,6 +23,7 @@ from bigquery_schema_generator.generate_schema import is_string_type from bigquery_schema_generator.generate_schema import convert_type from data_reader import DataReader +from pytest import fixture class TestSchemaGenerator(unittest.TestCase): @@ -397,7 +398,7 @@ def test_run_with_input_and_output(self): self.assertEqual(expected, output.getvalue()) -class TestFromDataFile(unittest.TestCase): +class ChunksFromDataFile(object): """Read the test case data from TESTDATA_FILE and verify that the expected schema matches the one produced by SchemaGenerator.deduce_schema(). Multiple test cases are stored in TESTDATA_FILE. The data_reader.py module knows how @@ -406,7 +407,7 @@ class TestFromDataFile(unittest.TestCase): TESTDATA_FILE = 'testdata.txt' - def test(self): + def chunks(self): # Find the TESTDATA_FILE in the same directory as this script file. dir_path = os.path.dirname(os.path.realpath(__file__)) testdata_path = os.path.join(dir_path, self.TESTDATA_FILE) @@ -414,15 +415,25 @@ def test(self): # Read each test case (data chunk) and verify the expected schema. with open(testdata_path) as testdatafile: data_reader = DataReader(testdatafile) - chunk_count = 0 while True: chunk = data_reader.read_chunk() if chunk is None: break - chunk_count += 1 - self.verify_data_chunk(chunk_count, chunk) + yield chunk - def verify_data_chunk(self, chunk_count, chunk): + @classmethod + def chunk_id(cls, chunk): + return "chunk-{chunk_count}-line-{line}".format(**chunk) + +@fixture(params = list(ChunksFromDataFile().chunks()), ids=ChunksFromDataFile.chunk_id) +def chunk(request): + return request.param + + +class TestDataChunks: + def test_data_chunk(self, chunk): + chunk_count = chunk['chunk_count'] + line = chunk['line'] data_flags = chunk['data_flags'] input_format = 'csv' if ('csv' in data_flags) else 'json' keep_nulls = ('keep_nulls' in data_flags) @@ -434,7 +445,7 @@ def verify_data_chunk(self, chunk_count, chunk): expected_error_map = chunk['error_map'] expected_schema = chunk['schema'] - print("Test chunk %s: First record: %s" % (chunk_count, records[0])) + print("Test chunk %s, line %s: First record: %s" % (chunk_count, line, records[0])) # Generate schema. generator = SchemaGenerator( input_format=input_format, @@ -447,10 +458,12 @@ def verify_data_chunk(self, chunk_count, chunk): # Check the schema, preserving order expected = json.loads(expected_schema, object_pairs_hook=OrderedDict) - self.assertEqual(expected, schema) + assert expected == schema # Check the error messages - self.assertEqual(len(expected_errors), len(error_logs)) + assert len(expected_errors) == len(error_logs) + self.assert_error_messages(expected_error_map, error_logs) + self.assert_error_messages(expected_error_map, error_logs) def assert_error_messages(self, expected_error_map, error_logs): @@ -470,9 +483,9 @@ def assert_error_messages(self, expected_error_map, error_logs): # well. for line_number, messages in sorted(error_map.items()): expected_entry = expected_error_map.get(line_number) - self.assertIsNotNone(expected_entry) + assert expected_entry is not None expected_messages = expected_entry['msgs'] - self.assertEqual(len(expected_messages), len(messages)) + assert len(expected_messages) == len(messages) if __name__ == '__main__': From bbbb35cd6ee1d9ccb840ab1ec6b50369b1dc8097 Mon Sep 17 00:00:00 2001 From: Bozo Dragojevic Date: Tue, 24 Mar 2020 09:18:43 +0100 Subject: [PATCH 04/38] Add support for starting from a known BQ schema --- bigquery_schema_generator/generate_schema.py | 76 +++++++++++-- tests/data_reader.py | 29 +++-- tests/test_generate_schema.py | 108 +++++++++++++++++++ tests/testdata.txt | 8 +- 4 files changed, 205 insertions(+), 16 deletions(-) diff --git a/bigquery_schema_generator/generate_schema.py b/bigquery_schema_generator/generate_schema.py index 08b9cda..dc59874 100755 --- a/bigquery_schema_generator/generate_schema.py +++ b/bigquery_schema_generator/generate_schema.py @@ -206,11 +206,20 @@ def deduce_schema_for_line(self, json_object, schema_map): then they must be compatible. """ for key, value in json_object.items(): + key = self.sanitize_name(key) schema_entry = schema_map.get(key) new_schema_entry = self.get_schema_entry(key, value) schema_map[key] = self.merge_schema_entry(schema_entry, new_schema_entry) + def sanitize_name(self, value): + if self.sanitize_names: + new_value = re.sub('[^a-zA-Z0-9_]', '_', value[:127]) + else: + new_value = value + return new_value + + def merge_schema_entry(self, old_schema_entry, new_schema_entry): """Merges the 'new_schema_entry' into the 'old_schema_entry' and return a merged schema entry. Recursively merges in sub-fields as well. @@ -514,8 +523,7 @@ def flatten_schema(self, schema_map): schema_map=schema_map, keep_nulls=self.keep_nulls, sorted_schema=self.sorted_schema, - infer_mode=self.infer_mode, - sanitize_names=self.sanitize_names) + infer_mode=self.infer_mode) def run(self, input_file=sys.stdin, output_file=sys.stdout, schema_map=None): """Read the data records from the input_file and print out the BigQuery @@ -629,8 +637,7 @@ def is_string_type(thetype): def flatten_schema_map(schema_map, keep_nulls=False, sorted_schema=True, - infer_mode=False, - sanitize_names=False): + infer_mode=False): """Converts the 'schema_map' into a more flatten version which is compatible with BigQuery schema. @@ -687,7 +694,7 @@ def flatten_schema_map(schema_map, else: # Recursively flatten the sub-fields of a RECORD entry. new_value = flatten_schema_map( - value, keep_nulls, sorted_schema, sanitize_names) + value, keep_nulls, sorted_schema) elif key == 'type' and value in ['QINTEGER', 'QFLOAT', 'QBOOLEAN']: new_value = value[1:] elif key == 'mode': @@ -695,14 +702,69 @@ def flatten_schema_map(schema_map, new_value = 'REQUIRED' else: new_value = value - elif key == 'name' and sanitize_names: - new_value = re.sub('[^a-zA-Z0-9_]', '_', value)[0:127] else: new_value = value new_info[key] = new_value schema.append(new_info) return schema +def bq_schema_to_map(schema): + """ convert BQ JSON table schema representation to SchemaGenerator schema_map representaton """ + if isinstance(schema, dict): + schema = schema['fields'] + return OrderedDict((f['name'], bq_schema_field_to_entry(f)) + for f in schema) + + +BQ_TYPES = frozenset( + '''STRING + BYTES + INTEGER + FLOAT + BOOLEAN + TIMESTAMP + DATE + TIME + DATETIME + RECORD'''.split()) + +BQ_TYPE_ALIASES = { + 'INT64': 'INTEGER', + 'FLOAT64': 'FLOAT', + 'BOOL': 'BOOLEAN', + 'STRUCT': 'RECORD', + } + + +def bq_type_to_entry_type(type): + if type in BQ_TYPES: + return type + if type in BQ_TYPE_ALIASES: + return BQ_TYPE_ALIASES[type] + raise TypeError(f'Unknown BQ type ""{type}"') + + +def bq_schema_field_to_entry(field): + type = bq_type_to_entry_type(field['type']) + # maintain order of info fields + if type == 'RECORD': + info = OrderedDict([ + ('fields', bq_schema_to_map(field['fields'])), + ('mode', field['mode']), + ('name', field['name']), + ('type', type), + ]) + else: + info = OrderedDict([ + ('mode', field['mode']), + ('name', field['name']), + ('type', type), + ]) + return OrderedDict([ + ('status', 'hard'), + ('filled', field['mode'] != 'NULLABLE'), + ('info', info), + ]) def main(): # Configure command line flags. diff --git a/tests/data_reader.py b/tests/data_reader.py index 2daaf7f..87b4876 100755 --- a/tests/data_reader.py +++ b/tests/data_reader.py @@ -46,6 +46,9 @@ class DataReader: ERRORS line: msg ... + ERRORS INFORMED + line: msg + ... SCHEMA bigquery_schema END @@ -57,6 +60,8 @@ class DataReader: * a DATA section containing the newline-separated JSON data records * an optional ERRORS section containing the expected error messages + * an optional ERRORS INFORMED section containing the expected error + messages when the schema is known to schema decoder in advance * a SCHEMA section containing the expected BigQuery schema * comment lines start with a '#' character. @@ -120,8 +125,14 @@ def read_chunk(self): data_flags, records, line = self.read_data_section() if data_flags is None: return None - errors = self.read_errors_section() - error_map = self.process_errors(errors) + error_flags, errors = self.read_errors_section() + if errors and error_flags: + raise Exception("Unexpected error flags in the first ERRORS section") + informed_error_flags, informed_errors = self.read_errors_section() + if informed_errors and "INFORMED" not in informed_error_flags: + raise Exception("Expected INFORMED flag in the second ERRORS section") + error_map = self.process_errors(errors or []) + informed_error_map = self.process_errors(informed_errors or []) schema = self.read_schema_section() self.read_end_marker() self.chunk_count += 1 @@ -131,8 +142,10 @@ def read_chunk(self): 'line': line, 'data_flags': data_flags, 'records': records, - 'errors': errors, + 'errors': errors or [], 'error_map': error_map, + 'informed_errors': informed_errors, + 'informed_error_map': informed_error_map, 'schema': schema } @@ -180,11 +193,11 @@ def read_errors_section(self): # The 'ERRORS' section is optional. tag_line = self.read_line() if tag_line is None: - return [] - (tag, _) = self.parse_tag_line(tag_line) + return None, None + (tag, error_flags) = self.parse_tag_line(tag_line) if tag != 'ERRORS': self.push_back(tag_line) - return [] + return None, None # Read the ERRORS records until the next TAG_TOKEN. errors = [] @@ -194,12 +207,12 @@ def read_errors_section(self): raise Exception("Unexpected EOF, should be SCHEMA tag") (tag, _) = self.parse_tag_line(line) if tag in self.TAG_TOKENS: - if tag == 'ERRORS': + if tag == 'DATA': raise Exception("Unexpected ERRORS tag") self.push_back(line) break errors.append(line) - return errors + return error_flags, errors def read_schema_section(self): """Returns the JSON string of the schema section. diff --git a/tests/test_generate_schema.py b/tests/test_generate_schema.py index 0b163cd..e86d4c1 100755 --- a/tests/test_generate_schema.py +++ b/tests/test_generate_schema.py @@ -22,10 +22,73 @@ from bigquery_schema_generator.generate_schema import SchemaGenerator from bigquery_schema_generator.generate_schema import is_string_type from bigquery_schema_generator.generate_schema import convert_type +from bigquery_schema_generator.generate_schema import bq_schema_to_map +from bigquery_schema_generator.generate_schema import BQ_TYPES from data_reader import DataReader from pytest import fixture +@fixture(params=sorted(BQ_TYPES)) +def f_bq_type(request): + return request.param + + +@fixture(params=['NULLABLE', 'REQUIRED', 'REPEATED']) +def f_bq_mode(request): + return request.param + + +@fixture +def f_bq_entry(f_bq_mode, f_bq_type): + yield make_bq_entry(f_bq_mode, f_bq_type) + + +def make_bq_entry(mode, type): + if type == 'RECORD': + return OrderedDict([ + ('fields', [make_bq_entry('NULLABLE','STRING')]), + ('mode', mode), + ('name', 'a'), + ('type', type), + ]) + else: + return OrderedDict([ + ('mode', mode), + ('name', 'a'), + ('type', type), + ]) + + +@fixture(params=[('csv', True), ('csv', False), ('json', False)], + ids=lambda x: "{}-{}".format(*x)) +def f_input_format_infer_mode(request): + return request.param + + +@fixture(params=[True, False]) +def f_keep_nulls(request): + return request.param + + +@fixture(params=[True, False]) +def f_quoted_values_are_strings(request): + return request.param + + +@fixture +def f_generator(f_input_format_infer_mode, f_keep_nulls, f_quoted_values_are_strings): + yield SchemaGenerator(input_format=f_input_format_infer_mode[0], + infer_mode=f_input_format_infer_mode[1], + keep_nulls=f_keep_nulls, + quoted_values_are_strings=f_quoted_values_are_strings) + + +def test_bq_entry_roundtrip(f_bq_entry, f_generator): + schema = [f_bq_entry] + schema_map = bq_schema_to_map(schema) + flattened = f_generator.flatten_schema(schema_map) + assert schema == flattened + class TestSchemaGenerator(unittest.TestCase): def test_timestamp_matcher_valid(self): self.assertTrue( @@ -430,6 +493,7 @@ def chunk(request): return request.param + class TestDataChunks: def test_data_chunk(self, chunk): chunk_count = chunk['chunk_count'] @@ -464,8 +528,52 @@ def test_data_chunk(self, chunk): assert len(expected_errors) == len(error_logs) self.assert_error_messages(expected_error_map, error_logs) + def test_data_chunk_informed(self, chunk): + chunk_count = chunk['chunk_count'] + line = chunk['line'] + data_flags = chunk['data_flags'] + input_format = 'csv' if ('csv' in data_flags) else 'json' + keep_nulls = ('keep_nulls' in data_flags) + infer_mode = ('infer_mode' in data_flags) + quoted_values_are_strings = ('quoted_values_are_strings' in data_flags) + sanitize_names = ('sanitize_names' in data_flags) + records = chunk['records'] + expected_schema = chunk['schema'] + expected_errors = chunk['informed_errors'] + expected_error_map = chunk['informed_error_map'] + if expected_errors is None: + expected_errors = chunk['errors'] + expected_error_map = chunk['error_map'] + + # Check the schema, preserving order + expected = json.loads(expected_schema, object_pairs_hook=OrderedDict) + + print("Test informed chunk %s, line %s: First record: %s" % (chunk_count, line, records[0])) + + # Test deduction with preloaded schema + + expected_map = bq_schema_to_map(expected) + generator = SchemaGenerator( + input_format=input_format, + infer_mode=infer_mode, + keep_nulls=keep_nulls, + quoted_values_are_strings=quoted_values_are_strings, + sanitize_names=sanitize_names) + schema_map, error_logs = generator.deduce_schema(records, schema_map=expected_map) + schema = generator.flatten_schema(schema_map) + + # Check the schema, preserving order + assert expected == schema + + print('informed_expected_errors=',expected_errors,'error_logs=',error_logs) + assert len(expected_errors) == len(error_logs) self.assert_error_messages(expected_error_map, error_logs) + # Test roundtrip of schema -> schema_map -> schema + expected_map = bq_schema_to_map(expected) + schema = generator.flatten_schema(expected_map) + assert expected == schema + def assert_error_messages(self, expected_error_map, error_logs): # Convert the list of errors into a map error_map = {} diff --git a/tests/testdata.txt b/tests/testdata.txt index 8053bc3..d3a92fa 100644 --- a/tests/testdata.txt +++ b/tests/testdata.txt @@ -248,6 +248,8 @@ DATA { "a": [1, 3], "r": [{ "r0": "r0", "r1": "r1" }] } ERRORS 2: Converting schema for "r" from NULLABLE RECORD into REPEATED RECORD +ERRORS INFORMED +# none SCHEMA [ { @@ -407,6 +409,8 @@ DATA { "i": 3 } ERRORS 2: Ignoring non-RECORD field with mismatched mode: old=(hard,i,REPEATED,INTEGER); new=(hard,i,NULLABLE,INTEGER) +ERRORS INFORMED +2: Converting schema for "r" from NULLABLE RECORD into REPEATED RECORD SCHEMA [] END @@ -417,6 +421,8 @@ DATA { "r" : [{ "i": 4 }] } ERRORS 2: Converting schema for "r" from NULLABLE RECORD into REPEATED RECORD +ERRORS INFORMED +1: Leaving schema for "r" as REPEATED RECORD SCHEMA [ { @@ -811,7 +817,7 @@ SCHEMA ] END -# Infer 'REQUIRED' mode for a consistently filled in value - simple +# Infer 'REQUIR ED' mode for a consistently filled in value - simple DATA csv infer_mode a,b,c,d,e ,ho,hi,true From 040d8013d6c5c382f97ecb359e6ef58aae7eaa65 Mon Sep 17 00:00:00 2001 From: Bozo Dragojevic Date: Tue, 24 Mar 2020 09:35:15 +0100 Subject: [PATCH 05/38] Add support for tox to test across python versions --- .gitignore | 1 + pyproject.toml | 5 +++++ setup.py | 7 +++++++ tox.ini | 27 +++++++++++++++++++++++++++ 4 files changed, 40 insertions(+) create mode 100644 pyproject.toml create mode 100644 tox.ini diff --git a/.gitignore b/.gitignore index cb20bb7..a43d27c 100644 --- a/.gitignore +++ b/.gitignore @@ -21,3 +21,4 @@ wheels/ .installed.cfg *.egg MANIFEST +.tox/ diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..6ddb34b --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,5 @@ +[build-system] +# These are the assumed default build requirements from pip: +# https://pip.pypa.io/en/stable/reference/pip/#pep-517-and-518-support +requires = ["setuptools>=40.8.0", "wheel"] +build-backend = "setuptools.build_meta" diff --git a/setup.py b/setup.py index e858221..4c3f7fb 100644 --- a/setup.py +++ b/setup.py @@ -30,7 +30,14 @@ }, extras_require={ 'dev': [ + 'flake8', 'pytest', + 'tox', + 'wheel', + 'setuptools', ], + 'test': [ + 'coverage', + ], }, ) diff --git a/tox.ini b/tox.ini new file mode 100644 index 0000000..80e9134 --- /dev/null +++ b/tox.ini @@ -0,0 +1,27 @@ +[tox] +envlist = py{35,36,37} +skip_missing_interpreters = True + +# Define the minimal tox version required to run; +# if the host tox is less than this the tool with create an environment and +# provision it with a tox that satisfies it under provision_tox_env. +# At least this version is needed for PEP 517/518 support. +minversion = 3.3.0 + +# Activate isolated build environment. tox will use a virtual environment +# to build a source distribution from the source tree. For build tools and +# arguments use the pyproject.toml file as specified in PEP-517 and PEP-518. +isolated_build = true + +[testenv] +deps = + flake8 + pytest +commands = + python setup.py check -m -s + # flake8 . + py.test tests {posargs} + +[flake8] +exclude = .tox,*.egg,build,data +select = E,W,F From ea5107e59c80fb127c40b4514697321b1b6671d0 Mon Sep 17 00:00:00 2001 From: Bozo Dragojevic Date: Tue, 24 Mar 2020 20:36:56 +0100 Subject: [PATCH 06/38] BQ schema is case insensitive, track keys by their lowercase value --- bigquery_schema_generator/generate_schema.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bigquery_schema_generator/generate_schema.py b/bigquery_schema_generator/generate_schema.py index dc59874..5c564ef 100755 --- a/bigquery_schema_generator/generate_schema.py +++ b/bigquery_schema_generator/generate_schema.py @@ -217,7 +217,7 @@ def sanitize_name(self, value): new_value = re.sub('[^a-zA-Z0-9_]', '_', value[:127]) else: new_value = value - return new_value + return new_value.lower() def merge_schema_entry(self, old_schema_entry, new_schema_entry): From 35d5cec2b45be5c8c9d002bc39169b528c2a5634 Mon Sep 17 00:00:00 2001 From: Bozo Dragojevic Date: Tue, 7 Apr 2020 10:42:08 +0200 Subject: [PATCH 07/38] lowercase field names when generating schema map from schema --- bigquery_schema_generator/generate_schema.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/bigquery_schema_generator/generate_schema.py b/bigquery_schema_generator/generate_schema.py index 5c564ef..8be71a0 100755 --- a/bigquery_schema_generator/generate_schema.py +++ b/bigquery_schema_generator/generate_schema.py @@ -712,7 +712,7 @@ def bq_schema_to_map(schema): """ convert BQ JSON table schema representation to SchemaGenerator schema_map representaton """ if isinstance(schema, dict): schema = schema['fields'] - return OrderedDict((f['name'], bq_schema_field_to_entry(f)) + return OrderedDict((f['name'].lower(), bq_schema_field_to_entry(f)) for f in schema) @@ -751,13 +751,13 @@ def bq_schema_field_to_entry(field): info = OrderedDict([ ('fields', bq_schema_to_map(field['fields'])), ('mode', field['mode']), - ('name', field['name']), + ('name', field['name'].lower()), ('type', type), ]) else: info = OrderedDict([ ('mode', field['mode']), - ('name', field['name']), + ('name', field['name'].lower()), ('type', type), ]) return OrderedDict([ From 444a5b9aa2decbbf5c7bcc518f2038e651ac17c5 Mon Sep 17 00:00:00 2001 From: Bozo Dragojevic Date: Thu, 23 Apr 2020 13:14:16 +0200 Subject: [PATCH 08/38] Keep schema map index case insensitive, but preserve original field case --- bigquery_schema_generator/generate_schema.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/bigquery_schema_generator/generate_schema.py b/bigquery_schema_generator/generate_schema.py index 8be71a0..b4898f8 100755 --- a/bigquery_schema_generator/generate_schema.py +++ b/bigquery_schema_generator/generate_schema.py @@ -206,11 +206,11 @@ def deduce_schema_for_line(self, json_object, schema_map): then they must be compatible. """ for key, value in json_object.items(): - key = self.sanitize_name(key) - schema_entry = schema_map.get(key) + sanitized_key = self.sanitize_name(key) + schema_entry = schema_map.get(sanitized_key) new_schema_entry = self.get_schema_entry(key, value) - schema_map[key] = self.merge_schema_entry(schema_entry, - new_schema_entry) + schema_map[sanitized_key] = self.merge_schema_entry(schema_entry, + new_schema_entry) def sanitize_name(self, value): if self.sanitize_names: @@ -751,13 +751,13 @@ def bq_schema_field_to_entry(field): info = OrderedDict([ ('fields', bq_schema_to_map(field['fields'])), ('mode', field['mode']), - ('name', field['name'].lower()), + ('name', field['name']), ('type', type), ]) else: info = OrderedDict([ ('mode', field['mode']), - ('name', field['name'].lower()), + ('name', field['name']), ('type', type), ]) return OrderedDict([ From 265b3c3529eb85b3fcb34a2aebb054e74af4c067 Mon Sep 17 00:00:00 2001 From: Matevz Bradac Date: Tue, 16 Jun 2020 21:17:26 +0200 Subject: [PATCH 09/38] case sensitivity fix - preserve key name from old/original schema, if key from new schema only differs in case --- bigquery_schema_generator/generate_schema.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/bigquery_schema_generator/generate_schema.py b/bigquery_schema_generator/generate_schema.py index b4898f8..e3fecaf 100755 --- a/bigquery_schema_generator/generate_schema.py +++ b/bigquery_schema_generator/generate_schema.py @@ -272,9 +272,13 @@ def merge_schema_entry(self, old_schema_entry, new_schema_entry): # Defensive check, names should always be the same. if old_name != new_name: - raise Exception( - 'old_name (%s) != new_name(%s), should never happen' % - (old_name, new_name)) + if old_name.lower() != new_name.lower(): + raise Exception( + 'old_name (%s) != new_name(%s), should never happen' % + (old_name, new_name)) + else: + # preserve old name if case is different + new_info['name'] = old_info['name'] # Recursively merge in the subfields of a RECORD, allowing # NULLABLE to become REPEATED (because 'bq load' allows it). From ee47847fda41d35dab1997284321399056df1cc9 Mon Sep 17 00:00:00 2001 From: Matevz Bradac Date: Wed, 19 Aug 2020 13:33:38 +0200 Subject: [PATCH 10/38] add an optional callback to SchemaGenerator - allow the caller to process inconvertible types --- bigquery_schema_generator/generate_schema.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/bigquery_schema_generator/generate_schema.py b/bigquery_schema_generator/generate_schema.py index e3fecaf..f05ad88 100755 --- a/bigquery_schema_generator/generate_schema.py +++ b/bigquery_schema_generator/generate_schema.py @@ -80,13 +80,15 @@ def __init__(self, quoted_values_are_strings=False, debugging_interval=1000, debugging_map=False, - sanitize_names=False): + sanitize_names=False, + type_mismatch_callback=None): self.input_format = input_format self.infer_mode = infer_mode self.keep_nulls = keep_nulls self.quoted_values_are_strings = quoted_values_are_strings self.debugging_interval = debugging_interval self.debugging_map = debugging_map + self.type_mismatch_callback = type_mismatch_callback # 'infer_mode' is supported for only input_format = 'csv' because # the header line gives us the complete list of fields to be expected in @@ -325,6 +327,9 @@ def merge_schema_entry(self, old_schema_entry, new_schema_entry): # Check that the converted types are compatible. candidate_type = convert_type(old_type, new_type) + if not candidate_type and self.type_mismatch_callback: + # inconvertible -> check if the caller has additional insight + candidate_type = self.type_mismatch_callback(old_type, new_type) if not candidate_type: self.log_error( f'Ignoring field with mismatched type: ' From 8fbe52d0210867ec319c4a47fb9c9cd0d1ef1026 Mon Sep 17 00:00:00 2001 From: Austin Brogle Date: Fri, 6 Nov 2020 11:03:33 -0800 Subject: [PATCH 11/38] Modified data_reader class to read in an existing schema --- tests/data_reader.py | 45 +++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 42 insertions(+), 3 deletions(-) diff --git a/tests/data_reader.py b/tests/data_reader.py index 87b4876..8843302 100755 --- a/tests/data_reader.py +++ b/tests/data_reader.py @@ -32,6 +32,9 @@ class DataReader: DATA [flags] json_records ... + EXISTING_SCHEMA + existing json schema from bq api + ... ERRORS line: msg ... @@ -59,6 +62,8 @@ class DataReader: the following components: * a DATA section containing the newline-separated JSON data records + * an optional EXISTING_SCHEMA section contains the existing base + BigQuery schema to build off of * an optional ERRORS section containing the expected error messages * an optional ERRORS INFORMED section containing the expected error messages when the schema is known to schema decoder in advance @@ -97,13 +102,14 @@ class DataReader: data_flags = chunk['data_flags'] keep_nulls = ('keep_nulls' in data_flags) records = chunk['records'] + existing_schema = chunk['existing_schema'] schema = chunk['schema'] ... """ # Recognized tags. # TODO: Change to a hash set to speed up the lookup if many are added. - TAG_TOKENS = ['DATA', 'ERRORS', 'SCHEMA', 'END'] + TAG_TOKENS = ['DATA', 'ERRORS', 'EXISTING_SCHEMA', 'SCHEMA', 'END'] def __init__(self, testdata_file): self.testdata_file = testdata_file @@ -118,6 +124,7 @@ def read_chunk(self): 'data_flags': [data_flags], 'data': [data lines], 'errors': {errors}, + 'existing_schema': schema_string, 'schema': schema_string } Returns None if there are no more test chunks. @@ -125,6 +132,7 @@ def read_chunk(self): data_flags, records, line = self.read_data_section() if data_flags is None: return None + existing_schema = self.read_existing_schema_section() error_flags, errors = self.read_errors_section() if errors and error_flags: raise Exception("Unexpected error flags in the first ERRORS section") @@ -142,6 +150,7 @@ def read_chunk(self): 'line': line, 'data_flags': data_flags, 'records': records, + 'existing_schema': existing_schema, 'errors': errors or [], 'error_map': error_map, 'informed_errors': informed_errors, @@ -181,6 +190,35 @@ def read_data_section(self): return (data_flags, records, lineno) + def read_existing_schema_section(self): + """Returns the JSON string of the schema section. + """ + + # The next tag must be 'EXISTING_SCHEMA' + tag_line = self.read_line() + if tag_line is None: + raise Exception("Unexpected EOF, should be EXISTING_SCHEMA tag") + (tag, _) = self.parse_tag_line(tag_line) + if tag == 'EXISTING_SCHEMA': + # Read the EXISTING_SCHEMA records until the next TAG_TOKEN + schema_lines = [] + while True: + line = self.read_line() + if line is None: + break + (tag, _) = self.parse_tag_line(line) + if tag in self.TAG_TOKENS: + if tag in ('DATA', 'EXISTING_SCHEMA'): + raise Exception("Unexpected {} tag".format(tag)) + self.push_back(line) + break + schema_lines.append(line) + + return ''.join(schema_lines) + else: + self.push_back(tag_line) + return [] + def read_errors_section(self): """Return a dictionary of errors which are expected from the parsing of the DATA section. The dict has the form: @@ -208,7 +246,7 @@ def read_errors_section(self): (tag, _) = self.parse_tag_line(line) if tag in self.TAG_TOKENS: if tag == 'DATA': - raise Exception("Unexpected ERRORS tag") + raise Exception("Unexpected DATA tag found in ERRORS section") self.push_back(line) break errors.append(line) @@ -235,7 +273,7 @@ def read_schema_section(self): break (tag, _) = self.parse_tag_line(line) if tag in self.TAG_TOKENS: - if tag in ('DATA', 'ERROR', 'SCHEMA'): + if tag in ('DATA', 'ERRORS', 'EXISTING_SCHEMA', 'SCHEMA'): raise Exception("Unexpected {} tag".format(tag)) self.push_back(line) break @@ -342,6 +380,7 @@ def main(): break print("DATA_FLAGS: %s" % chunk['data_flags']) print("DATA: %s" % chunk['records']) + print("EXISTING_SCHEMA: %s" % chunk['existing_schema']) print("ERRORS: %s" % chunk['errors']) print("ERROR_MAP: %s" % chunk['error_map']) print("SCHEMA: %s" % chunk['schema']) From e15bfe9ca7af4df8a455e321fde1aefb00a89a2a Mon Sep 17 00:00:00 2001 From: Austin Brogle Date: Fri, 6 Nov 2020 14:06:32 -0800 Subject: [PATCH 12/38] Fixed bug related to sanitization within CSV datasets --- bigquery_schema_generator/generate_schema.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/bigquery_schema_generator/generate_schema.py b/bigquery_schema_generator/generate_schema.py index f05ad88..e597345 100755 --- a/bigquery_schema_generator/generate_schema.py +++ b/bigquery_schema_generator/generate_schema.py @@ -348,6 +348,7 @@ def get_schema_entry(self, key, value): value_mode, value_type = self.infer_bigquery_type(value) if not value_mode or not value_type: return None + sanitized_key = self.sanitize_name(key) # yapf: disable if value_type == 'RECORD': @@ -363,7 +364,7 @@ def get_schema_entry(self, key, value): ('info', OrderedDict([ ('fields', fields), ('mode', value_mode), - ('name', key), + ('name', sanitized_key), ('type', value_type), ]))]) elif value_type == '__null__': @@ -371,7 +372,7 @@ def get_schema_entry(self, key, value): ('filled', False), ('info', OrderedDict([ ('mode', 'NULLABLE'), - ('name', key), + ('name', sanitized_key), ('type', 'STRING'), ]))]) elif value_type == '__empty_array__': @@ -379,7 +380,7 @@ def get_schema_entry(self, key, value): ('filled', False), ('info', OrderedDict([ ('mode', 'REPEATED'), - ('name', key), + ('name', sanitized_key), ('type', 'STRING'), ]))]) elif value_type == '__empty_record__': @@ -388,7 +389,7 @@ def get_schema_entry(self, key, value): ('info', OrderedDict([ ('fields', OrderedDict()), ('mode', value_mode), - ('name', key), + ('name', sanitized_key), ('type', 'RECORD'), ]))]) else: @@ -404,7 +405,7 @@ def get_schema_entry(self, key, value): ('filled', filled), ('info', OrderedDict([ ('mode', value_mode), - ('name', key), + ('name', sanitized_key), ('type', value_type), ]))]) # yapf: enable @@ -540,6 +541,7 @@ def run(self, input_file=sys.stdin, output_file=sys.stdout, schema_map=None): Args: input_file: a file-like object (default: sys.stdin) output_file: a file-like object (default: sys.stdout) + schema_map: the existing bigquery schema_map we start with """ schema_map, error_logs = self.deduce_schema(input_file, schema_map=schema_map) From 1e286f14269066ce55628e143790a05f0a5a7b38 Mon Sep 17 00:00:00 2001 From: Austin Brogle Date: Fri, 6 Nov 2020 14:08:45 -0800 Subject: [PATCH 13/38] Migrated off of pytest and back to unittest. Coverted fixtures into for loops to achieve this --- pyproject.toml | 5 -- setup.py | 14 +-- tests/test_generate_schema.py | 162 +++++++++++++++++----------------- tests/testdata.txt | 2 +- tox.ini | 27 ------ 5 files changed, 84 insertions(+), 126 deletions(-) delete mode 100644 pyproject.toml delete mode 100644 tox.ini diff --git a/pyproject.toml b/pyproject.toml deleted file mode 100644 index 6ddb34b..0000000 --- a/pyproject.toml +++ /dev/null @@ -1,5 +0,0 @@ -[build-system] -# These are the assumed default build requirements from pip: -# https://pip.pypa.io/en/stable/reference/pip/#pep-517-and-518-support -requires = ["setuptools>=40.8.0", "wheel"] -build-backend = "setuptools.build_meta" diff --git a/setup.py b/setup.py index 4c3f7fb..c59e42c 100644 --- a/setup.py +++ b/setup.py @@ -27,17 +27,5 @@ 'console_scripts': [ 'generate-schema = bigquery_schema_generator.generate_schema:main' ] - }, - extras_require={ - 'dev': [ - 'flake8', - 'pytest', - 'tox', - 'wheel', - 'setuptools', - ], - 'test': [ - 'coverage', - ], - }, + } ) diff --git a/tests/test_generate_schema.py b/tests/test_generate_schema.py index e86d4c1..0fc51ce 100755 --- a/tests/test_generate_schema.py +++ b/tests/test_generate_schema.py @@ -25,70 +25,8 @@ from bigquery_schema_generator.generate_schema import bq_schema_to_map from bigquery_schema_generator.generate_schema import BQ_TYPES from data_reader import DataReader -from pytest import fixture -@fixture(params=sorted(BQ_TYPES)) -def f_bq_type(request): - return request.param - - -@fixture(params=['NULLABLE', 'REQUIRED', 'REPEATED']) -def f_bq_mode(request): - return request.param - - -@fixture -def f_bq_entry(f_bq_mode, f_bq_type): - yield make_bq_entry(f_bq_mode, f_bq_type) - - -def make_bq_entry(mode, type): - if type == 'RECORD': - return OrderedDict([ - ('fields', [make_bq_entry('NULLABLE','STRING')]), - ('mode', mode), - ('name', 'a'), - ('type', type), - ]) - else: - return OrderedDict([ - ('mode', mode), - ('name', 'a'), - ('type', type), - ]) - - -@fixture(params=[('csv', True), ('csv', False), ('json', False)], - ids=lambda x: "{}-{}".format(*x)) -def f_input_format_infer_mode(request): - return request.param - - -@fixture(params=[True, False]) -def f_keep_nulls(request): - return request.param - - -@fixture(params=[True, False]) -def f_quoted_values_are_strings(request): - return request.param - - -@fixture -def f_generator(f_input_format_infer_mode, f_keep_nulls, f_quoted_values_are_strings): - yield SchemaGenerator(input_format=f_input_format_infer_mode[0], - infer_mode=f_input_format_infer_mode[1], - keep_nulls=f_keep_nulls, - quoted_values_are_strings=f_quoted_values_are_strings) - - -def test_bq_entry_roundtrip(f_bq_entry, f_generator): - schema = [f_bq_entry] - schema_map = bq_schema_to_map(schema) - flattened = f_generator.flatten_schema(schema_map) - assert schema == flattened - class TestSchemaGenerator(unittest.TestCase): def test_timestamp_matcher_valid(self): self.assertTrue( @@ -484,18 +422,18 @@ def chunks(self): break yield chunk - @classmethod - def chunk_id(cls, chunk): - return "chunk-{chunk_count}-line-{line}".format(**chunk) - -@fixture(params = list(ChunksFromDataFile().chunks()), ids=ChunksFromDataFile.chunk_id) -def chunk(request): - return request.param - +class TestDataChunksFromFile(unittest.TestCase): + def test_all_data_chunks(self): + for chunk in ChunksFromDataFile().chunks(): + try: + self.verify_data_chunk(chunk) + self.verify_data_chunk_informed(chunk) + except AssertionError as e: + print("\nError when processing chunk on line {}\n".format(chunk['line'])) + raise e -class TestDataChunks: - def test_data_chunk(self, chunk): + def verify_data_chunk(self, chunk): chunk_count = chunk['chunk_count'] line = chunk['line'] data_flags = chunk['data_flags'] @@ -522,13 +460,13 @@ def test_data_chunk(self, chunk): # Check the schema, preserving order expected = json.loads(expected_schema, object_pairs_hook=OrderedDict) - assert expected == schema + self.assertEqual(expected, schema) # Check the error messages - assert len(expected_errors) == len(error_logs) + self.assertEqual(len(expected_errors), len(error_logs)) self.assert_error_messages(expected_error_map, error_logs) - def test_data_chunk_informed(self, chunk): + def verify_data_chunk_informed(self, chunk): chunk_count = chunk['chunk_count'] line = chunk['line'] data_flags = chunk['data_flags'] @@ -563,16 +501,16 @@ def test_data_chunk_informed(self, chunk): schema = generator.flatten_schema(schema_map) # Check the schema, preserving order - assert expected == schema + self.assertEqual(expected, schema) print('informed_expected_errors=',expected_errors,'error_logs=',error_logs) - assert len(expected_errors) == len(error_logs) + self.assertEqual(len(expected_errors), len(error_logs)) self.assert_error_messages(expected_error_map, error_logs) # Test roundtrip of schema -> schema_map -> schema expected_map = bq_schema_to_map(expected) schema = generator.flatten_schema(expected_map) - assert expected == schema + self.assertEqual(expected, schema) def assert_error_messages(self, expected_error_map, error_logs): # Convert the list of errors into a map @@ -591,9 +529,73 @@ def assert_error_messages(self, expected_error_map, error_logs): # well. for line_number, messages in sorted(error_map.items()): expected_entry = expected_error_map.get(line_number) - assert expected_entry is not None + self.assertIsNotNone(expected_entry) expected_messages = expected_entry['msgs'] - assert len(expected_messages) == len(messages) + self.assertEqual(len(expected_messages), len(messages)) + + +class TestBigQuerySchemaToSchemaMap(unittest.TestCase): + def test_bq_schema_to_map_permutations(self): + ''' This checks that each possible type of consititued schema, when generated, + then converted to a schema_map, then back to the schema, they are equal. + + This function is really ugly but has good coverage. This was migrated + from pytest fixtures which were a bit cleaner but we ideally did not + want to add a new dependency / library that is used for testing. + ''' + valid_types = BQ_TYPES + valid_modes = ['NULLABLE', 'REQUIRED', 'REPEATED'] + valid_input_formats_and_modes = [('csv', True), ('csv', False), ('json', False)] + valid_keep_null_params = [True, False] + valid_quoted_values_are_strings = [True, False] + for valid_type in valid_types: + for valid_mode in valid_modes: + bq_entry = self.make_bq_schema_entry(valid_mode, valid_type) + schema = [bq_entry] + schema_map = bq_schema_to_map(schema) + for input_format_and_mode in valid_input_formats_and_modes: + for keep_null_param in valid_keep_null_params: + for quote_value_are_strings in valid_quoted_values_are_strings: + generator = SchemaGenerator(input_format=input_format_and_mode[0], + infer_mode=input_format_and_mode[1], + keep_nulls=keep_null_param, + quoted_values_are_strings=quote_value_are_strings) + flattened = generator.flatten_schema(schema_map) + try: + self.assertEquals(schema, flattened) + except AssertionError as e: + print("test_bq_schema_to_map_permutations failed for case where: " + "bq_entry={}\nschema_generator created with values:" + "{}-{}-{}-{}" + .format(bq_entry, + input_format_and_mode[0], + input_format_and_mode[1], + keep_null_param, + quote_value_are_strings)) + raise e + + def make_bq_schema_entry(self, mode, type): + ''' Creates a bigquery schema entry + ''' + if type == 'RECORD': + return OrderedDict([ + ('fields', [self.make_bq_schema_entry('NULLABLE','STRING')]), + ('mode', mode), + ('name', 'a'), + ('type', type), + ]) + else: + return OrderedDict([ + ('mode', mode), + ('name', 'a'), + ('type', type), + ]) + + def validate_existing_schema_to_schema_map_entry(self, existing_schema, schema_generator): + schema = [existing_schema] + schema_map = bq_schema_to_map(schema) + flattened = schema_generator.flatten_schema(schema_map) + self.assertEqual(schema, flattened) if __name__ == '__main__': diff --git a/tests/testdata.txt b/tests/testdata.txt index d3a92fa..f69a454 100644 --- a/tests/testdata.txt +++ b/tests/testdata.txt @@ -817,7 +817,7 @@ SCHEMA ] END -# Infer 'REQUIR ED' mode for a consistently filled in value - simple +# Infer 'REQUIRED' mode for a consistently filled in value - simple DATA csv infer_mode a,b,c,d,e ,ho,hi,true diff --git a/tox.ini b/tox.ini deleted file mode 100644 index 80e9134..0000000 --- a/tox.ini +++ /dev/null @@ -1,27 +0,0 @@ -[tox] -envlist = py{35,36,37} -skip_missing_interpreters = True - -# Define the minimal tox version required to run; -# if the host tox is less than this the tool with create an environment and -# provision it with a tox that satisfies it under provision_tox_env. -# At least this version is needed for PEP 517/518 support. -minversion = 3.3.0 - -# Activate isolated build environment. tox will use a virtual environment -# to build a source distribution from the source tree. For build tools and -# arguments use the pyproject.toml file as specified in PEP-517 and PEP-518. -isolated_build = true - -[testenv] -deps = - flake8 - pytest -commands = - python setup.py check -m -s - # flake8 . - py.test tests {posargs} - -[flake8] -exclude = .tox,*.egg,build,data -select = E,W,F From 7afdf419a2c4a61c3b356d1b59ad5baaed8cc17c Mon Sep 17 00:00:00 2001 From: Austin Brogle Date: Fri, 6 Nov 2020 15:19:31 -0800 Subject: [PATCH 14/38] Adding command-line flag for starting from existing bigquery schema --- bigquery_schema_generator/generate_schema.py | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/bigquery_schema_generator/generate_schema.py b/bigquery_schema_generator/generate_schema.py index e597345..597b799 100755 --- a/bigquery_schema_generator/generate_schema.py +++ b/bigquery_schema_generator/generate_schema.py @@ -777,6 +777,13 @@ def bq_schema_field_to_entry(field): ('info', info), ]) +def read_existing_schema_from_file(existing_schema_path): + if existing_schema_path: + with open(existing_schema_path, 'r') as f: + existing_json_schema = json.load(f) + return bq_schema_to_map(existing_json_schema) + return None + def main(): # Configure command line flags. parser = argparse.ArgumentParser( @@ -810,6 +817,12 @@ def main(): '--sanitize_names', help='Forces schema name to comply with BigQuery naming standard', action="store_true") + parser.add_argument( + '--existing_schema_path', + help='File that contains the existing BigQuery schema for a table.' + ' This can be fetched with:' + ' `bq show --schema ::', + default=None) args = parser.parse_args() # Configure logging. @@ -823,7 +836,8 @@ def main(): debugging_interval=args.debugging_interval, debugging_map=args.debugging_map, sanitize_names=args.sanitize_names) - generator.run() + existing_schema_map = read_existing_schema_from_file(args.existing_schema_path) + generator.run(schema_map=existing_schema_map) if __name__ == '__main__': From 96ca4ae148f6aa899665222d1c0e5a96968ecaae Mon Sep 17 00:00:00 2001 From: Austin Brogle Date: Mon, 9 Nov 2020 12:08:44 -0800 Subject: [PATCH 15/38] Added default NULLABLE mode when bigquery does not provide one --- bigquery_schema_generator/generate_schema.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/bigquery_schema_generator/generate_schema.py b/bigquery_schema_generator/generate_schema.py index 597b799..3fb8226 100755 --- a/bigquery_schema_generator/generate_schema.py +++ b/bigquery_schema_generator/generate_schema.py @@ -757,23 +757,26 @@ def bq_type_to_entry_type(type): def bq_schema_field_to_entry(field): type = bq_type_to_entry_type(field['type']) + # In some cases with nested fields within a record, bigquery does not + # populate a mode field. We will assume this is NULLABLE in this case + mode = field.get('mode', 'NULLABLE') # maintain order of info fields if type == 'RECORD': info = OrderedDict([ ('fields', bq_schema_to_map(field['fields'])), - ('mode', field['mode']), + ('mode', mode), ('name', field['name']), ('type', type), ]) else: info = OrderedDict([ - ('mode', field['mode']), + ('mode', mode), ('name', field['name']), ('type', type), ]) return OrderedDict([ ('status', 'hard'), - ('filled', field['mode'] != 'NULLABLE'), + ('filled', mode != 'NULLABLE'), ('info', info), ]) From fff6f5bf61a69edf67a1d19106dc5f42ef71cab4 Mon Sep 17 00:00:00 2001 From: Austin Brogle Date: Mon, 9 Nov 2020 12:10:23 -0800 Subject: [PATCH 16/38] Removing errors informed from tests. Adding additional test cases including ones starting from an existing schema. --- tests/data_reader.py | 13 +- tests/test_generate_schema.py | 68 ++----- tests/testdata.txt | 374 +++++++++++++++++++++++++++++++++- 3 files changed, 381 insertions(+), 74 deletions(-) diff --git a/tests/data_reader.py b/tests/data_reader.py index 8843302..5c99fc8 100755 --- a/tests/data_reader.py +++ b/tests/data_reader.py @@ -49,9 +49,6 @@ class DataReader: ERRORS line: msg ... - ERRORS INFORMED - line: msg - ... SCHEMA bigquery_schema END @@ -65,7 +62,6 @@ class DataReader: * an optional EXISTING_SCHEMA section contains the existing base BigQuery schema to build off of * an optional ERRORS section containing the expected error messages - * an optional ERRORS INFORMED section containing the expected error messages when the schema is known to schema decoder in advance * a SCHEMA section containing the expected BigQuery schema * comment lines start with a '#' character. @@ -136,11 +132,7 @@ def read_chunk(self): error_flags, errors = self.read_errors_section() if errors and error_flags: raise Exception("Unexpected error flags in the first ERRORS section") - informed_error_flags, informed_errors = self.read_errors_section() - if informed_errors and "INFORMED" not in informed_error_flags: - raise Exception("Expected INFORMED flag in the second ERRORS section") error_map = self.process_errors(errors or []) - informed_error_map = self.process_errors(informed_errors or []) schema = self.read_schema_section() self.read_end_marker() self.chunk_count += 1 @@ -153,8 +145,6 @@ def read_chunk(self): 'existing_schema': existing_schema, 'errors': errors or [], 'error_map': error_map, - 'informed_errors': informed_errors, - 'informed_error_map': informed_error_map, 'schema': schema } @@ -191,7 +181,7 @@ def read_data_section(self): return (data_flags, records, lineno) def read_existing_schema_section(self): - """Returns the JSON string of the schema section. + """Returns the JSON string of the existing_schema section. """ # The next tag must be 'EXISTING_SCHEMA' @@ -213,7 +203,6 @@ def read_existing_schema_section(self): self.push_back(line) break schema_lines.append(line) - return ''.join(schema_lines) else: self.push_back(tag_line) diff --git a/tests/test_generate_schema.py b/tests/test_generate_schema.py index 0fc51ce..62e8b1a 100755 --- a/tests/test_generate_schema.py +++ b/tests/test_generate_schema.py @@ -425,10 +425,10 @@ def chunks(self): class TestDataChunksFromFile(unittest.TestCase): def test_all_data_chunks(self): + self.maxDiff = None for chunk in ChunksFromDataFile().chunks(): try: self.verify_data_chunk(chunk) - self.verify_data_chunk_informed(chunk) except AssertionError as e: print("\nError when processing chunk on line {}\n".format(chunk['line'])) raise e @@ -446,8 +446,10 @@ def verify_data_chunk(self, chunk): expected_errors = chunk['errors'] expected_error_map = chunk['error_map'] expected_schema = chunk['schema'] + existing_schema = chunk['existing_schema'] - print("Test chunk %s, line %s: First record: %s" % (chunk_count, line, records[0])) + print("Test chunk %s, line %s: First record: %s" % + (chunk_count, line, records[0])) # Generate schema. generator = SchemaGenerator( input_format=input_format, @@ -455,7 +457,11 @@ def verify_data_chunk(self, chunk): keep_nulls=keep_nulls, quoted_values_are_strings=quoted_values_are_strings, sanitize_names=sanitize_names) - schema_map, error_logs = generator.deduce_schema(records) + existing_schema_map = None + if existing_schema: + existing_schema_map = bq_schema_to_map(json.loads(existing_schema)) + schema_map, error_logs = generator.deduce_schema( + records, schema_map=existing_schema_map) schema = generator.flatten_schema(schema_map) # Check the schema, preserving order @@ -466,52 +472,6 @@ def verify_data_chunk(self, chunk): self.assertEqual(len(expected_errors), len(error_logs)) self.assert_error_messages(expected_error_map, error_logs) - def verify_data_chunk_informed(self, chunk): - chunk_count = chunk['chunk_count'] - line = chunk['line'] - data_flags = chunk['data_flags'] - input_format = 'csv' if ('csv' in data_flags) else 'json' - keep_nulls = ('keep_nulls' in data_flags) - infer_mode = ('infer_mode' in data_flags) - quoted_values_are_strings = ('quoted_values_are_strings' in data_flags) - sanitize_names = ('sanitize_names' in data_flags) - records = chunk['records'] - expected_schema = chunk['schema'] - expected_errors = chunk['informed_errors'] - expected_error_map = chunk['informed_error_map'] - if expected_errors is None: - expected_errors = chunk['errors'] - expected_error_map = chunk['error_map'] - - # Check the schema, preserving order - expected = json.loads(expected_schema, object_pairs_hook=OrderedDict) - - print("Test informed chunk %s, line %s: First record: %s" % (chunk_count, line, records[0])) - - # Test deduction with preloaded schema - - expected_map = bq_schema_to_map(expected) - generator = SchemaGenerator( - input_format=input_format, - infer_mode=infer_mode, - keep_nulls=keep_nulls, - quoted_values_are_strings=quoted_values_are_strings, - sanitize_names=sanitize_names) - schema_map, error_logs = generator.deduce_schema(records, schema_map=expected_map) - schema = generator.flatten_schema(schema_map) - - # Check the schema, preserving order - self.assertEqual(expected, schema) - - print('informed_expected_errors=',expected_errors,'error_logs=',error_logs) - self.assertEqual(len(expected_errors), len(error_logs)) - self.assert_error_messages(expected_error_map, error_logs) - - # Test roundtrip of schema -> schema_map -> schema - expected_map = bq_schema_to_map(expected) - schema = generator.flatten_schema(expected_map) - self.assertEqual(expected, schema) - def assert_error_messages(self, expected_error_map, error_logs): # Convert the list of errors into a map error_map = {} @@ -523,19 +483,15 @@ def assert_error_messages(self, expected_error_map, error_logs): error_map[line_number] = messages messages.append(error['msg']) - # Check that each entry in 'error_logs' is expected. Currently checks - # only that the number of errors matches on a per line basis. - # TODO: Look deeper and verify that the error message strings match as - # well. for line_number, messages in sorted(error_map.items()): expected_entry = expected_error_map.get(line_number) self.assertIsNotNone(expected_entry) expected_messages = expected_entry['msgs'] - self.assertEqual(len(expected_messages), len(messages)) + self.assertEqual(expected_messages, messages) class TestBigQuerySchemaToSchemaMap(unittest.TestCase): - def test_bq_schema_to_map_permutations(self): + def test_bq_schema_to_map_round_trip_permutations(self): ''' This checks that each possible type of consititued schema, when generated, then converted to a schema_map, then back to the schema, they are equal. @@ -562,7 +518,7 @@ def test_bq_schema_to_map_permutations(self): quoted_values_are_strings=quote_value_are_strings) flattened = generator.flatten_schema(schema_map) try: - self.assertEquals(schema, flattened) + self.assertEqual(schema, flattened) except AssertionError as e: print("test_bq_schema_to_map_permutations failed for case where: " "bq_entry={}\nschema_generator created with values:" diff --git a/tests/testdata.txt b/tests/testdata.txt index f69a454..16427d4 100644 --- a/tests/testdata.txt +++ b/tests/testdata.txt @@ -248,8 +248,6 @@ DATA { "a": [1, 3], "r": [{ "r0": "r0", "r1": "r1" }] } ERRORS 2: Converting schema for "r" from NULLABLE RECORD into REPEATED RECORD -ERRORS INFORMED -# none SCHEMA [ { @@ -409,8 +407,6 @@ DATA { "i": 3 } ERRORS 2: Ignoring non-RECORD field with mismatched mode: old=(hard,i,REPEATED,INTEGER); new=(hard,i,NULLABLE,INTEGER) -ERRORS INFORMED -2: Converting schema for "r" from NULLABLE RECORD into REPEATED RECORD SCHEMA [] END @@ -421,8 +417,6 @@ DATA { "r" : [{ "i": 4 }] } ERRORS 2: Converting schema for "r" from NULLABLE RECORD into REPEATED RECORD -ERRORS INFORMED -1: Leaving schema for "r" as REPEATED RECORD SCHEMA [ { @@ -910,3 +904,371 @@ SCHEMA } ] END + +# Empty JSON input file, existing schema no modification done, purely testing round trip schema generation +DATA keep_nulls +{} +EXISTING_SCHEMA +[ + { + "name": "event_name", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "event_time", + "type": "TIMESTAMP", + "mode": "NULLABLE" + }, + { + "name": "connection_info", + "type": "RECORD", + "mode": "NULLABLE", + "fields": [ + { + "name": "src_ip", + "type": "STRING" + }, + { + "name": "dest_ip", + "type": "STRING" + }, + { + "name": "src_port", + "type": "INTEGER" + }, + { + "name": "dest_port", + "type": "INTEGER" + }, + { + "name": "flags", + "type": "STRING", + "mode": "REPEATED" + } + ] + }, + { + "name": "data_sources", + "type": "STRING", + "mode": "REPEATED" + }, + { + "name": "test_float", + "type": "FLOAT", + "mode": "NULLABLE" + }, + { + "name": "test_boolean", + "type": "BOOLEAN", + "mode": "NULLABLE" + } +] +SCHEMA +[ + { + "fields": [ + { + "mode": "NULLABLE", + "name": "dest_ip", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "dest_port", + "type": "INTEGER" + }, + { + "mode": "REPEATED", + "name": "flags", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "src_ip", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "src_port", + "type": "INTEGER" + } + ], + "mode": "NULLABLE", + "name": "connection_info", + "type": "RECORD" + }, + { + "mode": "REPEATED", + "name": "data_sources", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "event_name", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "event_time", + "type": "TIMESTAMP" + }, + { + "mode": "NULLABLE", + "name": "test_boolean", + "type": "BOOLEAN" + }, + { + "mode": "NULLABLE", + "name": "test_float", + "type": "FLOAT" + } +] +END + +# JSON existing schema file valid record instead of string, no modification done +DATA keep_nulls +{"event_name": "test event", "event_time": "2020-06-02 23:57:12.120174 UTC", "connection_info": {"src_ip": "10.0.1.102", "src_port": 53, "dest_ip": "10.0.0.1", "dest_port": 53}, "data_sources": ["dhcp", "bro", "dns"], "test_float": 17.6, "test_boolean": "True"} +EXISTING_SCHEMA +[ + { + "name": "event_name", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "event_time", + "type": "TIMESTAMP", + "mode": "NULLABLE" + }, + { + "name": "connection_info", + "type": "RECORD", + "mode": "NULLABLE", + "fields": [ + { + "name": "src_ip", + "type": "STRING" + }, + { + "name": "dest_ip", + "type": "STRING" + }, + { + "name": "src_port", + "type": "INTEGER" + }, + { + "name": "dest_port", + "type": "INTEGER" + }, + { + "name": "flags", + "type": "STRING", + "mode": "REPEATED" + } + ] + }, + { + "name": "data_sources", + "type": "STRING", + "mode": "REPEATED" + }, + { + "name": "test_float", + "type": "FLOAT", + "mode": "NULLABLE" + }, + { + "name": "test_boolean", + "type": "BOOLEAN", + "mode": "NULLABLE" + } +] +SCHEMA +[ + { + "fields": [ + { + "mode": "NULLABLE", + "name": "dest_ip", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "dest_port", + "type": "INTEGER" + }, + { + "mode": "REPEATED", + "name": "flags", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "src_ip", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "src_port", + "type": "INTEGER" + } + ], + "mode": "NULLABLE", + "name": "connection_info", + "type": "RECORD" + }, + { + "mode": "REPEATED", + "name": "data_sources", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "event_name", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "event_time", + "type": "TIMESTAMP" + }, + { + "mode": "NULLABLE", + "name": "test_boolean", + "type": "BOOLEAN" + }, + { + "mode": "NULLABLE", + "name": "test_float", + "type": "FLOAT" + } +] +END + +# JSON file field added, schema expanded +DATA keep_nulls +{"event_name": "test event", "extra_field": "testing", "event_time": "2020-06-02 23:57:12.120174 UTC", "connection_info": {"src_ip": "10.0.1.102", "src_port": 53, "dest_ip": "10.0.0.1", "dest_port": 53}, "data_sources": ["dhcp", "bro", "dns"], "test_float": 17.6, "test_boolean": "True"} +EXISTING_SCHEMA +[ + { + "name": "event_name", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "event_time", + "type": "TIMESTAMP", + "mode": "NULLABLE" + }, + { + "name": "connection_info", + "type": "RECORD", + "mode": "NULLABLE", + "fields": [ + { + "name": "src_ip", + "type": "STRING" + }, + { + "name": "dest_ip", + "type": "STRING" + }, + { + "name": "src_port", + "type": "INTEGER" + }, + { + "name": "dest_port", + "type": "INTEGER" + }, + { + "name": "flags", + "type": "STRING", + "mode": "REPEATED" + } + ] + }, + { + "name": "data_sources", + "type": "STRING", + "mode": "REPEATED" + }, + { + "name": "test_float", + "type": "FLOAT", + "mode": "NULLABLE" + }, + { + "name": "test_boolean", + "type": "BOOLEAN", + "mode": "NULLABLE" + } +] +SCHEMA +[ + { + "fields": [ + { + "mode": "NULLABLE", + "name": "dest_ip", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "dest_port", + "type": "INTEGER" + }, + { + "mode": "REPEATED", + "name": "flags", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "src_ip", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "src_port", + "type": "INTEGER" + } + ], + "mode": "NULLABLE", + "name": "connection_info", + "type": "RECORD" + }, + { + "mode": "REPEATED", + "name": "data_sources", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "event_name", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "event_time", + "type": "TIMESTAMP" + }, + { + "mode": "NULLABLE", + "name": "extra_field", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "test_boolean", + "type": "BOOLEAN" + }, + { + "mode": "NULLABLE", + "name": "test_float", + "type": "FLOAT" + } +] +END From c5a57de54515e01de649f2705000a358f9688355 Mon Sep 17 00:00:00 2001 From: Austin Brogle Date: Mon, 9 Nov 2020 12:10:46 -0800 Subject: [PATCH 17/38] Removing type_mismatch_callback as this was untested --- bigquery_schema_generator/generate_schema.py | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/bigquery_schema_generator/generate_schema.py b/bigquery_schema_generator/generate_schema.py index 3fb8226..fbb06b2 100755 --- a/bigquery_schema_generator/generate_schema.py +++ b/bigquery_schema_generator/generate_schema.py @@ -80,15 +80,13 @@ def __init__(self, quoted_values_are_strings=False, debugging_interval=1000, debugging_map=False, - sanitize_names=False, - type_mismatch_callback=None): + sanitize_names=False): self.input_format = input_format self.infer_mode = infer_mode self.keep_nulls = keep_nulls self.quoted_values_are_strings = quoted_values_are_strings self.debugging_interval = debugging_interval self.debugging_map = debugging_map - self.type_mismatch_callback = type_mismatch_callback # 'infer_mode' is supported for only input_format = 'csv' because # the header line gives us the complete list of fields to be expected in @@ -327,9 +325,6 @@ def merge_schema_entry(self, old_schema_entry, new_schema_entry): # Check that the converted types are compatible. candidate_type = convert_type(old_type, new_type) - if not candidate_type and self.type_mismatch_callback: - # inconvertible -> check if the caller has additional insight - candidate_type = self.type_mismatch_callback(old_type, new_type) if not candidate_type: self.log_error( f'Ignoring field with mismatched type: ' From 098dbbad9bd23fc4f2224759309dca2e116db5ee Mon Sep 17 00:00:00 2001 From: Austin Brogle Date: Mon, 9 Nov 2020 12:59:25 -0800 Subject: [PATCH 18/38] Fixing tests post merging from develop --- tests/test_generate_schema.py | 7 +++++-- tests/testdata.txt | 4 ++-- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/tests/test_generate_schema.py b/tests/test_generate_schema.py index 22248fb..5cdda3f 100644 --- a/tests/test_generate_schema.py +++ b/tests/test_generate_schema.py @@ -19,9 +19,11 @@ import json from io import StringIO from collections import OrderedDict +from bigquery_schema_generator.generate_schema import BQ_TYPES from bigquery_schema_generator.generate_schema import SchemaGenerator -from bigquery_schema_generator.generate_schema import is_string_type +from bigquery_schema_generator.generate_schema import bq_schema_to_map from bigquery_schema_generator.generate_schema import convert_type +from bigquery_schema_generator.generate_schema import is_string_type from bigquery_schema_generator.generate_schema import json_full_path from .data_reader import DataReader @@ -483,7 +485,8 @@ def verify_data_chunk(self, chunk): infer_mode=infer_mode, keep_nulls=keep_nulls, quoted_values_are_strings=quoted_values_are_strings, - sanitize_names=sanitize_names) + sanitize_names=sanitize_names, + ignore_invalid_lines=ignore_invalid_lines) existing_schema_map = None if existing_schema: existing_schema_map = bq_schema_to_map(json.loads(existing_schema)) diff --git a/tests/testdata.txt b/tests/testdata.txt index a13e60f..9f6dd4f 100644 --- a/tests/testdata.txt +++ b/tests/testdata.txt @@ -86,8 +86,8 @@ DATA ignore_invalid_lines { "x": 3 } this is not a JSON object ERRORS -1: Record should be a JSON object but was a -3: Record cannot be parsed: Exception: Expecting value: line 1 column 1 (char 0) +1: Record should be a JSON Object but was a +3: Record could not be parsed: Exception: Expecting value: line 1 column 1 (char 0) SCHEMA [ { From 608b3fd8f7b63e078db8eccbd369d6cc9fe8274c Mon Sep 17 00:00:00 2001 From: Austin Brogle Date: Mon, 9 Nov 2020 13:01:23 -0800 Subject: [PATCH 19/38] Removing tox from gitignore --- .gitignore | 1 - 1 file changed, 1 deletion(-) diff --git a/.gitignore b/.gitignore index a43d27c..cb20bb7 100644 --- a/.gitignore +++ b/.gitignore @@ -21,4 +21,3 @@ wheels/ .installed.cfg *.egg MANIFEST -.tox/ From dd0db5fa165c302f303ad7341ac13bcb50be315c Mon Sep 17 00:00:00 2001 From: Austin Brogle Date: Mon, 9 Nov 2020 13:21:23 -0800 Subject: [PATCH 20/38] Updating README to include details on existing_schema_path --- README.md | 53 ++++++++++++++++++++++++++++++++++++----------------- 1 file changed, 36 insertions(+), 17 deletions(-) diff --git a/README.md b/README.md index aa8fb7f..4786145 100644 --- a/README.md +++ b/README.md @@ -235,13 +235,14 @@ as shown by the `--help` flag below. Print the built-in help strings: -``` +```bash $ generate-schema --help -usage: generate_schema.py [-h] [--input_format INPUT_FORMAT] [--keep_nulls] - [--quoted_values_are_strings] [--infer_mode] - [--debugging_interval DEBUGGING_INTERVAL] - [--debugging_map] [--sanitize_names] - [--ignore_invalid_lines] +usage: generate-schema [-h] [--input_format INPUT_FORMAT] [--keep_nulls] + [--quoted_values_are_strings] [--infer_mode] + [--debugging_interval DEBUGGING_INTERVAL] + [--debugging_map] [--sanitize_names] + [--ignore_invalid_lines] + [--existing_schema_path EXISTING_SCHEMA_PATH] Generate BigQuery schema from JSON or CSV file. @@ -261,6 +262,10 @@ optional arguments: standard --ignore_invalid_lines Ignore lines that cannot be parsed instead of stopping + --existing_schema_path EXISTING_SCHEMA_PATH + File that contains the existing BigQuery schema for a + table. This can be fetched with: `bq show --schema + :: ``` #### Input Format (`--input_format`) @@ -282,7 +287,7 @@ array or empty record as its value, the field is suppressed in the schema file. This flag enables this field to be included in the schema file. In other words, using a data file containing just nulls and empty values: -``` +```bash $ generate_schema { "s": null, "a": [], "m": {} } ^D @@ -291,7 +296,7 @@ INFO:root:Processed 1 lines ``` With the `keep_nulls` flag, we get: -``` +```bash $ generate-schema --keep_nulls { "s": null, "a": [], "m": {} } ^D @@ -331,7 +336,7 @@ consistent with the algorithm used by `bq load`. However, for the `BOOLEAN`, normal strings instead. This flag disables type inference for `BOOLEAN`, `INTEGER` and `FLOAT` types inside quoted strings. -``` +```bash $ generate-schema { "name": "1" } ^D @@ -374,7 +379,7 @@ By default, the `generate_schema.py` script prints a short progress message every 1000 lines of input data. This interval can be changed using the `--debugging_interval` flag. -``` +```bash $ generate-schema --debugging_interval 50 < file.data.json > file.schema.json ``` @@ -385,7 +390,7 @@ the bookkeeping metadata map which is used internally to keep track of the various fields and their types that were inferred using the data file. This flag is intended to be used for debugging. -``` +```bash $ generate-schema --debugging_map < file.data.json > file.schema.json ``` @@ -435,6 +440,20 @@ deduction logic will handle any missing or extra columns gracefully. Fixes [Issue #49](https://github.com/bxparks/bigquery-schema-generator/issues/49). +#### Existing Schema Path (`--existing_schema_path`) +There are cases where we would like to start from an existing BigQuery table schema +rather than starting from scratch with a new batch of data we would like to load. +In this case we can specify the path to a local file on disk that is our existing +bigquery table schema. This can be generated via the following bq cli command: +```bash +bq show --schema :. > existing_table_schema.json +``` + +We can then run generate-schema with the additional option +```bash +--existing_schema_path existing_table_schema.json +``` + ## Schema Types ### Supported Types @@ -534,7 +553,7 @@ compatibility rules implemented by **bq load**: Here is an example of a single JSON data record on the STDIN (the `^D` below means typing Control-D, which indicates "end of file" under Linux and MacOS): -``` +```bash $ generate-schema { "s": "string", "b": true, "i": 1, "x": 3.1, "t": "2017-05-22T17:10:00-07:00" } ^D @@ -569,7 +588,7 @@ INFO:root:Processed 1 lines ``` In most cases, the data file will be stored in a file: -``` +```bash $ cat > file.data.json { "a": [1, 2] } { "i": 3 } @@ -596,7 +615,7 @@ $ cat file.schema.json Here is the schema generated from a CSV input file. The first line is the header containing the names of the columns, and the schema lists the columns in the same order as the header: -``` +```bash $ generate-schema --input_format csv e,b,c,d,a 1,x,true,,2.0 @@ -634,7 +653,7 @@ INFO:root:Processed 3 lines ``` Here is an example of the schema generated with the `--infer_mode` flag: -``` +```bash $ generate-schema --input_format csv --infer_mode name,surname,age John @@ -701,7 +720,7 @@ json.dump(schema, output_file, indent=2) I wrote the `bigquery_schema_generator/anonymize.py` script to create an anonymized data file `tests/testdata/anon1.data.json.gz`: -``` +```bash $ ./bigquery_schema_generator/anonymize.py < original.data.json \ > anon1.data.json $ gzip anon1.data.json @@ -709,7 +728,7 @@ $ gzip anon1.data.json This data file is 290MB (5.6MB compressed) with 103080 data records. Generating the schema using -``` +```bash $ bigquery_schema_generator/generate_schema.py < anon1.data.json \ > anon1.schema.json ``` From de2769495d9b90211886f86d7b7e489afeaba10b Mon Sep 17 00:00:00 2001 From: Austin Brogle Date: Mon, 9 Nov 2020 13:32:58 -0800 Subject: [PATCH 21/38] Fixing Flake8 errors --- bigquery_schema_generator/generate_schema.py | 27 +++++++----- tests/data_reader.py | 6 ++- tests/test_generate_schema.py | 45 ++++++++++++-------- 3 files changed, 49 insertions(+), 29 deletions(-) diff --git a/bigquery_schema_generator/generate_schema.py b/bigquery_schema_generator/generate_schema.py index 0c0257f..48a05d1 100755 --- a/bigquery_schema_generator/generate_schema.py +++ b/bigquery_schema_generator/generate_schema.py @@ -237,8 +237,8 @@ def deduce_schema_for_line(self, json_object, schema_map, base_path=None): sanitized_key = self.sanitize_name(key) schema_entry = schema_map.get(sanitized_key) new_schema_entry = self.get_schema_entry(key, value) - schema_map[sanitized_key] = self.merge_schema_entry(schema_entry, - new_schema_entry) + schema_map[sanitized_key] = self.merge_schema_entry( + schema_entry, new_schema_entry) def sanitize_name(self, value): if self.sanitize_names: @@ -358,14 +358,17 @@ def merge_schema_entry( if old_mode != new_mode: # primitive-types are conservatively deduced NULLABLE. In case we # know a-priori that a field is REQUIRED, we accept that - new_might_be_required = new_mode == 'NULLABLE' and new_schema_entry['filled'] - if self.infer_mode and old_mode == 'REQUIRED' and new_might_be_required: + new_might_be_required = new_mode == 'NULLABLE' and\ + new_schema_entry['filled'] + if self.infer_mode and old_mode == 'REQUIRED' and\ + new_might_be_required: new_info['mode'] = old_mode else: self.log_error( f'Ignoring non-RECORD field with mismatched mode: ' - f'old=({old_status},{full_old_name},{old_mode},{old_type}); ' - f'new=({new_status},{full_new_name},{new_mode},{new_type})') + f'old=({old_status},{full_old_name},{old_mode},{old_type});' + f' new=({new_status},{full_new_name},{new_mode},{new_type})' + ) return None # Check that the converted types are compatible. @@ -598,7 +601,8 @@ def flatten_schema(self, schema_map): sorted_schema=self.sorted_schema, infer_mode=self.infer_mode) - def run(self, input_file=sys.stdin, output_file=sys.stdout, schema_map=None): + def run(self, input_file=sys.stdin, + output_file=sys.stdout, schema_map=None): """Read the data records from the input_file and print out the BigQuery schema on the output_file. The error logs are printed on the sys.stderr. Args: @@ -606,7 +610,8 @@ def run(self, input_file=sys.stdin, output_file=sys.stdout, schema_map=None): output_file: a file-like object (default: sys.stdout) schema_map: the existing bigquery schema_map we start with """ - schema_map, error_logs = self.deduce_schema(input_file, schema_map=schema_map) + schema_map, error_logs = self.deduce_schema(input_file, + schema_map=schema_map) for error in error_logs: logging.info("Problem on line %s: %s", error['line'], error['msg']) @@ -795,7 +800,8 @@ def flatten_schema_map(schema_map, return schema def bq_schema_to_map(schema): - """ convert BQ JSON table schema representation to SchemaGenerator schema_map representaton """ + """ convert BQ JSON table schema representation to SchemaGenerator + schema_map representaton """ if isinstance(schema, dict): schema = schema['fields'] return OrderedDict((f['name'].lower(), bq_schema_field_to_entry(f)) @@ -931,7 +937,8 @@ def main(): sanitize_names=args.sanitize_names, ignore_invalid_lines=args.ignore_invalid_lines, ) - existing_schema_map = read_existing_schema_from_file(args.existing_schema_path) + existing_schema_map = read_existing_schema_from_file( + args.existing_schema_path) generator.run(schema_map=existing_schema_map) generator.run() diff --git a/tests/data_reader.py b/tests/data_reader.py index 5c99fc8..4a239e2 100755 --- a/tests/data_reader.py +++ b/tests/data_reader.py @@ -131,7 +131,8 @@ def read_chunk(self): existing_schema = self.read_existing_schema_section() error_flags, errors = self.read_errors_section() if errors and error_flags: - raise Exception("Unexpected error flags in the first ERRORS section") + raise Exception("Unexpected error flags in the first ERRORS" + " section") error_map = self.process_errors(errors or []) schema = self.read_schema_section() self.read_end_marker() @@ -235,7 +236,8 @@ def read_errors_section(self): (tag, _) = self.parse_tag_line(line) if tag in self.TAG_TOKENS: if tag == 'DATA': - raise Exception("Unexpected DATA tag found in ERRORS section") + raise Exception("Unexpected DATA tag found in ERRORS" + " section") self.push_back(line) break errors.append(line) diff --git a/tests/test_generate_schema.py b/tests/test_generate_schema.py index 5cdda3f..529ccfb 100644 --- a/tests/test_generate_schema.py +++ b/tests/test_generate_schema.py @@ -458,7 +458,8 @@ def test_all_data_chunks(self): try: self.verify_data_chunk(chunk) except AssertionError as e: - print("\nError when processing chunk on line {}\n".format(chunk['line'])) + print("\nError when processing chunk starting on line {}\n" + .format(chunk['line'])) raise e def verify_data_chunk(self, chunk): @@ -522,16 +523,20 @@ def assert_error_messages(self, expected_error_map, error_logs): class TestBigQuerySchemaToSchemaMap(unittest.TestCase): def test_bq_schema_to_map_round_trip_permutations(self): - ''' This checks that each possible type of consititued schema, when generated, - then converted to a schema_map, then back to the schema, they are equal. - - This function is really ugly but has good coverage. This was migrated - from pytest fixtures which were a bit cleaner but we ideally did not - want to add a new dependency / library that is used for testing. + ''' This checks that each possible type of consititued schema, when + generated, then converted to a schema_map, then back to the schema, + they are equal. + + This function is really ugly but has good coverage. This was + migrated from pytest fixtures which were a bit cleaner but we + ideally did not want to add a new dependency / library that is used + for testing. ''' valid_types = BQ_TYPES valid_modes = ['NULLABLE', 'REQUIRED', 'REPEATED'] - valid_input_formats_and_modes = [('csv', True), ('csv', False), ('json', False)] + valid_input_formats_and_modes = [('csv', True), + ('csv', False), + ('json', False)] valid_keep_null_params = [True, False] valid_quoted_values_are_strings = [True, False] for valid_type in valid_types: @@ -541,17 +546,21 @@ def test_bq_schema_to_map_round_trip_permutations(self): schema_map = bq_schema_to_map(schema) for input_format_and_mode in valid_input_formats_and_modes: for keep_null_param in valid_keep_null_params: - for quote_value_are_strings in valid_quoted_values_are_strings: - generator = SchemaGenerator(input_format=input_format_and_mode[0], - infer_mode=input_format_and_mode[1], - keep_nulls=keep_null_param, - quoted_values_are_strings=quote_value_are_strings) + for quote_value_are_strings in\ + valid_quoted_values_are_strings: + generator = SchemaGenerator( + input_format=input_format_and_mode[0], + infer_mode=input_format_and_mode[1], + keep_nulls=keep_null_param, + quoted_values_are_strings=\ + quote_value_are_strings) flattened = generator.flatten_schema(schema_map) try: self.assertEqual(schema, flattened) except AssertionError as e: - print("test_bq_schema_to_map_permutations failed for case where: " - "bq_entry={}\nschema_generator created with values:" + print("test_bq_schema_to_map_permutations" + " failed for case where: bq_entry={}\n" + "schema_generator created with values:" "{}-{}-{}-{}" .format(bq_entry, input_format_and_mode[0], @@ -565,7 +574,7 @@ def make_bq_schema_entry(self, mode, type): ''' if type == 'RECORD': return OrderedDict([ - ('fields', [self.make_bq_schema_entry('NULLABLE','STRING')]), + ('fields', [self.make_bq_schema_entry('NULLABLE', 'STRING')]), ('mode', mode), ('name', 'a'), ('type', type), @@ -577,7 +586,9 @@ def make_bq_schema_entry(self, mode, type): ('type', type), ]) - def validate_existing_schema_to_schema_map_entry(self, existing_schema, schema_generator): + def validate_existing_schema_to_schema_map_entry(self, + existing_schema, + schema_generator): schema = [existing_schema] schema_map = bq_schema_to_map(schema) flattened = schema_generator.flatten_schema(schema_map) From 7d1b4ce892a174bc9dfaa6b5b3220691919f424f Mon Sep 17 00:00:00 2001 From: Austin Brogle Date: Mon, 9 Nov 2020 13:42:29 -0800 Subject: [PATCH 22/38] Actually fully fixing flake8 tests --- bigquery_schema_generator/generate_schema.py | 8 +++++--- tests/test_generate_schema.py | 9 ++++----- 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/bigquery_schema_generator/generate_schema.py b/bigquery_schema_generator/generate_schema.py index 48a05d1..dfde9c2 100755 --- a/bigquery_schema_generator/generate_schema.py +++ b/bigquery_schema_generator/generate_schema.py @@ -247,7 +247,6 @@ def sanitize_name(self, value): new_value = value return new_value.lower() - def merge_schema_entry( self, old_schema_entry, @@ -361,7 +360,7 @@ def merge_schema_entry( new_might_be_required = new_mode == 'NULLABLE' and\ new_schema_entry['filled'] if self.infer_mode and old_mode == 'REQUIRED' and\ - new_might_be_required: + new_might_be_required: new_info['mode'] = old_mode else: self.log_error( @@ -799,6 +798,7 @@ def flatten_schema_map(schema_map, schema.append(new_info) return schema + def bq_schema_to_map(schema): """ convert BQ JSON table schema representation to SchemaGenerator schema_map representaton """ @@ -825,7 +825,7 @@ def bq_schema_to_map(schema): 'FLOAT64': 'FLOAT', 'BOOL': 'BOOLEAN', 'STRUCT': 'RECORD', - } +} def bq_type_to_entry_type(type): @@ -861,6 +861,7 @@ def bq_schema_field_to_entry(field): ('info', info), ]) + def read_existing_schema_from_file(existing_schema_path): if existing_schema_path: with open(existing_schema_path, 'r') as f: @@ -868,6 +869,7 @@ def read_existing_schema_from_file(existing_schema_path): return bq_schema_to_map(existing_json_schema) return None + def json_full_path(base_path, key): """Return the dot-separated JSON full path to a particular key. e.g. 'server.config.port'. Column names in CSV files are never nested, diff --git a/tests/test_generate_schema.py b/tests/test_generate_schema.py index 529ccfb..4f61e73 100644 --- a/tests/test_generate_schema.py +++ b/tests/test_generate_schema.py @@ -546,14 +546,13 @@ def test_bq_schema_to_map_round_trip_permutations(self): schema_map = bq_schema_to_map(schema) for input_format_and_mode in valid_input_formats_and_modes: for keep_null_param in valid_keep_null_params: - for quote_value_are_strings in\ - valid_quoted_values_are_strings: + for quotes_are_strings in\ + valid_quoted_values_are_strings: generator = SchemaGenerator( input_format=input_format_and_mode[0], infer_mode=input_format_and_mode[1], keep_nulls=keep_null_param, - quoted_values_are_strings=\ - quote_value_are_strings) + quoted_values_are_strings=quotes_are_strings) flattened = generator.flatten_schema(schema_map) try: self.assertEqual(schema, flattened) @@ -566,7 +565,7 @@ def test_bq_schema_to_map_round_trip_permutations(self): input_format_and_mode[0], input_format_and_mode[1], keep_null_param, - quote_value_are_strings)) + quotes_are_strings)) raise e def make_bq_schema_entry(self, mode, type): From ed60a7f30abaa1c7db76175288c2e4ce77b5be0c Mon Sep 17 00:00:00 2001 From: Austin Brogle Date: Mon, 16 Nov 2020 10:09:54 -0800 Subject: [PATCH 23/38] Removing old generator.run function call --- bigquery_schema_generator/generate_schema.py | 1 - 1 file changed, 1 deletion(-) diff --git a/bigquery_schema_generator/generate_schema.py b/bigquery_schema_generator/generate_schema.py index dfde9c2..38f9248 100755 --- a/bigquery_schema_generator/generate_schema.py +++ b/bigquery_schema_generator/generate_schema.py @@ -942,7 +942,6 @@ def main(): existing_schema_map = read_existing_schema_from_file( args.existing_schema_path) generator.run(schema_map=existing_schema_map) - generator.run() if __name__ == '__main__': From bb35faf0cd6cf47a2a1c57a94b65055d18a75d15 Mon Sep 17 00:00:00 2001 From: Austin Brogle Date: Mon, 16 Nov 2020 10:10:41 -0800 Subject: [PATCH 24/38] Removing unused test function --- tests/test_generate_schema.py | 8 -------- 1 file changed, 8 deletions(-) diff --git a/tests/test_generate_schema.py b/tests/test_generate_schema.py index 4f61e73..71fb631 100644 --- a/tests/test_generate_schema.py +++ b/tests/test_generate_schema.py @@ -585,14 +585,6 @@ def make_bq_schema_entry(self, mode, type): ('type', type), ]) - def validate_existing_schema_to_schema_map_entry(self, - existing_schema, - schema_generator): - schema = [existing_schema] - schema_map = bq_schema_to_map(schema) - flattened = schema_generator.flatten_schema(schema_map) - self.assertEqual(schema, flattened) - if __name__ == '__main__': unittest.main() From 23f8c406719829e6778214d38514b9d5d517ced1 Mon Sep 17 00:00:00 2001 From: Austin Brogle Date: Tue, 17 Nov 2020 11:37:36 -0800 Subject: [PATCH 25/38] Keeping case sensitivity rather than converting everything to lowercase for new schema items --- bigquery_schema_generator/generate_schema.py | 11 +- tests/testdata.txt | 153 +++++++++++++++++++ 2 files changed, 162 insertions(+), 2 deletions(-) diff --git a/bigquery_schema_generator/generate_schema.py b/bigquery_schema_generator/generate_schema.py index 38f9248..66930f4 100755 --- a/bigquery_schema_generator/generate_schema.py +++ b/bigquery_schema_generator/generate_schema.py @@ -234,18 +234,25 @@ def deduce_schema_for_line(self, json_object, schema_map, base_path=None): nested record that leads to this specific entry. """ for key, value in json_object.items(): - sanitized_key = self.sanitize_name(key) + # We want to use a lowercase version of the key in the schema map so + # that we can aggregate keys with slightly different casing together + sanitized_key = self.sanitize_name(key).lower() schema_entry = schema_map.get(sanitized_key) new_schema_entry = self.get_schema_entry(key, value) schema_map[sanitized_key] = self.merge_schema_entry( schema_entry, new_schema_entry) def sanitize_name(self, value): + ''' Sanitizes a column name within the schema. + + We explicitly choose to not perform the lowercasing here as this + cause us to lose case sensitivity when generating the final schema + ''' if self.sanitize_names: new_value = re.sub('[^a-zA-Z0-9_]', '_', value[:127]) else: new_value = value - return new_value.lower() + return new_value def merge_schema_entry( self, diff --git a/tests/testdata.txt b/tests/testdata.txt index 9f6dd4f..512a383 100644 --- a/tests/testdata.txt +++ b/tests/testdata.txt @@ -36,6 +36,37 @@ SCHEMA ] END +# If 'keep_nulls' flag is given, then the input data with null values produces +# schema which seems to be the best match. This also checks capitalization +DATA keep_nulls +{ "s": null, "aBc": [], "m": {} } +SCHEMA +[ + { + "mode": "REPEATED", + "name": "aBc", + "type": "STRING" + }, + { + "fields": [ + { + "mode": "NULLABLE", + "name": "__unknown__", + "type": "STRING" + } + ], + "mode": "NULLABLE", + "name": "m", + "type": "RECORD" + }, + { + "mode": "NULLABLE", + "name": "s", + "type": "STRING" + } +] +END + # Fields with primitive types. BYTES is not supported. DATA { "s": "string", "b": true, "d": "2017-01-01", "i": 1, "t": "17:10:00", "ts": "2017-05-22T17:10:00-07:00", "x": 3.1 } @@ -1216,6 +1247,128 @@ SCHEMA ] END +# JSON existing schema changes in casing when generating schema should not result in a change from what existed already +DATA keep_nulls +{"eventname": "test event1", "event_time": "2020-06-02 23:57:12.120174 UTC", "connection_info": {"src_ip": "10.0.1.102", "src_port": 53, "dest_ip": "10.0.0.1", "dest_port": 53}, "data_sources": ["dhcp", "bro", "dns"], "test_float": 17.6, "test_boolean": "True"} +{"EVENTNAME": "test event2", "event_time": "2020-06-02 23:57:12.120174 UTC", "connection_info": {"src_ip": "10.0.1.102", "src_port": 53, "dest_ip": "10.0.0.1", "dest_port": 53}, "data_sources": ["dhcp", "bro", "dns"], "test_float": 17.6, "test_boolean": "True"} +EXISTING_SCHEMA +[ + { + "name": "eventName", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "event_time", + "type": "TIMESTAMP", + "mode": "NULLABLE" + }, + { + "name": "connection_info", + "type": "RECORD", + "mode": "NULLABLE", + "fields": [ + { + "name": "src_ip", + "type": "STRING" + }, + { + "name": "dest_ip", + "type": "STRING" + }, + { + "name": "src_port", + "type": "INTEGER" + }, + { + "name": "dest_port", + "type": "INTEGER" + }, + { + "name": "flags", + "type": "STRING", + "mode": "REPEATED" + } + ] + }, + { + "name": "data_sources", + "type": "STRING", + "mode": "REPEATED" + }, + { + "name": "test_float", + "type": "FLOAT", + "mode": "NULLABLE" + }, + { + "name": "test_boolean", + "type": "BOOLEAN", + "mode": "NULLABLE" + } +] +SCHEMA +[ + { + "fields": [ + { + "mode": "NULLABLE", + "name": "dest_ip", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "dest_port", + "type": "INTEGER" + }, + { + "mode": "REPEATED", + "name": "flags", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "src_ip", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "src_port", + "type": "INTEGER" + } + ], + "mode": "NULLABLE", + "name": "connection_info", + "type": "RECORD" + }, + { + "mode": "REPEATED", + "name": "data_sources", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "event_time", + "type": "TIMESTAMP" + }, + { + "mode": "NULLABLE", + "name": "eventName", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "test_boolean", + "type": "BOOLEAN" + }, + { + "mode": "NULLABLE", + "name": "test_float", + "type": "FLOAT" + } +] +END + # JSON file field added, schema expanded DATA keep_nulls {"event_name": "test event", "extra_field": "testing", "event_time": "2020-06-02 23:57:12.120174 UTC", "connection_info": {"src_ip": "10.0.1.102", "src_port": 53, "dest_ip": "10.0.0.1", "dest_port": 53}, "data_sources": ["dhcp", "bro", "dns"], "test_float": 17.6, "test_boolean": "True"} From 39f8222876983d345dc3439e865ed6174e055724 Mon Sep 17 00:00:00 2001 From: Austin Brogle Date: Tue, 17 Nov 2020 18:14:12 -0800 Subject: [PATCH 26/38] Fixing error logging bug related to base_path not being passed to get_schema_entry and merge_schema_entry in deduce_schema_for_line --- bigquery_schema_generator/generate_schema.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/bigquery_schema_generator/generate_schema.py b/bigquery_schema_generator/generate_schema.py index 66930f4..15b2efa 100755 --- a/bigquery_schema_generator/generate_schema.py +++ b/bigquery_schema_generator/generate_schema.py @@ -238,9 +238,11 @@ def deduce_schema_for_line(self, json_object, schema_map, base_path=None): # that we can aggregate keys with slightly different casing together sanitized_key = self.sanitize_name(key).lower() schema_entry = schema_map.get(sanitized_key) - new_schema_entry = self.get_schema_entry(key, value) + new_schema_entry = self.get_schema_entry(key, + value, + base_path=base_path) schema_map[sanitized_key] = self.merge_schema_entry( - schema_entry, new_schema_entry) + schema_entry, new_schema_entry, base_path=base_path) def sanitize_name(self, value): ''' Sanitizes a column name within the schema. From efcb2fa62f53ca49c23b30952cdcc76034f079ac Mon Sep 17 00:00:00 2001 From: Austin Brogle Date: Tue, 17 Nov 2020 19:18:02 -0800 Subject: [PATCH 27/38] Adding additional json_full_path error locations --- bigquery_schema_generator/generate_schema.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/bigquery_schema_generator/generate_schema.py b/bigquery_schema_generator/generate_schema.py index 15b2efa..1ef6685 100755 --- a/bigquery_schema_generator/generate_schema.py +++ b/bigquery_schema_generator/generate_schema.py @@ -315,12 +315,15 @@ def merge_schema_entry( new_type = new_info['type'] new_mode = new_info['mode'] + full_old_name = json_full_path(base_path, old_name) + full_new_name = json_full_path(base_path, new_name) + # Defensive check, names should always be the same. if old_name != new_name: if old_name.lower() != new_name.lower(): raise Exception( 'old_name (%s) != new_name(%s), should never happen' % - (old_name, new_name)) + (full_old_name, full_new_name)) else: # preserve old name if case is different new_info['name'] = old_info['name'] @@ -334,12 +337,12 @@ def merge_schema_entry( old_info['mode'] = 'REPEATED' self.log_error( ('Converting schema for "%s" from NULLABLE RECORD ' - 'into REPEATED RECORD') % old_name) + 'into REPEATED RECORD') % full_old_name) elif old_mode == 'REPEATED' and new_mode == 'NULLABLE': # TODO: Maybe remove this warning output. It was helpful during # development, but maybe it's just natural. self.log_error( - 'Leaving schema for "%s" as REPEATED RECORD' % old_name) + 'Leaving schema for "%s" as REPEATED RECORD' % full_old_name) # RECORD type needs a recursive merging of sub-fields. We merge into # the 'old_schema_entry' which assumes that the 'old_schema_entry' @@ -356,9 +359,6 @@ def merge_schema_entry( ) return old_schema_entry - full_old_name = json_full_path(base_path, old_name) - full_new_name = json_full_path(base_path, new_name) - # For all other types, the old_mode must be the same as the new_mode. It # might seem reasonable to allow a NULLABLE {primitive_type} to be # upgraded to a REPEATED {primitive_type}, but currently 'bq load' does From 865e270fc647ed134008f2201a82236bbec35489 Mon Sep 17 00:00:00 2001 From: Austin Brogle Date: Tue, 17 Nov 2020 19:19:00 -0800 Subject: [PATCH 28/38] Fixing flake8 error --- bigquery_schema_generator/generate_schema.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/bigquery_schema_generator/generate_schema.py b/bigquery_schema_generator/generate_schema.py index 1ef6685..0ce0047 100755 --- a/bigquery_schema_generator/generate_schema.py +++ b/bigquery_schema_generator/generate_schema.py @@ -342,7 +342,8 @@ def merge_schema_entry( # TODO: Maybe remove this warning output. It was helpful during # development, but maybe it's just natural. self.log_error( - 'Leaving schema for "%s" as REPEATED RECORD' % full_old_name) + 'Leaving schema for "%s" as REPEATED RECORD' % + full_old_name) # RECORD type needs a recursive merging of sub-fields. We merge into # the 'old_schema_entry' which assumes that the 'old_schema_entry' From bb5745c8b55e7d4b51a191feab995bc0cb308c81 Mon Sep 17 00:00:00 2001 From: Austin Brogle Date: Mon, 30 Nov 2020 17:06:20 -0800 Subject: [PATCH 29/38] Allow infer_schema to control relaxing mode when using existing_schema_map --- README.md | 6 + bigquery_schema_generator/generate_schema.py | 119 ++++++--- tests/test_generate_schema.py | 9 +- tests/testdata.txt | 241 +++++++++++++++++++ 4 files changed, 335 insertions(+), 40 deletions(-) diff --git a/README.md b/README.md index 4786145..3ee4d3a 100644 --- a/README.md +++ b/README.md @@ -370,6 +370,12 @@ feature for JSON files, but too difficult to implement in practice because fields are often completely missing from a given JSON record (instead of explicitly being defined to be `null`). +In addition to the above, this option, when used in conjunction with +--existing_schema_map, will allow fields to be relaxed from REQUIRED to NULLABLE +if they were REQUIRED in the existing schema and NULL rows are found in the new +data we are inferring a schema from. In this case it can be used with either +input_format, CSV or JSON. + See [Issue #28](https://github.com/bxparks/bigquery-schema-generator/issues/28) for implementation details. diff --git a/bigquery_schema_generator/generate_schema.py b/bigquery_schema_generator/generate_schema.py index 0ce0047..dd7164f 100755 --- a/bigquery_schema_generator/generate_schema.py +++ b/bigquery_schema_generator/generate_schema.py @@ -105,17 +105,6 @@ def __init__( # rest of the file. self.ignore_invalid_lines = ignore_invalid_lines - # 'infer_mode' is supported for only input_format = 'csv' because - # the header line gives us the complete list of fields to be expected in - # the CSV file. In JSON data files, certain fields will often be - # completely missing instead of being set to 'null' or "". If the field - # is not even present, then it becomes incredibly difficult (not - # impossible, but more effort than I want to expend right now) to figure - # out which fields are missing so that we can mark the appropriate - # schema entries with 'filled=False'. - if infer_mode and input_format != 'csv': - raise Exception("infer_mode requires input_format=='csv'") - # If CSV, force keep_nulls = True self.keep_nulls = True if (input_format == 'csv') else keep_nulls @@ -170,7 +159,8 @@ def deduce_schema(self, file, *, schema_map=None): The 'filled' entry indicates whether all input data records contained the given field. If the --infer_mode flag is given, the 'filled' entry - is used to convert a NULLABLE schema entry to a REQUIRED schema entry. + is used to convert a NULLABLE schema entry to a REQUIRED schema entry or + to relax an existing field in schema_map from REQUIRED to NULLABLE. The function returns a tuple of 2 things: * an OrderedDict which is sorted by the 'key' of the column name @@ -294,10 +284,22 @@ def merge_schema_entry( # new 'soft' does not clobber old 'hard' if old_status == 'hard' and new_status == 'soft': + mode = self.merge_mode(old_schema_entry, + new_schema_entry, + base_path) + if mode is None: + return None + old_schema_entry['info']['mode'] = mode return old_schema_entry # new 'hard' clobbers old 'soft' if old_status == 'soft' and new_status == 'hard': + mode = self.merge_mode(old_schema_entry, + new_schema_entry, + base_path) + if mode is None: + return None + new_schema_entry['info']['mode'] = mode return new_schema_entry # Verify that it's soft->soft or hard->hard @@ -360,37 +362,75 @@ def merge_schema_entry( ) return old_schema_entry - # For all other types, the old_mode must be the same as the new_mode. It - # might seem reasonable to allow a NULLABLE {primitive_type} to be - # upgraded to a REPEATED {primitive_type}, but currently 'bq load' does - # not support that so we must also follow that rule. - if old_mode != new_mode: - # primitive-types are conservatively deduced NULLABLE. In case we - # know a-priori that a field is REQUIRED, we accept that - new_might_be_required = new_mode == 'NULLABLE' and\ - new_schema_entry['filled'] - if self.infer_mode and old_mode == 'REQUIRED' and\ - new_might_be_required: - new_info['mode'] = old_mode - else: + new_mode = self.merge_mode(old_schema_entry, + new_schema_entry, + base_path) + if new_mode is None: + return None + new_schema_entry['info']['mode'] = new_mode + + # For all other types... + if old_type != new_type: + # Check that the converted types are compatible. + candidate_type = convert_type(old_type, new_type) + if not candidate_type: self.log_error( - f'Ignoring non-RECORD field with mismatched mode: ' + f'Ignoring field with mismatched type: ' f'old=({old_status},{full_old_name},{old_mode},{old_type});' - f' new=({new_status},{full_new_name},{new_mode},{new_type})' - ) + ' ' + f'new=({new_status},{full_new_name},{new_mode},{new_type})') return None - # Check that the converted types are compatible. - candidate_type = convert_type(old_type, new_type) - if not candidate_type: + new_info['type'] = candidate_type + return new_schema_entry + + def merge_mode(self, old_schema_entry, new_schema_entry, base_path): + old_info = old_schema_entry['info'] + new_info = new_schema_entry['info'] + old_mode = old_info['mode'] + old_name = old_info['name'] + old_type = old_info['type'] + old_status = old_schema_entry['status'] + new_mode = new_info['mode'] + new_name = new_info['name'] + new_type = new_info['type'] + new_status = new_schema_entry['status'] + full_old_name = json_full_path(base_path, old_name) + full_new_name = json_full_path(base_path, new_name) + # If the old field is a REQUIRED primitive (which could only have come + # from an existing schema), the new field can be either a + # NULLABLE(filled) or a NULLABLE(unfilled). + if old_mode == 'REQUIRED' and new_mode == 'NULLABLE': + # If the new field is filled, then retain the REQUIRED. + if new_schema_entry['filled']: + return old_mode + else: + # The new field is not filled (i.e. an empty or null field). + # If --infer_mode is active, then we allow the REQUIRED to + # revert back to NULLABLE. + if self.infer_mode: + return new_mode + else: + self.log_error( + f'Ignoring non-RECORD field with mismatched mode.' + ' cannot convert to NULLABLE because infer_schema not' + ' set: ' + f'old=({old_status},{full_old_name},{old_mode},' + f'{old_type});' + f' new=({new_status},{full_new_name},{new_mode},' + f'{new_type})' + ) + return None + elif old_mode != new_mode: self.log_error( - f'Ignoring field with mismatched type: ' - f'old=({old_status},{full_old_name},{old_mode},{old_type}); ' - f'new=({new_status},{full_new_name},{new_mode},{new_type})') + f'Ignoring non-RECORD field with mismatched mode: ' + f'old=({old_status},{full_old_name},{old_mode},' + f'{old_type});' + f' new=({new_status},{full_new_name},{new_mode},' + f'{new_type})' + ) return None - - new_info['type'] = candidate_type - return new_schema_entry + return old_mode def get_schema_entry(self, key, value, base_path=None): """Determines the 'schema_entry' of the (key, value) pair. Calls @@ -795,7 +835,7 @@ def flatten_schema_map(schema_map, infer_mode=infer_mode ) elif key == 'type' and value in ['QINTEGER', 'QFLOAT', 'QBOOLEAN']: - # Convert QINTEGER -> INTEGER, similarly for QFLAT and QBOOLEAN. + # Convert QINTEGER -> INTEGER, similarly for QFLOAT and QBOOLEAN new_value = value[1:] elif key == 'mode': if infer_mode and value == 'NULLABLE' and filled: @@ -909,7 +949,8 @@ def main(): action="store_true") parser.add_argument( '--infer_mode', - help="Determine if mode can be 'NULLABLE' or 'REQUIRED'", + help="Automatically determine if mode can be 'NULLABLE' or 'REQUIRED'" + " instead of the default 'NULLABLE'", action='store_true') parser.add_argument( '--debugging_interval', diff --git a/tests/test_generate_schema.py b/tests/test_generate_schema.py index 71fb631..29617ee 100644 --- a/tests/test_generate_schema.py +++ b/tests/test_generate_schema.py @@ -500,7 +500,14 @@ def verify_data_chunk(self, chunk): self.assertEqual(expected, schema) # Check the error messages - self.assertEqual(len(expected_errors), len(error_logs)) + try: + self.assertEqual(len(expected_errors), len(error_logs)) + except AssertionError as e: + print(f"Number of errors mismatched, expected:" + f" {len(expected_errors)} got: {len(error_logs)}") + print(f"Errors: {error_logs}") + print(f"Expected Errors: {expected_errors}") + raise e self.assert_error_messages(expected_error_map, error_logs) def assert_error_messages(self, expected_error_map, error_logs): diff --git a/tests/testdata.txt b/tests/testdata.txt index 512a383..5ef3a75 100644 --- a/tests/testdata.txt +++ b/tests/testdata.txt @@ -1494,3 +1494,244 @@ SCHEMA } ] END + +# JSON existing schema REQUIRED to NULLABLE no infer_mode set, errors with cannot change mode +DATA keep_nulls +{"event_name": "test event", "event_time": "2020-06-02 23:57:12.120174 UTC", "connection_info": {"src_ip": "10.0.1.102", "src_port": 53, "dest_ip": "10.0.0.1", "dest_port": 53}, "data_sources": ["dhcp", "bro", "dns"], "test_boolean": "True", "test_float": 1.3} +{"event_name": "test event", "event_time": "2020-06-02 23:57:12.120174 UTC", "connection_info": {"src_ip": "10.0.1.102", "src_port": 53, "dest_ip": "10.0.0.1", "dest_port": 53}, "data_sources": ["dhcp", "bro", "dns"], "test_boolean": "True", "test_float": null} +EXISTING_SCHEMA +[ + { + "name": "event_name", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "event_time", + "type": "TIMESTAMP", + "mode": "NULLABLE" + }, + { + "name": "connection_info", + "type": "RECORD", + "mode": "NULLABLE", + "fields": [ + { + "name": "src_ip", + "type": "STRING" + }, + { + "name": "dest_ip", + "type": "STRING" + }, + { + "name": "src_port", + "type": "INTEGER" + }, + { + "name": "dest_port", + "type": "INTEGER" + }, + { + "name": "flags", + "type": "STRING", + "mode": "REPEATED" + } + ] + }, + { + "name": "data_sources", + "type": "STRING", + "mode": "REPEATED" + }, + { + "name": "test_float", + "type": "FLOAT", + "mode": "REQUIRED" + }, + { + "name": "test_boolean", + "type": "BOOLEAN", + "mode": "NULLABLE" + } +] +ERRORS +2: Ignoring non-RECORD field with mismatched mode. cannot convert to NULLABLE because infer_schema not set: old=(hard,test_float,REQUIRED,FLOAT); new=(soft,test_float,NULLABLE,STRING) +SCHEMA +[ + { + "fields": [ + { + "mode": "NULLABLE", + "name": "dest_ip", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "dest_port", + "type": "INTEGER" + }, + { + "mode": "REPEATED", + "name": "flags", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "src_ip", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "src_port", + "type": "INTEGER" + } + ], + "mode": "NULLABLE", + "name": "connection_info", + "type": "RECORD" + }, + { + "mode": "REPEATED", + "name": "data_sources", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "event_name", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "event_time", + "type": "TIMESTAMP" + }, + { + "mode": "NULLABLE", + "name": "test_boolean", + "type": "BOOLEAN" + } +] +END + +# JSON existing schema REQUIRED to NULLABLE no infer_mode set, errors with cannot change mode +DATA keep_nulls infer_mode +{"event_name": "test event", "event_time": "2020-06-02 23:57:12.120174 UTC", "connection_info": {"src_ip": "10.0.1.102", "src_port": 53, "dest_ip": "10.0.0.1", "dest_port": 53}, "data_sources": ["dhcp", "bro", "dns"], "test_boolean": "True", "test_float": 1.3} +{"event_name": "test event", "event_time": "2020-06-02 23:57:12.120174 UTC", "connection_info": {"src_ip": "10.0.1.102", "src_port": 53, "dest_ip": "10.0.0.1", "dest_port": 53}, "data_sources": ["dhcp", "bro", "dns"], "test_boolean": "True", "test_float": null} +EXISTING_SCHEMA +[ + { + "name": "event_name", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "event_time", + "type": "TIMESTAMP", + "mode": "NULLABLE" + }, + { + "name": "connection_info", + "type": "RECORD", + "mode": "NULLABLE", + "fields": [ + { + "name": "src_ip", + "type": "STRING" + }, + { + "name": "dest_ip", + "type": "STRING" + }, + { + "name": "src_port", + "type": "INTEGER" + }, + { + "name": "dest_port", + "type": "INTEGER" + }, + { + "name": "flags", + "type": "STRING", + "mode": "REPEATED" + } + ] + }, + { + "name": "data_sources", + "type": "STRING", + "mode": "REPEATED" + }, + { + "name": "test_float", + "type": "FLOAT", + "mode": "REQUIRED" + }, + { + "name": "test_boolean", + "type": "BOOLEAN", + "mode": "NULLABLE" + } +] +SCHEMA +[ + { + "fields": [ + { + "mode": "NULLABLE", + "name": "dest_ip", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "dest_port", + "type": "INTEGER" + }, + { + "mode": "REPEATED", + "name": "flags", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "src_ip", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "src_port", + "type": "INTEGER" + } + ], + "mode": "NULLABLE", + "name": "connection_info", + "type": "RECORD" + }, + { + "mode": "REPEATED", + "name": "data_sources", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "event_name", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "event_time", + "type": "TIMESTAMP" + }, + { + "mode": "NULLABLE", + "name": "test_boolean", + "type": "BOOLEAN" + }, + { + "mode": "NULLABLE", + "name": "test_float", + "type": "FLOAT" + } +] +END From 411f402d2fcfe0da5b22cf0d89b08e12e0d9c90c Mon Sep 17 00:00:00 2001 From: Austin Brogle Date: Mon, 30 Nov 2020 17:16:47 -0800 Subject: [PATCH 30/38] Renaming line --> line_number --- bigquery_schema_generator/generate_schema.py | 2 +- tests/data_reader.py | 82 ++++++++++---------- tests/test_generate_schema.py | 12 +-- 3 files changed, 48 insertions(+), 48 deletions(-) diff --git a/bigquery_schema_generator/generate_schema.py b/bigquery_schema_generator/generate_schema.py index dd7164f..9b3f2ea 100755 --- a/bigquery_schema_generator/generate_schema.py +++ b/bigquery_schema_generator/generate_schema.py @@ -119,7 +119,7 @@ def __init__( self.error_logs = [] def log_error(self, msg): - self.error_logs.append({'line': self.line_number, 'msg': msg}) + self.error_logs.append({'line_number': self.line_number, 'msg': msg}) # TODO: BigQuery is case-insensitive with regards to the 'name' of the # field. Verify that the 'name' is unique regardless of the case. diff --git a/tests/data_reader.py b/tests/data_reader.py index 4a239e2..3fec626 100755 --- a/tests/data_reader.py +++ b/tests/data_reader.py @@ -36,7 +36,7 @@ class DataReader: existing json schema from bq api ... ERRORS - line: msg + line_number: msg ... SCHEMA bigquery_schema @@ -47,7 +47,7 @@ class DataReader: json_records ... ERRORS - line: msg + line_number: msg ... SCHEMA bigquery_schema @@ -110,7 +110,7 @@ class DataReader: def __init__(self, testdata_file): self.testdata_file = testdata_file self.next_line = None - self.lineno = 0 + self.line_number = 0 self.chunk_count = 0 def read_chunk(self): @@ -125,7 +125,7 @@ def read_chunk(self): } Returns None if there are no more test chunks. """ - data_flags, records, line = self.read_data_section() + data_flags, records, line_number = self.read_data_section() if data_flags is None: return None existing_schema = self.read_existing_schema_section() @@ -140,7 +140,7 @@ def read_chunk(self): return { 'chunk_count': self.chunk_count, - 'line': line, + 'line_number': line_number, 'data_flags': data_flags, 'records': records, 'existing_schema': existing_schema, @@ -156,30 +156,30 @@ def read_data_section(self): # First tag must be 'DATA [flags]' tag_line = self.read_line() - lineno = self.lineno + line_number = self.line_number if tag_line is None: - return (None, None, lineno) + return (None, None, line_number) (tag, data_flags) = self.parse_tag_line(tag_line) if tag != 'DATA': raise Exception( - "Unrecoginized tag line '%s', should be DATA" % tag_line) + "Unrecoginized tag line_number '%s', should be DATA" % tag_line) # Read the DATA records until the next TAG_TOKEN. records = [] while True: - line = self.read_line() - if line is None: + line_number = self.read_line() + if line_number is None: raise Exception( "Unexpected EOF, should be ERRORS or SCHEMA tag") - (tag, _) = self.parse_tag_line(line) + (tag, _) = self.parse_tag_line(line_number) if tag in self.TAG_TOKENS: if tag == 'DATA': raise Exception("Unexpected DATA tag") - self.push_back(line) + self.push_back(line_number) break - records.append(line) + records.append(line_number) - return (data_flags, records, lineno) + return (data_flags, records, line_number) def read_existing_schema_section(self): """Returns the JSON string of the existing_schema section. @@ -194,16 +194,16 @@ def read_existing_schema_section(self): # Read the EXISTING_SCHEMA records until the next TAG_TOKEN schema_lines = [] while True: - line = self.read_line() - if line is None: + line_number = self.read_line() + if line_number is None: break - (tag, _) = self.parse_tag_line(line) + (tag, _) = self.parse_tag_line(line_number) if tag in self.TAG_TOKENS: if tag in ('DATA', 'EXISTING_SCHEMA'): raise Exception("Unexpected {} tag".format(tag)) - self.push_back(line) + self.push_back(line_number) break - schema_lines.append(line) + schema_lines.append(line_number) return ''.join(schema_lines) else: self.push_back(tag_line) @@ -213,7 +213,7 @@ def read_errors_section(self): """Return a dictionary of errors which are expected from the parsing of the DATA section. The dict has the form: { - 'line': line, + 'line_number': line_number, 'msg': [ messages ...] } """ @@ -230,17 +230,17 @@ def read_errors_section(self): # Read the ERRORS records until the next TAG_TOKEN. errors = [] while True: - line = self.read_line() - if line is None: + line_number = self.read_line() + if line_number is None: raise Exception("Unexpected EOF, should be SCHEMA tag") - (tag, _) = self.parse_tag_line(line) + (tag, _) = self.parse_tag_line(line_number) if tag in self.TAG_TOKENS: if tag == 'DATA': raise Exception("Unexpected DATA tag found in ERRORS" " section") - self.push_back(line) + self.push_back(line_number) break - errors.append(line) + errors.append(line_number) return error_flags, errors def read_schema_section(self): @@ -254,21 +254,21 @@ def read_schema_section(self): (tag, _) = self.parse_tag_line(tag_line) if tag != 'SCHEMA': raise Exception( - "Unrecoginized tag line '%s', should be SCHEMA" % tag_line) + "Unrecoginized tag line_number '%s', should be SCHEMA" % tag_line) # Read the SCHEMA records until the next TAG_TOKEN schema_lines = [] while True: - line = self.read_line() - if line is None: + line_number = self.read_line() + if line_number is None: break - (tag, _) = self.parse_tag_line(line) + (tag, _) = self.parse_tag_line(line_number) if tag in self.TAG_TOKENS: if tag in ('DATA', 'ERRORS', 'EXISTING_SCHEMA', 'SCHEMA'): raise Exception("Unexpected {} tag".format(tag)) - self.push_back(line) + self.push_back(line_number) break - schema_lines.append(line) + schema_lines.append(line_number) return ''.join(schema_lines) @@ -280,21 +280,21 @@ def read_end_marker(self): (tag, _) = self.parse_tag_line(tag_line) if tag != 'END': raise Exception( - "Unrecoginized tag line '%s', should be END" % tag_line) + "Unrecoginized tag line_number '%s', should be END" % tag_line) - def parse_tag_line(self, line): - """Parses a potential tag line of the form 'TAG [flags...]' where + def parse_tag_line(self, line_number): + """Parses a potential tag line_number of the form 'TAG [flags...]' where 'flags' is a list of strings separated by spaces. Returns the tuple of (tag, [flags]). """ - tokens = line.split() + tokens = line_number.split() if tokens: return (tokens[0], tokens[1:]) else: return (None, []) def read_line(self): - """Return the next line, while supporting a one-line push_back(). + """Return the next line_number, while supporting a one-line_number push_back(). Comment lines begin with a '#' character and are skipped. Blank lines are skipped. Prepending and trailing whitespaces are stripped. @@ -307,7 +307,7 @@ def read_line(self): while True: line = self.testdata_file.readline() - self.lineno += 1 + self.line_number += 1 # EOF if line == '': return None @@ -338,20 +338,20 @@ def process_errors(self, error_records): (line_number, message) = self.parse_error_line(error) error_entry = error_map.get(line_number) if error_entry is None: - error_entry = {'line': line_number, 'msgs': []} + error_entry = {'line_number': line_number, 'msgs': []} error_map[line_number] = error_entry messages = error_entry['msgs'] messages.append(message) return error_map def parse_error_line(self, line): - """Parse the error line of the form: - line: msg + """Parse the error line_number of the form: + line_number: msg """ pos = line.find(':') if pos < 0: raise Exception( - "Error line must be of the form 'line: msg': '%s'" % line) + "Error line_number must be of the form 'line_number: msg': '%s'" % line) line_number = int(line[0:pos]) message = line[pos + 1:].strip() return (line_number, message) diff --git a/tests/test_generate_schema.py b/tests/test_generate_schema.py index 29617ee..9e9d526 100644 --- a/tests/test_generate_schema.py +++ b/tests/test_generate_schema.py @@ -458,13 +458,13 @@ def test_all_data_chunks(self): try: self.verify_data_chunk(chunk) except AssertionError as e: - print("\nError when processing chunk starting on line {}\n" - .format(chunk['line'])) + print("\nError when processing chunk starting on line_number {}\n" + .format(chunk['line_number'])) raise e def verify_data_chunk(self, chunk): chunk_count = chunk['chunk_count'] - line = chunk['line'] + line_number = chunk['line_number'] data_flags = chunk['data_flags'] input_format = 'csv' if ('csv' in data_flags) else 'json' keep_nulls = ('keep_nulls' in data_flags) @@ -478,8 +478,8 @@ def verify_data_chunk(self, chunk): expected_schema = chunk['schema'] existing_schema = chunk['existing_schema'] - print("Test chunk %s, line %s: First record: %s" % - (chunk_count, line, records[0])) + print("Test chunk %s, line_number %s: First record: %s" % + (chunk_count, line_number, records[0])) # Generate schema. generator = SchemaGenerator( input_format=input_format, @@ -514,7 +514,7 @@ def assert_error_messages(self, expected_error_map, error_logs): # Convert the list of errors into a map error_map = {} for error in error_logs: - line_number = error['line'] + line_number = error['line_number'] messages = error_map.get(line_number) if messages is None: messages = [] From 9829bb01f0dba0cd3fca2eef6fb61b6b6c311e02 Mon Sep 17 00:00:00 2001 From: Austin Brogle Date: Mon, 30 Nov 2020 17:21:57 -0800 Subject: [PATCH 31/38] Updating make flake8 task to also scan tests/ folder since CI/CD does scan that --- Makefile | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/Makefile b/Makefile index 7268aff..7707289 100644 --- a/Makefile +++ b/Makefile @@ -6,9 +6,16 @@ tests: python3 -m unittest flake8: - flake8 bigquery_schema_generator \ + flake8 bigquery_schema_generator/ \ --count \ --ignore W503 \ --show-source \ --statistics \ --max-line-length=80 + flake8 tests/ \ + --count \ + --ignore W503 \ + --show-source \ + --statistics \ + --max-line-length=80 + From b556e0bfc3adb85a0d528ca0c13ec4b7abe7d3b0 Mon Sep 17 00:00:00 2001 From: Austin Brogle Date: Mon, 30 Nov 2020 17:22:07 -0800 Subject: [PATCH 32/38] Fix flake8 on tests/ --- tests/data_reader.py | 6 ++++-- tests/test_generate_schema.py | 4 ++-- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/tests/data_reader.py b/tests/data_reader.py index 3fec626..8812f87 100755 --- a/tests/data_reader.py +++ b/tests/data_reader.py @@ -254,7 +254,8 @@ def read_schema_section(self): (tag, _) = self.parse_tag_line(tag_line) if tag != 'SCHEMA': raise Exception( - "Unrecoginized tag line_number '%s', should be SCHEMA" % tag_line) + "Unrecoginized tag line_number '%s', should be SCHEMA" + % tag_line) # Read the SCHEMA records until the next TAG_TOKEN schema_lines = [] @@ -351,7 +352,8 @@ def parse_error_line(self, line): pos = line.find(':') if pos < 0: raise Exception( - "Error line_number must be of the form 'line_number: msg': '%s'" % line) + "Error line_number must be of the form 'line_number: msg': '%s'" + % line) line_number = int(line[0:pos]) message = line[pos + 1:].strip() return (line_number, message) diff --git a/tests/test_generate_schema.py b/tests/test_generate_schema.py index 9e9d526..e8e72ce 100644 --- a/tests/test_generate_schema.py +++ b/tests/test_generate_schema.py @@ -458,8 +458,8 @@ def test_all_data_chunks(self): try: self.verify_data_chunk(chunk) except AssertionError as e: - print("\nError when processing chunk starting on line_number {}\n" - .format(chunk['line_number'])) + print("\nError when processing chunk starting on line_number {}" + "\n".format(chunk['line_number'])) raise e def verify_data_chunk(self, chunk): From be3a37a50b14b70254538c324463e230bd0f4e80 Mon Sep 17 00:00:00 2001 From: Austin Brogle Date: Mon, 30 Nov 2020 17:27:07 -0800 Subject: [PATCH 33/38] Revert read_errors_section logic to original --- tests/data_reader.py | 30 +++++++++++++----------------- 1 file changed, 13 insertions(+), 17 deletions(-) diff --git a/tests/data_reader.py b/tests/data_reader.py index 8812f87..83c7291 100755 --- a/tests/data_reader.py +++ b/tests/data_reader.py @@ -129,10 +129,7 @@ def read_chunk(self): if data_flags is None: return None existing_schema = self.read_existing_schema_section() - error_flags, errors = self.read_errors_section() - if errors and error_flags: - raise Exception("Unexpected error flags in the first ERRORS" - " section") + errors = self.read_errors_section() error_map = self.process_errors(errors or []) schema = self.read_schema_section() self.read_end_marker() @@ -213,7 +210,7 @@ def read_errors_section(self): """Return a dictionary of errors which are expected from the parsing of the DATA section. The dict has the form: { - 'line_number': line_number, + 'line': line, 'msg': [ messages ...] } """ @@ -221,27 +218,26 @@ def read_errors_section(self): # The 'ERRORS' section is optional. tag_line = self.read_line() if tag_line is None: - return None, None - (tag, error_flags) = self.parse_tag_line(tag_line) + return [] + (tag, _) = self.parse_tag_line(tag_line) if tag != 'ERRORS': self.push_back(tag_line) - return None, None + return [] # Read the ERRORS records until the next TAG_TOKEN. errors = [] while True: - line_number = self.read_line() - if line_number is None: + line = self.read_line() + if line is None: raise Exception("Unexpected EOF, should be SCHEMA tag") - (tag, _) = self.parse_tag_line(line_number) + (tag, _) = self.parse_tag_line(line) if tag in self.TAG_TOKENS: - if tag == 'DATA': - raise Exception("Unexpected DATA tag found in ERRORS" - " section") - self.push_back(line_number) + if tag == 'ERRORS': + raise Exception("Unexpected ERRORS tag") + self.push_back(line) break - errors.append(line_number) - return error_flags, errors + errors.append(line) + return errors def read_schema_section(self): """Returns the JSON string of the schema section. From fae9581be522b2428085a6ae5678f109c3e7ab58 Mon Sep 17 00:00:00 2001 From: Austin Brogle Date: Mon, 30 Nov 2020 17:31:47 -0800 Subject: [PATCH 34/38] Convert .format into f strings --- tests/data_reader.py | 4 ++-- tests/test_generate_schema.py | 17 ++++++++--------- 2 files changed, 10 insertions(+), 11 deletions(-) diff --git a/tests/data_reader.py b/tests/data_reader.py index 83c7291..1a53c8b 100755 --- a/tests/data_reader.py +++ b/tests/data_reader.py @@ -197,7 +197,7 @@ def read_existing_schema_section(self): (tag, _) = self.parse_tag_line(line_number) if tag in self.TAG_TOKENS: if tag in ('DATA', 'EXISTING_SCHEMA'): - raise Exception("Unexpected {} tag".format(tag)) + raise Exception(f"Unexpected {tag} tag") self.push_back(line_number) break schema_lines.append(line_number) @@ -262,7 +262,7 @@ def read_schema_section(self): (tag, _) = self.parse_tag_line(line_number) if tag in self.TAG_TOKENS: if tag in ('DATA', 'ERRORS', 'EXISTING_SCHEMA', 'SCHEMA'): - raise Exception("Unexpected {} tag".format(tag)) + raise Exception(f"Unexpected {tag} tag") self.push_back(line_number) break schema_lines.append(line_number) diff --git a/tests/test_generate_schema.py b/tests/test_generate_schema.py index e8e72ce..51b6c9f 100644 --- a/tests/test_generate_schema.py +++ b/tests/test_generate_schema.py @@ -458,8 +458,8 @@ def test_all_data_chunks(self): try: self.verify_data_chunk(chunk) except AssertionError as e: - print("\nError when processing chunk starting on line_number {}" - "\n".format(chunk['line_number'])) + print(f"\nError when processing chunk starting on line_number" + f"{chunk['line_number']}\n") raise e def verify_data_chunk(self, chunk): @@ -565,14 +565,13 @@ def test_bq_schema_to_map_round_trip_permutations(self): self.assertEqual(schema, flattened) except AssertionError as e: print("test_bq_schema_to_map_permutations" - " failed for case where: bq_entry={}\n" + " failed for case where: " + f"bq_entry={bq_entry}\n" "schema_generator created with values:" - "{}-{}-{}-{}" - .format(bq_entry, - input_format_and_mode[0], - input_format_and_mode[1], - keep_null_param, - quotes_are_strings)) + f"{input_format_and_mode[0]}" + f"-{input_format_and_mode[1]}" + f"-{keep_null_param}" + f"-{quotes_are_strings}") raise e def make_bq_schema_entry(self, mode, type): From d41556ed4695758bdb5d866a872c6bed1e393d79 Mon Sep 17 00:00:00 2001 From: Austin Brogle Date: Mon, 30 Nov 2020 17:36:30 -0800 Subject: [PATCH 35/38] Revert generator for testcases to original loop method --- tests/test_generate_schema.py | 29 ++++++++--------------------- 1 file changed, 8 insertions(+), 21 deletions(-) diff --git a/tests/test_generate_schema.py b/tests/test_generate_schema.py index 51b6c9f..a949d27 100644 --- a/tests/test_generate_schema.py +++ b/tests/test_generate_schema.py @@ -427,16 +427,10 @@ def test_json_full_path(self): self.assertEqual('server.port', json_full_path('server', 'port')) -class ChunksFromDataFile(object): - """Read the test case data from TESTDATA_FILE and verify that the expected - schema matches the one produced by SchemaGenerator.deduce_schema(). Multiple - test cases are stored in TESTDATA_FILE. The data_reader.py module knows how - to parse that file. - """ - +class TestDataChunksFromFile(unittest.TestCase): TESTDATA_FILE = 'testdata.txt' - def chunks(self): + def test_all_data_chunks(self): # Find the TESTDATA_FILE in the same directory as this script file. dir_path = os.path.dirname(os.path.realpath(__file__)) testdata_path = os.path.join(dir_path, self.TESTDATA_FILE) @@ -448,19 +442,12 @@ def chunks(self): chunk = data_reader.read_chunk() if chunk is None: break - yield chunk - - -class TestDataChunksFromFile(unittest.TestCase): - def test_all_data_chunks(self): - self.maxDiff = None - for chunk in ChunksFromDataFile().chunks(): - try: - self.verify_data_chunk(chunk) - except AssertionError as e: - print(f"\nError when processing chunk starting on line_number" - f"{chunk['line_number']}\n") - raise e + try: + self.verify_data_chunk(chunk) + except AssertionError as e: + print("Error when processing chunk starting on line_number:" + f" {chunk['line_number']}\n") + raise e def verify_data_chunk(self, chunk): chunk_count = chunk['chunk_count'] From 38523e262a9c2d4b2daa91606bd7b93826d27c0d Mon Sep 17 00:00:00 2001 From: Austin Brogle Date: Mon, 30 Nov 2020 19:58:11 -0800 Subject: [PATCH 36/38] Added a test for standard sql types to legacy type conversion FLOAT64 --> FLOAT and INT64 --> Integer --- tests/testdata.txt | 124 ++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 123 insertions(+), 1 deletion(-) diff --git a/tests/testdata.txt b/tests/testdata.txt index 5ef3a75..707e1d1 100644 --- a/tests/testdata.txt +++ b/tests/testdata.txt @@ -1614,7 +1614,7 @@ SCHEMA ] END -# JSON existing schema REQUIRED to NULLABLE no infer_mode set, errors with cannot change mode +# JSON existing schema REQUIRED to NULLABLE infer_mode set so this should relax the field mode DATA keep_nulls infer_mode {"event_name": "test event", "event_time": "2020-06-02 23:57:12.120174 UTC", "connection_info": {"src_ip": "10.0.1.102", "src_port": 53, "dest_ip": "10.0.0.1", "dest_port": 53}, "data_sources": ["dhcp", "bro", "dns"], "test_boolean": "True", "test_float": 1.3} {"event_name": "test event", "event_time": "2020-06-02 23:57:12.120174 UTC", "connection_info": {"src_ip": "10.0.1.102", "src_port": 53, "dest_ip": "10.0.0.1", "dest_port": 53}, "data_sources": ["dhcp", "bro", "dns"], "test_boolean": "True", "test_float": null} @@ -1735,3 +1735,125 @@ SCHEMA } ] END + +# Testing standard SQL types to legacy types +DATA keep_nulls infer_mode +{"event_name": "test event", "event_time": "2020-06-02 23:57:12.120174 UTC", "connection_info": {"src_ip": "10.0.1.102", "src_port": 53, "dest_ip": "10.0.0.1", "dest_port": 53}, "data_sources": ["dhcp", "bro", "dns"], "test_boolean": "True", "test_float": 1.3} +{"event_name": "test event", "event_time": "2020-06-02 23:57:12.120174 UTC", "connection_info": {"src_ip": "10.0.1.102", "src_port": 53, "dest_ip": "10.0.0.1", "dest_port": 53}, "data_sources": ["dhcp", "bro", "dns"], "test_boolean": "True", "test_float": null} +EXISTING_SCHEMA +[ + { + "name": "event_name", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "event_time", + "type": "TIMESTAMP", + "mode": "NULLABLE" + }, + { + "name": "connection_info", + "type": "RECORD", + "mode": "NULLABLE", + "fields": [ + { + "name": "src_ip", + "type": "STRING" + }, + { + "name": "dest_ip", + "type": "STRING" + }, + { + "name": "src_port", + "type": "INT64" + }, + { + "name": "dest_port", + "type": "INT64" + }, + { + "name": "flags", + "type": "STRING", + "mode": "REPEATED" + } + ] + }, + { + "name": "data_sources", + "type": "STRING", + "mode": "REPEATED" + }, + { + "name": "test_float", + "type": "FLOAT64", + "mode": "REQUIRED" + }, + { + "name": "test_boolean", + "type": "BOOLEAN", + "mode": "NULLABLE" + } +] +SCHEMA +[ + { + "fields": [ + { + "mode": "NULLABLE", + "name": "dest_ip", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "dest_port", + "type": "INTEGER" + }, + { + "mode": "REPEATED", + "name": "flags", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "src_ip", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "src_port", + "type": "INTEGER" + } + ], + "mode": "NULLABLE", + "name": "connection_info", + "type": "RECORD" + }, + { + "mode": "REPEATED", + "name": "data_sources", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "event_name", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "event_time", + "type": "TIMESTAMP" + }, + { + "mode": "NULLABLE", + "name": "test_boolean", + "type": "BOOLEAN" + }, + { + "mode": "NULLABLE", + "name": "test_float", + "type": "FLOAT" + } +] +END From 989c2f8b148f2648d6f22e1566b4e9201afec6c0 Mon Sep 17 00:00:00 2001 From: Austin Brogle Date: Mon, 30 Nov 2020 19:59:04 -0800 Subject: [PATCH 37/38] Added additional 2 standard to legacy type conversions to test --- tests/testdata.txt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/testdata.txt b/tests/testdata.txt index 707e1d1..9c69dd2 100644 --- a/tests/testdata.txt +++ b/tests/testdata.txt @@ -1754,7 +1754,7 @@ EXISTING_SCHEMA }, { "name": "connection_info", - "type": "RECORD", + "type": "STRUCT", "mode": "NULLABLE", "fields": [ { @@ -1792,7 +1792,7 @@ EXISTING_SCHEMA }, { "name": "test_boolean", - "type": "BOOLEAN", + "type": "BOOL", "mode": "NULLABLE" } ] From 82593a4cf3da5ae0a826db00891302979a9f04ab Mon Sep 17 00:00:00 2001 From: Austin Brogle Date: Tue, 1 Dec 2020 15:04:21 -0800 Subject: [PATCH 38/38] Fixed bug where we used infer_mode to set a field as REQUIRED for a json input_format --- bigquery_schema_generator/generate_schema.py | 21 ++++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-) diff --git a/bigquery_schema_generator/generate_schema.py b/bigquery_schema_generator/generate_schema.py index 9b3f2ea..d64f921 100755 --- a/bigquery_schema_generator/generate_schema.py +++ b/bigquery_schema_generator/generate_schema.py @@ -648,7 +648,8 @@ def flatten_schema(self, schema_map): schema_map=schema_map, keep_nulls=self.keep_nulls, sorted_schema=self.sorted_schema, - infer_mode=self.infer_mode) + infer_mode=self.infer_mode, + input_format=self.input_format) def run(self, input_file=sys.stdin, output_file=sys.stdout, schema_map=None): @@ -771,7 +772,8 @@ def is_string_type(thetype): def flatten_schema_map(schema_map, keep_nulls=False, sorted_schema=True, - infer_mode=False): + infer_mode=False, + input_format='json'): """Converts the 'schema_map' into a more flatten version which is compatible with BigQuery schema. @@ -832,13 +834,24 @@ def flatten_schema_map(schema_map, schema_map=value, keep_nulls=keep_nulls, sorted_schema=sorted_schema, - infer_mode=infer_mode + infer_mode=infer_mode, + input_format=input_format ) elif key == 'type' and value in ['QINTEGER', 'QFLOAT', 'QBOOLEAN']: # Convert QINTEGER -> INTEGER, similarly for QFLOAT and QBOOLEAN new_value = value[1:] elif key == 'mode': - if infer_mode and value == 'NULLABLE' and filled: + # 'infer_mode' to set a field as REQUIRED is supported for only + # input_format = 'csv' because the header line gives us the + # complete list of fields to be expected in the CSV file. In + # JSON data files, certain fields will often be completely + # missing instead of being set to 'null' or "". If the field is + # not even present, then it becomes incredibly difficult (not + # impossible, but more effort than I want to expend right now) + # to figure out which fields are missing so that we can mark the + # appropriate schema entries with 'filled=False'. + if (infer_mode and value == 'NULLABLE' and filled + and input_format == 'csv'): new_value = 'REQUIRED' else: new_value = value