From e4c66f60d2f91e766025b312ca26ac1e18702ce6 Mon Sep 17 00:00:00 2001 From: abulo Date: Thu, 29 Aug 2024 23:39:21 +0800 Subject: [PATCH] =?UTF-8?q?feat=E2=9C=A8:=20add=20logger=20driver=20sql?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- core/env/const.go | 2 +- core/logger/es/exec.go | 9 +- core/logger/es/hook.go | 8 +- core/logger/mongo/exec.go | 21 +++-- core/logger/mongo/hook.go | 8 +- core/logger/sql/exec.go | 69 +++++++++++++++ core/logger/sql/hook.go | 179 ++++++++++++++++++++++++++++++++++++++ go.mod | 29 +++--- 8 files changed, 290 insertions(+), 35 deletions(-) create mode 100644 core/logger/sql/exec.go create mode 100644 core/logger/sql/hook.go diff --git a/core/env/const.go b/core/env/const.go index 72d8752a..0fbf0928 100644 --- a/core/env/const.go +++ b/core/env/const.go @@ -12,7 +12,7 @@ import ( ) const ( - ratelVersion = "v3.8.5" + ratelVersion = "v3.8.6" ) var ( diff --git a/core/logger/es/exec.go b/core/logger/es/exec.go index ca4aa24b..705715f0 100644 --- a/core/logger/es/exec.go +++ b/core/logger/es/exec.go @@ -15,21 +15,24 @@ type ExecCloser interface { type defaultExec struct { client *elasticsearch.Client + index string canClose bool } // NewExec create an exec instance -func NewExec(client *elasticsearch.Client) ExecCloser { +func NewExec(client *elasticsearch.Client, index string) ExecCloser { return &defaultExec{ client: client, + index: index, canClose: true, } } // NewExecWithURL create an exec instance -func NewExecWithURL(client *elasticsearch.Client) ExecCloser { +func NewExecWithURL(client *elasticsearch.Client, index string) ExecCloser { return &defaultExec{ client: client, + index: index, canClose: true, } } @@ -37,7 +40,7 @@ func NewExecWithURL(client *elasticsearch.Client) ExecCloser { // Exec ... func (e *defaultExec) Exec(entry *entry.Entry) error { ctx := context.Background() - _, err := e.client.Index().Index("logger_entry").Id(uuid.New().String()).BodyJson(entry).Do(ctx) + _, err := e.client.Index().Index(e.index).Id(uuid.New().String()).BodyJson(entry).Do(ctx) if err != nil { return err } diff --git a/core/logger/es/hook.go b/core/logger/es/hook.go index cde48222..6ae9ece0 100644 --- a/core/logger/es/hook.go +++ b/core/logger/es/hook.go @@ -84,10 +84,10 @@ func SetOut(out io.Writer) Option { type Option func(*options) // Default create a default es hook -func Default(client *elasticsearch.Client, opts ...Option) *Hook { +func Default(client *elasticsearch.Client, index string, opts ...Option) *Hook { var options []Option options = append(options, opts...) - options = append(options, SetExec(NewExec(client))) + options = append(options, SetExec(NewExec(client, index))) if err := CreateIndex(client); err != nil { panic(err) @@ -96,10 +96,10 @@ func Default(client *elasticsearch.Client, opts ...Option) *Hook { } // DefaultWithURL create a default es hook -func DefaultWithURL(client *elasticsearch.Client, opts ...Option) *Hook { +func DefaultWithURL(client *elasticsearch.Client, index string, opts ...Option) *Hook { var options []Option options = append(options, opts...) - options = append(options, SetExec(NewExecWithURL(client))) + options = append(options, SetExec(NewExecWithURL(client, index))) if err := CreateIndex(client); err != nil { panic(err) } diff --git a/core/logger/mongo/exec.go b/core/logger/mongo/exec.go index b4bfff5d..9b22b3e1 100644 --- a/core/logger/mongo/exec.go +++ b/core/logger/mongo/exec.go @@ -14,23 +14,26 @@ type ExecCloser interface { } type defaultExec struct { - client *mongodb.MongoDB - canClose bool + client *mongodb.MongoDB + collection string + canClose bool } // NewExec create an exec instance -func NewExec(client *mongodb.MongoDB) ExecCloser { +func NewExec(client *mongodb.MongoDB, collection string) ExecCloser { return &defaultExec{ - client: client, - canClose: true, + client: client, + collection: collection, + canClose: true, } } // NewExecWithURL create an exec instance -func NewExecWithURL(client *mongodb.MongoDB) ExecCloser { +func NewExecWithURL(client *mongodb.MongoDB, collection string) ExecCloser { return &defaultExec{ - client: client, - canClose: true, + client: client, + collection: collection, + canClose: true, } } @@ -46,7 +49,7 @@ func (e *defaultExec) Exec(entry *entry.Entry) error { data := bson.M(entry.Data) item["data"] = data ctx := context.Background() - handler, err := e.client.NewCollection("logger_entry") + handler, err := e.client.NewCollection(e.collection) if err != nil { return err } diff --git a/core/logger/mongo/hook.go b/core/logger/mongo/hook.go index 9abc6f24..8ba4f214 100644 --- a/core/logger/mongo/hook.go +++ b/core/logger/mongo/hook.go @@ -84,18 +84,18 @@ func SetOut(out io.Writer) Option { type Option func(*options) // Default create a default mongo hook -func Default(client *mongodb.MongoDB, opts ...Option) *Hook { +func Default(client *mongodb.MongoDB, collection string, opts ...Option) *Hook { var options []Option options = append(options, opts...) - options = append(options, SetExec(NewExec(client))) + options = append(options, SetExec(NewExec(client, collection))) return New(options...) } // DefaultWithURL create a default mongo hook -func DefaultWithURL(client *mongodb.MongoDB, opts ...Option) *Hook { +func DefaultWithURL(client *mongodb.MongoDB, collection string, opts ...Option) *Hook { var options []Option options = append(options, opts...) - options = append(options, SetExec(NewExecWithURL(client))) + options = append(options, SetExec(NewExecWithURL(client, collection))) return New(options...) } diff --git a/core/logger/sql/exec.go b/core/logger/sql/exec.go new file mode 100644 index 00000000..82387a06 --- /dev/null +++ b/core/logger/sql/exec.go @@ -0,0 +1,69 @@ +package sql + +import ( + "context" + "encoding/json" + + "github.com/abulo/ratel/v3/core/logger/entry" + "github.com/abulo/ratel/v3/stores/null" + "github.com/abulo/ratel/v3/stores/sql" +) + +// ExecCloser 将logrus条目写入数据库并关闭数据库 +type ExecCloser interface { + Exec(entry *entry.Entry) error +} + +type defaultExec struct { + client sql.SqlConn + tableName string + canClose bool +} + +// NewExec create an exec instance +func NewExec(client sql.SqlConn, tableName string) ExecCloser { + return &defaultExec{ + client: client, + tableName: tableName, + canClose: true, + } +} + +// NewExecWithURL create an exec instance +func NewExecWithURL(client sql.SqlConn, tableName string) ExecCloser { + return &defaultExec{ + client: client, + tableName: tableName, + canClose: true, + } +} + +func (e *defaultExec) Exec(entry *entry.Entry) error { + daoItem := &Dao{} + daoItem.Host = null.StringFrom(entry.Host) + daoItem.File = null.StringFrom(entry.File) + daoItem.Func = null.StringFrom(entry.Func) + daoItem.Message = null.StringFrom(entry.Message) + daoItem.Level = null.StringFrom(entry.Level) + data, _ := json.Marshal(entry.Data) + daoItem.Data = null.JSONFrom(data) + builder := sql.NewBuilder() + query, args, err := builder.Table(e.tableName).Insert(data) + if err != nil { + return err + } + ctx := context.Background() + _, err = e.client.Insert(ctx, query, args...) + return err +} + +type Dao struct { + Id *int64 `db:"id,-" json:"id"` + Host null.String `db:"host" json:"host"` + Timestamp null.DateTime `db:"timestamp" json:"timestamp"` + File null.String `db:"file" json:"file"` + Func null.String `db:"func" json:"func"` + Message null.String `db:"message" json:"message"` + Level null.String `db:"level" json:"level"` + Data null.JSON `db:"data" json:"data"` +} diff --git a/core/logger/sql/hook.go b/core/logger/sql/hook.go new file mode 100644 index 00000000..3419db66 --- /dev/null +++ b/core/logger/sql/hook.go @@ -0,0 +1,179 @@ +package sql + +import ( + "fmt" + "io" + "os" + "strings" + + "github.com/abulo/ratel/v3/core/logger/entry" + "github.com/abulo/ratel/v3/core/logger/queue" + "github.com/abulo/ratel/v3/stores/sql" + "github.com/abulo/ratel/v3/util" + "github.com/sirupsen/logrus" +) + +var defaultOptions = options{ + maxQueues: 512, + maxWorkers: 2, + levels: []logrus.Level{ + logrus.FatalLevel, + logrus.ErrorLevel, + logrus.WarnLevel, + logrus.InfoLevel, + logrus.DebugLevel, + }, + out: os.Stderr, +} + +type options struct { + maxQueues int + maxWorkers int + extra map[string]any + exec ExecCloser + levels []logrus.Level + out io.Writer +} + +// SetMaxQueues 设置缓冲区的数量 +func SetMaxQueues(maxQueues int) Option { + return func(o *options) { + o.maxQueues = maxQueues + } +} + +// SetMaxWorkers 设置工作线程数 +func SetMaxWorkers(maxWorkers int) Option { + return func(o *options) { + o.maxWorkers = maxWorkers + } +} + +// SetExtra 设置扩展参数 +func SetExtra(extra map[string]any) Option { + return func(o *options) { + o.extra = extra + } +} + +// SetExec 设置Execer接口 +func SetExec(exec ExecCloser) Option { + return func(o *options) { + o.exec = exec + } +} + +// SetLevels 设置可用的日志级别 +func SetLevels(levels ...logrus.Level) Option { + return func(o *options) { + if len(levels) == 0 { + return + } + o.levels = levels + } +} + +// SetOut 设置错误输出 +func SetOut(out io.Writer) Option { + return func(o *options) { + o.out = out + } +} + +// Option 钩子参数选项 +type Option func(*options) + +// Default create a default mongo hook +func Default(client sql.SqlConn, tableName string, opts ...Option) *Hook { + var options []Option + options = append(options, opts...) + options = append(options, SetExec(NewExec(client, tableName))) + return New(options...) +} + +// DefaultWithURL create a default mongo hook +func DefaultWithURL(client sql.SqlConn, tableName string, opts ...Option) *Hook { + var options []Option + options = append(options, opts...) + options = append(options, SetExec(NewExecWithURL(client, tableName))) + return New(options...) +} + +// New 创建一个要添加到logger实例的钩子 +func New(opt ...Option) *Hook { + opts := defaultOptions + for _, o := range opt { + o(&opts) + } + + if opts.exec == nil { + // panic("Unknown Execer interface implementation") + logrus.Info("Unknown Execer interface implementation") + } + + q := queue.NewQueue(opts.maxQueues, opts.maxWorkers) + q.Run() + + return &Hook{ + opts: opts, + q: q, + } +} + +// Hook 将日志发送到数据库 +type Hook struct { + opts options + q *queue.Queue +} + +// Levels 返回可用的日志记录级别 +func (h *Hook) Levels() []logrus.Level { + return h.opts.levels +} + +// Fire 触发日志事件时将调用 +func (h *Hook) Fire(entryLogrus *logrus.Entry) error { + var funcVal string + var fileVal string + if entryLogrus.HasCaller() { + funcVal = entryLogrus.Caller.Function + fileVal = fmt.Sprintf("%s:%d", entryLogrus.Caller.File, entryLogrus.Caller.Line) + } + hostName, err := os.Hostname() + if err != nil { + hostName = "unknown" + } + level := entryLogrus.Level.String() + newEntry := &entry.Entry{ + Host: hostName, + Timestamp: util.Now(), + File: fileVal, + Func: funcVal, + Message: entryLogrus.Message, + Level: strings.ToUpper(level), + Data: entryLogrus.Data, + } + h.q.Push(queue.NewJob(newEntry, func(v any) { + h.exec(v.(*entry.Entry)) + })) + return nil +} + +func (h *Hook) exec(entry *entry.Entry) { + if extra := h.opts.extra; extra != nil { + for k, v := range extra { + if _, ok := entry.Data[k]; !ok { + entry.Data[k] = v + } + } + } + err := h.opts.exec.Exec(entry) + if err != nil && h.opts.out != nil { + fmt.Fprintf(h.opts.out, "[Mongo-Hook] Execution error: %s", err.Error()) + } +} + +// Flush 等待日志队列为空 +func (h *Hook) Flush() { + h.q.Terminate() +} diff --git a/go.mod b/go.mod index 106735da..db214a9a 100644 --- a/go.mod +++ b/go.mod @@ -3,10 +3,10 @@ module github.com/abulo/ratel/v3 go 1.23 require ( - dario.cat/mergo v1.0.0 + dario.cat/mergo v1.0.1 github.com/AlecAivazis/survey/v2 v2.3.7 github.com/BurntSushi/toml v1.4.0 - github.com/ClickHouse/clickhouse-go/v2 v2.27.1 + github.com/ClickHouse/clickhouse-go/v2 v2.28.1 github.com/cloudwego/hertz v0.9.2 github.com/codegangsta/inject v0.0.0-20150114235600-33e0aa1cb7c0 github.com/disintegration/imaging v1.6.2 @@ -24,8 +24,8 @@ require ( github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 github.com/h2non/bimg v1.1.9 github.com/hashicorp/hcl v1.0.0 - github.com/hashicorp/hcl/v2 v2.21.0 - github.com/issue9/term/v3 v3.2.9 + github.com/hashicorp/hcl/v2 v2.22.0 + github.com/issue9/term/v3 v3.3.0 github.com/jlaffaye/ftp v0.2.0 github.com/json-iterator/go v1.1.12 github.com/lib/pq v1.10.9 @@ -35,7 +35,7 @@ require ( github.com/opentracing/opentracing-go v1.2.0 github.com/pkg/errors v0.9.1 github.com/pkg/sftp v1.13.6 - github.com/prometheus/client_golang v1.19.1 + github.com/prometheus/client_golang v1.20.2 github.com/redis/go-redis/v9 v9.6.1 github.com/sirupsen/logrus v1.9.3 github.com/smallnest/weighted v0.0.0-20230419055410-36b780e40a7a @@ -55,8 +55,8 @@ require ( golang.org/x/mod v0.20.0 golang.org/x/sync v0.8.0 golang.org/x/text v0.17.0 - google.golang.org/genproto/googleapis/rpc v0.0.0-20240812133136-8ffd90a71988 - google.golang.org/grpc v1.65.0 + google.golang.org/genproto/googleapis/rpc v0.0.0-20240827150818-7e3bb234dfed + google.golang.org/grpc v1.66.0 google.golang.org/protobuf v1.34.2 gopkg.in/yaml.v2 v2.4.0 gopkg.in/yaml.v3 v3.0.1 @@ -103,7 +103,7 @@ require ( github.com/issue9/errwrap v0.3.2 // indirect github.com/josharian/intern v1.0.0 // indirect github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 // indirect - github.com/klauspost/compress v1.17.7 // indirect + github.com/klauspost/compress v1.17.9 // indirect github.com/klauspost/cpuid/v2 v2.2.7 // indirect github.com/kr/fs v0.1.0 // indirect github.com/leodido/go-urn v1.4.0 // indirect @@ -115,14 +115,15 @@ require ( github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/montanaflynn/stats v0.7.1 // indirect + github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/nyaruka/phonenumbers v1.0.55 // indirect github.com/paulmach/orb v0.11.1 // indirect github.com/pelletier/go-toml/v2 v2.2.2 // indirect github.com/pierrec/lz4/v4 v4.1.21 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect - github.com/prometheus/client_model v0.5.0 // indirect - github.com/prometheus/common v0.48.0 // indirect - github.com/prometheus/procfs v0.12.0 // indirect + github.com/prometheus/client_model v0.6.1 // indirect + github.com/prometheus/common v0.55.0 // indirect + github.com/prometheus/procfs v0.15.1 // indirect github.com/segmentio/asm v1.2.0 // indirect github.com/shopspring/decimal v1.4.0 // indirect github.com/spf13/pflag v1.0.5 // indirect @@ -145,11 +146,11 @@ require ( go.uber.org/zap v1.27.0 // indirect golang.org/x/arch v0.8.0 // indirect golang.org/x/exp v0.0.0-20230510235704-dd950f8aeaea // indirect - golang.org/x/net v0.27.0 // indirect - golang.org/x/sys v0.23.0 // indirect + golang.org/x/net v0.28.0 // indirect + golang.org/x/sys v0.24.0 // indirect golang.org/x/term v0.23.0 // indirect golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d // indirect - google.golang.org/genproto/googleapis/api v0.0.0-20240528184218-531527333157 // indirect + google.golang.org/genproto/googleapis/api v0.0.0-20240604185151-ef581f913117 // indirect ) replace github.com/bytedance/sonic/loader => github.com/bytedance/sonic/loader v0.2.0