Skip to content
This repository has been archived by the owner on Jun 27, 2024. It is now read-only.

feature[RR v2024]: max_jobs jitter to prevent thundering herd problem #113

Merged
merged 37 commits into from
Apr 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
c62e288
Implementation of dispersion when restarting workers
Kaspiman Feb 7, 2024
28267d7
Jitter for maxJobs option
Kaspiman Feb 8, 2024
94ae631
Jitter for maxJobs option
Kaspiman Feb 12, 2024
e841f82
Merge branch 'master' into max_jobs_dispersion
Kaspiman Feb 12, 2024
c151663
Jitter for maxJobs option
Kaspiman Feb 12, 2024
772f43c
Merge remote-tracking branch 'origin/max_jobs_dispersion' into max_jo…
Kaspiman Feb 12, 2024
72beffa
Jitter for maxJobs option
Kaspiman Feb 12, 2024
eb5156d
Jitter for maxJobs option
Kaspiman Feb 12, 2024
f63bce9
Merge branch 'master' into max_jobs_dispersion
Kaspiman Feb 12, 2024
4faae05
Jitter for maxJobs option
Kaspiman Feb 12, 2024
606e909
Merge remote-tracking branch 'origin/max_jobs_dispersion' into max_jo…
Kaspiman Feb 12, 2024
2031011
Jitter for maxJobs option
Kaspiman Feb 12, 2024
2323c95
Jitter for maxJobs option
Kaspiman Feb 12, 2024
6a1e065
Jitter for maxJobs option
Kaspiman Feb 12, 2024
0d0a0e4
Jitter for maxJobs option
Kaspiman Feb 12, 2024
11a53a4
Jitter for maxJobs option
Kaspiman Feb 13, 2024
5ddc995
Jitter for maxJobs option
Kaspiman Feb 13, 2024
95475b3
Jitter for maxJobs option
Kaspiman Feb 13, 2024
dfa5ee6
Jitter for maxJobs option
Kaspiman Feb 14, 2024
0b04aa6
Jitter for maxJobs option
Kaspiman Feb 14, 2024
df55bfb
Jitter for maxJobs option
Kaspiman Feb 15, 2024
52a6030
Jitter for maxJobs option
Kaspiman Feb 15, 2024
880e4cc
Jitter for maxJobs option
Kaspiman Feb 15, 2024
f92b452
Merge branch 'master' into max_jobs_dispersion
rustatian Feb 15, 2024
8a1432d
Merge branch 'master' into max_jobs_dispersion
rustatian Feb 15, 2024
b3dbe40
Merge branch 'master' into max_jobs_dispersion
rustatian Feb 23, 2024
0dd06d8
Jitter for maxJobs option
Kaspiman Mar 8, 2024
5e4157e
Merge remote-tracking branch 'origin/max_jobs_dispersion' into max_jo…
Kaspiman Mar 8, 2024
0899b3f
Merge branch 'roadrunner-server:master' into max_jobs_dispersion
Kaspiman Mar 8, 2024
048d81b
Jitter for maxJobs option
Kaspiman Mar 8, 2024
b08a097
Jitter for maxJobs option
Kaspiman Mar 8, 2024
d96623e
Jitter for maxJobs option
Kaspiman Mar 11, 2024
0dd4af8
Jitter for maxJobs option
Kaspiman Mar 11, 2024
eac7d93
Jitter for maxJobs option
Kaspiman Mar 11, 2024
0b7300a
Jitter for maxJobs option
Kaspiman Mar 11, 2024
8f4fd4e
Jitter for maxJobs option
Kaspiman Mar 11, 2024
ce532e5
Merge branch 'master' into max_jobs_dispersion
rustatian Apr 9, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 3 additions & 41 deletions ipc/pipe/pipe_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,12 @@ type sr struct {
err error
}

