-
Notifications
You must be signed in to change notification settings - Fork 453
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[dbnode] Use go context to cancel index query workers after timeout #3194
Conversation
ea7c531
to
9e45698
Compare
12b02b7
to
bc86a69
Compare
This prevents a race in the previous code that only checked the timeout when scheduling a new worker. Once a worker was dispatched it could in theory run forever since it would not recheck the timeout. Now the background goroutine will cancel the worker after the timeout. The worker checks the cancelation after each batch, so at worst a worker will process one more additional batch after the timeout.
bc86a69
to
8d51d21
Compare
Codecov Report
@@ Coverage Diff @@
## master #3194 +/- ##
=========================================
- Coverage 72.3% 72.2% -0.1%
=========================================
Files 1087 1087
Lines 100864 100667 -197
=========================================
- Hits 72934 72773 -161
+ Misses 22869 22839 -30
+ Partials 5061 5055 -6
Flags with carried forward coverage won't be shown. Click here to find out more. Continue to review full report at Codecov.
|
@@ -254,7 +253,14 @@ func testNamespaceIndexHighConcurrentQueries( | |||
r index.QueryResults, | |||
logFields []opentracinglog.Field, | |||
) (bool, error) { | |||
timeoutWg.Wait() | |||
// block until the query will be canceled due to timeout. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@robskillington this test broke because it was relying on the original flow control, that has now changed. now there is a no longer a goroutine waiting for the timeout and returning. instead there is a goroutine watching for the timeout and interrupting the workers (which ultimately results in a return). however, that doesn't work with the previous flow since block.Query
never runs to get interrupted.
this patch makes the test go green, but unclear if it has the same coverage as before. I'm not sure what the previous test was actually trying to accomplish. It appears it was trying to stress test racing to close the ctx. However, not even sure if that's possible anymore since we are properly interrupting the workers before returning from the rpc call.
src/dbnode/storage/index.go
Outdated
// launch a goroutine to interrupt the query workers after the timeout. | ||
fnDoneCh := make(chan struct{}, 1) // don't block the defer in the case of a timeout. | ||
defer func() { | ||
fnDoneCh <- struct{}{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is why we really need to start embracing the std go ctx instead. ideally this would be
ctx, cancel := WithCancel(ctx)
defer cancel()
select {
<-ctx.Done()
}
however, the m3 Context
makes that difficult to achieve, since it's doesn't have the same immutability pattern. If you do Context.SetGoContext(ctx)
you'll impact the ctx of the caller with the new cancel, which is not good.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah maybe we should honestly just remove the whole concept of "not having a timeout" here and rely on the context having an existing one?
As @nbroyles discovered it's basically guaranteed you'll have a serverside Go context with a timeout specified already with TChannel Thrift.
@@ -1594,89 +1625,62 @@ func (i *nsIndex) queryWithSpan( | |||
break | |||
} | |||
|
|||
if applyTimeout := timeout > 0; !applyTimeout { | |||
wg.Add(1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this could use some close eyes. it should behave exactly as before. instead of launching a goroutine to watch the timeout and return, we are using a goroutine to watch the timeout and interrupt. when the workers are interrupted, it will ultimately return preserving the existing behavior and properly cleaning up.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I think this looks good.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
src/dbnode/storage/index_test.go
Outdated
if !c.TryCheckout() { | ||
return false, index.ErrCancelledQuery | ||
} | ||
time.Sleep(time.Millisecond) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is this needed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
figured it was better than tight looping in a test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this runs in a separate goroutine (from the worker pool), so probably best to not crush the cpu.
src/dbnode/storage/index.go
Outdated
stdCtx, ctxFound := ctx.GoContext() | ||
if !ctxFound { | ||
stdCtx = stdctx.Background() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we add a convenience method to the M3 context such as:
goCtx := ctx.GoContextOrBackground()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I went with ctx.MustGoContext() stdctx.Context
since it is guaranteed to be set for a thrift rpc handler. Also added a new helper constructors for tests NewBackground()
and NewWithGoContext()
src/dbnode/storage/index.go
Outdated
} | ||
select { | ||
case <-stdCtx.Done(): | ||
return false, fmt.Errorf("caller canceled the query before it started") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Use a var errQueryCancelledBeforeStarted = errors.New("....")
to the top of this file if this is a constant?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just used the standard index.ErrQueryCanceled
src/dbnode/storage/index.go
Outdated
} | ||
|
||
func (i *nsIndex) execBlockQueryFn( | ||
ctx context.Context, | ||
cancellable *xresource.CancellableLifetime, | ||
_ *xresource.CancellableLifetime, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we just remove these from the exec block query fn's since they are unused now? Or is this just here until aggregate does not use cancellable anymore?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed since I updated the aggregate path as well
src/dbnode/storage/index.go
Outdated
@@ -1740,7 +1680,7 @@ func (i *nsIndex) execBlockQueryFn( | |||
|
|||
func (i *nsIndex) execBlockWideQueryFn( | |||
ctx context.Context, | |||
cancellable *xresource.CancellableLifetime, | |||
_ *xresource.CancellableLifetime, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we just remove these from the exec block query fn's since they are unused now? Or is this just here until aggregate does not use cancellable anymore?
src/dbnode/storage/index/block.go
Outdated
sp opentracing.Span, | ||
logFields []opentracinglog.Field, | ||
) (bool, error) { | ||
results DocumentResults) (bool, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: For consistency the majority of our indentation for function definitions usually has each argument with a trailing comma like this was before:
e.g.
func (b *block) queryWithSpan(
ctx context.Context,
...
results DocumentResults,
) (bool, error) {
// ...
}
src/dbnode/storage/index/block.go
Outdated
select { | ||
case <-ctx.MustGoContext().Done(): | ||
// indexNs will log something useful. | ||
return false, ErrCancelledQuery | ||
default: | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we already check for cancellation at the query worker pool level before the start of this query starts executing, do we need one as well before the batches do you think?
Probably fine to be here, but also probably would be fine to omit this too since we check it before we get the token and so we probably momentarily check this prior to now (haven't started searches yet I believe?).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yea I added this to get a concurrency test to actually return a timeout, since the test had less blocks than the batch :/ not a good reason. I'll remove and get the test working another way.
it might not be a bad idea to check inside the iterator when the len(batch) == 0, since that would mean search has run
src/x/sync/types.go
Outdated
@@ -88,6 +89,9 @@ type WorkerPool interface { | |||
|
|||
// GoWithTimeoutInstrument instruments GoWithTimeout with timing information. | |||
GoWithTimeoutInstrument(work Work, timeout time.Duration) ScheduleResult | |||
|
|||
// GoWithCtx waits until a worker is available or the provided ctx is canceled. | |||
GoWithCtx(ctx context.Context, work Work) ScheduleResult |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Other naming usually only uses abbreviations in local vars or struct fields and the type itself is fully spelled out. Guess you could call it Java-iesque naming... but anyhow would be slightly more consistent to call this GoWithContext(...)
.
// Don't give out a token if the ctx has already been canceled. | ||
select { | ||
case <-ctx.Done(): | ||
return ScheduleResult{Available: false, WaitTime: 0} | ||
default: | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me as is
@@ -592,7 +592,7 @@ func (td testDatapoints) toRPCSegments(th testFetchTaggedHelper, start time.Time | |||
for _, dp := range td { | |||
require.NoError(th.t, enc.Encode(dp, testFetchTaggedTimeUnit, nil), fmt.Sprintf("%+v", dp)) | |||
} | |||
reader, ok := enc.Stream(context.NewContext()) | |||
reader, ok := enc.Stream(context.NewBackground()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a mechanical refactoring that I scoped to test files only
git grep --name-only "\.NewContext()" | grep "_test.go" | xargs sed -i '' 's/\.NewContext()/\.NewBackground()/'
It was getting painful to update individual failing tests that were missing a go context when querying
* master: (30 commits) [dbnode] Use go context to cancel index query workers after timeout (#3194) [aggregator] Fix change ActivePlacement semantics on close (#3201) [aggregator] Simplify (Active)StagedPlacement API (#3199) [aggregator] Checking if metadata is set to default should not cause copying (#3198) [dbnode] Remove readers and writer from aggregator API (#3122) [aggregator] Avoid large copies in entry rollup comparisons by making them more inline-friendly (#3195) [dbnode] Re-add aggregator doc limit update (#3137) [m3db] Do not close reader in filterFieldsIterator.Close() (#3196) Revert "Remove disk series read limit (#3174)" (#3193) [instrument] Improve sampled timer and stopwatch performance (#3191) Omit unset fields in metadata json (#3189) [dbnode] Remove left-over code in storage/bootstrap/bootstrapper (#3190) [dbnode][coordinator] Support match[] in label endpoints (#3180) Instrument the worker pool with the wait time (#3188) Instrument query path (#3182) [aggregator] Remove indirection, large copy from unaggregated protobuf decoder (#3186) [aggregator] Sample timers completely (#3184) [aggregator] Reduce error handling overhead in rawtcp server (#3183) [aggregator] Move shardID calculation out of critical section (#3179) Move instrumentation cleanup to FetchTaggedResultIterator Close() (#3173) ...
This prevents a race in the previous code that only checked the timeout
when scheduling a new worker. Once a worker was dispatched it could in
theory run forever since it would not recheck the timeout.
What this PR does / why we need it:
Fixes #
Special notes for your reviewer:
Does this PR introduce a user-facing and/or backwards incompatible change?:
Does this PR require updating code package or user-facing documentation?: