diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 1a8bc4f..38a2795 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -17,9 +17,10 @@ jobs: - name: Install Linting Tools run: | python -m pip install --upgrade pip - pip install --user pylint==2.6.0 + pip install --user pylint==2.17.7 pip install --user black~=22.3 pip install --user flake8~=4.0 + pip install --user pytest - name: Install Partition Manager run: | diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 0e3382c..ccccb8a 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -17,7 +17,7 @@ repos: hooks: - id: flake8 - repo: https://github.com/PyCQA/pylint - rev: pylint-2.6.0 + rev: v2.17.7 hooks: - id: pylint args: diff --git a/partitionmanager/cli.py b/partitionmanager/cli.py index 6e319ff..723b650 100644 --- a/partitionmanager/cli.py +++ b/partitionmanager/cli.py @@ -10,6 +10,8 @@ import traceback import yaml +import partitionmanager.database_helpers +import partitionmanager.dropper import partitionmanager.migrate import partitionmanager.sql import partitionmanager.stats @@ -121,10 +123,10 @@ def from_yaml_file(self, file): for key in data["tables"]: tab = partitionmanager.types.Table(key) tabledata = data["tables"][key] - if isinstance(tabledata, dict) and "retention" in tabledata: - tab.set_retention( + if isinstance(tabledata, dict) and "retention_period" in tabledata: + tab.set_retention_period( partitionmanager.types.timedelta_from_dict( - tabledata["retention"] + tabledata["retention_period"] ) ) if isinstance(tabledata, dict) and "partition_period" in tabledata: @@ -318,16 +320,10 @@ def do_partition(conf): duration = table.partition_period log.info(f"Evaluating {table} (duration={duration})") - - positions = pm_tap.get_current_positions( - conf.dbcmd, table, map_data["range_cols"] + cur_pos = partitionmanager.database_helpers.get_position_of_table( + conf.dbcmd, table, map_data ) - log.info(f"{table} (pos={positions})") - - cur_pos = partitionmanager.types.Position() - cur_pos.set_position([positions[col] for col in map_data["range_cols"]]) - sql_cmds = pm_tap.get_pending_sql_reorganize_partition_commands( database=conf.dbcmd, table=table, @@ -465,6 +461,57 @@ def do_stats(conf, metrics=partitionmanager.stats.PrometheusMetrics()): return all_results +def drop_cmd(args): + """Calculates drop. + Helper for argparse. + """ + conf = config_from_args(args) + return do_find_drops_for_tables(conf) + + +DROP_PARSER = SUBPARSERS.add_parser("drop", help="drop old partitions") +DROP_PARSER.set_defaults(func=drop_cmd) + + +def do_find_drops_for_tables(conf): + all_results = dict() + for table in conf.tables: + log = logging.getLogger(f"do_find_drops_for_tables:{table.name}") + + if not table.has_date_query: + log.warning(f"Cannot process {table}: no date query specified") + continue + + if not table.retention_period: + log.warning(f"Cannot process {table}: no retention specified") + continue + + try: + table_problems = pm_tap.get_table_compatibility_problems(conf.dbcmd, table) + if table_problems: + log.debug(f"Cannot process {table}: {table_problems}") + continue + + map_data = pm_tap.get_partition_map(conf.dbcmd, table) + current_position = partitionmanager.database_helpers.get_position_of_table( + conf.dbcmd, table, map_data + ) + + droppable = partitionmanager.dropper.get_droppable_partitions( + conf.dbcmd, + map_data["partitions"], + current_position, + conf.curtime, + table, + ) + + all_results[table.name] = droppable + except Exception as e: + log.warning(f"Error processing table {table.name}") + raise e + return all_results + + def main(): """Start here.""" args = PARSER.parse_args() diff --git a/partitionmanager/cli_test.py b/partitionmanager/cli_test.py index 398ccf2..a926bfd 100644 --- a/partitionmanager/cli_test.py +++ b/partitionmanager/cli_test.py @@ -8,6 +8,7 @@ migrate_cmd, config_from_args, do_partition, + drop_cmd, PARSER, partition_cmd, stats_cmd, @@ -224,11 +225,9 @@ def test_partition_period_seven_days(self): [ "INFO:partition:Evaluating Table partitioned_last_week " "(duration=7 days, 0:00:00)", - "INFO:partition:Table partitioned_last_week (pos={'id': 150})", "DEBUG:partition:Table partitioned_last_week has no pending SQL updates.", "INFO:partition:Evaluating Table partitioned_yesterday " "(duration=7 days, 0:00:00)", - "INFO:partition:Table partitioned_yesterday (pos={'id': 150})", "DEBUG:partition:Table partitioned_yesterday has no pending SQL updates.", ] ), @@ -626,3 +625,60 @@ def test_migrate_cmd_in_out(self): "flip", ] ) + + +class TestDropCmd(unittest.TestCase): + def _run_drop_cmd_yaml(self, yaml): + with tempfile.NamedTemporaryFile() as tmpfile: + insert_into_file(tmpfile, yaml) + args = PARSER.parse_args(["--config", tmpfile.name, "drop"]) + return drop_cmd(args) + + def test_drop_invalid_config(self): + with self.assertLogs( + "do_find_drops_for_tables:unused", level="WARNING" + ) as logctx: + self._run_drop_cmd_yaml( + f""" +partitionmanager: + mariadb: {str(fake_exec)} + tables: + unused: + earliest_utc_timestamp_query: > + SELECT UNIX_TIMESTAMP(`issued`) FROM `unused` + WHERE `id` > '?' ORDER BY `id` ASC LIMIT 1; +""" + ) + self.assertEqual( + set(logctx.output), + set( + [ + "WARNING:do_find_drops_for_tables:unused:" + "Cannot process Table unused: no retention specified" + ] + ), + ) + + def test_drop_no_sql(self): + with self.assertLogs( + "do_find_drops_for_tables:unused", level="WARNING" + ) as logctx: + self._run_drop_cmd_yaml( + f""" +partitionmanager: + mariadb: {str(fake_exec)} + tables: + unused: + retention_period: + days: 180 + """ + ) + self.assertEqual( + set(logctx.output), + set( + [ + "WARNING:do_find_drops_for_tables:unused:" + "Cannot process Table unused: no date query specified" + ] + ), + ) diff --git a/partitionmanager/database_helpers.py b/partitionmanager/database_helpers.py new file mode 100644 index 0000000..98400d1 --- /dev/null +++ b/partitionmanager/database_helpers.py @@ -0,0 +1,72 @@ +""" +Helper functions for database operations +""" + +from datetime import datetime, timezone +import logging + +import partitionmanager.table_append_partition as pm_tap +import partitionmanager.types + + +def get_position_of_table(database, table, map_data): + """Returns a Position of the table at the current moment.""" + + pos_list = pm_tap.get_current_positions(database, table, map_data["range_cols"]) + + cur_pos = partitionmanager.types.Position() + cur_pos.set_position([pos_list[col] for col in map_data["range_cols"]]) + + return cur_pos + + +def calculate_exact_timestamp_via_query(database, table, position_partition): + """Calculates the exact timestamp of a PositionPartition. + + raises ValueError if the position is incalculable + """ + + log = logging.getLogger(f"calculate_exact_timestamp_via_query:{table.name}") + + if not table.has_date_query: + raise ValueError("Table has no defined date query") + + if not isinstance(position_partition, partitionmanager.types.PositionPartition): + raise ValueError("Only PositionPartitions are supported") + + if len(position_partition.position) != 1: + raise ValueError( + "This method is only valid for single-column partitions right now" + ) + arg = position_partition.position.as_sql_input()[0] + + sql_select_cmd = table.earliest_utc_timestamp_query.get_statement_with_argument(arg) + log.debug( + "Executing %s to derive partition %s at position %s", + sql_select_cmd, + position_partition.name, + position_partition.position, + ) + + start = datetime.now() + exact_time_result = database.run(sql_select_cmd) + end = datetime.now() + + if not len(exact_time_result) == 1: + raise partitionmanager.types.NoExactTimeException("No exact timestamp result") + if not len(exact_time_result[0]) == 1: + raise partitionmanager.types.NoExactTimeException( + "Unexpected column count for the timestamp result" + ) + for key, value in exact_time_result[0].items(): + exact_time = datetime.fromtimestamp(value, tz=timezone.utc) + break + + log.debug( + "Exact time of %s returned for %s at position %s, query took %s", + exact_time, + position_partition.name, + position_partition.position, + (end - start), + ) + return exact_time diff --git a/partitionmanager/database_helpers_test.py b/partitionmanager/database_helpers_test.py new file mode 100644 index 0000000..188752c --- /dev/null +++ b/partitionmanager/database_helpers_test.py @@ -0,0 +1,109 @@ +import unittest + +from .database_helpers import get_position_of_table, calculate_exact_timestamp_via_query + +from .types import ( + DatabaseCommand, + NoExactTimeException, + PositionPartition, + SqlInput, + SqlQuery, + Table, +) + + +class MockDatabase(DatabaseCommand): + def __init__(self): + self._responses = list() + self.num_queries = 0 + + def add_response(self, expected, response): + self._responses.insert(0, {"expected": expected, "response": response}) + + def run(self, cmd): + self.num_queries += 1 + if not self._responses: + raise Exception(f"No mock responses available for cmd [{cmd}]") + + r = self._responses.pop() + if r["expected"] in cmd: + return r["response"] + + raise Exception(f"Received command [{cmd}] and expected [{r['expected']}]") + + def db_name(self): + return SqlInput("the-database") + + +class TestDatabaseHelpers(unittest.TestCase): + def test_position_of_table(self): + db = MockDatabase() + db.add_response("SELECT id FROM `burgers` ORDER BY", [{"id": 90210}]) + + table = Table("burgers") + data = {"range_cols": ["id"]} + + pos = get_position_of_table(db, table, data) + self.assertEqual(pos.as_list(), [90210]) + + def test_exact_timestamp_no_query(self): + db = MockDatabase() + db.add_response("SELECT id FROM `burgers` ORDER BY", [{"id": 42}]) + + table = Table("burgers") + self.assertFalse(table.has_date_query) + + pos = PositionPartition("p_start") + pos.set_position([42]) + + with self.assertRaises(ValueError): + calculate_exact_timestamp_via_query(db, table, pos) + + def test_exact_timestamp(self): + db = MockDatabase() + db.add_response( + "SELECT UNIX_TIMESTAMP(`cooked`)", [{"UNIX_TIMESTAMP": 17541339060}] + ) + + table = Table("burgers") + table.set_earliest_utc_timestamp_query( + SqlQuery( + "SELECT UNIX_TIMESTAMP(`cooked`) FROM `orders` " + "WHERE `type` = \"burger\" AND `id` > '?' ORDER BY `id` ASC LIMIT 1;" + ) + ) + + pos = PositionPartition("p_start") + pos.set_position([150]) + + ts = calculate_exact_timestamp_via_query(db, table, pos) + assert f"{ts}" == "2525-11-11 18:11:00+00:00" + + def test_no_exact_timestamp(self): + db = MockDatabase() + db.add_response( + "SELECT UNIX_TIMESTAMP(`cooked`)", + [{"UNIX_TIMESTAMP": 17541339060}, {"UNIX_TIMESTAMP": 17541339070}], + ) + + table = Table("burgers") + table.set_earliest_utc_timestamp_query( + SqlQuery( + "SELECT UNIX_TIMESTAMP(`cooked`) FROM `orders` " + "WHERE `type` = \"burger\" AND `id` > '?' ORDER BY `id` ASC LIMIT 1;" + ) + ) + + pos = PositionPartition("p_start") + pos.set_position([150]) + + with self.assertRaises(NoExactTimeException): + calculate_exact_timestamp_via_query(db, table, pos) + + db.add_response( + "SELECT UNIX_TIMESTAMP(`cooked`)", + [{"UNIX_TIMESTAMP": 17541339060, "column2": True}], + ) + + with self.assertRaises(NoExactTimeException): + calculate_exact_timestamp_via_query(db, table, pos) diff --git a/partitionmanager/dropper.py b/partitionmanager/dropper.py new file mode 100644 index 0000000..26557a4 --- /dev/null +++ b/partitionmanager/dropper.py @@ -0,0 +1,114 @@ +""" +Determine which partitions can be dropped. +""" + +import logging + +import partitionmanager.types +import partitionmanager.tools + + +def _drop_statement(table, partition_list): + """Generate an ALTER TABLE statement to drop these partitions.""" + + log = logging.getLogger("get_droppable_partitions") + + if not partition_list: + raise ValueError("Partition list may not be empty") + + partitions = ",".join(map(lambda x: f"`{x.name}`", partition_list)) + + alter_cmd = f"ALTER TABLE `{table.name}` " f"DROP PARTITION IF EXISTS {partitions};" + + log.debug("Yielding %s", alter_cmd) + + return alter_cmd + + +def get_droppable_partitions( + database, partitions, current_position, current_timestamp, table +): + """Return a dictionary of partitions which can be dropped and why.""" + log = logging.getLogger("get_droppable_partitions") + results = {} + droppable = [] + + if not table.retention_period: + raise ValueError(f"{table.name} does not have a retention period set") + + if not partitions: + return results + + if sorted(partitions) != partitions: + raise ValueError(f"Supplied partitions are not correctly sorted: {partitions}") + + for partition, next_partition in partitionmanager.tools.pairwise(partitions): + if next_partition >= current_position: + log.debug( + "Stopping at %s because current position %s indicates " + "subsequent partition is empty", + partition, + current_position, + ) + break + + if isinstance(next_partition, partitionmanager.types.MaxValuePartition): + log.debug("Stopping at %s because we can't handle MaxValuePartitions.") + break + + assert isinstance(next_partition, partitionmanager.types.PositionPartition) + + approx_size = 0 + for a, b in zip( + next_partition.position.as_list(), partition.position.as_list() + ): + approx_size += a - b + + try: + start_time = ( + partitionmanager.database_helpers.calculate_exact_timestamp_via_query( + database, table, partition + ) + ) + end_time = ( + partitionmanager.database_helpers.calculate_exact_timestamp_via_query( + database, table, next_partition + ) + ) + + oldest_age = current_timestamp - start_time + youngest_age = current_timestamp - end_time + + if youngest_age > table.retention_period: + results[partition.name] = { + "oldest_time": f"{start_time}", + "youngest_time": f"{end_time}", + "oldest_position": partition.position, + "youngest_position": next_partition.position, + "oldest_age": f"{oldest_age}", + "youngest_age": f"{youngest_age}", + "approx_size": approx_size, + } + droppable.append(partition) + except partitionmanager.types.NoExactTimeException: + log.warning( + "Couldn't determine exact times for %s.%s, it is probably droppable too.", + table, + partition, + ) + + results[partition.name] = { + "oldest_time": "unable to determine", + "youngest_time": "unable to determine", + "oldest_position": partition.position, + "youngest_position": next_partition.position, + "oldest_age": "unable to determine", + "youngest_age": "unable to determine", + "approx_size": approx_size, + } + droppable.append(partition) + + if droppable: + results["drop_query"] = _drop_statement(table, droppable) + + return results diff --git a/partitionmanager/dropper_test.py b/partitionmanager/dropper_test.py new file mode 100644 index 0000000..af3899e --- /dev/null +++ b/partitionmanager/dropper_test.py @@ -0,0 +1,235 @@ +import unittest +from datetime import datetime, timedelta, timezone + +from .dropper import _drop_statement, get_droppable_partitions +from .types import ( + DatabaseCommand, + PositionPartition, + SqlInput, + SqlQuery, + Table, +) +from .types_test import mkPPart, mkTailPart, mkPos + + +def _timestamp_rsp(year, mo, day): + return [ + {"UNIX_TIMESTAMP": datetime(year, mo, day, tzinfo=timezone.utc).timestamp()} + ] + + +class MockDatabase(DatabaseCommand): + def __init__(self): + self._responses = list() + self.num_queries = 0 + + def add_response(self, expected, response): + self._responses.insert(0, {"expected": expected, "response": response}) + + def run(self, cmd): + self.num_queries += 1 + if not self._responses: + raise Exception(f"No mock responses available for cmd [{cmd}]") + + r = self._responses.pop() + if r["expected"] in cmd: + return r["response"] + + raise Exception(f"Received command [{cmd}] and expected [{r['expected']}]") + + def db_name(self): + return SqlInput("the-database") + + +class TestDropper(unittest.TestCase): + def test_drop_statement_empty(self): + table = Table("burgers") + parts = [] + with self.assertRaises(ValueError): + _drop_statement(table, parts) + + def test_drop_statement(self): + table = Table("burgers") + parts = [PositionPartition("p_start")] + self.assertEqual( + _drop_statement(table, parts), + "ALTER TABLE `burgers` DROP PARTITION IF EXISTS `p_start`;", + ) + + def test_get_droppable_partitions_invalid_config(self): + database = MockDatabase() + table = Table("burgers") + partitions = [PositionPartition("p_start")] + current_timestamp = datetime(2021, 1, 1, tzinfo=timezone.utc) + current_position = PositionPartition("p_20210102").set_position([10]) + + with self.assertRaises(ValueError): + get_droppable_partitions( + database, partitions, current_position, current_timestamp, table + ) + + def test_no_droppable_partitions(self): + database = MockDatabase() + table = Table("burgers") + table.set_earliest_utc_timestamp_query( + SqlQuery( + "SELECT UNIX_TIMESTAMP(`cooked`) FROM `orders` " + "WHERE `id` > '?' ORDER BY `id` ASC LIMIT 1;" + ) + ) + table.set_retention_period(timedelta(days=2)) + current_timestamp = datetime(2021, 1, 1, tzinfo=timezone.utc) + current_position = PositionPartition("p_20210102").set_position([10]) + assert {} == get_droppable_partitions( + database, [], current_position, current_timestamp, table + ) + + def test_get_droppable_partitions(self): + database = MockDatabase() + database.add_response("WHERE `id` > '100'", _timestamp_rsp(2021, 5, 20)) + database.add_response("WHERE `id` > '200'", _timestamp_rsp(2021, 5, 27)) + database.add_response("WHERE `id` > '200'", _timestamp_rsp(2021, 5, 27)) + database.add_response("WHERE `id` > '300'", _timestamp_rsp(2021, 6, 3)) + database.add_response("WHERE `id` > '300'", _timestamp_rsp(2021, 6, 3)) + database.add_response("WHERE `id` > '400'", _timestamp_rsp(2021, 6, 10)) + database.add_response("WHERE `id` > '400'", _timestamp_rsp(2021, 6, 10)) + database.add_response("WHERE `id` > '500'", _timestamp_rsp(2021, 6, 17)) + + table = Table("burgers") + table.set_earliest_utc_timestamp_query( + SqlQuery( + "SELECT UNIX_TIMESTAMP(`cooked`) FROM `orders` " + "WHERE `id` > '?' ORDER BY `id` ASC LIMIT 1;" + ) + ) + current_timestamp = datetime(2021, 7, 1, tzinfo=timezone.utc) + + partitions = [ + mkPPart("1", 100), + mkPPart("2", 200), + mkPPart("3", 300), + mkPPart("4", 400), + mkPPart("5", 500), + mkPPart("6", 600), + mkTailPart("z"), + ] + current_position = mkPos(340) + + table.set_retention_period(timedelta(days=2)) + results = get_droppable_partitions( + database, partitions, current_position, current_timestamp, table + ) + self.assertEqual( + results["drop_query"], + "ALTER TABLE `burgers` DROP PARTITION IF EXISTS `1`,`2`;", + ) + self.assertEqual(results["1"]["oldest_time"], "2021-05-20 00:00:00+00:00") + self.assertEqual(results["1"]["youngest_time"], "2021-05-27 00:00:00+00:00") + self.assertEqual(results["1"]["oldest_position"].as_list(), [100]) + self.assertEqual(results["1"]["youngest_position"].as_list(), [200]) + self.assertEqual(results["1"]["oldest_age"], "42 days, 0:00:00") + self.assertEqual(results["1"]["youngest_age"], "35 days, 0:00:00") + self.assertEqual(results["1"]["approx_size"], 100) + + self.assertEqual(results["2"]["oldest_time"], "2021-05-27 00:00:00+00:00") + self.assertEqual(results["2"]["youngest_time"], "2021-06-03 00:00:00+00:00") + self.assertEqual(results["2"]["oldest_position"].as_list(), [200]) + self.assertEqual(results["2"]["youngest_position"].as_list(), [300]) + self.assertEqual(results["2"]["oldest_age"], "35 days, 0:00:00") + self.assertEqual(results["2"]["youngest_age"], "28 days, 0:00:00") + self.assertEqual(results["2"]["approx_size"], 100) + + def test_get_droppable_partitions_out_of_order(self): + database = MockDatabase() + + table = Table("burgers") + table.set_earliest_utc_timestamp_query( + SqlQuery( + "SELECT UNIX_TIMESTAMP(`cooked`) FROM `orders` " + "WHERE `id` > '?' ORDER BY `id` ASC LIMIT 1;" + ) + ) + current_timestamp = datetime(2021, 7, 1, tzinfo=timezone.utc) + + partitions = [ + mkPPart("2", 200), + mkPPart("1", 100), + mkPPart("3", 300), + mkTailPart("z"), + ] + current_position = mkPos(140) + table.set_retention_period(timedelta(days=2)) + + with self.assertRaises(ValueError): + get_droppable_partitions( + database, partitions, current_position, current_timestamp, table + ) + + def test_drop_nothing_to_do(self): + database = MockDatabase() + database.add_response("WHERE `id` > '100'", _timestamp_rsp(2021, 5, 1)) + database.add_response("WHERE `id` > '200'", _timestamp_rsp(2021, 5, 8)) + database.add_response("WHERE `id` > '200'", _timestamp_rsp(2021, 5, 8)) + database.add_response("WHERE `id` > '300'", _timestamp_rsp(2021, 5, 19)) + database.add_response("WHERE `id` > '300'", _timestamp_rsp(2021, 5, 19)) + database.add_response("WHERE `id` > '400'", _timestamp_rsp(2021, 5, 24)) + + table = Table("burgers") + table.set_earliest_utc_timestamp_query( + SqlQuery( + "SELECT UNIX_TIMESTAMP(`cooked`) FROM `orders` " + "WHERE `id` > '?' ORDER BY `id` ASC LIMIT 1;" + ) + ) + current_timestamp = datetime(2021, 6, 1, tzinfo=timezone.utc) + + partitions = [ + mkPPart("1", 100), + mkPPart("2", 200), + mkPPart("3", 300), + mkPPart("4", 400), + mkPPart("5", 500), + mkPPart("6", 600), + mkTailPart("z"), + ] + current_position = mkPos(340) + + table.set_retention_period(timedelta(days=30)) + results = get_droppable_partitions( + database, partitions, current_position, current_timestamp, table + ) + self.assertNotIn("drop_query", results) + + +def test_get_droppable_partitions_no_exact_times(caplog): + database = MockDatabase() + resp = _timestamp_rsp(2021, 5, 20) + resp.extend(_timestamp_rsp(2021, 5, 21)) + database.add_response("WHERE `id` > '100'", resp) + database.add_response("WHERE `id` > '200'", _timestamp_rsp(2021, 5, 27)) + + table = Table("burgers") + table.set_earliest_utc_timestamp_query( + SqlQuery( + "SELECT UNIX_TIMESTAMP(`cooked`) FROM `orders` " + "WHERE `id` > '?' ORDER BY `id` ASC LIMIT 1;" + ) + ) + current_timestamp = datetime(2021, 7, 1, tzinfo=timezone.utc) + + partitions = [ + mkPPart("1", 100), + mkPPart("2", 200), + mkTailPart("z"), + ] + current_position = mkPos(340) + + table.set_retention_period(timedelta(days=2)) + + get_droppable_partitions( + database, partitions, current_position, current_timestamp, table + ) + assert ( + "Couldn't determine exact times for Table burgers.1: (100), it is probably droppable too." + in caplog.messages + ) diff --git a/partitionmanager/table_append_partition.py b/partitionmanager/table_append_partition.py index 87e7dee..7f8dc32 100644 --- a/partitionmanager/table_append_partition.py +++ b/partitionmanager/table_append_partition.py @@ -2,7 +2,7 @@ Design and perform partition management. """ -from datetime import datetime, timedelta, timezone +from datetime import timedelta import logging import operator import re @@ -425,9 +425,6 @@ def _get_rate_partitions_with_queried_timestamps( The partitions' timestamps are explicitly queried. """ - log = logging.getLogger( - f"_get_rate_partitions_with_queried_timestamps:{table.name}" - ) if not table.has_date_query: raise ValueError("Table has no defined date query") @@ -435,42 +432,10 @@ def _get_rate_partitions_with_queried_timestamps( instant_partitions = list() for partition in partition_list: - if len(partition.position) != 1: - raise ValueError( - "This method is only valid for single-column partitions right now" + exact_time = ( + partitionmanager.database_helpers.calculate_exact_timestamp_via_query( + database, table, partition ) - arg = partition.position.as_sql_input()[0] - - sql_select_cmd = table.earliest_utc_timestamp_query.get_statement_with_argument( - arg - ) - log.debug( - "Executing %s to derive partition %s at position %s", - sql_select_cmd, - partition.name, - partition.position, - ) - - start = datetime.now() - exact_time_result = database.run(sql_select_cmd) - end = datetime.now() - - if not exact_time_result: - log.debug("No result found for position %s", arg) - continue - - assert len(exact_time_result) == 1 - assert len(exact_time_result[0]) == 1 - for key, value in exact_time_result[0].items(): - exact_time = datetime.fromtimestamp(value, tz=timezone.utc) - break - - log.debug( - "Exact time of %s returned for %s at position %s, query took %s", - exact_time, - partition.name, - partition.position, - (end - start), ) instant_partitions.append( diff --git a/partitionmanager/types.py b/partitionmanager/types.py index 774d3ba..b3c1eee 100644 --- a/partitionmanager/types.py +++ b/partitionmanager/types.py @@ -30,17 +30,17 @@ class Table: def __init__(self, name): self.name = SqlInput(name) - self.retention = None + self.retention_period = None self.partition_period = None self.earliest_utc_timestamp_query = None - def set_retention(self, ret): + def set_retention_period(self, ret): """ Sets the retention period as a timedelta for this table """ if not isinstance(ret, timedelta): raise ValueError("Must be a timedelta") - self.retention = ret + self.retention_period = ret return self def set_partition_period(self, dur): @@ -350,6 +350,9 @@ def __lt__(self, other): return True return False + def __ge__(self, other): + return not self < other + def __eq__(self, other): if isinstance(other, PositionPartition): return self.name == other.name and self._position == other.position @@ -399,6 +402,9 @@ def __lt__(self, other): return False return ValueError() + def __ge__(self, other): + return not self < other + def __eq__(self, other): if isinstance(other, MaxValuePartition): return self.name == other.name and self._count == other.num_columns @@ -609,3 +615,7 @@ class NoEmptyPartitionsAvailableException(Exception): class DatabaseCommandException(Exception): """Raised if the database command failed.""" + + +class NoExactTimeException(Exception): + """Raised if there's no exact time available for this partition.""" diff --git a/partitionmanager/types_test.py b/partitionmanager/types_test.py index 25a9014..ee7cd50 100644 --- a/partitionmanager/types_test.py +++ b/partitionmanager/types_test.py @@ -1,5 +1,6 @@ import argparse import unittest +import pytest from datetime import datetime, timedelta, timezone from .types import ( ChangePlannedPartition, @@ -146,13 +147,18 @@ def test_table(self): self.assertEqual(type(Table("name").name), SqlInput) t = Table("t") - self.assertEqual(None, t.retention) + self.assertEqual(None, t.retention_period) self.assertEqual( Table("a").set_partition_period(timedelta(days=9)).partition_period, timedelta(days=9), ) + self.assertEqual( + Table("a").set_retention_period(timedelta(days=9)).retention_period, + timedelta(days=9), + ) + with self.assertRaises(argparse.ArgumentTypeError): timedelta_from_dict({"something": 1}) @@ -179,6 +185,10 @@ def test_table(self): ) self.assertTrue(t.has_date_query) + def test_invalid_timedelta_string(self): + with pytest.raises(AttributeError): + assert timedelta_from_dict("30s") + def test_changed_partition(self): with self.assertRaises(ValueError): ChangePlannedPartition("bob")