// SpawnWorkerWithTimeout creates new Process and connects it to goridge relay,
// SpawnWorkerWithContext Creates new Process and connects it to goridge relay,
// method Wait() must be handled on level above.
func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd) (*worker.Process, error) {
func (f *Factory) SpawnWorkerWithContext(ctx context.Context, cmd *exec.Cmd, options ...worker.Options) (*worker.Process, error) {
spCh := make(chan sr)
go func() {
w, err := worker.InitBaseWorker(cmd, worker.WithLog(f.log))
w, err := worker.InitBaseWorker(cmd, options...)
if err != nil {
select {
case spCh <- sr{
Expand Down Expand Up @@ -140,44 +140,6 @@ func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd) (*w
}
}

func (f *Factory) SpawnWorker(cmd *exec.Cmd) (*worker.Process, error) {
w, err := worker.InitBaseWorker(cmd, worker.WithLog(f.log))
if err != nil {
return nil, err
}

in, err := cmd.StdoutPipe()
if err != nil {
return nil, err
}

out, err := cmd.StdinPipe()
if err != nil {
return nil, err
}

// Init new PIPE relay
relay := pipe.NewPipeRelay(in, out)
w.AttachRelay(relay)

// Start the worker
err = w.Start()
if err != nil {
return nil, err
}

// errors bundle
_, err = internal.Pid(relay)
if err != nil {
_ = w.Kill()
return nil, err
}

// everything ok, set ready state
w.State().Transition(fsm.StateReady)
return w, nil
}

// Close the factory.
func (f *Factory) Close() error {
return nil
Expand Down
27 changes: 1 addition & 26 deletions ipc/pipe/pipe_factory_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,37 +11,12 @@ import (
"go.uber.org/zap"
)

func Benchmark_WorkerPipeNoTTL(b *testing.B) {
rustatian marked this conversation as resolved.
Show resolved Hide resolved
cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")

log, _ = zap.NewDevelopment()
w, err := NewPipeFactory(log).SpawnWorker(cmd)
require.NoError(b, err)

go func() {
_ = w.Wait()
}()

b.ResetTimer()
b.ReportAllocs()

for i := 0; i < b.N; i++ {
res, err := w.Exec(context.Background(), &payload.Payload{Body: []byte("hello")})
assert.NoError(b, err)
assert.NotNil(b, res)
}

b.Cleanup(func() {
assert.NoError(b, w.Stop())
})
}

func Benchmark_WorkerPipeTTL(b *testing.B) {
cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
ctx := context.Background()

log, _ = zap.NewDevelopment()
w, err := NewPipeFactory(log).SpawnWorkerWithTimeout(ctx, cmd)
w, err := NewPipeFactory(log).SpawnWorkerWithContext(ctx, cmd)
require.NoError(b, err)

go func() {
Expand Down
42 changes: 21 additions & 21 deletions ipc/pipe/pipe_factory_spawn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ var log = zap.NewNop()
func Test_GetState2(t *testing.T) {
cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")

w, err := NewPipeFactory(log).SpawnWorker(cmd)
w, err := NewPipeFactory(log).SpawnWorkerWithContext(context.Background(), cmd)
go func() {
assert.NoError(t, w.Wait())
assert.Equal(t, fsm.StateStopped, w.State().CurrentState())
Expand All @@ -36,7 +36,7 @@ func Test_GetState2(t *testing.T) {
func Test_Kill2(t *testing.T) {
cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")

w, err := NewPipeFactory(log).SpawnWorker(cmd)
w, err := NewPipeFactory(log).SpawnWorkerWithContext(context.Background(), cmd)
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
Expand All @@ -59,7 +59,7 @@ func Test_Kill2(t *testing.T) {
func Test_Pipe_Start2(t *testing.T) {
cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")

w, err := NewPipeFactory(log).SpawnWorker(cmd)
w, err := NewPipeFactory(log).SpawnWorkerWithContext(context.Background(), cmd)
assert.NoError(t, err)
assert.NotNil(t, w)

Expand All @@ -77,7 +77,7 @@ func Test_Pipe_StartError2(t *testing.T) {
t.Errorf("error running the command: error %v", err)
}

w, err := NewPipeFactory(log).SpawnWorker(cmd)
w, err := NewPipeFactory(log).SpawnWorkerWithContext(context.Background(), cmd)
assert.Error(t, err)
assert.Nil(t, w)
}
Expand All @@ -89,7 +89,7 @@ func Test_Pipe_PipeError3(t *testing.T) {
t.Errorf("error creating the STDIN pipe: error %v", err)
}

w, err := NewPipeFactory(log).SpawnWorker(cmd)
w, err := NewPipeFactory(log).SpawnWorkerWithContext(context.Background(), cmd)
assert.Error(t, err)
assert.Nil(t, w)
}
Expand All @@ -101,28 +101,28 @@ func Test_Pipe_PipeError4(t *testing.T) {
t.Errorf("error creating the STDIN pipe: error %v", err)
}

w, err := NewPipeFactory(log).SpawnWorker(cmd)
w, err := NewPipeFactory(log).SpawnWorkerWithContext(context.Background(), cmd)
assert.Error(t, err)
assert.Nil(t, w)
}

func Test_Pipe_Failboot2(t *testing.T) {
cmd := exec.Command("php", "../../tests/failboot.php")
w, err := NewPipeFactory(log).SpawnWorker(cmd)
w, err := NewPipeFactory(log).SpawnWorkerWithContext(context.Background(), cmd)
assert.Nil(t, w)
assert.Error(t, err)
}

func Test_Pipe_Invalid2(t *testing.T) {
cmd := exec.Command("php", "../../tests/invalid.php")
w, err := NewPipeFactory(log).SpawnWorker(cmd)
w, err := NewPipeFactory(log).SpawnWorkerWithContext(context.Background(), cmd)
assert.Error(t, err)
assert.Nil(t, w)
}

func Test_Pipe_Echo2(t *testing.T) {
cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
w, err := NewPipeFactory(log).SpawnWorker(cmd)
w, err := NewPipeFactory(log).SpawnWorkerWithContext(context.Background(), cmd)
assert.NoError(t, err)

res, err := w.Exec(context.Background(), &payload.Payload{Body: []byte("hello")})
Expand All @@ -145,7 +145,7 @@ func Test_Pipe_Echo2(t *testing.T) {

func Test_Pipe_Broken2(t *testing.T) {
cmd := exec.Command("php", "../../tests/client.php", "broken", "pipes")
w, err := NewPipeFactory(log).SpawnWorker(cmd)
w, err := NewPipeFactory(log).SpawnWorkerWithContext(context.Background(), cmd)
assert.NoError(t, err)
require.NotNil(t, w)

Expand All @@ -162,7 +162,7 @@ func Benchmark_Pipe_SpawnWorker_Stop2(b *testing.B) {
f := NewPipeFactory(log)
for n := 0; n < b.N; n++ {
cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
w, _ := f.SpawnWorker(cmd)
w, _ := f.SpawnWorkerWithContext(context.Background(), cmd)
go func() {
if w.Wait() != nil {
b.Fail()
Expand All @@ -179,7 +179,7 @@ func Benchmark_Pipe_SpawnWorker_Stop2(b *testing.B) {
func Benchmark_Pipe_Worker_ExecEcho2(b *testing.B) {
cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")

w, _ := NewPipeFactory(log).SpawnWorker(cmd)
w, _ := NewPipeFactory(log).SpawnWorkerWithContext(context.Background(), cmd)

b.ReportAllocs()
b.ResetTimer()
Expand All @@ -205,7 +205,7 @@ func Benchmark_Pipe_Worker_ExecEcho2(b *testing.B) {

func Benchmark_Pipe_Worker_ExecEcho4(b *testing.B) {
cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
w, err := NewPipeFactory(log).SpawnWorker(cmd)
w, err := NewPipeFactory(log).SpawnWorkerWithContext(context.Background(), cmd)
if err != nil {
b.Fatal(err)
}
Expand All @@ -226,7 +226,7 @@ func Benchmark_Pipe_Worker_ExecEcho4(b *testing.B) {

func Benchmark_Pipe_Worker_ExecEchoWithoutContext2(b *testing.B) {
cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
w, err := NewPipeFactory(log).SpawnWorker(cmd)
w, err := NewPipeFactory(log).SpawnWorkerWithContext(context.Background(), cmd)
if err != nil {
b.Fatal(err)
}
Expand All @@ -248,7 +248,7 @@ func Benchmark_Pipe_Worker_ExecEchoWithoutContext2(b *testing.B) {
func Test_Echo2(t *testing.T) {
cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")

w, err := NewPipeFactory(log).SpawnWorker(cmd)
w, err := NewPipeFactory(log).SpawnWorkerWithContext(context.Background(), cmd)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -276,7 +276,7 @@ func Test_Echo2(t *testing.T) {
func Test_BadPayload2(t *testing.T) {
cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")

w, _ := NewPipeFactory(log).SpawnWorker(cmd)
w, _ := NewPipeFactory(log).SpawnWorkerWithContext(context.Background(), cmd)

go func() {
assert.NoError(t, w.Wait())
Expand All @@ -296,7 +296,7 @@ func Test_BadPayload2(t *testing.T) {
func Test_String2(t *testing.T) {
cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")

w, _ := NewPipeFactory(log).SpawnWorker(cmd)
w, _ := NewPipeFactory(log).SpawnWorkerWithContext(context.Background(), cmd)
go func() {
assert.NoError(t, w.Wait())
}()
Expand All @@ -315,7 +315,7 @@ func Test_String2(t *testing.T) {
func Test_Echo_Slow2(t *testing.T) {
cmd := exec.Command("php", "../../tests/slow-client.php", "echo", "pipes", "10", "10")

w, _ := NewPipeFactory(log).SpawnWorker(cmd)
w, _ := NewPipeFactory(log).SpawnWorkerWithContext(context.Background(), cmd)
go func() {
assert.NoError(t, w.Wait())
}()
Expand All @@ -339,7 +339,7 @@ func Test_Echo_Slow2(t *testing.T) {
func Test_Broken2(t *testing.T) {
cmd := exec.Command("php", "../../tests/client.php", "broken", "pipes")

w, err := NewPipeFactory(log).SpawnWorker(cmd)
w, err := NewPipeFactory(log).SpawnWorkerWithContext(context.Background(), cmd)
if err != nil {
t.Fatal(err)
}
Expand All @@ -355,7 +355,7 @@ func Test_Broken2(t *testing.T) {
func Test_Error2(t *testing.T) {
cmd := exec.Command("php", "../../tests/client.php", "error", "pipes")

w, _ := NewPipeFactory(log).SpawnWorker(cmd)
w, _ := NewPipeFactory(log).SpawnWorkerWithContext(context.Background(), cmd)
go func() {
assert.NoError(t, w.Wait())
}()
Expand All @@ -380,7 +380,7 @@ func Test_Error2(t *testing.T) {
func Test_NumExecs2(t *testing.T) {
cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")

w, _ := NewPipeFactory(log).SpawnWorker(cmd)
w, _ := NewPipeFactory(log).SpawnWorkerWithContext(context.Background(), cmd)
go func() {
assert.NoError(t, w.Wait())
}()
Expand Down
Loading
Loading