Releases: PaulSchweizer/flowpipe
v1.0.1
Version 1.0: Dropping Py2 support and Cleanup
- Dropped Py2 support
- Cleanup of Code
- Fix for Pickling Error #168
- FunctionNodes can now be used directly without going through the Node decorator
v0.10.1
Add previously deleted Plug._sub_plugs method back in for backwards compatibility.
Full Changelog: v0.10.0...v0.10.1
Poetry and code quality
- Addopting poetry
- Code quality
Graph input groups
Convenient feature: Make input plugs of nodes available directly on the graph for easy access.
Fixed a bug in event handling
This minor release brings two changes:
- A bug was fixed in event handling that lead to callbacks being triggered multiple times.
- Better debug logging for the threaded evaluation of the graph was added.
Fix for a bug in threading Graph evaluation.
A major bug in the threading Graph evaluation was fixed, that would lead to nodes not being submitted for evaluation reliably.
Revamped the threading evaluation of Graphs
When evaluating a Graph in "threading" mode, the higher-level API from concurrent.futures
will be used, providing an easy option to control the number of threads being spawned, as well as providing more useful information in case a Node causes an Exception during evaluation.
Note that the API for the "threading" mode changed and instead of a "submission_delay", the "max_workers" keyword controls the behavior of threading evaluations.
Nested Subgraphs
Nested graphs are supported in flowpipe.
from flowpipe import Graph, Node
@Node(outputs=['file'])
def MyNode(file):
# Something is done in here ...
return {'file': file}
# A graph that fixes an incoming file, cleaning up messy names etc.
#
# +-----------------------+ +-------------------------+
# | Cleanup Filename | | Change Lineendings |
# |-----------------------| |-------------------------|
# o file<> | +--->o file<> |
# | file o-----+ | file o
# +-----------------------+ +-------------------------+
fix_file = Graph(name="fix_file")
cleanup_filename = MyNode(name="Cleanup Filename", graph=fix_file)
change_lineendings = MyNode(name="Change Lineendings", graph=fix_file)
cleanup_filename.outputs["file"].connect(change_lineendings.inputs["file"])
# A second graph reads finds files, and extracts their contents into a database
# +----------------+ +----------------------------+ +----------------+
# | Find File | | Read Values from File | | Update DB |
# |----------------| |----------------------------| |----------------|
# o file<> | +--->o file<> | +--->o file<> |
# | file o-----+ | file o-----+ | file o
# +----------------+ +----------------------------+ +----------------+
udpate_db_from_file = Graph(name="udpate_db_from_file")
find_file = MyNode(name="Find File", graph=udpate_db_from_file)
values_from_file = MyNode(name="Read Values from File", graph=udpate_db_from_file)
update_db = MyNode(name="Update DB", graph=udpate_db_from_file)
find_file.outputs["file"].connect(values_from_file.inputs["file"])
values_from_file.outputs["file"].connect(update_db.inputs["file"])
# The second graph however relies on clean input files so the first graph can
# be used within the second "udpate db" graph.
# For this purpose, graphs can promote input and output plugs from their nodes
# to the graph level, making other graphs aware of them:
fix_file["Cleanup Filename"].inputs["file"].promote_to_graph(name="file_to_clean")
fix_file["Change Lineendings"].outputs["file"].promote_to_graph(name="clean_file")
# Now the update_db graph can connect nodes to the fix_file graph
find_file.outputs["file"].connect(fix_file.inputs["file_to_clean"])
fix_file.outputs["clean_file"].connect(udpate_db_from_file["Read Values from File"].inputs["file"])
# The result now looks like this:
#
# +---udpate_db_from_file----+ +-------fix_file--------+ +--------fix_file---------+ +----udpate_db_from_file-----+ +---udpate_db_from_file----+
# | Find File | | Cleanup Filename | | Change Lineendings | | Read Values from File | | Update DB |
# |--------------------------| |-----------------------| |-------------------------| |----------------------------| |--------------------------|
# o file<> | +--->o file<> | +--->o file<> | +--->o file<> | +--->o file<> |
# | file o-----+ | file o-----+ | file o-----+ | file o-----+ | file o
# +--------------------------+ +-----------------------+ +-------------------------+ +----------------------------+ +--------------------------+
print(fix_file)
# Subgraphs can be accessed by their name from any participating graph
assert udpate_db_from_file.subgraphs["fix_file"] is fix_file
assert fix_file.subgraphs["udpate_db_from_file"] is udpate_db_from_file
Event System
StatsReporter and LogObserver are replaced by a unified and simplified Event system.
Original Issue: #97
How to use:
# Simple functions serve as listeners
def omitted_listener(node):
print("Omitted:", node.name)
def started_listener(node):
print("Started:", node.name)
def finished_listener(node):
print("Finished:", node.name)
# Connect the listeners
INode.EVENTS['evaluation-omitted'].register(omitted_listener)
INode.EVENTS['evaluation-started'].register(started_listener)
INode.EVENTS['evaluation-finished'].register(finished_listener)
my_node = MyNode()
my_node.evaluate()
# Inspect the node's stats after evaluation
print(my_node.stats)
>> {"eval_time": 0.123, "start_time": 1573121193}