Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove database connection during query init #3128

Open
wants to merge 92 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 72 commits
Commits
Show all changes
92 commits
Select commit Hold shift + click to select a range
ba7f5c3
Preflight checks
greenape Sep 28, 2020
7978db5
Fixup
greenape Sep 28, 2020
0076706
Run preflight in get_query
greenape Sep 28, 2020
fd1d05c
Set columns post-init where all are required
greenape Sep 28, 2020
1428e83
Need the class for mro
greenape Sep 28, 2020
370a92c
Fix some typos, and call hooks in a sensible order
greenape Sep 29, 2020
aa09330
Limit the default tables
greenape Sep 29, 2020
4adaeac
Pass column names when creating table from query
greenape Sep 29, 2020
3056640
Ensure preflight is called before getting column names in head
greenape Sep 29, 2020
3446e70
Move geom column check to preflight
greenape Sep 29, 2020
4d557d8
Protect all tables, drop events prefix
greenape Oct 2, 2020
1b4b901
Fix not running preflight before store,
greenape Oct 2, 2020
a9e0757
Syntax, fix tests
greenape Oct 2, 2020
b6c8413
Update test to refelect one less link in dep chain
greenape Oct 2, 2020
f4802a8
Disallow None for start/stop
greenape Oct 2, 2020
5a75692
Small fixups
greenape Oct 2, 2020
9e21ef2
Fix error return on preflight fail
greenape Oct 5, 2020
26c964d
Wrap geom table arg in a table
greenape Oct 5, 2020
6e556ac
Update tests
greenape Oct 5, 2020
93e54e1
Update table names in test
greenape Oct 12, 2020
3a9b2ae
Match original table order
greenape Oct 12, 2020
85be37c
Update passed table in docs
greenape Oct 12, 2020
d33ec65
Update table order in dependency graph
greenape Oct 12, 2020
0045814
Sort provided tables
greenape Oct 12, 2020
b47d344
Update query hashed in approval test
greenape Oct 12, 2020
a86895c
Fix fast forwarding state machine multiple times, add missing classes…
greenape Oct 12, 2020
f809daa
Match cache half life
greenape Oct 12, 2020
43faa71
Preflight subs table
greenape Oct 12, 2020
bc7342e
Fix dependency graph order in test
greenape Oct 12, 2020
8c7bf49
Update fm tests
greenape Oct 13, 2020
d97c76f
Fix not checking table arg type and geom table construction,
greenape Oct 13, 2020
a4e9693
Handle None table case
greenape Oct 13, 2020
7f464ff
Store zero-score cache classes in db
greenape Oct 16, 2020
07dc063
Fix table name typo
greenape Oct 16, 2020
68b2cc9
Syntax for touch_cache
greenape Oct 19, 2020
9c13057
Syntax for sort
greenape Oct 19, 2020
b72b207
Wrap in subquery
greenape Oct 19, 2020
adadf5b
Missing bracket
greenape Oct 20, 2020
aef870e
Add missing cast
greenape Oct 20, 2020
1bce844
Use subquery for get size of cache
greenape Oct 20, 2020
d6d5920
Missed a bracket
greenape Oct 20, 2020
86e6c22
Remove zero cache from internal tables in test helper
greenape Oct 20, 2020
1efdd5e
Centralise list of internal tables in helper
greenape Oct 20, 2020
c20f4a9
Don't call the database from init of RandomSystemRows
greenape Oct 22, 2020
3f8ed90
Correct import
greenape Oct 22, 2020
96ca0eb
Update CHANGELOG.md
greenape Oct 22, 2020
d9d08e7
Move geotable column checks back into init (no db required)
greenape Oct 22, 2020
c3507a4
Update CHANGELOG.md
greenape Oct 22, 2020
4baea78
Handle empty table list
greenape Nov 13, 2020
0c6f125
Add missing arg
greenape Nov 13, 2020
1b78892
Fix columns check
greenape Dec 18, 2020
c048210
Clean up table signature a bit
greenape Dec 18, 2020
8c50b3b
Fixups after rebase
greenape Oct 21, 2021
61ed2fd
Fix another rebase change
greenape Oct 21, 2021
5a6bb11
More rebase issues
greenape Oct 21, 2021
d4a2e85
Fixup geotable, fast forward even if already fast forwarded
greenape Oct 21, 2021
102dbf5
Finish then write meta
greenape Oct 21, 2021
faebea2
Supply columns for Table
greenape Nov 8, 2021
f906ed0
Fix missing date preflight error test
greenape Nov 8, 2021
e2434bc
Update approval test
greenape Nov 8, 2021
23b2728
Call preflight to raise error
greenape Nov 8, 2021
8f2d3c5
Fix some usages of event. and update exceptions, Table usages
greenape Jun 16, 2022
2584008
lint
greenape Jun 27, 2022
6888c06
Update test_query_object_construction.test_construct_query.approved.txt
greenape Jun 27, 2022
6c1cf3b
Merge branch 'master' into asyncio
greenape Oct 23, 2024
0b5cf0c
Make FlowDBTable abstract
greenape Oct 23, 2024
db6c404
Cache protected classes
greenape Oct 23, 2024
95bf78b
Correct doctsring
greenape Oct 23, 2024
8d1eb11
Fix column name typos
greenape Oct 23, 2024
0ce6a55
Remove unused imports in preflight
greenape Oct 23, 2024
dbbca00
Check for empty string as a table name
greenape Oct 23, 2024
a6bfdd8
Dedupe tables
greenape Oct 23, 2024
aece3d9
Simplify hooks iteration
greenape Oct 23, 2024
f317937
Remove a couple of unused imports
greenape Oct 23, 2024
0de3df1
Fix dodgy suggestion
greenape Oct 23, 2024
137d886
Lint
greenape Oct 23, 2024
ddd660d
Call preflight on the query object
greenape Oct 23, 2024
a57e843
Update approval tests
greenape Oct 23, 2024
8a09e6c
Need to actually depend on the nested query to force it to store first
greenape Oct 23, 2024
37de99e
Update table param
greenape Oct 23, 2024
1d77308
Add missing columns arguments
greenape Oct 23, 2024
52c0bdc
Pass up preflight errors, fix error test
greenape Oct 23, 2024
302abc7
Fix typo in example
greenape Oct 23, 2024
e1510e0
Log at error level for exception
greenape Oct 23, 2024
267fac5
Add missing mpl headers
greenape Oct 23, 2024
36b6b3e
Merge branch 'asyncio' of github.com:Flowminder/FlowKit into asyncio
greenape Oct 23, 2024
fea6c74
Delete flowmachine/tests/test_model.py
greenape Oct 23, 2024
489c94e
Correct test name
greenape Oct 23, 2024
22fd099
Add missing columns arg
greenape Oct 23, 2024
9a60732
Update default schema test
greenape Oct 24, 2024
bb04fb8
Types for tables
greenape Oct 24, 2024
127d2e9
Merge branch 'master' into asyncio
greenape Nov 5, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).

