Skip to content
This repository has been archived by the owner on Jun 27, 2024. It is now read-only.

fix: memory allocations in the debug mode #110

Merged
merged 3 commits into from
Jan 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU=
github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/uuid v1.5.0 h1:1p67kYwdtXjb0gL0BPiP1Av9wiZPo5A8z2cWkTZ+eyU=
github.com/google/uuid v1.5.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
Expand Down Expand Up @@ -43,6 +45,8 @@ github.com/tklauser/go-sysconf v0.3.13 h1:GBUpcahXSpR2xN01jhkNAbTLRk2Yzgggk8IM08
github.com/tklauser/go-sysconf v0.3.13/go.mod h1:zwleP4Q4OehZHGn4CYZDipCgg9usW5IJePewFCGVEa0=
github.com/tklauser/numcpus v0.7.0 h1:yjuerZP127QG9m5Zh/mSO4wqurYil27tHrqwRoRjpr4=
github.com/tklauser/numcpus v0.7.0/go.mod h1:bb6dMVcj8A42tSE7i32fsIUCbQNllK5iDguyOZRUzAY=
github.com/yusufpapurcu/wmi v1.2.3 h1:E1ctvB7uKFMOJw3fdOW32DwGE9I7t++CRUEMKvFoFiw=
github.com/yusufpapurcu/wmi v1.2.3/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0=
github.com/yusufpapurcu/wmi v1.2.4 h1:zFUKzehAFReQwLys1b/iSMl+JQGSCSjtVqQn9bBrPo0=
github.com/yusufpapurcu/wmi v1.2.4/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0=
go.uber.org/goleak v1.2.0 h1:xqgm/S+aQvhWFTtR0XK3Jvg7z8kGV8P4X14IzwN3Eqk=
Expand Down
75 changes: 57 additions & 18 deletions pool/static_pool/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
}

go func() {
// read the exit status to prevent process to be a zombie
// read the exit status to prevent process to become a zombie
_ = w.Wait()
}()

Expand All @@ -30,15 +30,18 @@
return nil, err
}

// create a channel for the stream (only if there are no errors)
resp := make(chan *PExec, 1000000)

