Skip to content
This repository has been archived by the owner on Jun 27, 2024. It is now read-only.

Commit

Permalink
[#105]: fix: streaming responses lock-up under some conditions
Browse files Browse the repository at this point in the history
  • Loading branch information
rustatian authored Jan 4, 2024
2 parents 5417ebb + d41504f commit e3ef277
Show file tree
Hide file tree
Showing 9 changed files with 218 additions and 123 deletions.
Binary file removed .DS_Store
Binary file not shown.
8 changes: 4 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,19 @@ module github.com/roadrunner-server/sdk/v4

go 1.21

toolchain go1.22rc1
toolchain go1.21.5

require (
github.com/goccy/go-json v0.10.2
github.com/google/uuid v1.5.0
github.com/prometheus/client_golang v1.17.0
github.com/prometheus/client_golang v1.18.0
github.com/roadrunner-server/errors v1.3.0
github.com/roadrunner-server/goridge/v3 v3.8.1
github.com/roadrunner-server/tcplisten v1.4.0
github.com/shirou/gopsutil v3.21.11+incompatible
github.com/stretchr/testify v1.8.4
go.uber.org/zap v1.26.0
golang.org/x/sync v0.5.0
golang.org/x/sync v0.6.0
)

require (
Expand All @@ -31,7 +31,7 @@ require (
github.com/tklauser/numcpus v0.7.0 // indirect
github.com/yusufpapurcu/wmi v1.2.3 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/sys v0.15.0 // indirect
golang.org/x/sys v0.16.0 // indirect
google.golang.org/protobuf v1.32.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
19 changes: 6 additions & 13 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ github.com/go-ole/go-ole v1.3.0 h1:Dt6ye7+vXGIKZ7Xtk4s6/xVdGDQynvom7xCFEdWr6uE=
github.com/go-ole/go-ole v1.3.0/go.mod h1:5LS6F96DhAwUc7C+1HLexzMXY1xGRSryjyPPKW6zv78=
github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU=
github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I=
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/uuid v1.5.0 h1:1p67kYwdtXjb0gL0BPiP1Av9wiZPo5A8z2cWkTZ+eyU=
Expand All @@ -19,13 +17,12 @@ github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zkfA9PSy5pEvNWRP0ET0TIVo=
github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 h1:jWpvCLoY8Z/e3VKvlsiIGKtc+UG6U5vzxaoagmhXfyg=
github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0/go.mod h1:QUyp042oQthUoa9bqDv0ER0wrtXnBruoNd7aNjkbP+k=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_golang v1.17.0 h1:rl2sfwZMtSthVU752MqfjQozy7blglC+1SOtjMAMh+Q=
github.com/prometheus/client_golang v1.17.0/go.mod h1:VeL+gMmOAxkS2IqfCq0ZmHSL+LjWfWDUmp1mBz9JgUY=
github.com/prometheus/client_golang v1.18.0 h1:HzFfmkOzH5Q8L8G+kSJKUx5dtG87sewO+FoDDqP5Tbk=
github.com/prometheus/client_golang v1.18.0/go.mod h1:T+GXkCk5wSJyOqMIzVgvvjFDlkOQntgjkJWKrN5txjA=
github.com/prometheus/client_model v0.5.0 h1:VQw1hfvPvk3Uv6Qf29VrPF32JB6rtbgI6cYPYQjL0Qw=
github.com/prometheus/client_model v0.5.0/go.mod h1:dTiFglRmd66nLR9Pv9f0mZi7B7fk5Pm3gvsjB5tr+kI=
github.com/prometheus/common v0.45.0 h1:2BGz0eBc2hdMDLnO/8n0jeB3oPrt2D08CekT0lneoxM=
Expand Down Expand Up @@ -56,16 +53,12 @@ go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0=
go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
go.uber.org/zap v1.26.0 h1:sI7k6L95XOKS281NhVKOFCUNIvv9e0w4BF8N3u+tCRo=
go.uber.org/zap v1.26.0/go.mod h1:dtElttAiwGvoJ/vj4IwHBS/gXsEu/pZ50mUIRWuG0so=
golang.org/x/sync v0.5.0 h1:60k92dhOjHxJkrqnwsfl8KuaHbn/5dl0lUPUklKo3qE=
golang.org/x/sync v0.5.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sync v0.6.0 h1:5BMeUDZ7vkXGfEr1x9B4bRcTH4lpkTkpdh0T/J+qjbQ=
golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc=
golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8=
google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU=
golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
google.golang.org/protobuf v1.32.0 h1:pPC6BG5ex8PDFnkbrGU3EixyhKcQ2aDuBS36lqK/C7I=
google.golang.org/protobuf v1.32.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
Expand Down
3 changes: 3 additions & 0 deletions ipc/pipe/pipe_factory_spawn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,16 +396,19 @@ func Test_NumExecs2(t *testing.T) {
t.Errorf("fail to execute payload: error %v", err)
}
assert.Equal(t, uint64(1), w.State().NumExecs())
w.State().Transition(fsm.StateReady)

_, err = w.Exec(context.Background(), &payload.Payload{Body: []byte("hello")})
if err != nil {
t.Errorf("fail to execute payload: error %v", err)
}
assert.Equal(t, uint64(2), w.State().NumExecs())
w.State().Transition(fsm.StateReady)

_, err = w.Exec(context.Background(), &payload.Payload{Body: []byte("hello")})
if err != nil {
t.Errorf("fail to execute payload: error %v", err)
}
assert.Equal(t, uint64(3), w.State().NumExecs())
w.State().Transition(fsm.StateReady)
}
3 changes: 3 additions & 0 deletions ipc/pipe/pipe_factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -421,17 +421,20 @@ func Test_NumExecs(t *testing.T) {
if err != nil {
t.Errorf("fail to execute payload: error %v", err)
}
w.State().Transition(fsm.StateReady)
assert.Equal(t, uint64(1), w.State().NumExecs())

_, err = w.Exec(context.Background(), &payload.Payload{Body: []byte("hello")})
if err != nil {
t.Errorf("fail to execute payload: error %v", err)
}
assert.Equal(t, uint64(2), w.State().NumExecs())
w.State().Transition(fsm.StateReady)

_, err = w.Exec(context.Background(), &payload.Payload{Body: []byte("hello")})
if err != nil {
t.Errorf("fail to execute payload: error %v", err)
}
assert.Equal(t, uint64(3), w.State().NumExecs())
w.State().Transition(fsm.StateReady)
}
6 changes: 3 additions & 3 deletions pool/static_pool/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func (sp *Pool) execDebug(ctx context.Context, p *payload.Payload, stopCh chan s
}

// create a channel for the stream (only if there are no errors)
resp := make(chan *PExec, 1)
resp := make(chan *PExec, 1000000)

switch {
case rsp.Flags&frame.STREAM != 0:
Expand Down Expand Up @@ -71,9 +71,9 @@ func (sp *Pool) execDebug(ctx context.Context, p *payload.Payload, stopCh chan s
cancelT()
runtime.Goexit()
default:
pld, next, errI := w.StreamIter()
pld, next, errI := w.StreamIterWithContext(ctx)
if errI != nil {
resp <- newPExec(nil, errI) // exit from the goroutine
resp <- newPExec(nil, errI)
runtime.Goexit()
}

Expand Down
2 changes: 2 additions & 0 deletions pool/static_pool/supervisor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ func Test_SupervisedPool_Exec(t *testing.T) {
require.NoError(t, err)
}

time.Sleep(time.Second)
require.NotEqual(t, pidBefore, p.Workers()[0].Pid())

ctxNew, cancel := context.WithTimeout(ctx, time.Second)
Expand Down Expand Up @@ -87,6 +88,7 @@ func Test_SupervisedPool_AddRemoveWorkers(t *testing.T) {
require.NoError(t, err)
}

time.Sleep(time.Second)
require.NotEqual(t, pidBefore, p.Workers()[0].Pid())

ctxNew, cancel := context.WithTimeout(ctx, time.Second)
Expand Down
133 changes: 97 additions & 36 deletions pool/static_pool/workers_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,31 +184,31 @@ begin:
rsp, err = w.Exec(ctxT, p)
case false:
// no context here
// potential problem: if the worker is hung, we can't stop it
rsp, err = w.Exec(context.Background(), p)
}

if err != nil {
if errors.Is(errors.Retry, err) {
sp.ww.Release(w)
goto begin
}
if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs {
sp.log.Debug("max requests reached", zap.Int64("pid", w.Pid()))
w.State().Transition(fsm.StateMaxJobsReached)
}

if err != nil {
// just push event if on any stage was timeout error
switch {
// for this case, worker already killed in the ExecTTL function
case errors.Is(errors.ExecTTL, err):
// for this case, worker already killed in the ExecTTL function
sp.log.Warn("worker stopped, and will be restarted", zap.String("reason", "execTTL timeout elapsed"), zap.Int64("pid", w.Pid()), zap.String("internal_event_name", events.EventExecTTL.String()), zap.Error(err))
w.State().Transition(fsm.StateExecTTLReached)
sp.ww.Release(w)

// worker should already be reallocated
return nil, err
case errors.Is(errors.SoftJob, err):
sp.log.Warn("soft worker error, worker won't be restarted", zap.String("reason", "SoftJob"), zap.Int64("pid", w.Pid()), zap.String("internal_event_name", events.EventWorkerSoftError.String()), zap.Error(err))
// soft jobs errors are allowed, just put the worker back
if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs {
// mark old as invalid and stop
w.State().Transition(fsm.StateMaxJobsReached)
}
/*
in case of soft job error, we should not kill the worker, this is just an error payload from the worker.
*/
w.State().Transition(fsm.StateReady)
sp.log.Warn("soft worker error", zap.String("reason", "SoftJob"), zap.Int64("pid", w.Pid()), zap.String("internal_event_name", events.EventWorkerSoftError.String()), zap.Error(err))
sp.ww.Release(w)

return nil, err
Expand All @@ -219,23 +219,27 @@ begin:
// kill the worker instead of sending a net packet to it
_ = w.Kill()

// do not return it, should be reallocated on Kill
return nil, err
case errors.Is(errors.Retry, err):
// put the worker back to the stack and retry the request with the new one
sp.ww.Release(w)
goto begin

default:
w.State().Transition(fsm.StateErrored)
sp.log.Warn("worker will be restarted", zap.Int64("pid", w.Pid()), zap.String("internal_event_name", events.EventWorkerDestruct.String()), zap.Error(err))

sp.ww.Release(w)
return nil, err
}
}

if sp.cfg.MaxJobs != 0 {
if w.State().NumExecs() >= sp.cfg.MaxJobs {
w.State().Transition(fsm.StateMaxJobsReached)
}
}

// create channel for the stream (only if there are no errors)
resp := make(chan *PExec, 1)
// we need to create a buffered channel to prevent blocking
resp := make(chan *PExec, 100000000)
// send the initial frame
resp <- newPExec(rsp, nil)

switch {
case rsp.Flags&frame.STREAM != 0:
Expand All @@ -244,38 +248,93 @@ begin:
go func() {
// would be called on Goexit
defer func() {
sp.log.Debug("release [stream] worker", zap.Int("pid", int(w.Pid())), zap.String("state", w.State().String()))
close(resp)
sp.ww.Release(w)
}()

// send the initial frame
resp <- newPExec(rsp, nil)

// stream iterator
for {
select {
// we received stop signal
case <-stopCh:
sp.log.Debug("stream stop signal received", zap.Int("pid", int(w.Pid())), zap.String("state", w.State().String()))
ctxT, cancelT := context.WithTimeout(ctx, sp.cfg.StreamTimeout)
err = w.StreamCancel(ctxT)
cancelT()
if err != nil {
w.State().Transition(fsm.StateErrored)
sp.log.Warn("stream cancel error", zap.Error(err))
w.State().Transition(fsm.StateInvalid)
} else {
// successfully canceled
w.State().Transition(fsm.StateReady)
sp.log.Debug("transition to the ready state", zap.String("from", w.State().String()))
}

cancelT()
runtime.Goexit()
default:
pld, next, errI := w.StreamIter()
if errI != nil {
resp <- newPExec(nil, errI) // exit from the goroutine
runtime.Goexit()
}

resp <- newPExec(pld, nil)
if !next {
// we've got the last frame
runtime.Goexit()
// we have to set a stream timeout on every request
switch sp.supervisedExec {
case true:
ctxT, cancelT := context.WithTimeout(context.Background(), sp.cfg.Supervisor.ExecTTL)
pld, next, errI := w.StreamIterWithContext(ctxT)
cancelT()
if errI != nil {
sp.log.Warn("stream error", zap.Error(err))
// send error response
select {
case resp <- newPExec(nil, errI):
default:
sp.log.Error("failed to send error", zap.Error(errI))
}
// move worker to the invalid state to restart
w.State().Transition(fsm.StateInvalid)
runtime.Goexit()
}

select {
case resp <- newPExec(pld, nil):
default:
sp.log.Error("failed to send payload chunk, stream is corrupted")
// we need to restart the worker since it can be in the incorrect state
w.State().Transition(fsm.StateErrored)
}

if !next {
w.State().Transition(fsm.StateReady)
// we've got the last frame
runtime.Goexit()
}
case false:
// non supervised execution, can potentially hang here
pld, next, errI := w.StreamIter()
if errI != nil {
sp.log.Warn("stream iter error", zap.Error(err))
// send error response
select {
case resp <- newPExec(nil, errI):
default:
sp.log.Error("failed to send error", zap.Error(errI))
}

// move worker to the invalid state to restart
w.State().Transition(fsm.StateInvalid)
runtime.Goexit()
}

select {
case resp <- newPExec(pld, nil):
default:
sp.log.Error("failed to send payload chunk, stream is corrupted")
// we need to restart the worker since it can be in the incorrect state
w.State().Transition(fsm.StateErrored)
}

if !next {
w.State().Transition(fsm.StateReady)
// we've got the last frame
runtime.Goexit()
}
}
}
}
Expand All @@ -284,7 +343,9 @@ begin:
return resp, nil
default:
sp.log.Debug("req-resp mode", zap.Int64("pid", w.Pid()))
resp <- newPExec(rsp, nil)
if w.State().Compare(fsm.StateWorking) {
w.State().Transition(fsm.StateReady)
}
// return worker back
sp.ww.Release(w)
// close the channel
Expand Down
Loading

0 comments on commit e3ef277

Please sign in to comment.