Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rename .md5 .query_id #1289

Merged
merged 6 commits into from
Sep 19, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).
- Flowmachine's default random sampling method is now `random_ids` rather than the non-reproducible `system_rows`. [#1263](https://github.com/Flowminder/FlowKit/issues/1263)
- `IntereventPeriod` now returns stats over the gap between events in fractional time units, instead of time intervals. [#1265](https://github.com/Flowminder/FlowKit/issues/1265)
- In the FlowETL deployment example, the external ingestion database is now set up separately from the FlowKit components and connected to FlowDB via a docker overlay network. [#1276](https://github.com/Flowminder/FlowKit/issues/1276)
- The `md5` attribute of the `Query` class has been renamed to `query_id` [#1288](https://github.com/Flowminder/FlowKit/issues/1288).


### Fixed
- Quickstart will no longer fail if it has been run previously with a different FlowDB data size and not explicitly shut down. [#900](https://github.com/Flowminder/FlowKit/issues/900)
Expand Down
44 changes: 23 additions & 21 deletions flowmachine/flowmachine/core/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,11 @@ def write_query_to_cache(
This is a _blocking function_, and will not return until the query is no longer in an executing state.

"""
logger.debug(f"Trying to switch '{query.md5}' to executing state.")
q_state_machine = QueryStateMachine(redis, query.md5)
logger.debug(f"Trying to switch '{query.query_id}' to executing state.")
q_state_machine = QueryStateMachine(redis, query.query_id)
current_state, this_thread_is_owner = q_state_machine.execute()
if this_thread_is_owner:
logger.debug(f"In charge of executing '{query.md5}'.")
logger.debug(f"In charge of executing '{query.query_id}'.")
try:
query_ddl_ops = ddl_ops_func(name, schema)
except Exception as exc:
Expand Down Expand Up @@ -128,16 +128,16 @@ def write_query_to_cache(
if q_state_machine.is_completed:
return query
elif q_state_machine.is_cancelled:
logger.error(f"Query '{query.md5}' was cancelled.")
raise QueryCancelledException(query.md5)
logger.error(f"Query '{query.query_id}' was cancelled.")
raise QueryCancelledException(query.query_id)
elif q_state_machine.is_errored:
logger.error(f"Query '{query.md5}' finished with an error.")
raise QueryErroredException(query.md5)
logger.error(f"Query '{query.query_id}' finished with an error.")
raise QueryErroredException(query.query_id)
else:
logger.error(
f"Query '{query.md5}' not stored. State is {q_state_machine.current_query_state}"
f"Query '{query.query_id}' not stored. State is {q_state_machine.current_query_state}"
)
raise StoreFailedException(query.md5)
raise StoreFailedException(query.query_id)


def write_cache_metadata(
Expand Down Expand Up @@ -165,7 +165,9 @@ def write_cache_metadata(

try:
in_cache = bool(
connection.fetch(f"SELECT * FROM cache.cached WHERE query_id='{query.md5}'")
connection.fetch(
f"SELECT * FROM cache.cached WHERE query_id='{query.query_id}'"
)
)
if not in_cache:
try:
Expand All @@ -184,7 +186,7 @@ def write_cache_metadata(
con.execute(
cache_record_insert,
(
query.md5,
query.query_id,
__version__,
query._make_query(),
compute_time,
Expand All @@ -193,13 +195,13 @@ def write_cache_metadata(
psycopg2.Binary(self_storage),
),
)
con.execute("SELECT touch_cache(%s);", query.md5)
con.execute("SELECT touch_cache(%s);", query.query_id)

if not in_cache:
for dep in query._get_stored_dependencies(exclude_self=True):
con.execute(
"INSERT INTO cache.dependencies values (%s, %s) ON CONFLICT DO NOTHING",
(query.md5, dep.md5),
(query.query_id, dep.query_id),
)
logger.debug(f"{query.fully_qualified_table_name} added to cache.")
else:
Expand All @@ -216,7 +218,7 @@ def touch_cache(connection: "Connection", query_id: str) -> float:
----------
connection : Connection
query_id : str
md5 id of the query to touch
Unique id of the query to touch

Returns
-------
Expand Down Expand Up @@ -321,7 +323,7 @@ def invalidate_cache_by_id(
----------
connection : Connection
query_id : str
md5 id of the query
Unique id of the query
cascade : bool, default False
Set to true to remove any queries that depend on the one being removed

Expand All @@ -344,7 +346,7 @@ def get_query_object_by_id(connection: "Connection", query_id: str) -> "Query":
----------
connection : Connection
query_id : str
md5 id of the query
Unique id of the query

Returns
-------
Expand Down Expand Up @@ -404,7 +406,7 @@ def shrink_one(connection: "Connection", dry_run: bool = False) -> "Query":
obj_to_remove, obj_size = get_cached_query_objects_ordered_by_score(connection)[0]

logger.info(
f"{'Would' if dry_run else 'Will'} remove cache record for {obj_to_remove.md5} of type {obj_to_remove.__class__}"
f"{'Would' if dry_run else 'Will'} remove cache record for {obj_to_remove.query_id} of type {obj_to_remove.__class__}"
)
logger.info(
f"Table {obj_to_remove.fully_qualified_table_name} ({obj_size} bytes) {'would' if dry_run else 'will'} be removed."
Expand Down Expand Up @@ -446,7 +448,7 @@ def shrink_below_size(
def dry_run_shrink(connection):
obj, obj_size = cached_queries.__next__()
logger.info(
f"Would remove cache record for {obj.md5} of type {obj.__class__}"
f"Would remove cache record for {obj.query_id} of type {obj.__class__}"
)
logger.info(
f"Table {obj.fully_qualified_table_name} ({obj_size} bytes) would be removed."
Expand Down Expand Up @@ -602,7 +604,7 @@ def get_compute_time(connection: "Connection", query_id: str) -> float:
----------
connection : "Connection"
query_id : str
md5 identifier of the query
Unique id of the query

Returns
-------
Expand All @@ -629,7 +631,7 @@ def get_score(connection: "Connection", query_id: str) -> float:
----------
connection: "Connection"
query_id : str
md5 id of the cached query
Unique id of the cached query

Returns
-------
Expand Down Expand Up @@ -657,7 +659,7 @@ def cache_table_exists(connection: "Connection", query_id: str) -> bool:
----------
connection: "Connection"
query_id : str
md5 id of the cached query
Unique id of the cached query

Returns
-------
Expand Down
8 changes: 4 additions & 4 deletions flowmachine/flowmachine/core/dummy_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ def __init__(self, dummy_param):
self.dummy_param = dummy_param

@property
def md5(self):
# Prefix the usual md5 hash with 'dummy_query' to make it obvious
def query_id(self):
# Prefix the usual query_id hash with 'dummy_query' to make it obvious
# that this is not a regular query.
md5_hash = super().md5
md5_hash = super().query_id
return f"dummy_query_{md5_hash}"

def _make_query(self):
Expand All @@ -32,7 +32,7 @@ def store(self):
logger.debug(
"Storing dummy query by marking the query state as 'finished' (but without actually writing to the database)."
)
q_state_machine = QueryStateMachine(self.redis, self.md5)
q_state_machine = QueryStateMachine(self.redis, self.query_id)
q_state_machine.enqueue()
q_state_machine.execute()
q_state_machine.finish()
6 changes: 3 additions & 3 deletions flowmachine/flowmachine/core/model_result.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,14 +156,14 @@ def to_sql(self, name: str, schema: Union[str, None] = None) -> Future:

def write_model_result(query_ddl_ops: List[str], connection: Engine) -> float:
self._df.to_sql(name, connection, schema=schema, index=False)
QueryStateMachine(self.redis, self.md5).finish()
QueryStateMachine(self.redis, self.query_id).finish()
return self._runtime

current_state, changed_to_queue = QueryStateMachine(
self.redis, self.md5
self.redis, self.query_id
).enqueue()
logger.debug(
f"Attempted to enqueue query '{self.md5}', query state is now {current_state} and change happened {'here and now' if changed_to_queue else 'elsewhere'}."
f"Attempted to enqueue query '{self.query_id}', query state is now {current_state} and change happened {'here and now' if changed_to_queue else 'elsewhere'}."
)
# name, redis, query, connection, ddl_ops_func, write_func, schema = None, sleep_duration = 1
store_future = self.thread_pool_executor.submit(
Expand Down
36 changes: 17 additions & 19 deletions flowmachine/flowmachine/core/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,20 +62,20 @@ class Query(metaclass=ABCMeta):
_QueryPool = weakref.WeakValueDictionary()

def __init__(self, cache=True):
obj = Query._QueryPool.get(self.md5)
obj = Query._QueryPool.get(self.query_id)
if obj is None:
try:
self.connection
except AttributeError:
raise NotConnectedError()

self._cache = cache
Query._QueryPool[self.md5] = self
Query._QueryPool[self.query_id] = self
else:
self.__dict__ = obj.__dict__

@property
def md5(self):
def query_id(self):
"""
Generate a uniquely identifying hash of this query,
based on the parameters of it and the subqueries it is
Expand All @@ -84,21 +84,21 @@ def md5(self):
Returns
-------
str
md5 hash string
query_id hash string
"""
try:
return self._md5
except:
dependencies = self.dependencies
state = self.__getstate__()
hashes = sorted([x.md5 for x in dependencies])
hashes = sorted([x.query_id for x in dependencies])
for key, item in sorted(state.items()):
if isinstance(item, Query) and item in dependencies:
# this item is already included in `hashes`
continue
elif isinstance(item, list) or isinstance(item, tuple):
item = sorted(
item, key=lambda x: x.md5 if isinstance(x, Query) else x
item, key=lambda x: x.query_id if isinstance(x, Query) else x
)
elif isinstance(item, dict):
item = json.dumps(item, sort_keys=True, default=str)
Expand All @@ -112,8 +112,6 @@ def md5(self):
self._md5 = md5(str(hashes).encode()).hexdigest()
return self._md5

query_id = md5 # alias which is more meaningful to users than 'md5'

@abstractmethod
def _make_query(self):

Expand Down Expand Up @@ -230,7 +228,7 @@ def query_state(self) -> "QueryState":
flowmachine.core.query_state.QueryState
The current query state
"""
state_machine = QueryStateMachine(self.redis, self.md5)
state_machine = QueryStateMachine(self.redis, self.query_id)
return state_machine.current_query_state

@property
Expand Down Expand Up @@ -260,13 +258,13 @@ def get_query(self):
try:
table_name = self.fully_qualified_table_name
schema, name = table_name.split(".")
state_machine = QueryStateMachine(self.redis, self.md5)
state_machine = QueryStateMachine(self.redis, self.query_id)
state_machine.wait_until_complete()
if state_machine.is_completed and self.connection.has_table(
schema=schema, name=name
):
try:
touch_cache(self.connection, self.md5)
touch_cache(self.connection, self.query_id)
except ValueError:
pass # Cache record not written yet, which can happen for Models
# which will call through to this method from their `_make_query` method while writing metadata.
Expand Down Expand Up @@ -589,10 +587,10 @@ def write_query(query_ddl_ops: List[str], connection: Engine) -> float:
return plan_time

current_state, changed_to_queue = QueryStateMachine(
self.redis, self.md5
self.redis, self.query_id
).enqueue()
logger.debug(
f"Attempted to enqueue query '{self.md5}', query state is now {current_state} and change happened {'here and now' if changed_to_queue else 'elsewhere'}."
f"Attempted to enqueue query '{self.query_id}', query state is now {current_state} and change happened {'here and now' if changed_to_queue else 'elsewhere'}."
)
# name, redis, query, connection, ddl_ops_func, write_func, schema = None, sleep_duration = 1
store_future = self.thread_pool_executor.submit(
Expand Down Expand Up @@ -667,7 +665,7 @@ def table_name(self):
str
String form of the table's fqn
"""
return f"x{self.md5}"
return f"x{self.query_id}"

@property
def is_stored(self):
Expand Down Expand Up @@ -788,7 +786,7 @@ def invalidate_db_cache(self, name=None, schema=None, cascade=True, drop=True):
drop : bool
Set to false to remove the cache record without dropping the table
"""
q_state_machine = QueryStateMachine(self.redis, self.md5)
q_state_machine = QueryStateMachine(self.redis, self.query_id)
current_state, this_thread_is_owner = q_state_machine.reset()
if this_thread_is_owner:
con = self.connection.engine
Expand All @@ -805,12 +803,12 @@ def invalidate_db_cache(self, name=None, schema=None, cascade=True, drop=True):
"""SELECT obj FROM cache.cached LEFT JOIN cache.dependencies
ON cache.cached.query_id=cache.dependencies.query_id
WHERE depends_on='{}'""".format(
self.md5
self.query_id
)
)
with con.begin():
con.execute(
"DELETE FROM cache.cached WHERE query_id=%s", (self.md5,)
"DELETE FROM cache.cached WHERE query_id=%s", (self.query_id,)
)
logger.debug(
"Deleted cache record for {}.".format(
Expand Down Expand Up @@ -853,12 +851,12 @@ def invalidate_db_cache(self, name=None, schema=None, cascade=True, drop=True):
q_state_machine.finish_resetting()
elif q_state_machine.is_resetting:
logger.debug(
f"Query '{self.md5}' is being reset from elsewhere, waiting for reset to finish."
f"Query '{self.query_id}' is being reset from elsewhere, waiting for reset to finish."
)
while q_state_machine.is_resetting:
_sleep(1)
if not q_state_machine.is_known:
raise QueryResetFailedException(self.md5)
raise QueryResetFailedException(self.query_id)

@property
def index_cols(self):
Expand Down
2 changes: 1 addition & 1 deletion flowmachine/flowmachine/core/query_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ class QueryStateMachine:
redis_client : StrictRedis
Client for redis
query_id : str
md5 query identifier
Unique query identifier

Notes
-----
Expand Down
2 changes: 1 addition & 1 deletion flowmachine/flowmachine/core/random.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ def _sample_params(self) -> Dict[str, Any]:
def table_name(self) -> str:
if self.seed is None:
raise NotImplementedError("Unseeded random samples cannot be stored.")
return f"x{self.md5}"
return f"x{self.query_id}"


class RandomTablesample(SeedableRandom):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def store_async(self):
q = self._flowmachine_query_obj
q.store()

query_id = q.md5
query_id = q.query_id

return query_id

Expand All @@ -61,6 +61,6 @@ def query_id(self):
# return md5(json.dumps(self.query_params, sort_keys=True).encode()).hexdigest()
#
# However, the resulting md5 hash is different from the one produced internally
# by flowmachine.core.Query.md5, and the latter is currently being used by
# by flowmachine.core.Query.query_id, and the latter is currently being used by
# the QueryStateMachine, so we need to use it to check the query state.
return self._flowmachine_query_obj.md5
return self._flowmachine_query_obj.query_id
4 changes: 2 additions & 2 deletions flowmachine/flowmachine/core/spatial_unit.py
Original file line number Diff line number Diff line change
Expand Up @@ -294,13 +294,13 @@ def __init__(

def __eq__(self, other):
try:
return self.md5 == other.md5
return self.query_id == other.query_id
except AttributeError:
return False

def __hash__(self):
# Must define this because we explicitly define self.__eq__
return hash(self.md5)
return hash(self.query_id)

def _get_aliased_geom_table_cols(self, table_alias: str) -> List[str]:
return [f"{table_alias}.{c}" for c in self._geom_table_cols]
Expand Down
Loading