Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: 支持主配置变更重加载 #109

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions beater/beater.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ package beater

import (
"context"
"encoding/json"
"fmt"
"github.com/TencentBlueKing/bkunifylogbeat/utils"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

注意代码格式化。

"time"

"github.com/TencentBlueKing/bkmonitor-datalink/pkg/libgse/beat"
Expand All @@ -40,9 +42,12 @@ import (
_ "github.com/TencentBlueKing/bkunifylogbeat/include"
"github.com/TencentBlueKing/bkunifylogbeat/registrar"
"github.com/pkg/errors"

"github.com/elastic/beats/libbeat/cfgfile"
)

var Registrar *registrar.Registrar
var lastTaskHash string

// LogBeat package cadvisor
type LogBeat struct {
Expand Down Expand Up @@ -118,6 +123,7 @@ func (bt *LogBeat) Run() error {
}

reloadTicker := time.NewTicker(10 * time.Second)
diffTaskTicker := time.NewTicker(60 * time.Second)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个 ticker 也需要 defer stop

defer reloadTicker.Stop()
for {
select {
Expand All @@ -130,6 +136,11 @@ func (bt *LogBeat) Run() error {
bt.Reload(config)
}
}
// 处理采集器主配置是否变更,变更则发送重加载信号
case <-diffTaskTicker.C:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

建议按需开启,而不是成为一种默认行为。

if err = bt.checkNeedReload(); err != nil {
logp.L.Error(err)
}
case <-beat.ReloadChan:
bt.isReload = true
// 处理采集器框架发送的结束采集器的信号(常由SIGINT引起),关闭采集器
Expand All @@ -148,6 +159,42 @@ func (bt *LogBeat) Stop() {
close(bt.done)
}

// Main config diff check
func (bt *LogBeat) checkNeedReload() error {
rawConfig, err := cfgfile.Load("", nil)
if err != nil {
return err
}
if !rawConfig.HasField("bkunifylogbeat") {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个建议可以写成常量。

return errors.New("no bkunifylogbeat field found")
}

beatConfig, err := rawConfig.Child("bkunifylogbeat", -1)
if err != nil {
return err
}
config, err := cfg.Parse(beatConfig)
if err != nil {
return err
}
b, err := json.Marshal(config)
if err != nil {
return err
}

currentTaskHash := utils.Md5(string(b))
if len(lastTaskHash) == 0 {
lastTaskHash = currentTaskHash
}
if lastTaskHash != currentTaskHash {
lastTaskHash = currentTaskHash
bt.Reload(beatConfig)
logp.L.Info("Reload main config task.")
}

return nil
}

// Close cadvisor storage interface
func (bt *LogBeat) Close() error {
return nil
Expand Down
Loading