From fff6f5bf61a69edf67a1d19106dc5f42ef71cab4 Mon Sep 17 00:00:00 2001 From: Austin Brogle Date: Mon, 9 Nov 2020 12:10:23 -0800 Subject: [PATCH] Removing errors informed from tests. Adding additional test cases including ones starting from an existing schema. --- tests/data_reader.py | 13 +- tests/test_generate_schema.py | 68 ++----- tests/testdata.txt | 374 +++++++++++++++++++++++++++++++++- 3 files changed, 381 insertions(+), 74 deletions(-) diff --git a/tests/data_reader.py b/tests/data_reader.py index 8843302..5c99fc8 100755 --- a/tests/data_reader.py +++ b/tests/data_reader.py @@ -49,9 +49,6 @@ class DataReader: ERRORS line: msg ... - ERRORS INFORMED - line: msg - ... SCHEMA bigquery_schema END @@ -65,7 +62,6 @@ class DataReader: * an optional EXISTING_SCHEMA section contains the existing base BigQuery schema to build off of * an optional ERRORS section containing the expected error messages - * an optional ERRORS INFORMED section containing the expected error messages when the schema is known to schema decoder in advance * a SCHEMA section containing the expected BigQuery schema * comment lines start with a '#' character. @@ -136,11 +132,7 @@ def read_chunk(self): error_flags, errors = self.read_errors_section() if errors and error_flags: raise Exception("Unexpected error flags in the first ERRORS section") - informed_error_flags, informed_errors = self.read_errors_section() - if informed_errors and "INFORMED" not in informed_error_flags: - raise Exception("Expected INFORMED flag in the second ERRORS section") error_map = self.process_errors(errors or []) - informed_error_map = self.process_errors(informed_errors or []) schema = self.read_schema_section() self.read_end_marker() self.chunk_count += 1 @@ -153,8 +145,6 @@ def read_chunk(self): 'existing_schema': existing_schema, 'errors': errors or [], 'error_map': error_map, - 'informed_errors': informed_errors, - 'informed_error_map': informed_error_map, 'schema': schema } @@ -191,7 +181,7 @@ def read_data_section(self): return (data_flags, records, lineno) def read_existing_schema_section(self): - """Returns the JSON string of the schema section. + """Returns the JSON string of the existing_schema section. """ # The next tag must be 'EXISTING_SCHEMA' @@ -213,7 +203,6 @@ def read_existing_schema_section(self): self.push_back(line) break schema_lines.append(line) - return ''.join(schema_lines) else: self.push_back(tag_line) diff --git a/tests/test_generate_schema.py b/tests/test_generate_schema.py index 0fc51ce..62e8b1a 100755 --- a/tests/test_generate_schema.py +++ b/tests/test_generate_schema.py @@ -425,10 +425,10 @@ def chunks(self): class TestDataChunksFromFile(unittest.TestCase): def test_all_data_chunks(self): + self.maxDiff = None for chunk in ChunksFromDataFile().chunks(): try: self.verify_data_chunk(chunk) - self.verify_data_chunk_informed(chunk) except AssertionError as e: print("\nError when processing chunk on line {}\n".format(chunk['line'])) raise e @@ -446,8 +446,10 @@ def verify_data_chunk(self, chunk): expected_errors = chunk['errors'] expected_error_map = chunk['error_map'] expected_schema = chunk['schema'] + existing_schema = chunk['existing_schema'] - print("Test chunk %s, line %s: First record: %s" % (chunk_count, line, records[0])) + print("Test chunk %s, line %s: First record: %s" % + (chunk_count, line, records[0])) # Generate schema. generator = SchemaGenerator( input_format=input_format, @@ -455,7 +457,11 @@ def verify_data_chunk(self, chunk): keep_nulls=keep_nulls, quoted_values_are_strings=quoted_values_are_strings, sanitize_names=sanitize_names) - schema_map, error_logs = generator.deduce_schema(records) + existing_schema_map = None + if existing_schema: + existing_schema_map = bq_schema_to_map(json.loads(existing_schema)) + schema_map, error_logs = generator.deduce_schema( + records, schema_map=existing_schema_map) schema = generator.flatten_schema(schema_map) # Check the schema, preserving order @@ -466,52 +472,6 @@ def verify_data_chunk(self, chunk): self.assertEqual(len(expected_errors), len(error_logs)) self.assert_error_messages(expected_error_map, error_logs) - def verify_data_chunk_informed(self, chunk): - chunk_count = chunk['chunk_count'] - line = chunk['line'] - data_flags = chunk['data_flags'] - input_format = 'csv' if ('csv' in data_flags) else 'json' - keep_nulls = ('keep_nulls' in data_flags) - infer_mode = ('infer_mode' in data_flags) - quoted_values_are_strings = ('quoted_values_are_strings' in data_flags) - sanitize_names = ('sanitize_names' in data_flags) - records = chunk['records'] - expected_schema = chunk['schema'] - expected_errors = chunk['informed_errors'] - expected_error_map = chunk['informed_error_map'] - if expected_errors is None: - expected_errors = chunk['errors'] - expected_error_map = chunk['error_map'] - - # Check the schema, preserving order - expected = json.loads(expected_schema, object_pairs_hook=OrderedDict) - - print("Test informed chunk %s, line %s: First record: %s" % (chunk_count, line, records[0])) - - # Test deduction with preloaded schema - - expected_map = bq_schema_to_map(expected) - generator = SchemaGenerator( - input_format=input_format, - infer_mode=infer_mode, - keep_nulls=keep_nulls, - quoted_values_are_strings=quoted_values_are_strings, - sanitize_names=sanitize_names) - schema_map, error_logs = generator.deduce_schema(records, schema_map=expected_map) - schema = generator.flatten_schema(schema_map) - - # Check the schema, preserving order - self.assertEqual(expected, schema) - - print('informed_expected_errors=',expected_errors,'error_logs=',error_logs) - self.assertEqual(len(expected_errors), len(error_logs)) - self.assert_error_messages(expected_error_map, error_logs) - - # Test roundtrip of schema -> schema_map -> schema - expected_map = bq_schema_to_map(expected) - schema = generator.flatten_schema(expected_map) - self.assertEqual(expected, schema) - def assert_error_messages(self, expected_error_map, error_logs): # Convert the list of errors into a map error_map = {} @@ -523,19 +483,15 @@ def assert_error_messages(self, expected_error_map, error_logs): error_map[line_number] = messages messages.append(error['msg']) - # Check that each entry in 'error_logs' is expected. Currently checks - # only that the number of errors matches on a per line basis. - # TODO: Look deeper and verify that the error message strings match as - # well. for line_number, messages in sorted(error_map.items()): expected_entry = expected_error_map.get(line_number) self.assertIsNotNone(expected_entry) expected_messages = expected_entry['msgs'] - self.assertEqual(len(expected_messages), len(messages)) + self.assertEqual(expected_messages, messages) class TestBigQuerySchemaToSchemaMap(unittest.TestCase): - def test_bq_schema_to_map_permutations(self): + def test_bq_schema_to_map_round_trip_permutations(self): ''' This checks that each possible type of consititued schema, when generated, then converted to a schema_map, then back to the schema, they are equal. @@ -562,7 +518,7 @@ def test_bq_schema_to_map_permutations(self): quoted_values_are_strings=quote_value_are_strings) flattened = generator.flatten_schema(schema_map) try: - self.assertEquals(schema, flattened) + self.assertEqual(schema, flattened) except AssertionError as e: print("test_bq_schema_to_map_permutations failed for case where: " "bq_entry={}\nschema_generator created with values:" diff --git a/tests/testdata.txt b/tests/testdata.txt index f69a454..16427d4 100644 --- a/tests/testdata.txt +++ b/tests/testdata.txt @@ -248,8 +248,6 @@ DATA { "a": [1, 3], "r": [{ "r0": "r0", "r1": "r1" }] } ERRORS 2: Converting schema for "r" from NULLABLE RECORD into REPEATED RECORD -ERRORS INFORMED -# none SCHEMA [ { @@ -409,8 +407,6 @@ DATA { "i": 3 } ERRORS 2: Ignoring non-RECORD field with mismatched mode: old=(hard,i,REPEATED,INTEGER); new=(hard,i,NULLABLE,INTEGER) -ERRORS INFORMED -2: Converting schema for "r" from NULLABLE RECORD into REPEATED RECORD SCHEMA [] END @@ -421,8 +417,6 @@ DATA { "r" : [{ "i": 4 }] } ERRORS 2: Converting schema for "r" from NULLABLE RECORD into REPEATED RECORD -ERRORS INFORMED -1: Leaving schema for "r" as REPEATED RECORD SCHEMA [ { @@ -910,3 +904,371 @@ SCHEMA } ] END + +# Empty JSON input file, existing schema no modification done, purely testing round trip schema generation +DATA keep_nulls +{} +EXISTING_SCHEMA +[ + { + "name": "event_name", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "event_time", + "type": "TIMESTAMP", + "mode": "NULLABLE" + }, + { + "name": "connection_info", + "type": "RECORD", + "mode": "NULLABLE", + "fields": [ + { + "name": "src_ip", + "type": "STRING" + }, + { + "name": "dest_ip", + "type": "STRING" + }, + { + "name": "src_port", + "type": "INTEGER" + }, + { + "name": "dest_port", + "type": "INTEGER" + }, + { + "name": "flags", + "type": "STRING", + "mode": "REPEATED" + } + ] + }, + { + "name": "data_sources", + "type": "STRING", + "mode": "REPEATED" + }, + { + "name": "test_float", + "type": "FLOAT", + "mode": "NULLABLE" + }, + { + "name": "test_boolean", + "type": "BOOLEAN", + "mode": "NULLABLE" + } +] +SCHEMA +[ + { + "fields": [ + { + "mode": "NULLABLE", + "name": "dest_ip", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "dest_port", + "type": "INTEGER" + }, + { + "mode": "REPEATED", + "name": "flags", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "src_ip", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "src_port", + "type": "INTEGER" + } + ], + "mode": "NULLABLE", + "name": "connection_info", + "type": "RECORD" + }, + { + "mode": "REPEATED", + "name": "data_sources", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "event_name", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "event_time", + "type": "TIMESTAMP" + }, + { + "mode": "NULLABLE", + "name": "test_boolean", + "type": "BOOLEAN" + }, + { + "mode": "NULLABLE", + "name": "test_float", + "type": "FLOAT" + } +] +END + +# JSON existing schema file valid record instead of string, no modification done +DATA keep_nulls +{"event_name": "test event", "event_time": "2020-06-02 23:57:12.120174 UTC", "connection_info": {"src_ip": "10.0.1.102", "src_port": 53, "dest_ip": "10.0.0.1", "dest_port": 53}, "data_sources": ["dhcp", "bro", "dns"], "test_float": 17.6, "test_boolean": "True"} +EXISTING_SCHEMA +[ + { + "name": "event_name", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "event_time", + "type": "TIMESTAMP", + "mode": "NULLABLE" + }, + { + "name": "connection_info", + "type": "RECORD", + "mode": "NULLABLE", + "fields": [ + { + "name": "src_ip", + "type": "STRING" + }, + { + "name": "dest_ip", + "type": "STRING" + }, + { + "name": "src_port", + "type": "INTEGER" + }, + { + "name": "dest_port", + "type": "INTEGER" + }, + { + "name": "flags", + "type": "STRING", + "mode": "REPEATED" + } + ] + }, + { + "name": "data_sources", + "type": "STRING", + "mode": "REPEATED" + }, + { + "name": "test_float", + "type": "FLOAT", + "mode": "NULLABLE" + }, + { + "name": "test_boolean", + "type": "BOOLEAN", + "mode": "NULLABLE" + } +] +SCHEMA +[ + { + "fields": [ + { + "mode": "NULLABLE", + "name": "dest_ip", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "dest_port", + "type": "INTEGER" + }, + { + "mode": "REPEATED", + "name": "flags", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "src_ip", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "src_port", + "type": "INTEGER" + } + ], + "mode": "NULLABLE", + "name": "connection_info", + "type": "RECORD" + }, + { + "mode": "REPEATED", + "name": "data_sources", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "event_name", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "event_time", + "type": "TIMESTAMP" + }, + { + "mode": "NULLABLE", + "name": "test_boolean", + "type": "BOOLEAN" + }, + { + "mode": "NULLABLE", + "name": "test_float", + "type": "FLOAT" + } +] +END + +# JSON file field added, schema expanded +DATA keep_nulls +{"event_name": "test event", "extra_field": "testing", "event_time": "2020-06-02 23:57:12.120174 UTC", "connection_info": {"src_ip": "10.0.1.102", "src_port": 53, "dest_ip": "10.0.0.1", "dest_port": 53}, "data_sources": ["dhcp", "bro", "dns"], "test_float": 17.6, "test_boolean": "True"} +EXISTING_SCHEMA +[ + { + "name": "event_name", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "event_time", + "type": "TIMESTAMP", + "mode": "NULLABLE" + }, + { + "name": "connection_info", + "type": "RECORD", + "mode": "NULLABLE", + "fields": [ + { + "name": "src_ip", + "type": "STRING" + }, + { + "name": "dest_ip", + "type": "STRING" + }, + { + "name": "src_port", + "type": "INTEGER" + }, + { + "name": "dest_port", + "type": "INTEGER" + }, + { + "name": "flags", + "type": "STRING", + "mode": "REPEATED" + } + ] + }, + { + "name": "data_sources", + "type": "STRING", + "mode": "REPEATED" + }, + { + "name": "test_float", + "type": "FLOAT", + "mode": "NULLABLE" + }, + { + "name": "test_boolean", + "type": "BOOLEAN", + "mode": "NULLABLE" + } +] +SCHEMA +[ + { + "fields": [ + { + "mode": "NULLABLE", + "name": "dest_ip", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "dest_port", + "type": "INTEGER" + }, + { + "mode": "REPEATED", + "name": "flags", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "src_ip", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "src_port", + "type": "INTEGER" + } + ], + "mode": "NULLABLE", + "name": "connection_info", + "type": "RECORD" + }, + { + "mode": "REPEATED", + "name": "data_sources", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "event_name", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "event_time", + "type": "TIMESTAMP" + }, + { + "mode": "NULLABLE", + "name": "extra_field", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "test_boolean", + "type": "BOOLEAN" + }, + { + "mode": "NULLABLE", + "name": "test_float", + "type": "FLOAT" + } +] +END