Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove database connection during query init #3128

Open
wants to merge 92 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
92 commits
Select commit Hold shift + click to select a range
ba7f5c3
Preflight checks
greenape Sep 28, 2020
7978db5
Fixup
greenape Sep 28, 2020
0076706
Run preflight in get_query
greenape Sep 28, 2020
fd1d05c
Set columns post-init where all are required
greenape Sep 28, 2020
1428e83
Need the class for mro
greenape Sep 28, 2020
370a92c
Fix some typos, and call hooks in a sensible order
greenape Sep 29, 2020
aa09330
Limit the default tables
greenape Sep 29, 2020
4adaeac
Pass column names when creating table from query
greenape Sep 29, 2020
3056640
Ensure preflight is called before getting column names in head
greenape Sep 29, 2020
3446e70
Move geom column check to preflight
greenape Sep 29, 2020
4d557d8
Protect all tables, drop events prefix
greenape Oct 2, 2020
1b4b901
Fix not running preflight before store,
greenape Oct 2, 2020
a9e0757
Syntax, fix tests
greenape Oct 2, 2020
b6c8413
Update test to refelect one less link in dep chain
greenape Oct 2, 2020
f4802a8
Disallow None for start/stop
greenape Oct 2, 2020
5a75692
Small fixups
greenape Oct 2, 2020
9e21ef2
Fix error return on preflight fail
greenape Oct 5, 2020
26c964d
Wrap geom table arg in a table
greenape Oct 5, 2020
6e556ac
Update tests
greenape Oct 5, 2020
93e54e1
Update table names in test
greenape Oct 12, 2020
3a9b2ae
Match original table order
greenape Oct 12, 2020
85be37c
Update passed table in docs
greenape Oct 12, 2020
d33ec65
Update table order in dependency graph
greenape Oct 12, 2020
0045814
Sort provided tables
greenape Oct 12, 2020
b47d344
Update query hashed in approval test
greenape Oct 12, 2020
a86895c
Fix fast forwarding state machine multiple times, add missing classes…
greenape Oct 12, 2020
f809daa
Match cache half life
greenape Oct 12, 2020
43faa71
Preflight subs table
greenape Oct 12, 2020
bc7342e
Fix dependency graph order in test
greenape Oct 12, 2020
8c7bf49
Update fm tests
greenape Oct 13, 2020
d97c76f
Fix not checking table arg type and geom table construction,
greenape Oct 13, 2020
a4e9693
Handle None table case
greenape Oct 13, 2020
7f464ff
Store zero-score cache classes in db
greenape Oct 16, 2020
07dc063
Fix table name typo
greenape Oct 16, 2020
68b2cc9
Syntax for touch_cache
greenape Oct 19, 2020
9c13057
Syntax for sort
greenape Oct 19, 2020
b72b207
Wrap in subquery
greenape Oct 19, 2020
adadf5b
Missing bracket
greenape Oct 20, 2020
aef870e
Add missing cast
greenape Oct 20, 2020
1bce844
Use subquery for get size of cache
greenape Oct 20, 2020
d6d5920
Missed a bracket
greenape Oct 20, 2020
86e6c22
Remove zero cache from internal tables in test helper
greenape Oct 20, 2020
1efdd5e
Centralise list of internal tables in helper
greenape Oct 20, 2020
c20f4a9
Don't call the database from init of RandomSystemRows
greenape Oct 22, 2020
3f8ed90
Correct import
greenape Oct 22, 2020
96ca0eb
Update CHANGELOG.md
greenape Oct 22, 2020
d9d08e7
Move geotable column checks back into init (no db required)
greenape Oct 22, 2020
c3507a4
Update CHANGELOG.md
greenape Oct 22, 2020
4baea78
Handle empty table list
greenape Nov 13, 2020
0c6f125
Add missing arg
greenape Nov 13, 2020
1b78892
Fix columns check
greenape Dec 18, 2020
c048210
Clean up table signature a bit
greenape Dec 18, 2020
8c50b3b
Fixups after rebase
greenape Oct 21, 2021
61ed2fd
Fix another rebase change
greenape Oct 21, 2021
5a6bb11
More rebase issues
greenape Oct 21, 2021
d4a2e85
Fixup geotable, fast forward even if already fast forwarded
greenape Oct 21, 2021
102dbf5
Finish then write meta
greenape Oct 21, 2021
faebea2
Supply columns for Table
greenape Nov 8, 2021
f906ed0
Fix missing date preflight error test
greenape Nov 8, 2021
e2434bc
Update approval test
greenape Nov 8, 2021
23b2728
Call preflight to raise error
greenape Nov 8, 2021
8f2d3c5
Fix some usages of event. and update exceptions, Table usages
greenape Jun 16, 2022
2584008
lint
greenape Jun 27, 2022
6888c06
Update test_query_object_construction.test_construct_query.approved.txt
greenape Jun 27, 2022
6c1cf3b
Merge branch 'master' into asyncio
greenape Oct 23, 2024
0b5cf0c
Make FlowDBTable abstract
greenape Oct 23, 2024
db6c404
Cache protected classes
greenape Oct 23, 2024
95bf78b
Correct doctsring
greenape Oct 23, 2024
8d1eb11
Fix column name typos
greenape Oct 23, 2024
0ce6a55
Remove unused imports in preflight
greenape Oct 23, 2024
dbbca00
Check for empty string as a table name
greenape Oct 23, 2024
a6bfdd8
Dedupe tables
greenape Oct 23, 2024
aece3d9
Simplify hooks iteration
greenape Oct 23, 2024
f317937
Remove a couple of unused imports
greenape Oct 23, 2024
0de3df1
Fix dodgy suggestion
greenape Oct 23, 2024
137d886
Lint
greenape Oct 23, 2024
ddd660d
Call preflight on the query object
greenape Oct 23, 2024
a57e843
Update approval tests
greenape Oct 23, 2024
8a09e6c
Need to actually depend on the nested query to force it to store first
greenape Oct 23, 2024
37de99e
Update table param
greenape Oct 23, 2024
1d77308
Add missing columns arguments
greenape Oct 23, 2024
52c0bdc
Pass up preflight errors, fix error test
greenape Oct 23, 2024
302abc7
Fix typo in example
greenape Oct 23, 2024
e1510e0
Log at error level for exception
greenape Oct 23, 2024
267fac5
Add missing mpl headers
greenape Oct 23, 2024
36b6b3e
Merge branch 'asyncio' of github.com:Flowminder/FlowKit into asyncio
greenape Oct 23, 2024
fea6c74
Delete flowmachine/tests/test_model.py
greenape Oct 23, 2024
489c94e
Correct test name
greenape Oct 23, 2024
22fd099
Add missing columns arg
greenape Oct 23, 2024
9a60732
Update default schema test
greenape Oct 24, 2024
bb04fb8
Types for tables
greenape Oct 24, 2024
127d2e9
Merge branch 'master' into asyncio
greenape Nov 5, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).

