-
Notifications
You must be signed in to change notification settings - Fork 0
/
subtask.go
129 lines (97 loc) · 2.39 KB
/
subtask.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
package plumber
import (
"fmt"
)
type (
SubtaskExtendFn[Pipe TaskListData] func(pt *Task[Pipe], st *Task[Pipe])
)
// Creates a subtask that is attached to the current task.
func (t *Task[Pipe]) CreateSubtask(name ...string) *Task[Pipe] {
parsed := append([]string{t.Name}, name...)
st := NewTask(t.TL, parsed...)
st.parent = t
st.Lock = t.Lock
return st
}
// Checks whether this task has a parent task.
func (t *Task[Pipe]) HasParent() bool {
return t.parent != nil
}
// Extends the subtask of the current task with a wrapper.
func (t *Task[Pipe]) ExtendSubtask(fn JobFn) *Task[Pipe] {
t.taskLock.Lock()
t.subtask = fn(t.subtask)
t.taskLock.Unlock()
return t
}
// Attaches this task to a arbitatary given parent task.
func (t *Task[Pipe]) ToParent(
parent *Task[Pipe],
fn SubtaskExtendFn[Pipe],
) *Task[Pipe] {
t.parent.taskLock.Lock()
fn(parent, t)
t.parent.taskLock.Unlock()
return t
}
// Attaches this task to the parent task with a wrapper.
func (t *Task[Pipe]) AddSelfToTheParent(
fn SubtaskExtendFn[Pipe],
) *Task[Pipe] {
if !t.HasParent() {
t.SendFatal(fmt.Errorf("Task has no parent value set."))
return t
}
t.parent.Lock.Lock()
fn(t.parent, t)
t.parent.Lock.Unlock()
return t
}
// Attaches this task to the parent task in sequence.
func (t *Task[Pipe]) AddSelfToTheParentAsSequence() *Task[Pipe] {
if !t.HasParent() {
t.SendFatal(fmt.Errorf("Task has no parent value set."))
return t
}
t.parent.Lock.Lock()
t.parent.ExtendSubtask(func(job Job) Job {
return t.TL.JobSequence(job, t.Job())
})
t.parent.Lock.Unlock()
return t
}
// Attaches this task to the parent task in parallel.
func (t *Task[Pipe]) AddSelfToTheParentAsParallel() *Task[Pipe] {
if !t.HasParent() {
t.SendFatal(fmt.Errorf("Task has no parent value set."))
return t
}
t.parent.Lock.Lock()
t.parent.ExtendSubtask(func(job Job) Job {
return t.TL.JobParallel(job, t.Job())
})
t.parent.Lock.Unlock()
return t
}
// Returns the subtasks of this task.
func (t *Task[Pipe]) GetSubtasks() Job {
return t.subtask
}
// Sets the subtask of this task directly.
func (t *Task[Pipe]) SetSubtask(job Job) *Task[Pipe] {
if job == nil {
job = t.emptyJob
}
t.taskLock.Lock()
t.subtask = job
t.taskLock.Unlock()
return t
}
// Runs the subtasks of the current task.
func (t *Task[Pipe]) RunSubtasks() error {
err := t.TL.RunJobs(t.subtask)
if err == nil {
t.SetSubtask(nil)
}
return err
}