Skip to content

Commit

Permalink
log,server: add contextual log util & test it in conn.go (#9548) (#9980)
Browse files Browse the repository at this point in the history
  • Loading branch information
lysu authored and zz-jason committed Apr 2, 2019
1 parent 8d24954 commit 43cf3c9
Show file tree
Hide file tree
Showing 6 changed files with 96 additions and 48 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ require (
github.com/pingcap/gofail v0.0.0-20181217135706-6a951c1e42c3
github.com/pingcap/goleveldb v0.0.0-20171020084629-8d44bfdf1030
github.com/pingcap/kvproto v0.0.0-20190226063853-f6c0b7ffff11
github.com/pingcap/log v0.0.0-20190214045112-b37da76f67a7
github.com/pingcap/log v0.0.0-20190307075452-bd41d9273596
github.com/pingcap/parser v0.0.0-20190328044348-9945885931bb
github.com/pingcap/pd v2.1.0-rc.4+incompatible
github.com/pingcap/tidb-tools v2.1.3-0.20190116051332-34c808eef588+incompatible
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,9 @@ github.com/opentracing/basictracer-go v1.0.0 h1:YyUAhaEfjoWXclZVJ9sGoNct7j4TVk7l
github.com/opentracing/basictracer-go v1.0.0/go.mod h1:QfBfYuafItcjQuMwinw9GhYKwFXS9KnPs5lxoYwgW74=
github.com/opentracing/opentracing-go v1.0.2 h1:3jA2P6O1F9UOrWVpwrIo17pu01KWvNWg4X946/Y5Zwg=
github.com/opentracing/opentracing-go v1.0.2/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
github.com/pingcap/check v0.0.0-20171206051426-1c287c953996 h1:ZBdiJCMan6GSo/aPAM7gywcUKa0z58gczVrnG6TQnAQ=
github.com/pingcap/check v0.0.0-20171206051426-1c287c953996/go.mod h1:B1+S9LNcuMyLH/4HMTViQOJevkGiik3wW2AN9zb2fNQ=
github.com/pingcap/check v0.0.0-20190102082844-67f458068fc8 h1:USx2/E1bX46VG32FIw034Au6seQ2fY9NEILmNh/UlQg=
github.com/pingcap/check v0.0.0-20190102082844-67f458068fc8/go.mod h1:B1+S9LNcuMyLH/4HMTViQOJevkGiik3wW2AN9zb2fNQ=
github.com/pingcap/errors v0.11.0/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8=
github.com/pingcap/errors v0.11.1 h1:BXFZ6MdDd2U1uJUa2sRAWTmm+nieEzuyYM0R4aUTcC8=
github.com/pingcap/errors v0.11.1/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8=
github.com/pingcap/gofail v0.0.0-20181217135706-6a951c1e42c3 h1:04yuCf5NMvLU8rB2m4Qs3rynH7EYpMno3lHkewIOdMo=
Expand All @@ -98,8 +98,8 @@ github.com/pingcap/goleveldb v0.0.0-20171020084629-8d44bfdf1030 h1:XJLuW0lsP7vAt
github.com/pingcap/goleveldb v0.0.0-20171020084629-8d44bfdf1030/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw=
github.com/pingcap/kvproto v0.0.0-20190226063853-f6c0b7ffff11 h1:iGNfAHgK0VHJobW4bPTlFmdnt3YWsEHdSTIcjut6ffk=
github.com/pingcap/kvproto v0.0.0-20190226063853-f6c0b7ffff11/go.mod h1:0gwbe1F2iBIjuQ9AH0DbQhL+Dpr5GofU8fgYyXk+ykk=
github.com/pingcap/log v0.0.0-20190214045112-b37da76f67a7 h1:kOHAMalwF69bJrtWrOdVaCSvZjLucrJhP4NQKIu6uM4=
github.com/pingcap/log v0.0.0-20190214045112-b37da76f67a7/go.mod h1:xsfkWVaFVV5B8e1K9seWfyJWFrIhbtUTAD8NV1Pq3+w=
github.com/pingcap/log v0.0.0-20190307075452-bd41d9273596 h1:t2OQTpPJnrPDGlvA+3FwJptMTt6MEPdzK1Wt99oaefQ=
github.com/pingcap/log v0.0.0-20190307075452-bd41d9273596/go.mod h1:WpHUKhNZ18v116SvGrmjkA9CBhYmuUTKL+p8JC9ANEw=
github.com/pingcap/parser v0.0.0-20190328044348-9945885931bb h1:JCc0MMW4fqfhbaI8LS1dXRmql1PbpoKLxoYq6GUlSaQ=
github.com/pingcap/parser v0.0.0-20190328044348-9945885931bb/go.mod h1:1FNvfp9+J0wvc4kl8eGNh7Rqrxveg15jJoWo/a0uHwA=
github.com/pingcap/pd v2.1.0-rc.4+incompatible h1:/buwGk04aHO5odk/+O8ZOXGs4qkUjYTJ2UpCJXna8NE=
Expand Down
69 changes: 37 additions & 32 deletions server/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,9 @@ import (
"github.com/pingcap/tidb/util/arena"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/hack"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/memory"
log "github.com/sirupsen/logrus"
"go.uber.org/zap"
"golang.org/x/net/context"
)

Expand Down Expand Up @@ -120,14 +121,14 @@ func (cc *clientConn) String() string {
// handshake works like TCP handshake, but in a higher level, it first writes initial packet to client,
// during handshake, client and server negotiate compatible features and do authentication.
// After handshake, client can send sql query to server.
func (cc *clientConn) handshake() error {
func (cc *clientConn) handshake(ctx context.Context) error {
if err := cc.writeInitialHandshake(); err != nil {
return errors.Trace(err)
}
if err := cc.readOptionalSSLRequestAndHandshakeResponse(); err != nil {
if err := cc.readOptionalSSLRequestAndHandshakeResponse(ctx); err != nil {
err1 := cc.writeError(err)
if err1 != nil {
log.Debug(err1)
logutil.Logger(ctx).Debug("writeError failed", zap.Error(err1))
}
return errors.Trace(err)
}
Expand Down Expand Up @@ -234,11 +235,11 @@ type handshakeResponse41 struct {
}

// parseHandshakeResponseHeader parses the common header of SSLRequest and HandshakeResponse41.
func parseHandshakeResponseHeader(packet *handshakeResponse41, data []byte) (parsedBytes int, err error) {
func parseHandshakeResponseHeader(ctx context.Context, packet *handshakeResponse41, data []byte) (parsedBytes int, err error) {
// Ensure there are enough data to read:
// http://dev.mysql.com/doc/internals/en/connection-phase-packets.html#packet-Protocol::SSLRequest
if len(data) < 4+4+1+23 {
log.Errorf("Got malformed handshake response, packet data: %v", data)
logutil.Logger(ctx).Error("got malformed handshake response", zap.ByteString("packetData", data))
return 0, mysql.ErrMalformPacket
}

Expand All @@ -259,11 +260,11 @@ func parseHandshakeResponseHeader(packet *handshakeResponse41, data []byte) (par
}

// parseHandshakeResponseBody parse the HandshakeResponse (except the common header part).
func parseHandshakeResponseBody(packet *handshakeResponse41, data []byte, offset int) (err error) {
func parseHandshakeResponseBody(ctx context.Context, packet *handshakeResponse41, data []byte, offset int) (err error) {
defer func() {
// Check malformat packet cause out of range is disgusting, but don't panic!
if r := recover(); r != nil {
log.Errorf("handshake panic, packet data: %v", data)
logutil.Logger(ctx).Error("handshake panic", zap.ByteString("packetData", data))
err = mysql.ErrMalformPacket
}
}()
Expand Down Expand Up @@ -316,7 +317,7 @@ func parseHandshakeResponseBody(packet *handshakeResponse41, data []byte, offset
row := data[offset : offset+int(num)]
attrs, err := parseAttrs(row)
if err != nil {
log.Warn("parse attrs error:", errors.ErrorStack(err))
logutil.Logger(ctx).Warn("parse attrs failed", zap.Error(err))
return nil
}
packet.Attrs = attrs
Expand Down Expand Up @@ -346,7 +347,7 @@ func parseAttrs(data []byte) (map[string]string, error) {
return attrs, nil
}

func (cc *clientConn) readOptionalSSLRequestAndHandshakeResponse() error {
func (cc *clientConn) readOptionalSSLRequestAndHandshakeResponse(ctx context.Context) error {
// Read a packet. It may be a SSLRequest or HandshakeResponse.
data, err := cc.readPacket()
if err != nil {
Expand All @@ -355,7 +356,7 @@ func (cc *clientConn) readOptionalSSLRequestAndHandshakeResponse() error {

var resp handshakeResponse41

pos, err := parseHandshakeResponseHeader(&resp, data)
pos, err := parseHandshakeResponseHeader(ctx, &resp, data)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -370,14 +371,14 @@ func (cc *clientConn) readOptionalSSLRequestAndHandshakeResponse() error {
if err != nil {
return errors.Trace(err)
}
pos, err = parseHandshakeResponseHeader(&resp, data)
pos, err = parseHandshakeResponseHeader(ctx, &resp, data)
if err != nil {
return errors.Trace(err)
}
}

// Read the remaining part of the packet.
if err = parseHandshakeResponseBody(&resp, data, pos); err != nil {
if err = parseHandshakeResponseBody(ctx, &resp, data, pos); err != nil {
return errors.Trace(err)
}

Expand Down Expand Up @@ -426,7 +427,7 @@ func (cc *clientConn) openSessionAndDoAuth(authData []byte) error {
// Run reads client query and writes query result to client in for loop, if there is a panic during query handling,
// it will be recovered and log the panic error.
// This function returns and the connection is closed if there is an IO error or there is a panic.
func (cc *clientConn) Run() {
func (cc *clientConn) Run(ctx context.Context) {
const size = 4096
closedOutside := false
defer func() {
Expand All @@ -435,7 +436,11 @@ func (cc *clientConn) Run() {
buf := make([]byte, size)
stackSize := runtime.Stack(buf, false)
buf = buf[:stackSize]
log.Errorf("lastCmd %s, %v, %s", cc.lastCmd, r, buf)
logutil.Logger(ctx).Error("connection running loop panic",
zap.String("lastCmd", cc.lastCmd),
zap.Reflect("err", r),
zap.String("stack", string(buf)),
)
metrics.PanicCounter.WithLabelValues(metrics.LabelSession).Inc()
}
if !closedOutside {
Expand Down Expand Up @@ -463,8 +468,7 @@ func (cc *clientConn) Run() {
if terror.ErrorNotEqual(err, io.EOF) {
errStack := errors.ErrorStack(err)
if !strings.Contains(errStack, "use of closed network connection") {
log.Errorf("con:%d read packet error, close this connection %s",
cc.connectionID, errStack)
logutil.Logger(ctx).Error("read packet failed, close this connection", zap.Error(err))
}
}
return
Expand All @@ -478,26 +482,27 @@ func (cc *clientConn) Run() {
}

startTime := time.Now()
if err = cc.dispatch(data); err != nil {
if err = cc.dispatch(ctx, data); err != nil {
if terror.ErrorEqual(err, io.EOF) {
cc.addMetrics(data[0], startTime, nil)
return
} else if terror.ErrResultUndetermined.Equal(err) {
log.Errorf("con:%d result undetermined error, close this connection %s",
cc.connectionID, errors.ErrorStack(err))
logutil.Logger(ctx).Error("result undetermined, close this connection", zap.Error(err))
return
} else if terror.ErrCritical.Equal(err) {
log.Errorf("con:%d critical error, stop the server listener %s",
cc.connectionID, errors.ErrorStack(err))
logutil.Logger(ctx).Error("critical error, stop the server listener", zap.Error(err))
metrics.CriticalErrorCounter.Add(1)
select {
case cc.server.stopListenerCh <- struct{}{}:
default:
}
return
}
log.Warnf("con:%d dispatch error:\n%s\n%q\n%s",
cc.connectionID, cc, queryStrForLog(string(data[1:])), errStrForLog(err))
logutil.Logger(ctx).Warn("dispatch error",
zap.String("connInfo", cc.String()),
zap.String("sql", queryStrForLog(string(data[1:]))),
zap.String("err", errStrForLog(err)),
)
err1 := cc.writeError(err)
terror.Log(errors.Trace(err1))
}
Expand Down Expand Up @@ -591,9 +596,9 @@ func (cc *clientConn) addMetrics(cmd byte, startTime time.Time, err error) {
// dispatch handles client request based on command which is the first byte of the data.
// It also gets a token from server which is used to limit the concurrently handling clients.
// The most frequently used command is ComQuery.
func (cc *clientConn) dispatch(data []byte) error {
func (cc *clientConn) dispatch(ctx context.Context, data []byte) error {
span := opentracing.StartSpan("server.dispatch")
ctx := opentracing.ContextWithSpan(context.Background(), span)
ctx = opentracing.ContextWithSpan(ctx, span)

ctx1, cancelFunc := context.WithCancel(ctx)
cc.mu.Lock()
Expand Down Expand Up @@ -650,7 +655,7 @@ func (cc *clientConn) dispatch(data []byte) error {
case mysql.ComSetOption:
return cc.handleSetOption(data)
case mysql.ComChangeUser:
return cc.handleChangeUser(data)
return cc.handleChangeUser(ctx1, data)
default:
return mysql.NewErrf(mysql.ErrUnknown, "command %d not supported now", cmd)
}
Expand Down Expand Up @@ -806,7 +811,7 @@ func (cc *clientConn) handleLoadData(ctx context.Context, loadDataInfo *executor
curData, err = cc.readPacket()
if err != nil {
if terror.ErrorNotEqual(err, io.EOF) {
log.Error(errors.ErrorStack(err))
logutil.Logger(ctx).Error("read packet failed", zap.Error(err))
break
}
}
Expand All @@ -832,7 +837,7 @@ func (cc *clientConn) handleLoadData(ctx context.Context, loadDataInfo *executor
if err != nil {
if txn != nil && txn.Valid() {
if err1 := txn.Rollback(); err1 != nil {
log.Errorf("load data rollback failed: %v", err1)
logutil.Logger(ctx).Error("load data rollback failed", zap.Error(err1))
}
}
return errors.Trace(err)
Expand Down Expand Up @@ -962,7 +967,7 @@ func (cc *clientConn) writeResultset(ctx context.Context, rs ResultSet, binary b
buf := make([]byte, 4096)
stackSize := runtime.Stack(buf, false)
buf = buf[:stackSize]
log.Errorf("query: %s:\n%s", cc.lastCmd, buf)
logutil.Logger(ctx).Error("write query result panic", zap.String("lastCmd", cc.lastCmd), zap.String("stack", string(buf)))
}()
var err error
if mysql.HasCursorExistsFlag(serverStatus) {
Expand Down Expand Up @@ -1130,7 +1135,7 @@ func (cc *clientConn) upgradeToTLS(tlsConfig *tls.Config) error {
return nil
}

func (cc *clientConn) handleChangeUser(data []byte) error {
func (cc *clientConn) handleChangeUser(ctx context.Context, data []byte) error {
user, data := parseNullTermString(data)
cc.user = hack.String(user)
if len(data) < 1 {
Expand All @@ -1147,7 +1152,7 @@ func (cc *clientConn) handleChangeUser(data []byte) error {
cc.dbname = hack.String(dbName)
err := cc.ctx.Close()
if err != nil {
log.Debug(err)
logutil.Logger(ctx).Debug("close old context error", zap.Error(err))
}
err = cc.openSessionAndDoAuth(pass)
if err != nil {
Expand Down
15 changes: 8 additions & 7 deletions server/conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package server
import (
"bufio"
"bytes"
"context"
"encoding/binary"

. "github.com/pingcap/check"
Expand All @@ -30,7 +31,7 @@ func (ts ConnTestSuite) TestMalformHandshakeHeader(c *C) {
c.Parallel()
data := []byte{0x00}
var p handshakeResponse41
_, err := parseHandshakeResponseHeader(&p, data)
_, err := parseHandshakeResponseHeader(context.Background(), &p, data)
c.Assert(err, NotNil)
}

Expand All @@ -52,10 +53,10 @@ func (ts ConnTestSuite) TestParseHandshakeResponse(c *C) {
0x6f, 0x6f, 0x03, 0x62, 0x61, 0x72,
}
var p handshakeResponse41
offset, err := parseHandshakeResponseHeader(&p, data)
offset, err := parseHandshakeResponseHeader(context.Background(), &p, data)
c.Assert(err, IsNil)
c.Assert(p.Capability&mysql.ClientConnectAtts, Equals, mysql.ClientConnectAtts)
err = parseHandshakeResponseBody(&p, data, offset)
err = parseHandshakeResponseBody(context.Background(), &p, data, offset)
c.Assert(err, IsNil)
eq := mapIdentical(p.Attrs, map[string]string{
"_client_version": "5.6.6-m9",
Expand All @@ -75,14 +76,14 @@ func (ts ConnTestSuite) TestParseHandshakeResponse(c *C) {
0x61, 0x73, 0x73, 0x77, 0x6f, 0x72, 0x64, 0x00,
}
p = handshakeResponse41{}
offset, err = parseHandshakeResponseHeader(&p, data)
offset, err = parseHandshakeResponseHeader(context.Background(), &p, data)
c.Assert(err, IsNil)
capability := mysql.ClientProtocol41 |
mysql.ClientPluginAuth |
mysql.ClientSecureConnection |
mysql.ClientConnectWithDB
c.Assert(p.Capability&capability, Equals, capability)
err = parseHandshakeResponseBody(&p, data, offset)
err = parseHandshakeResponseBody(context.Background(), &p, data, offset)
c.Assert(err, IsNil)
c.Assert(p.User, Equals, "pam")
c.Assert(p.DBName, Equals, "test")
Expand All @@ -107,10 +108,10 @@ func (ts ConnTestSuite) TestIssue1768(c *C) {
0x79, 0x73, 0x71, 0x6c,
}
p := handshakeResponse41{}
offset, err := parseHandshakeResponseHeader(&p, data)
offset, err := parseHandshakeResponseHeader(context.Background(), &p, data)
c.Assert(err, IsNil)
c.Assert(p.Capability&mysql.ClientPluginAuthLenencClientData, Equals, mysql.ClientPluginAuthLenencClientData)
err = parseHandshakeResponseBody(&p, data, offset)
err = parseHandshakeResponseBody(context.Background(), &p, data, offset)
c.Assert(err, IsNil)
c.Assert(len(p.Auth) > 0, IsTrue)
}
Expand Down
12 changes: 8 additions & 4 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
package server

import (
"context"
"crypto/tls"
"crypto/x509"
"fmt"
Expand All @@ -50,7 +51,9 @@ import (
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/logutil"
log "github.com/sirupsen/logrus"
"go.uber.org/zap"
)

var (
Expand Down Expand Up @@ -303,25 +306,26 @@ func (s *Server) Close() {
// onConn runs in its own goroutine, handles queries from this connection.
func (s *Server) onConn(c net.Conn) {
conn := s.newConn(c)
if err := conn.handshake(); err != nil {
ctx := logutil.WithConnID(context.Background(), conn.connectionID)
if err := conn.handshake(ctx); err != nil {
// Some keep alive services will send request to TiDB and disconnect immediately.
// So we only record metrics.
metrics.HandShakeErrorCounter.Inc()
err = c.Close()
terror.Log(errors.Trace(err))
return
}
log.Infof("con:%d new connection %s", conn.connectionID, c.RemoteAddr().String())
logutil.Logger(ctx).Info("new connection", zap.String("remoteAddr", c.RemoteAddr().String()))
defer func() {
log.Infof("con:%d close connection", conn.connectionID)
logutil.Logger(ctx).Info("close connection")
}()
s.rwlock.Lock()
s.clients[conn.connectionID] = conn
connections := len(s.clients)
s.rwlock.Unlock()
metrics.ConnGauge.Set(float64(connections))

conn.Run()
conn.Run(ctx)
}

// ShowProcessList implements the SessionManager interface.
Expand Down
Loading

0 comments on commit 43cf3c9

Please sign in to comment.