Skip to content

Commit

Permalink
update the way to kill background executors
Browse files Browse the repository at this point in the history
  • Loading branch information
qw4990 committed Mar 27, 2019
1 parent 933fd04 commit 9325798
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 45 deletions.
47 changes: 4 additions & 43 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,11 +216,10 @@ func (a *ExecStmt) Exec(ctx context.Context) (sqlexec.RecordSet, error) {
}()
}

exec, err := a.buildExecutor(sctx)
e, err := a.buildExecutor(sctx)
if err != nil {
return nil, errors.Trace(err)
}
e := wrapReentrant(exec)

if err = e.Open(ctx); err != nil {
terror.Call(e.Close)
Expand All @@ -247,7 +246,7 @@ func (a *ExecStmt) Exec(ctx context.Context) (sqlexec.RecordSet, error) {
// If the executor doesn't return any result to the client, we execute it without delay.
if e.Schema().Len() == 0 {
return a.handleNoDelayExecutor(ctx, sctx, e)
} else if proj, ok := e.Executor.(*ProjectionExec); ok && proj.calculateNoDelay {
} else if proj, ok := e.(*ProjectionExec); ok && proj.calculateNoDelay {
// Currently this is only for the "DO" statement. Take "DO 1, @a=2;" as an example:
// the Projection has two expressions and two columns in the schema, but we should
// not return the result of the two expressions.
Expand All @@ -269,15 +268,15 @@ func (a *ExecStmt) Exec(ctx context.Context) (sqlexec.RecordSet, error) {
}, nil
}

func (a *ExecStmt) handleNoDelayExecutor(ctx context.Context, sctx sessionctx.Context, e *execReentrantWrapper) (sqlexec.RecordSet, error) {
func (a *ExecStmt) handleNoDelayExecutor(ctx context.Context, sctx sessionctx.Context, e Executor) (sqlexec.RecordSet, error) {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("executor.handleNoDelayExecutor", opentracing.ChildOf(span.Context()))
defer span1.Finish()
}

// Check if "tidb_snapshot" is set for the write executors.
// In history read mode, we can not do write operations.
switch e.Executor.(type) {
switch e.(type) {
case *DeleteExec, *InsertExec, *UpdateExec, *ReplaceExec, *LoadDataExec, *DDLExec:
snapshotTS := sctx.GetSessionVars().SnapshotTS
if snapshotTS != 0 {
Expand Down Expand Up @@ -485,41 +484,3 @@ func IsPointGetWithPKOrUniqueKeyByAutoCommit(ctx sessionctx.Context, p plannerco
return false, nil
}
}

const (
reentrantWrapperNotStart = iota
reentrantWrapperStartFail
reentrantWrapperRunning
reentrantWrapperClosed
)

// execReentrantWrapper used to watch the context specified in Open and
// stop the wrapped executor when this context is cancelled.
type execReentrantWrapper struct {
Executor
// 0, 1, 2, 3 represent not started, failed in start, running, closed
status int32
}

func wrapReentrant(exec Executor) *execReentrantWrapper {
return &execReentrantWrapper{exec, reentrantWrapperNotStart}
}

func (ecw *execReentrantWrapper) Open(ctx context.Context) error {
if !atomic.CompareAndSwapInt32(&ecw.status, reentrantWrapperNotStart, reentrantWrapperRunning) {
return nil
}

if err := ecw.Executor.Open(ctx); err != nil {
atomic.StoreInt32(&ecw.status, reentrantWrapperStartFail)
return err
}
return nil
}

func (ecw *execReentrantWrapper) Close() error {
if !atomic.CompareAndSwapInt32(&ecw.status, reentrantWrapperRunning, reentrantWrapperClosed) {
return nil
}
return ecw.Executor.Close()
}
8 changes: 6 additions & 2 deletions server/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ import (
const (
connStatusDispatching int32 = iota
connStatusReading
connStatusShutdown // Closed by server.
connStatusWaitShutdown // Notified by server to close.
connStatusShutdown // Closed by server.
connStatusWaitShutdown // Notified by server to close.
)

// newClientConn creates a *clientConn object.
Expand Down Expand Up @@ -112,6 +112,7 @@ type clientConn struct {
mu struct {
sync.RWMutex
cancelFunc context.CancelFunc
resultSets []ResultSet
}
}

Expand Down Expand Up @@ -1047,6 +1048,9 @@ func (cc *clientConn) handleQuery(ctx context.Context, sql string) (err error) {
metrics.ExecuteErrorCounter.WithLabelValues(metrics.ExecuteErrorToLabel(err)).Inc()
return errors.Trace(err)
}
cc.mu.Lock()
cc.mu.resultSets = rs
cc.mu.Unlock()
if rs != nil {
if len(rs) == 1 {
err = cc.writeResultset(ctx, rs[0], false, 0, 0)
Expand Down
6 changes: 6 additions & 0 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,12 @@ func killConn(conn *clientConn, query bool) {
}

conn.mu.RLock()
for _, resultSet := range conn.mu.resultSets {
// resultSet.Close() is reentrant so it's safe to kill a same connID concurrently or multiple times
if err := resultSet.Close(); err != nil {
logutil.Logger(context.Background()).Error("close result set error", zap.Uint32("connID", conn.connectionID), zap.Error(err))
}
}
cancelFunc := conn.mu.cancelFunc
conn.mu.RUnlock()
if cancelFunc != nil {
Expand Down

0 comments on commit 9325798

Please sign in to comment.