switch {
case rsp.Flags&frame.STREAM != 0:
// create a channel for the stream (only if there are no errors)
resp := make(chan *PExec, 5)
// send the initial frame
resp <- newPExec(rsp, nil)

Check warning on line 39 in pool/static_pool/debug.go

View check run for this annotation

Codecov / codecov/patch

pool/static_pool/debug.go#L35-L39

Added lines #L35 - L39 were not covered by tests
// in case of stream, we should not return worker immediately
go func() {
// would be called on Goexit
defer func() {
sp.log.Debug("stopping [stream] worker", zap.Int("pid", int(w.Pid())), zap.String("state", w.State().String()))

Check warning on line 44 in pool/static_pool/debug.go

View check run for this annotation

Codecov / codecov/patch

pool/static_pool/debug.go#L44

Added line #L44 was not covered by tests
close(resp)
// destroy the worker
errD := w.Stop()
Expand All @@ -53,41 +56,77 @@
}
}()

// send the initial frame
resp <- newPExec(rsp, nil)

// stream iterator
for {
select {
// we received stop signal
case <-stopCh:
sp.log.Debug("stream stop signal received", zap.Int("pid", int(w.Pid())), zap.String("state", w.State().String()))

Check warning on line 64 in pool/static_pool/debug.go

View check run for this annotation

Codecov / codecov/patch

pool/static_pool/debug.go#L64

Added line #L64 was not covered by tests
ctxT, cancelT := context.WithTimeout(ctx, sp.cfg.StreamTimeout)
err = w.StreamCancel(ctxT)
cancelT()

Check warning on line 67 in pool/static_pool/debug.go

View check run for this annotation

Codecov / codecov/patch

pool/static_pool/debug.go#L67

Added line #L67 was not covered by tests
if err != nil {
w.State().Transition(fsm.StateErrored)

Check warning on line 69 in pool/static_pool/debug.go

View check run for this annotation

Codecov / codecov/patch

pool/static_pool/debug.go#L69

Added line #L69 was not covered by tests
sp.log.Warn("stream cancel error", zap.Error(err))
w.State().Transition(fsm.StateInvalid)
} else {
// successfully canceled
w.State().Transition(fsm.StateReady)
sp.log.Debug("transition to the ready state", zap.String("from", w.State().String()))

Check warning on line 74 in pool/static_pool/debug.go

View check run for this annotation

Codecov / codecov/patch

pool/static_pool/debug.go#L71-L74

Added lines #L71 - L74 were not covered by tests
}

cancelT()
runtime.Goexit()
default:
pld, next, errI := w.StreamIterWithContext(ctx)
if errI != nil {
resp <- newPExec(nil, errI)
runtime.Goexit()
}
// we have to set a stream timeout on every request
switch sp.supervisedExec {
case true:
ctxT, cancelT := context.WithTimeout(context.Background(), sp.cfg.Supervisor.ExecTTL)
pld, next, errI := w.StreamIterWithContext(ctxT)
cancelT()
if errI != nil {
sp.log.Warn("stream error", zap.Error(err))

resp <- newPExec(nil, errI)

// move worker to the invalid state to restart
w.State().Transition(fsm.StateInvalid)
runtime.Goexit()
}

Check warning on line 93 in pool/static_pool/debug.go

View check run for this annotation

Codecov / codecov/patch

pool/static_pool/debug.go#L79-L93

Added lines #L79 - L93 were not covered by tests

resp <- newPExec(pld, nil)

if !next {
w.State().Transition(fsm.StateReady)
// we've got the last frame
runtime.Goexit()
}
case false:
// non supervised execution, can potentially hang here
pld, next, errI := w.StreamIter()
if errI != nil {
sp.log.Warn("stream iter error", zap.Error(err))
// send error response
resp <- newPExec(nil, errI)

// move worker to the invalid state to restart
w.State().Transition(fsm.StateInvalid)
runtime.Goexit()
}

Check warning on line 113 in pool/static_pool/debug.go

View check run for this annotation

Codecov / codecov/patch

pool/static_pool/debug.go#L95-L113

Added lines #L95 - L113 were not covered by tests

resp <- newPExec(pld, nil)

Check warning on line 115 in pool/static_pool/debug.go

View check run for this annotation

Codecov / codecov/patch

pool/static_pool/debug.go#L115

Added line #L115 was not covered by tests

resp <- newPExec(pld, nil)
if !next {
// we've got the last frame
runtime.Goexit()
if !next {
w.State().Transition(fsm.StateReady)
// we've got the last frame
runtime.Goexit()
}

Check warning on line 121 in pool/static_pool/debug.go

View check run for this annotation

Codecov / codecov/patch

pool/static_pool/debug.go#L117-L121

Added lines #L117 - L121 were not covered by tests
Comment on lines +79 to +121
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logic for handling stream iteration based on the supervisedExec flag from lines 79-121 introduces complexity but is necessary for supporting both supervised and unsupervised execution modes. However, there's repeated code for handling errors and transitioning worker states in both cases. Consider refactoring this logic into a separate function to adhere to the DRY (Don't Repeat Yourself) principle and simplify the code.

+ func handleStreamError(w *Worker, resp chan *PExec, err error) {
+     sp.log.Warn("stream error", zap.Error(err))
+     resp <- newPExec(nil, err)
+     w.State().Transition(fsm.StateInvalid)
+     runtime.Goexit()
+ }

Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.

Suggested change
// we have to set a stream timeout on every request
switch sp.supervisedExec {
case true:
ctxT, cancelT := context.WithTimeout(context.Background(), sp.cfg.Supervisor.ExecTTL)
pld, next, errI := w.StreamIterWithContext(ctxT)
cancelT()
if errI != nil {
sp.log.Warn("stream error", zap.Error(err))
resp <- newPExec(nil, errI)
// move worker to the invalid state to restart
w.State().Transition(fsm.StateInvalid)
runtime.Goexit()
}
resp <- newPExec(pld, nil)
if !next {
w.State().Transition(fsm.StateReady)
// we've got the last frame
runtime.Goexit()
}
case false:
// non supervised execution, can potentially hang here
pld, next, errI := w.StreamIter()
if errI != nil {
sp.log.Warn("stream iter error", zap.Error(err))
// send error response
resp <- newPExec(nil, errI)
// move worker to the invalid state to restart
w.State().Transition(fsm.StateInvalid)
runtime.Goexit()
}
resp <- newPExec(pld, nil)
resp <- newPExec(pld, nil)
if !next {
// we've got the last frame
runtime.Goexit()
if !next {
w.State().Transition(fsm.StateReady)
// we've got the last frame
runtime.Goexit()
}
// we have to set a stream timeout on every request
switch sp.supervisedExec {
case true:
ctxT, cancelT := context.WithTimeout(context.Background(), sp.cfg.Supervisor.ExecTTL)
pld, next, errI := w.StreamIterWithContext(ctxT)
cancelT()
if errI != nil {
sp.log.Warn("stream error", zap.Error(err))
resp <- newPExec(nil, errI)
// move worker to the invalid state to restart
w.State().Transition(fsm.StateInvalid)
runtime.Goexit()
}
resp <- newPExec(pld, nil)
if !next {
w.State().Transition(fsm.StateReady)
// we've got the last frame
runtime.Goexit()
}
case false:
// non supervised execution, can potentially hang here
pld, next, errI := w.StreamIter()
if errI != nil {
sp.log.Warn("stream iter error", zap.Error(err))
// send error response
resp <- newPExec(nil, errI)
// move worker to the invalid state to restart
w.State().Transition(fsm.StateInvalid)
runtime.Goexit()
}
resp <- newPExec(pld, nil)
if !next {
w.State().Transition(fsm.StateReady)
// we've got the last frame
runtime.Goexit()
}
func handleStreamError(w *Worker, resp chan *PExec, err error) {
sp.log.Warn("stream error", zap.Error(err))
resp <- newPExec(nil, err)
w.State().Transition(fsm.StateInvalid)
runtime.Goexit()
}

}
}
}
}()

return resp, nil
default:
resp := make(chan *PExec, 1)
resp <- newPExec(rsp, nil)
// close the channel
close(resp)
Expand Down
Loading