Skip to content

Commit

Permalink
feat✨: add logger driver sql
Browse files Browse the repository at this point in the history
  • Loading branch information
abulo committed Aug 29, 2024
1 parent dc2bc49 commit e4c66f6
Show file tree
Hide file tree
Showing 8 changed files with 290 additions and 35 deletions.
2 changes: 1 addition & 1 deletion core/env/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
)

const (
ratelVersion = "v3.8.5"
ratelVersion = "v3.8.6"
)

var (
Expand Down
9 changes: 6 additions & 3 deletions core/logger/es/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,29 +15,32 @@ type ExecCloser interface {

type defaultExec struct {
client *elasticsearch.Client
index string
canClose bool
}

// NewExec create an exec instance
func NewExec(client *elasticsearch.Client) ExecCloser {
func NewExec(client *elasticsearch.Client, index string) ExecCloser {
return &defaultExec{
client: client,
index: index,
canClose: true,
}
}

// NewExecWithURL create an exec instance
func NewExecWithURL(client *elasticsearch.Client) ExecCloser {
func NewExecWithURL(client *elasticsearch.Client, index string) ExecCloser {
return &defaultExec{
client: client,
index: index,
canClose: true,
}
}

// Exec ...
func (e *defaultExec) Exec(entry *entry.Entry) error {
ctx := context.Background()
_, err := e.client.Index().Index("logger_entry").Id(uuid.New().String()).BodyJson(entry).Do(ctx)
_, err := e.client.Index().Index(e.index).Id(uuid.New().String()).BodyJson(entry).Do(ctx)
if err != nil {
return err
}
Expand Down
8 changes: 4 additions & 4 deletions core/logger/es/hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,10 @@ func SetOut(out io.Writer) Option {
type Option func(*options)

// Default create a default es hook
func Default(client *elasticsearch.Client, opts ...Option) *Hook {
func Default(client *elasticsearch.Client, index string, opts ...Option) *Hook {
var options []Option
options = append(options, opts...)
options = append(options, SetExec(NewExec(client)))
options = append(options, SetExec(NewExec(client, index)))

if err := CreateIndex(client); err != nil {
panic(err)
Expand All @@ -96,10 +96,10 @@ func Default(client *elasticsearch.Client, opts ...Option) *Hook {
}

// DefaultWithURL create a default es hook
func DefaultWithURL(client *elasticsearch.Client, opts ...Option) *Hook {
func DefaultWithURL(client *elasticsearch.Client, index string, opts ...Option) *Hook {
var options []Option
options = append(options, opts...)
options = append(options, SetExec(NewExecWithURL(client)))
options = append(options, SetExec(NewExecWithURL(client, index)))
if err := CreateIndex(client); err != nil {
panic(err)
}
Expand Down
21 changes: 12 additions & 9 deletions core/logger/mongo/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,23 +14,26 @@ type ExecCloser interface {
}

type defaultExec struct {
client *mongodb.MongoDB
canClose bool
client *mongodb.MongoDB
collection string
canClose bool
}

// NewExec create an exec instance
func NewExec(client *mongodb.MongoDB) ExecCloser {
func NewExec(client *mongodb.MongoDB, collection string) ExecCloser {
return &defaultExec{
client: client,
canClose: true,
client: client,
collection: collection,
canClose: true,
}
}

// NewExecWithURL create an exec instance
func NewExecWithURL(client *mongodb.MongoDB) ExecCloser {
func NewExecWithURL(client *mongodb.MongoDB, collection string) ExecCloser {
return &defaultExec{
client: client,
canClose: true,
client: client,
collection: collection,
canClose: true,
}
}

Expand All @@ -46,7 +49,7 @@ func (e *defaultExec) Exec(entry *entry.Entry) error {
data := bson.M(entry.Data)
item["data"] = data
ctx := context.Background()
handler, err := e.client.NewCollection("logger_entry")
handler, err := e.client.NewCollection(e.collection)
if err != nil {
return err
}
Expand Down
8 changes: 4 additions & 4 deletions core/logger/mongo/hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,18 +84,18 @@ func SetOut(out io.Writer) Option {
type Option func(*options)

// Default create a default mongo hook
func Default(client *mongodb.MongoDB, opts ...Option) *Hook {
func Default(client *mongodb.MongoDB, collection string, opts ...Option) *Hook {
var options []Option
options = append(options, opts...)
options = append(options, SetExec(NewExec(client)))
options = append(options, SetExec(NewExec(client, collection)))
return New(options...)
}

// DefaultWithURL create a default mongo hook
func DefaultWithURL(client *mongodb.MongoDB, opts ...Option) *Hook {
func DefaultWithURL(client *mongodb.MongoDB, collection string, opts ...Option) *Hook {
var options []Option
options = append(options, opts...)
options = append(options, SetExec(NewExecWithURL(client)))
options = append(options, SetExec(NewExecWithURL(client, collection)))
return New(options...)
}

Expand Down
69 changes: 69 additions & 0 deletions core/logger/sql/exec.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package sql

import (
"context"
"encoding/json"

"github.com/abulo/ratel/v3/core/logger/entry"
"github.com/abulo/ratel/v3/stores/null"
"github.com/abulo/ratel/v3/stores/sql"
)

// ExecCloser 将logrus条目写入数据库并关闭数据库
type ExecCloser interface {
Exec(entry *entry.Entry) error
}

type defaultExec struct {
client sql.SqlConn
tableName string
canClose bool
}

// NewExec create an exec instance
func NewExec(client sql.SqlConn, tableName string) ExecCloser {
return &defaultExec{
client: client,
tableName: tableName,
canClose: true,
}
}

// NewExecWithURL create an exec instance
func NewExecWithURL(client sql.SqlConn, tableName string) ExecCloser {
return &defaultExec{
client: client,
tableName: tableName,
canClose: true,
}
}

func (e *defaultExec) Exec(entry *entry.Entry) error {
daoItem := &Dao{}
daoItem.Host = null.StringFrom(entry.Host)
daoItem.File = null.StringFrom(entry.File)
daoItem.Func = null.StringFrom(entry.Func)
daoItem.Message = null.StringFrom(entry.Message)
daoItem.Level = null.StringFrom(entry.Level)
data, _ := json.Marshal(entry.Data)
daoItem.Data = null.JSONFrom(data)
builder := sql.NewBuilder()
query, args, err := builder.Table(e.tableName).Insert(data)
if err != nil {
return err
}
ctx := context.Background()
_, err = e.client.Insert(ctx, query, args...)
return err
}

type Dao struct {
Id *int64 `db:"id,-" json:"id"`
Host null.String `db:"host" json:"host"`
Timestamp null.DateTime `db:"timestamp" json:"timestamp"`
File null.String `db:"file" json:"file"`
Func null.String `db:"func" json:"func"`
Message null.String `db:"message" json:"message"`
Level null.String `db:"level" json:"level"`
Data null.JSON `db:"data" json:"data"`
}
179 changes: 179 additions & 0 deletions core/logger/sql/hook.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
package sql

import (
"fmt"
"io"
"os"
"strings"

"github.com/abulo/ratel/v3/core/logger/entry"
"github.com/abulo/ratel/v3/core/logger/queue"
"github.com/abulo/ratel/v3/stores/sql"
"github.com/abulo/ratel/v3/util"
"github.com/sirupsen/logrus"
)

var defaultOptions = options{
maxQueues: 512,
maxWorkers: 2,
levels: []logrus.Level{
logrus.FatalLevel,
logrus.ErrorLevel,
logrus.WarnLevel,
logrus.InfoLevel,
logrus.DebugLevel,
},
out: os.Stderr,
}

type options struct {
maxQueues int
maxWorkers int
extra map[string]any
exec ExecCloser
levels []logrus.Level
out io.Writer
}

// SetMaxQueues 设置缓冲区的数量
func SetMaxQueues(maxQueues int) Option {
return func(o *options) {
o.maxQueues = maxQueues
}
}

// SetMaxWorkers 设置工作线程数
func SetMaxWorkers(maxWorkers int) Option {
return func(o *options) {
o.maxWorkers = maxWorkers
}
}

// SetExtra 设置扩展参数
func SetExtra(extra map[string]any) Option {
return func(o *options) {
o.extra = extra
}
}

// SetExec 设置Execer接口
func SetExec(exec ExecCloser) Option {
return func(o *options) {
o.exec = exec
}
}

// SetLevels 设置可用的日志级别
func SetLevels(levels ...logrus.Level) Option {
return func(o *options) {
if len(levels) == 0 {
return
}
o.levels = levels
}
}

// SetOut 设置错误输出
func SetOut(out io.Writer) Option {
return func(o *options) {
o.out = out
}
}

// Option 钩子参数选项
type Option func(*options)

// Default create a default mongo hook
func Default(client sql.SqlConn, tableName string, opts ...Option) *Hook {
var options []Option
options = append(options, opts...)
options = append(options, SetExec(NewExec(client, tableName)))
return New(options...)
}

// DefaultWithURL create a default mongo hook
func DefaultWithURL(client sql.SqlConn, tableName string, opts ...Option) *Hook {
var options []Option
options = append(options, opts...)
options = append(options, SetExec(NewExecWithURL(client, tableName)))
return New(options...)
}

// New 创建一个要添加到logger实例的钩子
func New(opt ...Option) *Hook {
opts := defaultOptions
for _, o := range opt {
o(&opts)
}

if opts.exec == nil {
// panic("Unknown Execer interface implementation")
logrus.Info("Unknown Execer interface implementation")
}

q := queue.NewQueue(opts.maxQueues, opts.maxWorkers)
q.Run()

return &Hook{
opts: opts,
q: q,
}
}

// Hook 将日志发送到数据库
type Hook struct {
opts options
q *queue.Queue
}

// Levels 返回可用的日志记录级别
func (h *Hook) Levels() []logrus.Level {
return h.opts.levels
}

// Fire 触发日志事件时将调用
func (h *Hook) Fire(entryLogrus *logrus.Entry) error {
var funcVal string
var fileVal string
if entryLogrus.HasCaller() {
funcVal = entryLogrus.Caller.Function
fileVal = fmt.Sprintf("%s:%d", entryLogrus.Caller.File, entryLogrus.Caller.Line)
}
hostName, err := os.Hostname()
if err != nil {
hostName = "unknown"
}
level := entryLogrus.Level.String()
newEntry := &entry.Entry{
Host: hostName,
Timestamp: util.Now(),
File: fileVal,
Func: funcVal,
Message: entryLogrus.Message,
Level: strings.ToUpper(level),
Data: entryLogrus.Data,
}
h.q.Push(queue.NewJob(newEntry, func(v any) {
h.exec(v.(*entry.Entry))
}))
return nil
}

func (h *Hook) exec(entry *entry.Entry) {
if extra := h.opts.extra; extra != nil {
for k, v := range extra {
if _, ok := entry.Data[k]; !ok {
entry.Data[k] = v
}
}
}
err := h.opts.exec.Exec(entry)
if err != nil && h.opts.out != nil {
fmt.Fprintf(h.opts.out, "[Mongo-Hook] Execution error: %s", err.Error())
}
}

// Flush 等待日志队列为空
func (h *Hook) Flush() {
h.q.Terminate()
}
Loading

0 comments on commit e4c66f6

Please sign in to comment.