Skip to content

Commit

Permalink
Retention-based Partition Dropping (#44)
Browse files Browse the repository at this point in the history
* Add command: drop, to calculate partition drops based on retention periods

* Deduplicate methods that moved into database_helpers

* Add database helper tests

* Add dropper tests

* More test cleanups

* Update to PyLint 2.17.7 to fix Python11

* More tests

* pytlint needs pytest

* Add an assertion for correct ordering of partitions
  • Loading branch information
jcjones authored Feb 28, 2024
1 parent fd793fa commit b271785
Show file tree
Hide file tree
Showing 11 changed files with 677 additions and 58 deletions.
3 changes: 2 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@ jobs:
- name: Install Linting Tools
run: |
python -m pip install --upgrade pip
pip install --user pylint==2.6.0
pip install --user pylint==2.17.7
pip install --user black~=22.3
pip install --user flake8~=4.0
pip install --user pytest
- name: Install Partition Manager
run: |
Expand Down
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ repos:
hooks:
- id: flake8
- repo: https://github.com/PyCQA/pylint
rev: pylint-2.6.0
rev: v2.17.7
hooks:
- id: pylint
args:
Expand Down
69 changes: 58 additions & 11 deletions partitionmanager/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import traceback
import yaml

import partitionmanager.database_helpers
import partitionmanager.dropper
import partitionmanager.migrate
import partitionmanager.sql
import partitionmanager.stats
Expand Down Expand Up @@ -121,10 +123,10 @@ def from_yaml_file(self, file):
for key in data["tables"]:
tab = partitionmanager.types.Table(key)
tabledata = data["tables"][key]
if isinstance(tabledata, dict) and "retention" in tabledata:
tab.set_retention(
if isinstance(tabledata, dict) and "retention_period" in tabledata:
tab.set_retention_period(
partitionmanager.types.timedelta_from_dict(
tabledata["retention"]
tabledata["retention_period"]
)
)
if isinstance(tabledata, dict) and "partition_period" in tabledata:
Expand Down Expand Up @@ -318,16 +320,10 @@ def do_partition(conf):
duration = table.partition_period

log.info(f"Evaluating {table} (duration={duration})")

positions = pm_tap.get_current_positions(
conf.dbcmd, table, map_data["range_cols"]
cur_pos = partitionmanager.database_helpers.get_position_of_table(
conf.dbcmd, table, map_data
)

log.info(f"{table} (pos={positions})")

cur_pos = partitionmanager.types.Position()
cur_pos.set_position([positions[col] for col in map_data["range_cols"]])

sql_cmds = pm_tap.get_pending_sql_reorganize_partition_commands(
database=conf.dbcmd,
table=table,
Expand Down Expand Up @@ -465,6 +461,57 @@ def do_stats(conf, metrics=partitionmanager.stats.PrometheusMetrics()):
return all_results


def drop_cmd(args):
"""Calculates drop.
Helper for argparse.
"""
conf = config_from_args(args)
return do_find_drops_for_tables(conf)


DROP_PARSER = SUBPARSERS.add_parser("drop", help="drop old partitions")
DROP_PARSER.set_defaults(func=drop_cmd)


def do_find_drops_for_tables(conf):
all_results = dict()
for table in conf.tables:
log = logging.getLogger(f"do_find_drops_for_tables:{table.name}")

if not table.has_date_query:
log.warning(f"Cannot process {table}: no date query specified")
continue

if not table.retention_period:
log.warning(f"Cannot process {table}: no retention specified")
continue

try:
table_problems = pm_tap.get_table_compatibility_problems(conf.dbcmd, table)
if table_problems:
log.debug(f"Cannot process {table}: {table_problems}")
continue

map_data = pm_tap.get_partition_map(conf.dbcmd, table)
current_position = partitionmanager.database_helpers.get_position_of_table(
conf.dbcmd, table, map_data
)

droppable = partitionmanager.dropper.get_droppable_partitions(
conf.dbcmd,
map_data["partitions"],
current_position,
conf.curtime,
table,
)

all_results[table.name] = droppable
except Exception as e:
log.warning(f"Error processing table {table.name}")
raise e
return all_results


def main():
"""Start here."""
args = PARSER.parse_args()
Expand Down
60 changes: 58 additions & 2 deletions partitionmanager/cli_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
migrate_cmd,
config_from_args,
do_partition,
drop_cmd,
PARSER,
partition_cmd,
stats_cmd,
Expand Down Expand Up @@ -224,11 +225,9 @@ def test_partition_period_seven_days(self):
[
"INFO:partition:Evaluating Table partitioned_last_week "
"(duration=7 days, 0:00:00)",
"INFO:partition:Table partitioned_last_week (pos={'id': 150})",
"DEBUG:partition:Table partitioned_last_week has no pending SQL updates.",
"INFO:partition:Evaluating Table partitioned_yesterday "
"(duration=7 days, 0:00:00)",
"INFO:partition:Table partitioned_yesterday (pos={'id': 150})",
"DEBUG:partition:Table partitioned_yesterday has no pending SQL updates.",
]
),
Expand Down Expand Up @@ -626,3 +625,60 @@ def test_migrate_cmd_in_out(self):
"flip",
]
)


class TestDropCmd(unittest.TestCase):
def _run_drop_cmd_yaml(self, yaml):
with tempfile.NamedTemporaryFile() as tmpfile:
insert_into_file(tmpfile, yaml)
args = PARSER.parse_args(["--config", tmpfile.name, "drop"])
return drop_cmd(args)

def test_drop_invalid_config(self):
with self.assertLogs(
"do_find_drops_for_tables:unused", level="WARNING"
) as logctx:
self._run_drop_cmd_yaml(
f"""
partitionmanager:
mariadb: {str(fake_exec)}
tables:
unused:
earliest_utc_timestamp_query: >
SELECT UNIX_TIMESTAMP(`issued`) FROM `unused`
WHERE `id` > '?' ORDER BY `id` ASC LIMIT 1;
"""
)
self.assertEqual(
set(logctx.output),
set(
[
"WARNING:do_find_drops_for_tables:unused:"
"Cannot process Table unused: no retention specified"
]
),
)

def test_drop_no_sql(self):
with self.assertLogs(
"do_find_drops_for_tables:unused", level="WARNING"
) as logctx:
self._run_drop_cmd_yaml(
f"""
partitionmanager:
mariadb: {str(fake_exec)}
tables:
unused:
retention_period:
days: 180
"""
)
self.assertEqual(
set(logctx.output),
set(
[
"WARNING:do_find_drops_for_tables:unused:"
"Cannot process Table unused: no date query specified"
]
),
)
72 changes: 72 additions & 0 deletions partitionmanager/database_helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
"""
Helper functions for database operations
"""

from datetime import datetime, timezone
import logging

import partitionmanager.table_append_partition as pm_tap
import partitionmanager.types


def get_position_of_table(database, table, map_data):
"""Returns a Position of the table at the current moment."""

pos_list = pm_tap.get_current_positions(database, table, map_data["range_cols"])

cur_pos = partitionmanager.types.Position()
cur_pos.set_position([pos_list[col] for col in map_data["range_cols"]])

return cur_pos


def calculate_exact_timestamp_via_query(database, table, position_partition):
"""Calculates the exact timestamp of a PositionPartition.
raises ValueError if the position is incalculable
"""

log = logging.getLogger(f"calculate_exact_timestamp_via_query:{table.name}")

if not table.has_date_query:
raise ValueError("Table has no defined date query")

if not isinstance(position_partition, partitionmanager.types.PositionPartition):
raise ValueError("Only PositionPartitions are supported")

if len(position_partition.position) != 1:
raise ValueError(
"This method is only valid for single-column partitions right now"
)
arg = position_partition.position.as_sql_input()[0]

sql_select_cmd = table.earliest_utc_timestamp_query.get_statement_with_argument(arg)
log.debug(
"Executing %s to derive partition %s at position %s",
sql_select_cmd,
position_partition.name,
position_partition.position,
)

start = datetime.now()
exact_time_result = database.run(sql_select_cmd)
end = datetime.now()

if not len(exact_time_result) == 1:
raise partitionmanager.types.NoExactTimeException("No exact timestamp result")
if not len(exact_time_result[0]) == 1:
raise partitionmanager.types.NoExactTimeException(
"Unexpected column count for the timestamp result"
)
for key, value in exact_time_result[0].items():
exact_time = datetime.fromtimestamp(value, tz=timezone.utc)
break

log.debug(
"Exact time of %s returned for %s at position %s, query took %s",
exact_time,
position_partition.name,
position_partition.position,
(end - start),
)
return exact_time
109 changes: 109 additions & 0 deletions partitionmanager/database_helpers_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
import unittest

from .database_helpers import get_position_of_table, calculate_exact_timestamp_via_query

from .types import (
DatabaseCommand,
NoExactTimeException,
PositionPartition,
SqlInput,
SqlQuery,
Table,
)


class MockDatabase(DatabaseCommand):
def __init__(self):
self._responses = list()
self.num_queries = 0

def add_response(self, expected, response):
self._responses.insert(0, {"expected": expected, "response": response})

def run(self, cmd):
self.num_queries += 1
if not self._responses:
raise Exception(f"No mock responses available for cmd [{cmd}]")

r = self._responses.pop()
if r["expected"] in cmd:
return r["response"]

raise Exception(f"Received command [{cmd}] and expected [{r['expected']}]")

def db_name(self):
return SqlInput("the-database")


class TestDatabaseHelpers(unittest.TestCase):
def test_position_of_table(self):
db = MockDatabase()
db.add_response("SELECT id FROM `burgers` ORDER BY", [{"id": 90210}])

table = Table("burgers")
data = {"range_cols": ["id"]}

pos = get_position_of_table(db, table, data)
self.assertEqual(pos.as_list(), [90210])

def test_exact_timestamp_no_query(self):
db = MockDatabase()
db.add_response("SELECT id FROM `burgers` ORDER BY", [{"id": 42}])

table = Table("burgers")
self.assertFalse(table.has_date_query)

pos = PositionPartition("p_start")
pos.set_position([42])

with self.assertRaises(ValueError):
calculate_exact_timestamp_via_query(db, table, pos)

def test_exact_timestamp(self):
db = MockDatabase()
db.add_response(
"SELECT UNIX_TIMESTAMP(`cooked`)", [{"UNIX_TIMESTAMP": 17541339060}]
)

table = Table("burgers")
table.set_earliest_utc_timestamp_query(
SqlQuery(
"SELECT UNIX_TIMESTAMP(`cooked`) FROM `orders` "
"WHERE `type` = \"burger\" AND `id` > '?' ORDER BY `id` ASC LIMIT 1;"
)
)

pos = PositionPartition("p_start")
pos.set_position([150])

ts = calculate_exact_timestamp_via_query(db, table, pos)
assert f"{ts}" == "2525-11-11 18:11:00+00:00"

def test_no_exact_timestamp(self):
db = MockDatabase()
db.add_response(
"SELECT UNIX_TIMESTAMP(`cooked`)",
[{"UNIX_TIMESTAMP": 17541339060}, {"UNIX_TIMESTAMP": 17541339070}],
)

table = Table("burgers")
table.set_earliest_utc_timestamp_query(
SqlQuery(
"SELECT UNIX_TIMESTAMP(`cooked`) FROM `orders` "
"WHERE `type` = \"burger\" AND `id` > '?' ORDER BY `id` ASC LIMIT 1;"
)
)

pos = PositionPartition("p_start")
pos.set_position([150])

with self.assertRaises(NoExactTimeException):
calculate_exact_timestamp_via_query(db, table, pos)

db.add_response(
"SELECT UNIX_TIMESTAMP(`cooked`)",
[{"UNIX_TIMESTAMP": 17541339060, "column2": True}],
)

with self.assertRaises(NoExactTimeException):
calculate_exact_timestamp_via_query(db, table, pos)
Loading

0 comments on commit b271785

Please sign in to comment.