-
Notifications
You must be signed in to change notification settings - Fork 3
/
pipeline.hpp
126 lines (100 loc) · 3.5 KB
/
pipeline.hpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
#pragma once
#include <thread>
#include <mutex>
#include <iostream>
#include <condition_variable>
#include <exception>
namespace Pipeline
{
template<typename InType, typename OutType>
class Process{
public:
InType *process_in = NULL;
OutType process_out;
Process(){}
// Connect to the pipeline
template<typename super_inType>
Process(Process<super_inType, InType> &input)
{
_cv_in = &(input._cv_out);
process_in = &(input.process_out);
_input_mutex = &(input._my_mutex);
_parent_keep_going = &(input._keep_going);
}
~Process(){
stopProcessingContinually();
if(_thread != nullptr) delete _thread;
}
// This method is run once every time
virtual void processOnce(){}
// Keeps processing the data when it's available
void processContinually(){
_keep_going = true;
while(_keep_going){
// Stop if there's no more data to process
if(_parent_keep_going && !*_parent_keep_going) {
std::cout << "Parent stopped going" << std::endl;
_keep_going = false;
return;
}
// Wait for new data to come in
if(_input_mutex) {
std::unique_lock<std::mutex> lock_in(*_input_mutex);
_cv_in->wait(lock_in);
// Process it
processOnce_s();
lock_in.unlock();
}
else{
processOnce_s();
} // End if
// Let the next processs in the pipeline use it
_cv_out.notify_all();
} // End while
std::cout << "I stopped going" << std::endl;
} // End processContinually
void startProcessingContinuallyInNewThread(){
if(_thread != nullptr) delete _thread;
if(_keep_going) return;
_keep_going = true;
_thread = new std::thread(&Process::processContinually, this);
}
void stopProcessingContinually(){
_keep_going = false;
_cv_out.notify_all();
if(_thread != nullptr){
if(std::this_thread::get_id() != _thread->get_id()) _thread->join();
}
}
bool keepGoing(){return _keep_going;}
public:
// Unlocked every time I have new data.
std::mutex _my_mutex;
// Unlocked every time there's new data for me.
// It's my job to lock this mutex when I'm done,
// and I do that in processContinually.
std::mutex *_input_mutex = NULL;
// Set to false to stop the pipeline safely
bool _keep_going = false;
// A pointer to the parent's "_keep_going" if it exists
bool *_parent_keep_going = NULL;
std::condition_variable *_cv_in;
std::condition_variable _cv_out;
// The thread where the processing is done
std::thread *_thread = nullptr;
std::exception_ptr thread_exception = nullptr;
// Returns true if the Process has been aborted
bool process_aborted(){return thread_exception!=nullptr;}
// ProcessOnce(), only safe
void processOnce_s(){
try{
processOnce();
}
catch(std::exception e){ // Fail gracefully
std::cerr << "Pipeline process failed with exception: " << e.what() << std::endl;
thread_exception = std::current_exception();
stopProcessingContinually();
}
}
}; // end class Process
} // end namespace Pipeline