Skip to content

Commit

Permalink
Merge pull request #805 from Flowminder/query-formatting
Browse files Browse the repository at this point in the history
Query formatting and dependency tree/graph visualisation
  • Loading branch information
greenape authored May 20, 2019
2 parents 7b7235b + 4661848 commit c6836f1
Show file tree
Hide file tree
Showing 9 changed files with 177 additions and 49 deletions.
3 changes: 3 additions & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,9 @@ jobs:
at: /home/circleci/
- restore_cache:
key: flowmachine-deps-1-{{ checksum "Pipfile.lock" }}
- run:
name: Install graphviz
command: sudo apt-get install -y xvfb graphviz
- run: *wait_for_flowdb
- run:
name: Run tests
Expand Down
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).
- `flowmachine.utils.calculate_dependency_graph` now includes the `Query` objects in the `query_object` field of the graph's nodes dictionary [#767](https://github.com/Flowminder/FlowKit/issues/767)
- Architectural Decision Records (ADR) have been added and are included in the auto-generated docs [#780](https://github.com/Flowminder/FlowKit/issues/780)
- Added FlowDB environment variables `SHARED_BUFFERS_SIZE` and `EFFECTIVE_CACHE_SIZE`, to allow manually setting the Postgres configuration parameters `shared_buffers` and `effective_cache_size`.
- The function `print_dependency_tree()` now takes an optional argument `show_stored` to display information whether dependent queries have been stored or not [#804](https://github.com/Flowminder/FlowKit/issues/804)
- A new function `plot_dependency_graph()` has been added which allows to conveniently plot and visualise a dependency graph for use in Jupyter notebooks (this requires IPython and pygraphviz to be installed) [#786](https://github.com/Flowminder/FlowKit/issues/786)

### Changed
- Parameter names in `flowmachine.connect()` have been renamed as follows to be consistent with the associated environment variables [#728](https://github.com/Flowminder/FlowKit/issues/728):
Expand Down
1 change: 1 addition & 0 deletions flowmachine/Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ cachey = "*"
approvaltests = "*"
watchdog = "*"
ipdb = "*"
pygraphviz = "*"

[requires]
python_version = "3.7"
17 changes: 16 additions & 1 deletion flowmachine/Pipfile.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

41 changes: 38 additions & 3 deletions flowmachine/flowmachine/core/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,16 +112,51 @@ def md5(self):
self._md5 = md5(str(hashes).encode()).hexdigest()
return self._md5

query_id = md5 # alias which is more meaningful to users than 'md5'

@abstractmethod
def _make_query(self):

raise NotImplementedError

def __repr__(self):
# Default representation, derived classes might want to add something more specific
return format(self, "query_id")

def __format__(self, fmt=""):
"""
Return a formatted string representation of this query object.
Parameters
----------
fmt : str, optional
This should be the empty string or a comma-separated list of
query attributes that will be included in the formatted string.
Examples
--------
>>> dl = daily_location(date="2016-01-01", method="last")
>>> format(dl)
<Query of type: LastLocation>
>>> format(dl, "query_id,is_stored")
<Query of type: LastLocation, query_id: 'd9537c9bc11580f868e3fc372dafdb94', is_stored: True>
>>> print(f"{dl:is_stored,query_state}")
<Query of type: LastLocation, is_stored: True, query_state: <QueryState.COMPLETED: 'completed'>
"""
query_descr = f"Query of type: {self.__class__.__name__}"
attrs_to_include = [] if fmt == "" else fmt.split(",")
attr_descriptions = []
for attr in attrs_to_include:
try:
attr_descriptions.append(f"{attr}: {getattr(self, attr)!r}")
except AttributeError:
raise ValueError(
f"Format string contains invalid query attribute: '{attr}'"
)

# Default representation, derived classes might want to
# add something more specific
return f"<Query of type: {self.__class__.__name__}, query_id: '{self.md5}'>"
all_descriptions = [query_descr] + attr_descriptions
return f"<{', '.join(all_descriptions)}>"

def __iter__(self):
con = self.connection.engine
Expand Down
2 changes: 1 addition & 1 deletion flowmachine/flowmachine/core/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ def __init__(self, name=None, schema=None, columns=None):
self._db_store_cache_metadata(compute_time=0)
q_state_machine.finish()

def __repr__(self):
def __format__(self, fmt):
return f"<Table: '{self.schema}.{self.name}', query_id: '{self.md5}'>"

@property
Expand Down
96 changes: 70 additions & 26 deletions flowmachine/flowmachine/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import networkx as nx
import structlog
import sys
from io import BytesIO
from pathlib import Path
from pglast import prettify
from psycopg2._psycopg import adapt
Expand Down Expand Up @@ -379,14 +380,16 @@ def sort_recursively(d):
return d


def print_dependency_tree(query_obj, stream=None, indent_level=0):
def print_dependency_tree(query_obj, show_stored=False, stream=None, indent_level=0):
"""
Print the dependencies of a flowmachine query in a tree-like structure.
Parameters
----------
query_obj : Query
An instance of a query object.
show_stored : bool, optional
If True, show for each query whether it is stored or not. Default: False.
stream : io.IOBase, optional
The stream to which the output should be written (default: stdout).
indent_level : int
Expand All @@ -398,10 +401,13 @@ def print_dependency_tree(query_obj, stream=None, indent_level=0):
indent_per_level = 3
indent = " " * (indent_per_level * indent_level - 1)
prefix = "" if indent_level == 0 else "- "
stream.write(f"{indent}{prefix}{query_obj}\n")
fmt = "query_id" if not show_stored else "query_id,is_stored"
stream.write(f"{indent}{prefix}{query_obj:{fmt}}\n")
deps_sorted_by_query_id = sorted(query_obj.dependencies, key=lambda q: q.md5)
for dep in deps_sorted_by_query_id:
print_dependency_tree(dep, indent_level=indent_level + 1, stream=stream)
print_dependency_tree(
dep, indent_level=indent_level + 1, stream=stream, show_stored=show_stored
)


def _get_query_attrs_for_dependency_graph(query_obj, analyse=False):
Expand Down Expand Up @@ -434,11 +440,13 @@ def _get_query_attrs_for_dependency_graph(query_obj, analyse=False):

def calculate_dependency_graph(query_obj, analyse=False):
"""
Produce a graph of all the queries that go into producing this
one, with their estimated run costs, and whether they are stored
as node attributes.
Produce a graph of all the queries that go into producing this one, with their estimated
run costs, and whether they are stored as node attributes.
The resulting networkx object can then be visualised, or analysed.
The resulting networkx object can then be visualised, or analysed. When visualised,
nodes corresponding to stored queries will be rendered green. See the function
`plot_dependency_graph()` for a convenient way of plotting a dependency graph directly
for visualisation in a Jupyter notebook.
The dependency graph includes the estimated cost of the query in the 'cost' attribute,
the query object the node represents in the 'query_object' attribute, and with the analyse
Expand All @@ -458,23 +466,8 @@ def calculate_dependency_graph(query_obj, analyse=False):
Examples
--------
A useful way to visualise the dependency graph in a Jupyter notebook is to
export it to an SVG string and display it directly in the notebook:
>>> import flowmachine
>>> from flowmachine.features import daily_location
>>> from flowmachine.utils import calculate_dependency_graph
>>> from io import BytesIO
>>> flowmachine.connect(flowdb_user="flowdb", flowdb_password="flowflow", redis_password="fm_redis")
>>> dl = daily_location(date="2016-01-01")
>>> G = calculate_dependency_graph(dl, analyse=True)
>>> A = nx.nx_agraph.to_agraph(G)
>>> svg_str = BytesIO()
>>> A.draw(svg_str, format="svg", prog="dot")
>>> svg_str = svg_str.getvalue().decode("utf8")
>>> SVG(svg_str) # within a Jupyter notebook this will be displayed as a graph
Alternatively, you can export the dependency graph to a .dot file as follows:
If you don't want to visualise the dependency graph directly (for example
using `plot_dependency_graph()`, you can export it to a .dot file as follows:
>>> import flowmachine
>>> from flowmachine.features import daily_location
Expand Down Expand Up @@ -513,12 +506,12 @@ def calculate_dependency_graph(query_obj, analyse=False):
for n in set(y):
attrs = _get_query_attrs_for_dependency_graph(n, analyse=analyse)
attrs["shape"] = "rect"
attrs["label"] = "{}. Cost: {}.".format(attrs["name"], attrs["cost"])
attrs["query_object"] = n
attrs["label"] = f"{attrs['name']}. Cost: {attrs['cost']}"
if analyse:
attrs["label"] += " Actual runtime: {}.".format(attrs["runtime"])
if attrs["stored"]:
attrs["fillcolor"] = "green"
attrs["fillcolor"] = "#b3de69" # light green
attrs["style"] = "filled"
g.add_node(f"x{n.md5}", **attrs)

Expand All @@ -527,3 +520,54 @@ def calculate_dependency_graph(query_obj, analyse=False):
g.add_edge(*[f"x{z.md5}" for z in (x, y)])

return g


def plot_dependency_graph(
query_obj, analyse=False, format="png", width=None, height=None
):
"""
Plot a graph of all the queries that go into producing this one (see `calculate_dependency_graph`
for more details). This returns an IPython.display object which can be directly displayed in
Jupyter notebooks.
Note that this requires the IPython and pygraphviz packages to be installed.
Parameters
----------
query_obj : Query
Query object to plot a dependency graph for.
analyse : bool
Set to True to get actual runtimes for queries. Note that this will actually run the query!
format : {"png", "svg"}
Output format of the resulting
width : int
Width in pixels to which to constrain the image. Note this is only supported for format="png".
height : int
Height in pixels to which to constrain the image. Note this is only supported for format="png".
Returns
-------
IPython.display.Image or IPython.display.SVG
"""
try:
from IPython.display import Image, SVG
except ImportError:
raise ImportError("requires IPython ", "https://ipython.org/")

G = calculate_dependency_graph(query_obj, analyse=analyse)
A = nx.nx_agraph.to_agraph(G)
s = BytesIO()
A.draw(s, format=format, prog="dot")

if format == "png":
result = Image(s.getvalue(), width=width, height=height)
elif format == "svg":
if width is not None or height is not None: # pragma: no cover
logger.warning(
"The arguments 'width' and 'height' are not supported with format='svg'."
)
result = SVG(s.getvalue().decode("utf8"))
else:
raise ValueError(f"Unsupported output format: '{format}'")

return result
18 changes: 18 additions & 0 deletions flowmachine/tests/test_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,3 +135,21 @@ def test_make_sql_no_overwrite():

dl = daily_location("2016-01-01")
assert [] == dl._make_sql("admin3", schema="geography")


def test_query_formatting():
"""
Test that query can be formatted as a string, with query attributes
specified in the `fmt` argument being included.
"""
dl = daily_location("2016-01-01", method="last")
assert "<Query of type: LastLocation>" == format(dl)
assert (
"<Query of type: LastLocation, level: 'admin3', column_names: ['subscriber', 'pcod']>"
== f"{dl:level,column_names}"
)

with pytest.raises(
ValueError, match="Format string contains invalid query attribute: 'foo'"
):
format(dl, "query_id,foo")
46 changes: 28 additions & 18 deletions flowmachine/tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,32 +5,19 @@
"""
Tests for flowmachine small helper functions
"""
import datetime
import pytest
import pglast
import re
import textwrap
import unittest.mock
import IPython
from io import StringIO
from pathlib import Path

from flowmachine.core import CustomQuery
from flowmachine.core.errors import BadLevelError
from flowmachine.core.subscriber_subsetter import make_subscriber_subsetter
from flowmachine.features import daily_location, EventTableSubset
from flowmachine.utils import (
parse_datestring,
proj4string,
get_columns_for_level,
getsecret,
pretty_sql,
_makesafe,
print_dependency_tree,
calculate_dependency_graph,
convert_dict_keys_to_strings,
sort_recursively,
time_period_add,
)
from flowmachine.utils import *
from flowmachine.utils import _makesafe


@pytest.mark.parametrize("crs", (None, 4326, "+proj=longlat +datum=WGS84 +no_defs"))
Expand Down Expand Up @@ -190,6 +177,15 @@ def test_convert_dict_keys_to_strings():
assert d_out_expected == d_out


def test_to_nested_list():
"""
Test that a dictionary with multiple levels is correctly converted to a nested list of key-value pairs.
"""
d = {"a": {"b": 1, "c": [2, 3, {"e": 4}], "d": [5, 6]}}
expected = [("a", [("b", 1), ("c", [2, 3, [("e", 4)]]), ("d", [5, 6])])]
assert expected == to_nested_list(d)


def test_sort_recursively():
"""
Test that `sort_recursively` recursively sorts all components of the input dictionary.
Expand Down Expand Up @@ -252,9 +248,9 @@ def test_print_dependency_tree():
assert expected_output == output_with_query_ids_replaced


def test_dependency_graph():
def test_calculate_dependency_graph():
"""
Test that dependency graph util runs and has some correct entries.
Test that calculate_dependency_graph() runs and the returned graph has some correct entries.
"""
query = daily_location("2016-01-01")
G = calculate_dependency_graph(query, analyse=True)
Expand All @@ -265,3 +261,17 @@ def test_dependency_graph():
)
assert f"x{sd.md5}" in G.nodes()
assert G.nodes[f"x{sd.md5}"]["query_object"].md5 == sd.md5


def test_plot_dependency_graph():
"""
Test that plot_dependency_graph() runs and returns the expected IPython.display objects.
"""
query = daily_location(date="2016-01-02", level="admin2", method="most-common")
output_svg = plot_dependency_graph(query, format="svg")
output_png = plot_dependency_graph(query, format="png", width=600, height=200)

assert isinstance(output_svg, IPython.display.SVG)
assert isinstance(output_png, IPython.display.Image)
assert output_png.width == 600
assert output_png.height == 200

0 comments on commit c6836f1

Please sign in to comment.