Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf(opentsdb): 数据拉取以ident分发,并把list方式改为chan方式,提高消费效率。 #906

Merged
merged 1 commit into from
Apr 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 8 additions & 7 deletions src/server/router/router_opentsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,6 @@ func handleOpenTSDB(c *gin.Context) {
succ int
fail int
msg = "data pushed to queue"
list = make([]interface{}, 0, len(arr))
ts = time.Now().Unix()
ids = make(map[string]interface{})
)
Expand All @@ -191,17 +190,19 @@ func handleOpenTSDB(c *gin.Context) {
if has {
common.AppendLabels(pt, target)
}
// 更改分发方式,通过ident分发
writer.Writers.PushIdentChan(host, pt)
} else {
// 如果没有则默认放入指标名前缀的chan中
ident := arr[i].Metric[0:strings.Index(arr[i].Metric, "_")]
writer.Writers.PushIdentChan(ident, pt)
}

list = append(list, pt)
succ++
}

if len(list) > 0 {
promstat.CounterSampleTotal.WithLabelValues(config.C.ClusterName, "opentsdb").Add(float64(len(list)))
if !writer.Writers.PushQueue(list) {
msg = "writer queue full"
}
if succ > 0 {
promstat.CounterSampleTotal.WithLabelValues(config.C.ClusterName, "opentsdb").Add(float64(succ))

idents.Idents.MSet(ids)
}
Expand Down
150 changes: 145 additions & 5 deletions src/server/writer/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ import (
"bytes"
"context"
"fmt"
cmap "github.com/orcaman/concurrent-map"
"net"
"net/http"
"sync"
"time"

"github.com/golang/protobuf/proto"
Expand Down Expand Up @@ -44,6 +46,8 @@ type WriterType struct {
Client api.Client
}

var lock = sync.RWMutex{}

func (w WriterType) Write(items []*prompb.TimeSeries) {
if len(items) == 0 {
return
Expand All @@ -59,13 +63,34 @@ func (w WriterType) Write(items []*prompb.TimeSeries) {
return
}

if err := w.Post(snappy.Encode(nil, data)); err != nil {
if err := w.Post(snappy.Encode(nil, data), nil); err != nil {
logger.Warningf("post to %s got error: %v", w.Opts.Url, err)
logger.Warning("example timeseries:", items[0].String())
}
}

func (w WriterType) WriteWithHeader(items []*prompb.TimeSeries, headerMap map[string]string) {
if len(items) == 0 {
return
}

req := &prompb.WriteRequest{
Timeseries: items,
}

data, err := proto.Marshal(req)
if err != nil {
logger.Warningf("marshal prom data to proto got error: %v, data: %+v", err, items)
return
}

if err := w.Post(snappy.Encode(nil, data), headerMap); err != nil {
logger.Warningf("post to %s got error: %v", w.Opts.Url, err)
logger.Warning("example timeseries:", items[0].String())
}
}

func (w WriterType) Post(req []byte) error {
func (w WriterType) Post(req []byte, headerMap map[string]string) error {
httpReq, err := http.NewRequest("POST", w.Opts.Url, bytes.NewReader(req))
if err != nil {
logger.Warningf("create remote write request got error: %s", err.Error())
Expand All @@ -77,6 +102,12 @@ func (w WriterType) Post(req []byte) error {
httpReq.Header.Set("User-Agent", "n9e")
httpReq.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0")

if headerMap != nil {
for k, v := range headerMap {
httpReq.Header.Set(k, v)
}
}

if w.Opts.BasicAuthUser != "" {
httpReq.SetBasicAuth(w.Opts.BasicAuthUser, w.Opts.BasicAuthPass)
}
Expand All @@ -96,9 +127,10 @@ func (w WriterType) Post(req []byte) error {
}

type WritersType struct {
globalOpt GlobalOpt
m map[string]WriterType
queue *list.SafeListLimited
globalOpt GlobalOpt
m map[string]WriterType
queue *list.SafeListLimited
IdentChanMap cmap.ConcurrentMap
}

func (ws *WritersType) Put(name string, writer WriterType) {
Expand All @@ -109,6 +141,114 @@ func (ws *WritersType) PushQueue(vs []interface{}) bool {
return ws.queue.PushFrontBatch(vs)
}

//
// PushIdentChan 放入chan, 以ident分发
// @Author: quzhihao
// @Description:
// @receiver ws
// @param ident
// @param vs
//
func (ws *WritersType) PushIdentChan(ident string, vs interface{}) {
if !ws.IdentChanMap.Has(ident) {
lock.Lock()
if !ws.IdentChanMap.Has(ident) {
c := make(chan *prompb.TimeSeries, Writers.globalOpt.QueueMaxSize)
ws.IdentChanMap.Set(ident, c)
go func() {
ws.InitIdentChanWorker(ident, c)
}()
}
lock.Unlock()
}
// 往chan扔会导致内存不断增大,如果写入阻塞了,需要提示
c, ok := ws.IdentChanMap.Get(ident)
ch := c.(chan *prompb.TimeSeries)
if ok {
select {
case ch <- vs.(*prompb.TimeSeries):
case <-time.After(time.Duration(200) * time.Millisecond):
logger.Warningf("[%s] Write IdentChanMap Full, DropSize: %d", ident, len(ch))
}
}
}

//
// InitIdentChanWorker 初始化ident消费者
// @Author: quzhihao
// @Description:
// @receiver ws
// @param ident
// @param data
//
func (ws *WritersType) InitIdentChanWorker(ident string, data chan *prompb.TimeSeries) {
popCounter := 0
batch := ws.globalOpt.QueuePopSize
if batch <= 0 {
batch = 1000
}
logger.Infof("[%s] Start Ident Chan Worker, MaxSize:%d, batchSize:%d", ident, ws.globalOpt.QueueMaxSize, batch)
series := make([]*prompb.TimeSeries, 0, batch)
closePrepareCounter := 0
for {
select {
case item := <-data:
closePrepareCounter = 0
series = append(series, item)
popCounter++
if popCounter >= ws.globalOpt.QueuePopSize {
popCounter = 0
// 发送到prometheus
ws.postPrometheus(ident, series)
series = make([]*prompb.TimeSeries, 0, batch)
}
case <-time.After(10 * time.Second):
// 10秒清空一下,如果有数据的话
if len(series) > 0 {
ws.postPrometheus(ident, series)
series = make([]*prompb.TimeSeries, 0, batch)
closePrepareCounter = 0
} else {
closePrepareCounter++
}
// 一小时没数据,就关闭chan
if closePrepareCounter > 6*60 {
logger.Infof("[%s] Ident Chan Closing. Reason: No Data For An Hour.", ident)
lock.Lock()
close(data)
// 移除
ws.IdentChanMap.Remove(ident)
lock.Unlock()
logger.Infof("[%s] Ident Chan Closed Success.", ident)
return
}
}
}
}

//
// postPrometheus 发送数据至prometheus
// @Author: quzhihao
// @Description:
// @receiver ws
// @param ident
// @param series
//
func (ws *WritersType) postPrometheus(ident string, series []*prompb.TimeSeries) {
// 发送至prom
wg := sync.WaitGroup{}
wg.Add(len(ws.m))
headerMap := make(map[string]string, 1)
headerMap["ident"] = ident
for key := range ws.m {
go func(key string) {
defer wg.Done()
ws.m[key].WriteWithHeader(series, headerMap)
}(key)
}
wg.Wait()
}

func (ws *WritersType) Writes() {
batch := ws.globalOpt.QueuePopSize
if batch <= 0 {
Expand Down