Skip to content

Commit

Permalink
plugin: add audit plugin extension point (pingcap#9136) (pingcap#9954)
Browse files Browse the repository at this point in the history
  • Loading branch information
lysu authored and zz-jason committed Apr 4, 2019
1 parent 1dba727 commit 0896d15
Show file tree
Hide file tree
Showing 22 changed files with 541 additions and 71 deletions.
15 changes: 12 additions & 3 deletions cmd/pluginpkg/pluginpkg.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,18 @@ func PluginManifest() *plugin.Manifest {
},
{{end}}
},
Validate: {{.validate}},
OnInit: {{.onInit}},
OnShutdown: {{.onShutdown}},
{{if .validate }}
Validate: {{.validate}},
{{end}}
{{if .onInit }}
OnInit: {{.onInit}},
{{end}}
{{if .onShutdown }}
OnShutdown: {{.onShutdown}},
{{end}}
{{if .onFlush }}
OnFlush: {{.onFlush}},
{{end}}
},
{{range .export}}
{{.extPoint}}: {{.impl}},
Expand Down
22 changes: 22 additions & 0 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,14 @@ import (
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/metrics"
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/plugin"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/sqlexec"
log "github.com/sirupsen/logrus"
"go.uber.org/zap"
"golang.org/x/net/context"
)

Expand Down Expand Up @@ -124,6 +126,7 @@ func (a *recordSet) Close() error {
if a.processinfo != nil {
a.processinfo.SetProcessInfo("")
}
a.stmt.logAudit()
return errors.Trace(err)
}

Expand Down Expand Up @@ -286,6 +289,7 @@ func (a *ExecStmt) handleNoDelayExecutor(ctx context.Context, sctx sessionctx.Co
}
}
a.LogSlowQuery(txnTS, err == nil)
a.logAudit()
}()

