Skip to content

Commit

Permalink
Merge pull request #1289 from Flowminder/md5-to-query_id
Browse files Browse the repository at this point in the history
Rename .md5 .query_id
  • Loading branch information
mergify[bot] authored Sep 19, 2019
2 parents bdb7689 + 14b9161 commit b573fd0
Show file tree
Hide file tree
Showing 27 changed files with 183 additions and 179 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).
- Flowmachine's default random sampling method is now `random_ids` rather than the non-reproducible `system_rows`. [#1263](https://github.com/Flowminder/FlowKit/issues/1263)
- `IntereventPeriod` now returns stats over the gap between events in fractional time units, instead of time intervals. [#1265](https://github.com/Flowminder/FlowKit/issues/1265)
- In the FlowETL deployment example, the external ingestion database is now set up separately from the FlowKit components and connected to FlowDB via a docker overlay network. [#1276](https://github.com/Flowminder/FlowKit/issues/1276)
- The `md5` attribute of the `Query` class has been renamed to `query_id` [#1288](https://github.com/Flowminder/FlowKit/issues/1288).


### Fixed
- Quickstart will no longer fail if it has been run previously with a different FlowDB data size and not explicitly shut down. [#900](https://github.com/Flowminder/FlowKit/issues/900)
Expand Down
44 changes: 23 additions & 21 deletions flowmachine/flowmachine/core/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,11 @@ def write_query_to_cache(
This is a _blocking function_, and will not return until the query is no longer in an executing state.
"""
logger.debug(f"Trying to switch '{query.md5}' to executing state.")
q_state_machine = QueryStateMachine(redis, query.md5)
logger.debug(f"Trying to switch '{query.query_id}' to executing state.")
q_state_machine = QueryStateMachine(redis, query.query_id)
current_state, this_thread_is_owner = q_state_machine.execute()
if this_thread_is_owner:
logger.debug(f"In charge of executing '{query.md5}'.")
logger.debug(f"In charge of executing '{query.query_id}'.")
try:
query_ddl_ops = ddl_ops_func(name, schema)
except Exception as exc:
Expand Down Expand Up @@ -128,16 +128,16 @@ def write_query_to_cache(
if q_state_machine.is_completed:
return query
elif q_state_machine.is_cancelled:
logger.error(f"Query '{query.md5}' was cancelled.")
raise QueryCancelledException(query.md5)
logger.error(f"Query '{query.query_id}' was cancelled.")
raise QueryCancelledException(query.query_id)
elif q_state_machine.is_errored:
logger.error(f"Query '{query.md5}' finished with an error.")
raise QueryErroredException(query.md5)
logger.error(f"Query '{query.query_id}' finished with an error.")
raise QueryErroredException(query.query_id)
else:
logger.error(
f"Query '{query.md5}' not stored. State is {q_state_machine.current_query_state}"
f"Query '{query.query_id}' not stored. State is {q_state_machine.current_query_state}"
)
raise StoreFailedException(query.md5)
raise StoreFailedException(query.query_id)


def write_cache_metadata(
Expand Down Expand Up @@ -165,7 +165,9 @@ def write_cache_metadata(

try:
in_cache = bool(
connection.fetch(f"SELECT * FROM cache.cached WHERE query_id='{query.md5}'")
connection.fetch(
f"SELECT * FROM cache.cached WHERE query_id='{query.query_id}'"
)
)
if not in_cache:
try:
Expand All @@ -184,7 +186,7 @@ def write_cache_metadata(
con.execute(
cache_record_insert,
(
query.md5,
query.query_id,
__version__,
query._make_query(),
compute_time,
Expand All @@ -193,13 +195,13 @@ def write_cache_metadata(
psycopg2.Binary(self_storage),
),
)
con.execute("SELECT touch_cache(%s);", query.md5)
con.execute("SELECT touch_cache(%s);", query.query_id)

if not in_cache:
for dep in query._get_stored_dependencies(exclude_self=True):
con.execute(
"INSERT INTO cache.dependencies values (%s, %s) ON CONFLICT DO NOTHING",
(query.md5, dep.md5),
(query.query_id, dep.query_id),
)
logger.debug(f"{query.fully_qualified_table_name} added to cache.")
else:
Expand All @@ -216,7 +218,7 @@ def touch_cache(connection: "Connection", query_id: str) -> float:
----------
connection : Connection
query_id : str
md5 id of the query to touch
Unique id of the query to touch
Returns
-------
Expand Down Expand Up @@ -321,7 +323,7 @@ def invalidate_cache_by_id(
----------
connection : Connection
query_id : str
md5 id of the query
Unique id of the query
cascade : bool, default False
Set to true to remove any queries that depend on the one being removed
Expand All @@ -344,7 +346,7 @@ def get_query_object_by_id(connection: "Connection", query_id: str) -> "Query":
----------
connection : Connection
query_id : str
md5 id of the query
Unique id of the query
Returns
-------
Expand Down Expand Up @@ -404,7 +406,7 @@ def shrink_one(connection: "Connection", dry_run: bool = False) -> "Query":
obj_to_remove, obj_size = get_cached_query_objects_ordered_by_score(connection)[0]

logger.info(
f"{'Would' if dry_run else 'Will'} remove cache record for {obj_to_remove.md5} of type {obj_to_remove.__class__}"
f"{'Would' if dry_run else 'Will'} remove cache record for {obj_to_remove.query_id} of type {obj_to_remove.__class__}"
)
logger.info(
f"Table {obj_to_remove.fully_qualified_table_name} ({obj_size} bytes) {'would' if dry_run else 'will'} be removed."
Expand Down Expand Up @@ -446,7 +448,7 @@ def shrink_below_size(
def dry_run_shrink(connection):
obj, obj_size = cached_queries.__next__()
logger.info(
f"Would remove cache record for {obj.md5} of type {obj.__class__}"
f"Would remove cache record for {obj.query_id} of type {obj.__class__}"
)
logger.info(
f"Table {obj.fully_qualified_table_name} ({obj_size} bytes) would be removed."
Expand Down Expand Up @@ -602,7 +604,7 @@ def get_compute_time(connection: "Connection", query_id: str) -> float:
----------
connection : "Connection"
query_id : str
md5 identifier of the query
Unique id of the query
Returns
-------
Expand All @@ -629,7 +631,7 @@ def get_score(connection: "Connection", query_id: str) -> float:
----------
connection: "Connection"
query_id : str
md5 id of the cached query
Unique id of the cached query
Returns
-------
Expand Down Expand Up @@ -657,7 +659,7 @@ def cache_table_exists(connection: "Connection", query_id: str) -> bool:
----------
connection: "Connection"
query_id : str
md5 id of the cached query
Unique id of the cached query
Returns
-------
Expand Down
8 changes: 4 additions & 4 deletions flowmachine/flowmachine/core/dummy_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ def __init__(self, dummy_param):
self.dummy_param = dummy_param

@property
def md5(self):
# Prefix the usual md5 hash with 'dummy_query' to make it obvious
def query_id(self):
# Prefix the usual query_id hash with 'dummy_query' to make it obvious
# that this is not a regular query.
md5_hash = super().md5
md5_hash = super().query_id
return f"dummy_query_{md5_hash}"

def _make_query(self):
Expand All @@ -32,7 +32,7 @@ def store(self):
logger.debug(
"Storing dummy query by marking the query state as 'finished' (but without actually writing to the database)."
)
q_state_machine = QueryStateMachine(self.redis, self.md5)
q_state_machine = QueryStateMachine(self.redis, self.query_id)
q_state_machine.enqueue()
q_state_machine.execute()
q_state_machine.finish()
6 changes: 3 additions & 3 deletions flowmachine/flowmachine/core/model_result.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,14 +156,14 @@ def to_sql(self, name: str, schema: Union[str, None] = None) -> Future:

def write_model_result(query_ddl_ops: List[str], connection: Engine) -> float:
self._df.to_sql(name, connection, schema=schema, index=False)
QueryStateMachine(self.redis, self.md5).finish()
QueryStateMachine(self.redis, self.query_id).finish()
return self._runtime

current_state, changed_to_queue = QueryStateMachine(
self.redis, self.md5
self.redis, self.query_id
).enqueue()
logger.debug(
f"Attempted to enqueue query '{self.md5}', query state is now {current_state} and change happened {'here and now' if changed_to_queue else 'elsewhere'}."
f"Attempted to enqueue query '{self.query_id}', query state is now {current_state} and change happened {'here and now' if changed_to_queue else 'elsewhere'}."
)
# name, redis, query, connection, ddl_ops_func, write_func, schema = None, sleep_duration = 1
store_future = self.thread_pool_executor.submit(
Expand Down
36 changes: 17 additions & 19 deletions flowmachine/flowmachine/core/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,20 +62,20 @@ class Query(metaclass=ABCMeta):
_QueryPool = weakref.WeakValueDictionary()

def __init__(self, cache=True):
obj = Query._QueryPool.get(self.md5)
obj = Query._QueryPool.get(self.query_id)
if obj is None:
try:
self.connection
except AttributeError:
raise NotConnectedError()

self._cache = cache
Query._QueryPool[self.md5] = self
Query._QueryPool[self.query_id] = self
else:
self.__dict__ = obj.__dict__

@property
def md5(self):
def query_id(self):
"""
Generate a uniquely identifying hash of this query,
based on the parameters of it and the subqueries it is
Expand All @@ -84,21 +84,21 @@ def md5(self):
Returns
-------
str
md5 hash string
query_id hash string
"""
try:
return self._md5
except:
dependencies = self.dependencies
state = self.__getstate__()
hashes = sorted([x.md5 for x in dependencies])
hashes = sorted([x.query_id for x in dependencies])
for key, item in sorted(state.items()):
if isinstance(item, Query) and item in dependencies:
# this item is already included in `hashes`
continue
elif isinstance(item, list) or isinstance(item, tuple):
item = sorted(
item, key=lambda x: x.md5 if isinstance(x, Query) else x
item, key=lambda x: x.query_id if isinstance(x, Query) else x
)
elif isinstance(item, dict):
item = json.dumps(item, sort_keys=True, default=str)
Expand All @@ -112,8 +112,6 @@ def md5(self):
self._md5 = md5(str(hashes).encode()).hexdigest()
return self._md5

query_id = md5 # alias which is more meaningful to users than 'md5'

@abstractmethod
def _make_query(self):

Expand Down Expand Up @@ -230,7 +228,7 @@ def query_state(self) -> "QueryState":
flowmachine.core.query_state.QueryState
The current query state
"""
state_machine = QueryStateMachine(self.redis, self.md5)
state_machine = QueryStateMachine(self.redis, self.query_id)
return state_machine.current_query_state

@property
Expand Down Expand Up @@ -260,13 +258,13 @@ def get_query(self):
try:
table_name = self.fully_qualified_table_name
schema, name = table_name.split(".")
state_machine = QueryStateMachine(self.redis, self.md5)
state_machine = QueryStateMachine(self.redis, self.query_id)
state_machine.wait_until_complete()
if state_machine.is_completed and self.connection.has_table(
schema=schema, name=name
):
try:
touch_cache(self.connection, self.md5)
touch_cache(self.connection, self.query_id)
except ValueError:
pass # Cache record not written yet, which can happen for Models
# which will call through to this method from their `_make_query` method while writing metadata.
Expand Down Expand Up @@ -589,10 +587,10 @@ def write_query(query_ddl_ops: List[str], connection: Engine) -> float:
return plan_time

current_state, changed_to_queue = QueryStateMachine(
self.redis, self.md5
self.redis, self.query_id
).enqueue()
logger.debug(
f"Attempted to enqueue query '{self.md5}', query state is now {current_state} and change happened {'here and now' if changed_to_queue else 'elsewhere'}."
f"Attempted to enqueue query '{self.query_id}', query state is now {current_state} and change happened {'here and now' if changed_to_queue else 'elsewhere'}."
)
# name, redis, query, connection, ddl_ops_func, write_func, schema = None, sleep_duration = 1
store_future = self.thread_pool_executor.submit(
Expand Down Expand Up @@ -667,7 +665,7 @@ def table_name(self):
str
String form of the table's fqn
"""
return f"x{self.md5}"
return f"x{self.query_id}"

@property
def is_stored(self):
Expand Down Expand Up @@ -788,7 +786,7 @@ def invalidate_db_cache(self, name=None, schema=None, cascade=True, drop=True):
drop : bool
Set to false to remove the cache record without dropping the table
"""
q_state_machine = QueryStateMachine(self.redis, self.md5)
q_state_machine = QueryStateMachine(self.redis, self.query_id)
current_state, this_thread_is_owner = q_state_machine.reset()
if this_thread_is_owner:
con = self.connection.engine
Expand All @@ -805,12 +803,12 @@ def invalidate_db_cache(self, name=None, schema=None, cascade=True, drop=True):
"""SELECT obj FROM cache.cached LEFT JOIN cache.dependencies
ON cache.cached.query_id=cache.dependencies.query_id
WHERE depends_on='{}'""".format(
self.md5
self.query_id
)
)
with con.begin():
con.execute(
"DELETE FROM cache.cached WHERE query_id=%s", (self.md5,)
"DELETE FROM cache.cached WHERE query_id=%s", (self.query_id,)
)
logger.debug(
"Deleted cache record for {}.".format(
Expand Down Expand Up @@ -853,12 +851,12 @@ def invalidate_db_cache(self, name=None, schema=None, cascade=True, drop=True):
q_state_machine.finish_resetting()
elif q_state_machine.is_resetting:
logger.debug(
f"Query '{self.md5}' is being reset from elsewhere, waiting for reset to finish."
f"Query '{self.query_id}' is being reset from elsewhere, waiting for reset to finish."
)
while q_state_machine.is_resetting:
_sleep(1)
if not q_state_machine.is_known:
raise QueryResetFailedException(self.md5)
raise QueryResetFailedException(self.query_id)

@property
def index_cols(self):
Expand Down
2 changes: 1 addition & 1 deletion flowmachine/flowmachine/core/query_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ class QueryStateMachine:
redis_client : StrictRedis
Client for redis
query_id : str
md5 query identifier
Unique query identifier
Notes
-----
Expand Down
2 changes: 1 addition & 1 deletion flowmachine/flowmachine/core/random.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ def _sample_params(self) -> Dict[str, Any]:
def table_name(self) -> str:
if self.seed is None:
raise NotImplementedError("Unseeded random samples cannot be stored.")
return f"x{self.md5}"
return f"x{self.query_id}"


class RandomTablesample(SeedableRandom):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def store_async(self):
q = self._flowmachine_query_obj
q.store()

query_id = q.md5
query_id = q.query_id

return query_id

Expand All @@ -61,6 +61,6 @@ def query_id(self):
# return md5(json.dumps(self.query_params, sort_keys=True).encode()).hexdigest()
#
# However, the resulting md5 hash is different from the one produced internally
# by flowmachine.core.Query.md5, and the latter is currently being used by
# by flowmachine.core.Query.query_id, and the latter is currently being used by
# the QueryStateMachine, so we need to use it to check the query state.
return self._flowmachine_query_obj.md5
return self._flowmachine_query_obj.query_id
4 changes: 2 additions & 2 deletions flowmachine/flowmachine/core/spatial_unit.py
Original file line number Diff line number Diff line change
Expand Up @@ -294,13 +294,13 @@ def __init__(

def __eq__(self, other):
try:
return self.md5 == other.md5
return self.query_id == other.query_id
except AttributeError:
return False

def __hash__(self):
# Must define this because we explicitly define self.__eq__
return hash(self.md5)
return hash(self.query_id)

def _get_aliased_geom_table_cols(self, table_alias: str) -> List[str]:
return [f"{table_alias}.{c}" for c in self._geom_table_cols]
Expand Down
Loading

0 comments on commit b573fd0

Please sign in to comment.