THIS PROJECT IS DEPRECATED
It is superceded by https://github.com/chadrik/rill
This is a Python implementation of the flow-based programming paradigm, which allows people to create programs as graphs. This means we no longer have to write code, but can visually design our programs in a drag-and-drop style, using pre-defined generalized black-box components.
You can define data flow execution graphs (DAGs) where each process (node) is run as a parallel black box that processes input data and spits out something to send downstream. To design these graphs, you can use some freely available design tools such as DrawFBP, NoFlo UI, or Flowhub.
To understand what this is all about, perhaps it's easier to quote J. Paul Morrison, the inventor of flow-based programming:
In computer programming, Flow-Based Programming (FBP) is a programming paradigm that uses a "data factory" metaphor for designing and building applications. FBP defines applications as networks of "black box" processes, which exchange data across predefined connections by message passing, where the connections are specified externally to the processes. These black box processes can be reconnected endlessly to form different applications without having to be changed internally. FBP is thus naturally component-oriented.
Run python setup.py develop
to symlink site-packages to this repo,
then run the example graphs with ./example.py
.
To define and execute a graph, subclass pflow.core.Graph
, override initialize()
to construct the graph,
then run it using a GraphExecutor
implementation:
from pflow.executors.single_process import SingleProcessGraphExecutor
from pflow.components import *
class MyGraph(Graph):
def initialize(self):
tail_1 = FileTailReader('TAIL_1')
self.set_initial_packet(tail_1.inputs['PATH'], '/var/log/system.log')
filter_1 = RegexFilter('FILTER_1')
self.set_initial_packet(filter_1.inputs['REGEX'],
r' (USER|DEAD)_PROCESS: ')
self.connect(tail_1.outputs['OUT'],
filter_1.inputs['IN'])
self.connect(filter_1.outputs['OUT'],
ConsoleLineWriter('LOG_1').inputs['IN'])
graph = MyGraph('MY_GRAPH_NAME')
executor = SingleProcessGraphExecutor(graph)
executor.execute()
Components are connected by their ports by calling Graph.connect(source_output_port, target_input_port)
.
Any time Graph.connect()
is called, the components associated with the ports will automatically get added to the
graph. If (in the rare case) you have a graph with a single component, you'll need to register it by calling
Component.add_component()
.
You can find some premade components in the pflow.components
module. If you can't find what you need there,
you can always create a custom component by subclassing pflow.core.Component
, then overriding the initialize()
and run()
methods:
from pflow.core import Component, InputPort, OutputPort, EndOfStream, keepalive
class MySleepComponent(Component):
'''
Receives input from IN, sleeps for a predetermined amount of time,
then forwards it to output OUT.
'''
def initialize(self):
self.inputs.add(InputPort('IN'))
self.outputs.add(OutputPort('OUT'))
@keepalive
def run(self):
input_packet = self.inputs['IN'].receive_packet()
if input_packet is EndOfStream:
self.terminate()
else:
time.sleep(5)
self.outputs['OUT'].send_packet(input_packet)
Rules for creating components:
- Your component should generally be small and do one thing well.
- The
Component.initialize()
method is used for setting up ports and any initial state. - The
Component.run()
method is called by the runtime only once before the component is automatically terminated. If you don't want this behavior, you can either write your code in awhile self.is_alive(): ...
loop or simply decorate therun()
method with@keepalive
. If you decide to use the decorator, you must explicitlyterminate()
the component when you are finished. - Call
Component.suspend()
if you need to be explicit about suspending execution (typically done in loops or when waiting for some asynchronous task to complete). - Calls to
Port.send*()
orPort.receive*()
suspend execution while waiting for data to arrive, so that they do not block other processes. - You should always check that the return value of
Component.receive()
orComponent.receive_packet()
is not the sentinel objectEndOfStream
, denoting that the port was closed.
State | Description |
---|---|
NOT_INITIALIZED | Comonent hasn't been initialized yet (initial state). |
INITIALIZED | Component is initialized, but hasn't been run yet. |
ACTIVE | Component has received data and is actively running. |
SUSP_SEND | Component is waiting for data to send on its output port. |
SUSP_RECV | Component is waiting to receive data on its input port. |
TERMINATED | Component has successfully terminated execution (final state). |
ERROR | Component has terminated execution because of an error (final state). |
npm install -g n
npm install -g bower
npm install -g grunt-cli
sudo n 0.10
mkdir noflo
cd noflo
git clone https://github.com/noflo/noflo-ui
cd noflo-ui
git checkout 0.10.0
npm install
# running bower before grunt prompts to resolve a dependency conflict which otherwise causes grunt to fail
bower install
grunt build
python -m SimpleHTTPServer 8000
- Open your browser to
http://localhost:8000/
- Log in using your github account. Go to setting and copy your "User Identifier"
- In a fresh shell,
cd
into the root of the pflow repo. - Start the pflow runtime.
python -m pflow.runtime --user-id <USER_ID>
- Back in the browser, create a new project in NoFlo selecting the pflow runtime
- Green arrows should appear on the top-right menu, right before
ws:\\localhost:3569
First install the test suite:
nmp install -g fbp-protocol
Then, from the repo directory, run the tests
fbp-test