err = e.Next(ctx, e.newFirstChunk())
Expand Down Expand Up @@ -349,6 +353,24 @@ func (a *ExecStmt) buildExecutor(ctx sessionctx.Context) (Executor, error) {
// QueryReplacer replaces new line and tab for grep result including query string.
var QueryReplacer = strings.NewReplacer("\r", " ", "\n", " ", "\t", " ")

func (a *ExecStmt) logAudit() {
sessVars := a.Ctx.GetSessionVars()
if sessVars.InRestrictedSQL {
return
}
err := plugin.ForeachPlugin(plugin.Audit, func(p *plugin.Plugin) error {
audit := plugin.DeclareAuditManifest(p.Manifest)
if audit.OnGeneralEvent != nil {
cmd := mysql.Command2Str[byte(atomic.LoadUint32(&a.Ctx.GetSessionVars().CommandValue))]
audit.OnGeneralEvent(context.Background(), sessVars, plugin.Log, cmd)
}
return nil
})
if err != nil {
logutil.Logger(context.Background()).Error("log audit log failure", zap.Error(err))
}
}

// LogSlowQuery is used to print the slow query in the log files.
func (a *ExecStmt) LogSlowQuery(txnTS uint64, succ bool) {
level := log.GetLevel()
Expand Down
11 changes: 11 additions & 0 deletions executor/set.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/pingcap/parser/terror"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/plugin"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/types"
Expand Down Expand Up @@ -139,6 +140,16 @@ func (e *SetExecutor) setSysVariable(name string, v *expression.VarAssignment) e
if err != nil {
return errors.Trace(err)
}
err = plugin.ForeachPlugin(plugin.Audit, func(p *plugin.Plugin) error {
auditPlugin := plugin.DeclareAuditManifest(p.Manifest)
if auditPlugin.OnGlobalVariableEvent != nil {
auditPlugin.OnGlobalVariableEvent(context.Background(), e.ctx.GetSessionVars(), name, svalue)
}
return nil
})
if err != nil {
return err
}
} else {
// Set session scope system variable.
if sysVar.Scope&variable.ScopeSession == 0 {
Expand Down
2 changes: 2 additions & 0 deletions planner/core/optimizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ func Optimize(ctx sessionctx.Context, node ast.Node, is infoschema.InfoSchema) (
return nil, errors.Trace(err)
}

ctx.GetSessionVars().StmtCtx.Tables = builder.GetDBTableInfo()

// Maybe it's better to move this to Preprocess, but check privilege need table
// information, which is collected into visitInfo during logical plan builder.
if pm := privilege.GetPrivilegeManager(ctx); pm != nil {
Expand Down
21 changes: 21 additions & 0 deletions planner/core/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/planner/property"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/types/parser_driver"
Expand Down Expand Up @@ -324,6 +325,26 @@ func (b *planBuilder) detectSelectAgg(sel *ast.SelectStmt) bool {
return false
}

// GetDBTableInfo gets the accessed dbs and tables info.
func (b *planBuilder) GetDBTableInfo() []stmtctx.TableEntry {
var tables []stmtctx.TableEntry
existsFunc := func(tbls []stmtctx.TableEntry, tbl *stmtctx.TableEntry) bool {
for _, t := range tbls {
if t == *tbl {
return true
}
}
return false
}
for _, v := range b.visitInfo {
tbl := &stmtctx.TableEntry{DB: v.db, Table: v.table}
if !existsFunc(tables, tbl) {
tables = append(tables, *tbl)
}
}
return tables
}

func getPathByIndexName(paths []*accessPath, idxName model.CIStr, tblInfo *model.TableInfo) *accessPath {
var tablePath *accessPath
for _, path := range paths {
Expand Down
87 changes: 87 additions & 0 deletions plugin/audit.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package plugin

import (
"context"

"github.com/pingcap/parser/auth"
"github.com/pingcap/tidb/sessionctx/variable"
)

// GeneralEvent presents TiDB generate event.
type GeneralEvent byte

const (
// Log presents log event.
Log GeneralEvent = iota
// Error presents error event.
Error
// Result presents result event.
Result
// Status presents status event.
Status
)

// ConnectionEvent presents TiDB connection event.
type ConnectionEvent byte

const (
// Connected presents new connection establish event(finish auth).
Connected ConnectionEvent = iota
// Disconnect presents disconnect event.
Disconnect
// ChangeUser presents change user.
ChangeUser
// PreAuth presents event before start auth.
PreAuth
)

func (c ConnectionEvent) String() string {
switch c {
case Connected:
return "Connected"
case Disconnect:
return "Disconnect"
case ChangeUser:
return "ChangeUser"
case PreAuth:
return "PreAuth"
}
return ""
}

// ParseEvent presents events happen around parser.
type ParseEvent byte

const (
// PreParse presents event before parse.
PreParse ParseEvent = 1 + iota
// PostParse presents event after parse.
PostParse
)

// AuditManifest presents a sub-manifest that every audit plugin must provide.
type AuditManifest struct {
Manifest
// OnConnectionEvent will be called when TiDB receive or disconnect from client.
// return error will ignore and close current connection.
OnConnectionEvent func(ctx context.Context, identity *auth.UserIdentity, event ConnectionEvent, info *variable.ConnectionInfo) error
// OnGeneralEvent will be called during TiDB execution.
OnGeneralEvent func(ctx context.Context, sctx *variable.SessionVars, event GeneralEvent, cmd string)
// OnGlobalVariableEvent will be called when Change GlobalVariable.
OnGlobalVariableEvent func(ctx context.Context, sctx *variable.SessionVars, varName, varValue string)
// OnParseEvent will be called around parse logic.
OnParseEvent func(ctx context.Context, sctx *variable.SessionVars, event ParseEvent) error
}
4 changes: 2 additions & 2 deletions plugin/conn_ip_example/conn_ip_example.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ func OnShutdown(ctx context.Context, manifest *plugin.Manifest) error {
return nil
}

// NotifyEvent implements TiDB Audit plugin's NotifyEvent SPI.
func NotifyEvent(ctx context.Context) error {
// OnGeneralEvent implements TiDB Audit plugin's OnGeneralEvent SPI.
func OnGeneralEvent(ctx context.Context, sctx *variable.SessionVars, event plugin.GeneralEvent, cmd byte, stmt string) error {
fmt.Println("conn_ip_example notifiy called")
fmt.Println("variable test: ", variable.GetSysVar("conn_ip_example_test_variable").Value)
fmt.Printf("new connection by %s\n", ctx.Value("ip"))
Expand Down
20 changes: 13 additions & 7 deletions plugin/conn_ip_example/conn_ip_example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,23 @@ func Example_LoadRunShutdownPlugin() {
PluginVarNames: &pluginVarNames,
}

err := plugin.Init(ctx, cfg)
err := plugin.Load(ctx, cfg)
if err != nil {
panic(err)
}

ps := plugin.GetByKind(plugin.Audit)
for _, auditPlugin := range ps {
if auditPlugin.State != plugin.Ready {
continue
}
plugin.DeclareAuditManifest(auditPlugin.Manifest).NotifyEvent(context.Background(), nil)
// load and start TiDB domain.
err = plugin.Init(ctx, cfg)
if err != nil {
panic(err)
}

err = plugin.ForeachPlugin(plugin.Audit, func(auditPlugin *plugin.Plugin) error {
plugin.DeclareAuditManifest(auditPlugin.Manifest).OnGeneralEvent(context.Background(), nil, plugin.Log, "QUERY")
return nil
})
if err != nil {
panic(err)
}

plugin.Shutdown(context.Background())
Expand Down
2 changes: 1 addition & 1 deletion plugin/conn_ip_example/manifest.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,5 @@ validate = "Validate"
onInit = "OnInit"
onShutdown = "OnShutdown"
export = [
{extPoint="NotifyEvent", impl="NotifyEvent"}
{extPoint="OnGeneralEvent", impl="OnGeneralEvent"}
]
Loading

0 comments on commit 0896d15

Please sign in to comment.