#### Added
- `inflows` and `outflows` exposed via API endpoint + added to flowclient [#2029](https://github.com/Flowminder/FlowKit/issues/2029), [#4866](https://github.com/Flowminder/FlowKit/issues/4866)
- Added a new `@pre_flight` decorator which `Query` subclasses may use to indicate a method should be run to confirm the query is runnable
- Added a new `preflight()` method to query, which calls all applicable pre-flight check methods for the query

### Changed
- __Action Needed__ Airflow updated to version 2.3.3; **backup flowetl_db before applying update** [#4940](https://github.com/Flowminder/FlowKit/pull/4940)
Expand All @@ -255,6 +257,9 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).
- FlowDB now always creates a role named `flowmachine.`
- Flowmachine will set the state of a query being stored to cancelled if interrupted while the store is running.
- Flowmachine now supports sqlalchemy >=1.4 [#5140](https://github.com/Flowminder/FlowKit/issues/5140)
- `get_cached_query_objects_ordered_by_score` is now a generator. [#3116](https://github.com/Flowminder/FlowKit/issues/3116)
- Queries should no longer require communication with the database during `__init__`, any checks that require database access must now be implemented as a method of the class and use the `@pre_flight` decorator
- When specifying tables in Flowmachine, the `events.` prefix is no longer required.

### Fixed
- Flowmachine now makes the built in `flowmachine` role owner of cache tables as a post-action when a query is `store`d. [#4714](https://github.com/Flowminder/FlowKit/issues/4714)
Expand All @@ -265,6 +270,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).
### Removed
- `use_file_flux_sensor` removed entirely. [#2812](https://github.com/Flowminder/FlowKit/issues/2812)
- `Model`, `ModelResult` and `Louvain` have been removed. [#5168](https://github.com/Flowminder/FlowKit/issues/5168)
- `Table` no longer automatically infers columns from the database, they must be specified.

## [1.16.0]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@
"data_events_query = flowmachine.features.TotalLocationEvents(\n",
" start=\"2016-01-01\",\n",
" stop=\"2016-01-08\",\n",
" table=\"events.mds\",\n",
" table=\"mds\",\n",
" spatial_unit=make_spatial_unit(\"versioned-cell\"),\n",
" interval=\"hour\",\n",
")"
Expand Down
5 changes: 4 additions & 1 deletion flowdb/bin/build/0020_schema_cache.sql
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,7 @@ CREATE TABLE IF NOT EXISTS cache.dependencies
CREATE TABLE cache.cache_config (key text, value text);
INSERT INTO cache.cache_config (key, value) VALUES ('half_life', NULL);
INSERT INTO cache.cache_config (key, value) VALUES ('cache_size', NULL);
INSERT INTO cache.cache_config (key, value) VALUES ('cache_protected_period', NULL);
INSERT INTO cache.cache_config (key, value) VALUES ('cache_protected_period', NULL);

CREATE TABLE cache.zero_cache (object_class text);
INSERT INTO cache.zero_cache (object_class) VALUES ('Table'), ('GeoTable'), ('CallsTable'), ('SmsTable'), ('MdsTable'), ('TopupsTable'), ('ForwardsTable'), ('TacsTable'), ('CellsTable'), ('SitesTable');
3 changes: 2 additions & 1 deletion flowdb/bin/build/0030_utilities.sql
Original file line number Diff line number Diff line change
Expand Up @@ -243,9 +243,10 @@ $$
DECLARE score float;
BEGIN
UPDATE cache.cached SET last_accessed = NOW(), access_count = access_count + 1,
cache_score_multiplier = CASE WHEN class='Table' THEN 0 ELSE
cache_score_multiplier = CASE WHEN class=ANY(no_score.classes) THEN 0 ELSE
cache_score_multiplier+POWER(1 + ln(2) / cache_half_life(), nextval('cache.cache_touches') - 2)
END
FROM (SELECT array_agg(object_class) as classes FROM cache.zero_cache) AS no_score
Comment on lines +246 to +249
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider optimizing the cache score calculation performance.

The subquery (SELECT array_agg(object_class) as classes FROM cache.zero_cache) will be executed on every cache touch operation. For better performance, consider:

  1. Using a materialized view that's refreshed when zero_cache changes
  2. Or replacing the correlated subquery with a JOIN

Example optimization using a JOIN:

-        cache_score_multiplier = CASE WHEN class=ANY(no_score.classes) THEN 0 ELSE
-          cache_score_multiplier+POWER(1 + ln(2) / cache_half_life(), nextval('cache.cache_touches') - 2)
-        END
-        FROM (SELECT array_agg(object_class) as classes FROM cache.zero_cache) AS no_score
+        cache_score_multiplier = CASE 
+          WHEN EXISTS (SELECT 1 FROM cache.zero_cache WHERE object_class = class) THEN 0 
+          ELSE cache_score_multiplier+POWER(1 + ln(2) / cache_half_life(), nextval('cache.cache_touches') - 2)
+        END
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
cache_score_multiplier = CASE WHEN class=ANY(no_score.classes) THEN 0 ELSE
cache_score_multiplier+POWER(1 + ln(2) / cache_half_life(), nextval('cache.cache_touches') - 2)
END
FROM (SELECT array_agg(object_class) as classes FROM cache.zero_cache) AS no_score
cache_score_multiplier = CASE
WHEN EXISTS (SELECT 1 FROM cache.zero_cache WHERE object_class = class) THEN 0
ELSE cache_score_multiplier+POWER(1 + ln(2) / cache_half_life(), nextval('cache.cache_touches') - 2)
END

WHERE query_id=cached_query_id
RETURNING cache_score(cache_score_multiplier, compute_time, greatest(table_size(tablename, schema), 0.00001)) INTO score;
IF NOT FOUND THEN RAISE EXCEPTION 'Cache record % not found', cached_query_id;
Expand Down
30 changes: 25 additions & 5 deletions flowmachine/flowmachine/core/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import sqlalchemy.engine
from contextvars import copy_context
from concurrent.futures import Executor, TimeoutError
from functools import partial
from functools import partial, lru_cache
from sqlalchemy.exc import ResourceClosedError

from typing import TYPE_CHECKING, Tuple, List, Callable, Optional
Expand All @@ -29,6 +29,7 @@
from flowmachine.core.query_state import QueryStateMachine, QueryEvent
from flowmachine import __version__


if TYPE_CHECKING:
from .query import Query
from .connection import Connection
Expand Down Expand Up @@ -191,8 +192,10 @@ def write_query_to_cache(
if this_thread_is_owner:
logger.debug(f"In charge of executing '{query.query_id}'.")
try:
query.preflight()
query_ddl_ops = ddl_ops_func(name, schema)
except Exception as exc:
q_state_machine.raise_error()
logger.error(f"Error generating SQL. Error was {exc}")
raise exc
logger.debug("Made SQL.")
Expand All @@ -204,6 +207,7 @@ def write_query_to_cache(
)
logger.debug("Executed queries.")
except Exception as exc:
q_state_machine.raise_error()
logger.error(f"Error executing SQL. Error was {exc}")
raise exc
if analyze:
Expand All @@ -219,6 +223,7 @@ def write_query_to_cache(
executed_sql=";\n".join(query_ddl_ops),
)
except Exception as exc:
q_state_machine.raise_error()
logger.error(f"Error writing cache metadata. Error was {exc}")
raise exc
q_state_machine.finish()
Expand All @@ -229,7 +234,6 @@ def write_query_to_cache(
finally:
if this_thread_is_owner and not q_state_machine.is_finished_executing:
q_state_machine.cancel()

q_state_machine.wait_until_complete(sleep_duration=sleep_duration)
if q_state_machine.is_completed:
return query
Expand Down Expand Up @@ -301,6 +305,7 @@ def write_cache_metadata(
psycopg2.Binary(self_storage),
),
)
logger.debug("Touching cache.", query_id=query.query_id, query=str(query))
connection.exec_driver_sql(
"SELECT touch_cache(%(ident)s);", dict(ident=query.query_id)
)
Expand Down Expand Up @@ -334,6 +339,7 @@ def touch_cache(connection: "Connection", query_id: str) -> float:
The new cache score
"""
try:
logger.debug("Touching cache.", query_id=query_id)
with connection.engine.begin() as trans:
return float(
trans.exec_driver_sql(f"SELECT touch_cache('{query_id}')").fetchall()[
Expand Down Expand Up @@ -481,6 +487,19 @@ def get_query_object_by_id(connection: "Connection", query_id: str) -> "Query":
raise ValueError(f"Query id '{query_id}' is not in cache on this connection.")


@lru_cache(maxsize=1)
def _get_protected_classes():
from flowmachine.core.events_table import events_table_map
from flowmachine.core.infrastructure_table import infrastructure_table_map

return [
"Table",
"GeoTable",
*[cls.__name__ for cls in events_table_map.values()],
*[cls.__name__ for cls in infrastructure_table_map.values()],
]

greenape marked this conversation as resolved.
Show resolved Hide resolved

def get_cached_query_objects_ordered_by_score(
connection: "Connection",
protected_period: Optional[int] = None,
Expand All @@ -502,14 +521,15 @@ def get_cached_query_objects_ordered_by_score(
Returns a list of cached Query objects with their on disk sizes

"""

protected_period_clause = (
(f" AND NOW()-created > INTERVAL '{protected_period} seconds'")
if protected_period is not None
else " AND NOW()-created > (cache_protected_period()*INTERVAL '1 seconds')"
)
qry = f"""SELECT query_id, table_size(tablename, schema) as table_size
FROM cache.cached
WHERE cached.class!='Table' AND cached.class!='GeoTable'
WHERE NOT (cached.class=ANY(ARRAY{_get_protected_classes()}))
{protected_period_clause}
ORDER BY cache_score(cache_score_multiplier, compute_time, table_size(tablename, schema)) ASC
"""
Expand Down Expand Up @@ -689,9 +709,9 @@ def get_size_of_cache(connection: "Connection") -> int:
Number of bytes in total used by cache tables

"""
sql = """SELECT sum(table_size(tablename, schema)) as total_bytes
sql = f"""SELECT sum(table_size(tablename, schema)) as total_bytes
FROM cache.cached
WHERE cached.class!='Table' AND cached.class!='GeoTable'"""
WHERE NOT (cached.class=ANY(ARRAY{_get_protected_classes()}))"""
cache_bytes = connection.fetch(sql)[0][0]
return 0 if cache_bytes is None else int(cache_bytes)

Expand Down
21 changes: 21 additions & 0 deletions flowmachine/flowmachine/core/errors/flowmachine_errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,27 @@
"""
Custom errors raised by flowmachine.
"""
from typing import List, Dict


class PreFlightFailedException(Exception):
"""
Exception indicating that preflight checks for a query failed.

Parameters
----------
query_id : str
Identifier of the query
errors : dict
Mapping from query reps to lists of exceptions raised in preflight
"""

def __init__(self, query_id: str, errors: Dict[str, List[Exception]]):
self.errors = errors
self.query_id = query_id
Exception.__init__(
self, f"Pre-flight failed for '{self.query_id}'. Errors: {errors}"
)


class StoreFailedException(Exception):
Expand Down
125 changes: 125 additions & 0 deletions flowmachine/flowmachine/core/events_table.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
from typing import Optional

from flowmachine.core.flowdb_table import FlowDBTable


class EventsTable(FlowDBTable):
def __init__(self, *, name, columns: Optional[list[str]] = None) -> None:
super().__init__(schema="events", name=name, columns=columns)


class CallsTable(EventsTable):
all_columns = [
"id",
"outgoing",
"datetime",
"duration",
"network",
"msisdn",
"msisdn_counterpart",
"location_id",
"imsi",
"imei",
"tac",
"operator_code",
"country_code",
]
Comment on lines +15 to +29
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Refactor duplicate column definitions.

The all_columns lists in CallsTable, ForwardsTable, and SmsTable are identical. Consider extracting these to a common base class or shared constant to follow the DRY principle.

# Create a base class for common columns
class CommonEventsTable(EventsTable):
    all_columns = [
        "id",
        "outgoing",
        "datetime",
        "network",
        "msisdn",
        "msisdn_counterpart",
        "location_id",
        "imsi",
        "imei",
        "tac",
        "operator_code",
        "country_code",
    ]

class CallsTable(CommonEventsTable):
    def __init__(self, *, columns=None):
        super().__init__(name="calls", columns=columns)

# Similar changes for ForwardsTable and SmsTable

Also applies to: 31-44, 51-64


def __init__(self, *, columns: Optional[list[str]] = None) -> None:
super().__init__(name="calls", columns=columns)


class ForwardsTable(EventsTable):
all_columns = [
"id",
"outgoing",
"datetime",
"network",
"msisdn",
"msisdn_counterpart",
"location_id",
"imsi",
"imei",
"tac",
"operator_code",
"country_code",
]

def __init__(self, *, columns: Optional[list[str]] = None) -> None:
super().__init__(name="forwards", columns=columns)


class SmsTable(EventsTable):
all_columns = [
"id",
"outgoing",
"datetime",
"network",
"msisdn",
"msisdn_counterpart",
"location_id",
"imsi",
"imei",
"tac",
"operator_code",
"country_code",
]

def __init__(self, *, columns: Optional[list[str]] = None) -> None:
super().__init__(name="sms", columns=columns)


class MdsTable(EventsTable):
all_columns = [
"id",
"datetime",
"duration",
"volume_total",
"volume_upload",
"volume_download",
"msisdn",
"location_id",
"imsi",
"imei",
"tac",
"operator_code",
"country_code",
]

def __init__(self, *, columns: Optional[list[str]] = None) -> None:
super().__init__(name="mds", columns=columns)


class TopupsTable(EventsTable):
all_columns = [
"id",
"datetime",
"type",
"recharge_amount",
"airtime_fee",
"tax_and_fee",
"pre_event_balance",
"post_event_balance",
"msisdn",
"location_id",
"imsi",
"imei",
"tac",
"operator_code",
"country_code",
]

def __init__(self, *, columns: Optional[list[str]] = None) -> None:
super().__init__(name="topups", columns=columns)


events_table_map = dict(
calls=CallsTable,
sms=SmsTable,
mds=MdsTable,
topups=TopupsTable,
forwards=ForwardsTable,
)
34 changes: 34 additions & 0 deletions flowmachine/flowmachine/core/flowdb_table.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.

from abc import ABCMeta
from typing import Optional

from flowmachine.core.table import Table


class FlowDBTable(Table, metaclass=ABCMeta):
"""
Abstract base class for fixed tables that exist in FlowDB.

Parameters
----------
name : str
schema : str
columns : list of str
"""

def __init__(self, *, name: str, schema: str, columns: Optional[list[str]]) -> None:
if columns is None:
columns = self.all_columns
if set(columns).issubset(self.all_columns):
super().__init__(schema=schema, name=name, columns=columns)
else:
raise ValueError(
f"Columns {columns} must be a subset of {self.all_columns}"
)

@property
def all_columns(self):
raise NotImplementedError

Check warning on line 34 in flowmachine/flowmachine/core/flowdb_table.py

View check run for this annotation

Codecov / codecov/patch

flowmachine/flowmachine/core/flowdb_table.py#L34

Added line #L34 was not covered by tests
Loading