#### Added
- `inflows` and `outflows` exposed via API endpoint + added to flowclient [#2029](https://github.com/Flowminder/FlowKit/issues/2029), [#4866](https://github.com/Flowminder/FlowKit/issues/4866)
- Added a new `@pre_flight` decorator which `Query` subclasses may use to indicate a method should be run to confirm the query is runnable
- Added a new `preflight()` method to query, which calls all applicable pre-flight check methods for the query

### Changed
- __Action Needed__ Airflow updated to version 2.3.3; **backup flowetl_db before applying update** [#4940](https://github.com/Flowminder/FlowKit/pull/4940)
Expand All @@ -255,6 +257,9 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).
- FlowDB now always creates a role named `flowmachine.`
- Flowmachine will set the state of a query being stored to cancelled if interrupted while the store is running.
- Flowmachine now supports sqlalchemy >=1.4 [#5140](https://github.com/Flowminder/FlowKit/issues/5140)
- `get_cached_query_objects_ordered_by_score` is now a generator. [#3116](https://github.com/Flowminder/FlowKit/issues/3116)
- Queries should no longer require communication with the database during `__init__`, any checks that require database access must now be implemented as a method of the class and use the `@pre_flight` decorator
- When specifying tables in Flowmachine, the `events.` prefix is no longer required.

### Fixed
- Flowmachine now makes the built in `flowmachine` role owner of cache tables as a post-action when a query is `store`d. [#4714](https://github.com/Flowminder/FlowKit/issues/4714)
Expand All @@ -265,6 +270,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).
### Removed
- `use_file_flux_sensor` removed entirely. [#2812](https://github.com/Flowminder/FlowKit/issues/2812)
- `Model`, `ModelResult` and `Louvain` have been removed. [#5168](https://github.com/Flowminder/FlowKit/issues/5168)
- `Table` no longer automatically infers columns from the database, they must be specified.

## [1.16.0]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@
"data_events_query = flowmachine.features.TotalLocationEvents(\n",
" start=\"2016-01-01\",\n",
" stop=\"2016-01-08\",\n",
" table=\"events.mds\",\n",
" table=\"mds\",\n",
" spatial_unit=make_spatial_unit(\"versioned-cell\"),\n",
" interval=\"hour\",\n",
")"
Expand Down
5 changes: 4 additions & 1 deletion flowdb/bin/build/0020_schema_cache.sql
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,7 @@ CREATE TABLE IF NOT EXISTS cache.dependencies
CREATE TABLE cache.cache_config (key text, value text);
INSERT INTO cache.cache_config (key, value) VALUES ('half_life', NULL);
INSERT INTO cache.cache_config (key, value) VALUES ('cache_size', NULL);
INSERT INTO cache.cache_config (key, value) VALUES ('cache_protected_period', NULL);
INSERT INTO cache.cache_config (key, value) VALUES ('cache_protected_period', NULL);

CREATE TABLE cache.zero_cache (object_class text);
INSERT INTO cache.zero_cache (object_class) VALUES ('Table'), ('GeoTable'), ('CallsTable'), ('SmsTable'), ('MdsTable'), ('TopupsTable'), ('ForwardsTable'), ('TacsTable'), ('CellsTable'), ('SitesTable');
3 changes: 2 additions & 1 deletion flowdb/bin/build/0030_utilities.sql
Original file line number Diff line number Diff line change
Expand Up @@ -243,9 +243,10 @@ $$
DECLARE score float;
BEGIN
UPDATE cache.cached SET last_accessed = NOW(), access_count = access_count + 1,
cache_score_multiplier = CASE WHEN class='Table' THEN 0 ELSE
cache_score_multiplier = CASE WHEN class=ANY(no_score.classes) THEN 0 ELSE
cache_score_multiplier+POWER(1 + ln(2) / cache_half_life(), nextval('cache.cache_touches') - 2)
END
FROM (SELECT array_agg(object_class) as classes FROM cache.zero_cache) AS no_score
Comment on lines +246 to +249
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider optimizing the cache score calculation performance.

The subquery (SELECT array_agg(object_class) as classes FROM cache.zero_cache) will be executed on every cache touch operation. For better performance, consider:

  1. Using a materialized view that's refreshed when zero_cache changes
  2. Or replacing the correlated subquery with a JOIN

Example optimization using a JOIN:

-        cache_score_multiplier = CASE WHEN class=ANY(no_score.classes) THEN 0 ELSE
-          cache_score_multiplier+POWER(1 + ln(2) / cache_half_life(), nextval('cache.cache_touches') - 2)
-        END
-        FROM (SELECT array_agg(object_class) as classes FROM cache.zero_cache) AS no_score
+        cache_score_multiplier = CASE 
+          WHEN EXISTS (SELECT 1 FROM cache.zero_cache WHERE object_class = class) THEN 0 
+          ELSE cache_score_multiplier+POWER(1 + ln(2) / cache_half_life(), nextval('cache.cache_touches') - 2)
+        END
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
cache_score_multiplier = CASE WHEN class=ANY(no_score.classes) THEN 0 ELSE
cache_score_multiplier+POWER(1 + ln(2) / cache_half_life(), nextval('cache.cache_touches') - 2)
END
FROM (SELECT array_agg(object_class) as classes FROM cache.zero_cache) AS no_score
cache_score_multiplier = CASE
WHEN EXISTS (SELECT 1 FROM cache.zero_cache WHERE object_class = class) THEN 0
ELSE cache_score_multiplier+POWER(1 + ln(2) / cache_half_life(), nextval('cache.cache_touches') - 2)
END

WHERE query_id=cached_query_id
RETURNING cache_score(cache_score_multiplier, compute_time, greatest(table_size(tablename, schema), 0.00001)) INTO score;
IF NOT FOUND THEN RAISE EXCEPTION 'Cache record % not found', cached_query_id;
Expand Down
30 changes: 25 additions & 5 deletions flowmachine/flowmachine/core/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import sqlalchemy.engine
from contextvars import copy_context
from concurrent.futures import Executor, TimeoutError
from functools import partial
from functools import partial, lru_cache
from sqlalchemy.exc import ResourceClosedError

from typing import TYPE_CHECKING, Tuple, List, Callable, Optional
Expand All @@ -29,6 +29,7 @@
from flowmachine.core.query_state import QueryStateMachine, QueryEvent
from flowmachine import __version__


if TYPE_CHECKING:
from .query import Query
from .connection import Connection
Expand Down Expand Up @@ -191,8 +192,10 @@ def write_query_to_cache(
if this_thread_is_owner:
logger.debug(f"In charge of executing '{query.query_id}'.")
try:
query.preflight()
query_ddl_ops = ddl_ops_func(name, schema)
except Exception as exc:
q_state_machine.raise_error()
logger.error(f"Error generating SQL. Error was {exc}")
raise exc
logger.debug("Made SQL.")
Expand All @@ -204,6 +207,7 @@ def write_query_to_cache(
)
logger.debug("Executed queries.")
except Exception as exc:
q_state_machine.raise_error()
logger.error(f"Error executing SQL. Error was {exc}")
raise exc
if analyze:
Expand All @@ -219,6 +223,7 @@ def write_query_to_cache(
executed_sql=";\n".join(query_ddl_ops),
)
except Exception as exc:
q_state_machine.raise_error()
logger.error(f"Error writing cache metadata. Error was {exc}")
raise exc
q_state_machine.finish()
Expand All @@ -229,7 +234,6 @@ def write_query_to_cache(
finally:
if this_thread_is_owner and not q_state_machine.is_finished_executing:
q_state_machine.cancel()

q_state_machine.wait_until_complete(sleep_duration=sleep_duration)
if q_state_machine.is_completed:
return query
Expand Down Expand Up @@ -301,6 +305,7 @@ def write_cache_metadata(
psycopg2.Binary(self_storage),
),
)
logger.debug("Touching cache.", query_id=query.query_id, query=str(query))
connection.exec_driver_sql(
"SELECT touch_cache(%(ident)s);", dict(ident=query.query_id)
)
Expand Down Expand Up @@ -334,6 +339,7 @@ def touch_cache(connection: "Connection", query_id: str) -> float:
The new cache score
"""
try:
logger.debug("Touching cache.", query_id=query_id)
with connection.engine.begin() as trans:
return float(
trans.exec_driver_sql(f"SELECT touch_cache('{query_id}')").fetchall()[
Expand Down Expand Up @@ -481,6 +487,19 @@ def get_query_object_by_id(connection: "Connection", query_id: str) -> "Query":
raise ValueError(f"Query id '{query_id}' is not in cache on this connection.")


@lru_cache(maxsize=1)
def _get_protected_classes():
from flowmachine.core.events_table import events_table_map
from flowmachine.core.infrastructure_table import infrastructure_table_map

return [
"Table",
"GeoTable",
*[cls.__name__ for cls in events_table_map.values()],
*[cls.__name__ for cls in infrastructure_table_map.values()],
]

greenape marked this conversation as resolved.
Show resolved Hide resolved

def get_cached_query_objects_ordered_by_score(
connection: "Connection",
protected_period: Optional[int] = None,
Expand All @@ -502,14 +521,15 @@ def get_cached_query_objects_ordered_by_score(
Returns a list of cached Query objects with their on disk sizes

"""

protected_period_clause = (
(f" AND NOW()-created > INTERVAL '{protected_period} seconds'")
if protected_period is not None
else " AND NOW()-created > (cache_protected_period()*INTERVAL '1 seconds')"
)
qry = f"""SELECT query_id, table_size(tablename, schema) as table_size
FROM cache.cached
WHERE cached.class!='Table' AND cached.class!='GeoTable'
WHERE NOT (cached.class=ANY(ARRAY{_get_protected_classes()}))
{protected_period_clause}
ORDER BY cache_score(cache_score_multiplier, compute_time, table_size(tablename, schema)) ASC
"""
Expand Down Expand Up @@ -689,9 +709,9 @@ def get_size_of_cache(connection: "Connection") -> int:
Number of bytes in total used by cache tables

"""
sql = """SELECT sum(table_size(tablename, schema)) as total_bytes
sql = f"""SELECT sum(table_size(tablename, schema)) as total_bytes
FROM cache.cached
WHERE cached.class!='Table' AND cached.class!='GeoTable'"""
WHERE NOT (cached.class=ANY(ARRAY{_get_protected_classes()}))"""
cache_bytes = connection.fetch(sql)[0][0]
return 0 if cache_bytes is None else int(cache_bytes)

Expand Down
21 changes: 21 additions & 0 deletions flowmachine/flowmachine/core/errors/flowmachine_errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,27 @@
"""
Custom errors raised by flowmachine.
"""
from typing import List, Dict


class PreFlightFailedException(Exception):
"""
Exception indicating that preflight checks for a query failed.

Parameters
----------
query_id : str
Identifier of the query
errors : dict
Mapping from query reps to lists of exceptions raised in preflight
"""

def __init__(self, query_id: str, errors: Dict[str, List[Exception]]):
self.errors = errors
self.query_id = query_id
Exception.__init__(
self, f"Pre-flight failed for '{self.query_id}'. Errors: {errors}"
)


class StoreFailedException(Exception):
Expand Down
120 changes: 120 additions & 0 deletions flowmachine/flowmachine/core/events_table.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
from flowmachine.core.flowdb_table import FlowDBTable


class EventsTable(FlowDBTable):
def __init__(self, *, name, columns):
super().__init__(schema="events", name=name, columns=columns)


class CallsTable(EventsTable):
all_columns = [
"id",
"outgoing",
"datetime",
"duration",
"network",
"msisdn",
"msisdn_counterpart",
"location_id",
"imsi",
"imei",
"tac",
"operator_code",
"country_code",
]
Comment on lines +15 to +29
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Refactor duplicate column definitions.

The all_columns lists in CallsTable, ForwardsTable, and SmsTable are identical. Consider extracting these to a common base class or shared constant to follow the DRY principle.

# Create a base class for common columns
class CommonEventsTable(EventsTable):
    all_columns = [
        "id",
        "outgoing",
        "datetime",
        "network",
        "msisdn",
        "msisdn_counterpart",
        "location_id",
        "imsi",
        "imei",
        "tac",
        "operator_code",
        "country_code",
    ]

class CallsTable(CommonEventsTable):
    def __init__(self, *, columns=None):
        super().__init__(name="calls", columns=columns)

# Similar changes for ForwardsTable and SmsTable

Also applies to: 31-44, 51-64


def __init__(self, *, columns=None):
super().__init__(name="calls", columns=columns)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Standardise the constructor parameter requirements.

The columns parameter is inconsistently handled across table classes:

  • Optional in CallsTable and ForwardsTable
  • Required in SmsTable, MdsTable, and TopupsTable

This inconsistency could lead to confusion and maintenance issues.

Consider standardising the constructor signatures. If columns should be optional, apply this pattern:

-    def __init__(self, *, columns):
+    def __init__(self, *, columns=None):

Also applies to: 50-51, 70-71, 91-92, 114-115



class ForwardsTable(EventsTable):
all_columns = [
"id",
"outgoing",
"datetime",
"network",
"msisdn",
"msisdn_counterpart",
"location_id",
"imsi",
"imei",
"tac",
"operator_code",
"country_code",
]

def __init__(self, *, columns=None):
super().__init__(name="forwards", columns=columns)


class SmsTable(EventsTable):
all_columns = [
"id",
"outgoing",
"datetime",
"network",
"msisdn",
"msisdn_counterpart",
"location_id",
"imsi",
"imei",
"tac",
"operator_code",
"country_code",
]

def __init__(self, *, columns):
super().__init__(name="sms", columns=columns)


class MdsTable(EventsTable):
all_columns = [
"id",
"datetime",
"duration",
"volume_total",
"volume_upload",
"volume_download",
"msisdn",
"location_id",
"imsi",
"imei",
"tac",
"operator_code",
"country_code",
]

def __init__(self, *, columns):
super().__init__(name="mds", columns=columns)


class TopupsTable(EventsTable):
all_columns = [
"id",
"datetime",
"type",
"recharge_amount",
"airtime_fee",
"tax_and_fee",
"pre_event_balance",
"post_event_balance",
"msisdn",
"location_id",
"imsi",
"imei",
"tac",
"operator_code",
"country_code",
]

def __init__(self, *, columns):
super().__init__(name="topups", columns=columns)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider using composition to manage column definitions.

The current implementation has significant duplication in column definitions. Consider restructuring using composition to improve maintainability:

class ColumnSets:
    COMMON = [
        "id",
        "datetime",
        "location_id",
        "msisdn",
    ]
    
    DEVICE = [
        "imsi",
        "imei",
        "tac",
    ]
    
    NETWORK = [
        "operator_code",
        "country_code",
    ]
    
    INTERACTION = [
        "msisdn_counterpart",
        "network",
        "outgoing",
    ]

class CallsTable(EventsTable):
    all_columns = [
        *ColumnSets.COMMON,
        *ColumnSets.DEVICE,
        *ColumnSets.NETWORK,
        *ColumnSets.INTERACTION,
        "duration",
    ]

This approach would:

  • Make column groups more maintainable
  • Reduce duplication
  • Make it easier to ensure consistency across tables
  • Facilitate adding new tables with similar column patterns



events_table_map = dict(
calls=CallsTable,
sms=SmsTable,
mds=MdsTable,
topups=TopupsTable,
forwards=ForwardsTable,
)
20 changes: 20 additions & 0 deletions flowmachine/flowmachine/core/flowdb_table.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
from abc import ABCMeta

from flowmachine.core.table import Table



class FlowDBTable(Table, metaclass=ABCMeta):
def __init__(self, *, name, schema, columns):
if columns is None:
columns = self.all_columns
if set(columns).issubset(self.all_columns):
super().__init__(schema=schema, name=name, columns=columns)
else:
raise ValueError(
f"Columns {columns} must be a subset of {self.all_columns}"
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Several improvements needed in constructor implementation.

The constructor has the following issues:

  1. self.all_columns is accessed before it's defined
  2. Missing type hints and parameter documentation
  3. Missing validation for name and schema parameters
  4. Potential AttributeError if all_columns is not defined in a subclass

Consider applying these improvements:

-    def __init__(self, *, name, schema, columns):
+    def __init__(self, *, name: str, schema: str, columns: list[str] | None = None) -> None:
+        """
+        Initialize a FlowDB table with specified name, schema, and columns.
+
+        Args:
+            name: The name of the table
+            schema: The database schema containing the table
+            columns: List of column names. If None, uses all available columns
+
+        Raises:
+            ValueError: If provided columns are not valid or if name/schema are empty
+            AttributeError: If all_columns is not defined in the subclass
+        """
+        if not name or not schema:
+            raise ValueError("Both name and schema must be non-empty strings")
+
+        if not hasattr(self, 'all_columns'):
+            raise AttributeError("Subclass must define all_columns")
+
         if columns is None:
             columns = self.all_columns
         if set(columns).issubset(self.all_columns):
             super().__init__(schema=schema, name=name, columns=columns)
         else:
             raise ValueError(
-                f"Columns {columns} must be a subset of {self.all_columns}"
+                f"Invalid columns: {set(columns) - set(self.all_columns)}. "
+                f"Must be a subset of: {self.all_columns}"
             )
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def __init__(self, *, name, schema, columns):
if columns is None:
columns = self.all_columns
if set(columns).issubset(self.all_columns):
super().__init__(schema=schema, name=name, columns=columns)
else:
raise ValueError(
f"Columns {columns} must be a subset of {self.all_columns}"
)
def __init__(self, *, name: str, schema: str, columns: list[str] | None = None) -> None:
"""
Initialize a FlowDB table with specified name, schema, and columns.
Args:
name: The name of the table
schema: The database schema containing the table
columns: List of column names. If None, uses all available columns
Raises:
ValueError: If provided columns are not valid or if name/schema are empty
AttributeError: If all_columns is not defined in the subclass
"""
if not name or not schema:
raise ValueError("Both name and schema must be non-empty strings")
if not hasattr(self, 'all_columns'):
raise AttributeError("Subclass must define all_columns")
if columns is None:
columns = self.all_columns
if set(columns).issubset(self.all_columns):
super().__init__(schema=schema, name=name, columns=columns)
else:
raise ValueError(
f"Invalid columns: {set(columns) - set(self.all_columns)}. "
f"Must be a subset of: {self.all_columns}"
)


@property
def all_columns(self):
raise NotImplementedError
19 changes: 13 additions & 6 deletions flowmachine/flowmachine/core/geotable.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
"""
Simple utility class that represents tables with geometry.
"""
from typing import Optional, List

from . import Table
from .mixins import GeoDataMixin
Expand Down Expand Up @@ -47,19 +48,25 @@ class GeoTable(GeoDataMixin, Table):
"""

def __init__(
self, name=None, schema=None, columns=None, geom_column="geom", gid_column=None
self,
name: str,
*,
schema: Optional[str] = None,
columns: List[str],
geom_column: str = "geom",
gid_column: Optional[str] = None,
):
self.geom_column = geom_column
self.gid_column = gid_column
super().__init__(name=name, schema=schema, columns=columns)
if geom_column not in self.column_names:
if self.geom_column not in columns:
raise ValueError(
"geom_column: {} is not a column in this table.".format(geom_column)
f"geom_column: {self.geom_column} is not a column in this table."
)
if gid_column is not None and gid_column not in self.column_names:
if self.gid_column is not None and self.gid_column not in columns:
raise ValueError(
"gid_column: {} is not a column in this table.".format(gid_column)
f"gid_column: {self.gid_column} is not a column in this table."
)
super().__init__(name=name, schema=schema, columns=columns)
Comment on lines +51 to +69
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

Based on the comprehensive results from our verification scripts, I can now provide a final response:

Required columns parameter change is safe to proceed

The codebase analysis shows that:

  • All existing GeoTable instantiations in tests have already been updated to include the required columns parameter
  • GeoTable is only imported in the test file and core init file
  • No classes inherit from GeoTable
  • The example in the docstring needs to be updated to include the required columns parameter

The change to make columns a required parameter is safe, but the docstring example needs to be updated to match the new signature:

  • flowmachine/flowmachine/core/geotable.py: Update the example from >>> t = GeoTable(name="admin3", schema="geography") to include the required columns parameter
🔗 Analysis chain

Verify impact of required columns parameter

The change to make columns required aligns with removing database inference, but we should verify all existing GeoTable instantiations have been updated.


Let me generate additional scripts to get a more comprehensive view of the codebase.

The previous results showed test cases that have been updated with the required columns parameter, but we need to verify all actual usage in the codebase. These new scripts will help us get a complete picture of GeoTable usage and inheritance patterns.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for GeoTable instantiations that might need updating
rg "GeoTable\(" --type py -B 2 -A 2

# Look for any existing tests to ensure they've been updated
rg "test.*GeoTable" --type py

Length of output: 2844


Script:

#!/bin/bash
# Search for GeoTable instantiations in the entire codebase
ast-grep --pattern 'GeoTable($$$)'

# Search for any imports of GeoTable to find all files that might use it
rg "from.*import.*GeoTable" --type py

# Look for any class that might inherit from GeoTable
ast-grep --pattern 'class $_ ($$$GeoTable$$$)'

Length of output: 7209


def _geo_augmented_query(self):
if self.gid_column is None:
Expand Down
Loading