From b0c78323a54edcce575f28e11018e7d6cfccd1d1 Mon Sep 17 00:00:00 2001 From: zhushunjia Date: Wed, 25 Oct 2023 21:34:37 +0800 Subject: [PATCH 1/4] set buffer size of the queue and allow to use interceptor asynchronously --- plugins/flusher/http/flusher_http.go | 26 ++++-- plugins/flusher/http/flusher_http_test.go | 101 +++++++++++++++++++++- 2 files changed, 117 insertions(+), 10 deletions(-) diff --git a/plugins/flusher/http/flusher_http.go b/plugins/flusher/http/flusher_http.go index e2641448db..7672f612c0 100644 --- a/plugins/flusher/http/flusher_http.go +++ b/plugins/flusher/http/flusher_http.go @@ -67,7 +67,9 @@ type FlusherHTTP struct { Concurrency int // How many requests can be performed in concurrent Authenticator *extensions.ExtensionConfig // name and options of the extensions.ClientAuthenticator extension to use FlushInterceptor *extensions.ExtensionConfig // name and options of the extensions.FlushInterceptor extension to use + AsyncIntercept bool // intercept the event asynchronously RequestInterceptors []extensions.ExtensionConfig // custom request interceptor settings + QueueCapacity int // capacity of channel varKeys []string @@ -127,7 +129,10 @@ func (f *FlusherHTTP) Init(context pipeline.Context) error { return err } - f.queue = make(chan interface{}) + if f.QueueCapacity <= 0 { + f.QueueCapacity = 1024 + } + f.queue = make(chan interface{}, f.QueueCapacity) for i := 0; i < f.Concurrency; i++ { go f.runFlushTask() } @@ -135,7 +140,7 @@ func (f *FlusherHTTP) Init(context pipeline.Context) error { f.buildVarKeys() f.fillRequestContentType() - logger.Info(f.context.GetRuntimeContext(), "http flusher init", "initialized") + logger.Info(f.context.GetRuntimeContext(), "http flusher init", "initialized", "async intercept events", f.AsyncIntercept) return nil } @@ -148,12 +153,14 @@ func (f *FlusherHTTP) Flush(projectName string, logstoreName string, configName func (f *FlusherHTTP) Export(groupEventsArray []*models.PipelineGroupEvents, ctx pipeline.PipelineContext) error { for _, groupEvents := range groupEventsArray { - if f.interceptor != nil { + if !f.AsyncIntercept && f.interceptor != nil { groupEvents = f.interceptor.Intercept(groupEvents) - if groupEvents == nil { + // skip groupEvents that is nil or emtpy. + if groupEvents == nil || len(groupEvents.Events) == 0 { continue } } + f.addTask(groupEvents) } return nil @@ -269,6 +276,12 @@ func (f *FlusherHTTP) convertAndFlush(data interface{}) error { case *protocol.LogGroup: logs, varValues, err = f.converter.ToByteStreamWithSelectedFields(v, f.varKeys) case *models.PipelineGroupEvents: + if f.AsyncIntercept && f.interceptor != nil { + v = f.interceptor.Intercept(v) + if v == nil || len(v.Events) == 0 { + return nil + } + } logs, varValues, err = f.converter.ToByteStreamWithSelectedFieldsV2(v, f.varKeys) default: return fmt.Errorf("unsupport data type") @@ -452,8 +465,9 @@ func (f *FlusherHTTP) fillRequestContentType() { func init() { pipeline.Flushers["flusher_http"] = func() pipeline.Flusher { return &FlusherHTTP{ - Timeout: defaultTimeout, - Concurrency: 1, + QueueCapacity: 1024, + Timeout: defaultTimeout, + Concurrency: 1, Convert: helper.ConvertConfig{ Protocol: converter.ProtocolCustomSingle, Encoding: converter.EncodingJSON, diff --git a/plugins/flusher/http/flusher_http_test.go b/plugins/flusher/http/flusher_http_test.go index d1a36baf7c..46233d1dba 100644 --- a/plugins/flusher/http/flusher_http_test.go +++ b/plugins/flusher/http/flusher_http_test.go @@ -669,16 +669,86 @@ func TestGetNextRetryDelay(t *testing.T) { } } +func TestHttpFlusherFlushWithInterceptor(t *testing.T) { + Convey("Given a http flusher with sync intercepter", t, func() { + mockIntercepter := &mockInterceptor{} + flusher := &FlusherHTTP{ + RemoteURL: "http://test.com/write", + Convert: helper.ConvertConfig{ + Protocol: converter.ProtocolInfluxdb, + Encoding: converter.EncodingCustom, + }, + interceptor: mockIntercepter, + AsyncIntercept: false, + Timeout: defaultTimeout, + Concurrency: 1, + queue: make(chan interface{}, 10), + } + + Convey("should discard all events", func() { + groupEvents := models.PipelineGroupEvents{ + Events: []models.PipelineEvent{&models.Metric{ + Name: "cpu.load.short", + Timestamp: 1672321328000000000, + Tags: models.NewTagsWithKeyValues("host", "server01", "region", "cn"), + Value: &models.MetricSingleValue{Value: 0.64}, + }}, + } + err := flusher.Export([]*models.PipelineGroupEvents{&groupEvents}, nil) + So(err, ShouldBeNil) + So(flusher.queue, ShouldBeEmpty) + }) + }) + + Convey("Given a http flusher with async intercepter", t, func() { + mockIntercepter := &mockInterceptor{} + flusher := &FlusherHTTP{ + RemoteURL: "http://test.com/write", + Convert: helper.ConvertConfig{ + Protocol: converter.ProtocolInfluxdb, + Encoding: converter.EncodingCustom, + }, + interceptor: mockIntercepter, + AsyncIntercept: true, + Timeout: defaultTimeout, + Concurrency: 1, + queue: make(chan interface{}, 10), + } + + Convey("should discard all events", func() { + groupEvents := models.PipelineGroupEvents{ + Events: []models.PipelineEvent{&models.Metric{ + Name: "cpu.load.short", + Timestamp: 1672321328000000000, + Tags: models.NewTagsWithKeyValues("host", "server01", "region", "cn"), + Value: &models.MetricSingleValue{Value: 0.64}, + }}, + } + err := flusher.Export([]*models.PipelineGroupEvents{&groupEvents}, nil) + So(err, ShouldBeNil) + So(len(flusher.queue), ShouldEqual, 1) + err = flusher.convertAndFlush(<-flusher.queue) + So(err, ShouldBeNil) + }) + + }) +} + type mockContext struct { pipeline.Context - basicAuth *basicAuth + basicAuth *basicAuth + interceptor *mockInterceptor } func (c mockContext) GetExtension(name string, cfg any) (pipeline.Extension, error) { - if c.basicAuth == nil { - return nil, fmt.Errorf("basicAuth not set") + if c.basicAuth != nil { + return c.basicAuth, nil } - return c.basicAuth, nil + + if c.interceptor != nil { + return c.interceptor, nil + } + return nil, fmt.Errorf("basicAuth not set") } func (c mockContext) GetConfigName() string { @@ -723,3 +793,26 @@ func (b *basicAuthRoundTripper) RoundTrip(request *http.Request) (*http.Response request.SetBasicAuth(b.auth.Username, b.auth.Password) return b.base.RoundTrip(request) } + +type mockInterceptor struct { +} + +func (b *mockInterceptor) Description() string { + return "a filter that discard all events" +} + +func (b *mockInterceptor) Init(context pipeline.Context) error { + return nil +} + +func (b *mockInterceptor) Stop() error { + return nil +} + +func (mi *mockInterceptor) Intercept(group *models.PipelineGroupEvents) *models.PipelineGroupEvents { + if group == nil { + return nil + } + group.Events = group.Events[:0] + return group +} From 84435ae0ac816049083e9fd99678e252dccc8445 Mon Sep 17 00:00:00 2001 From: zhushunjia Date: Wed, 25 Oct 2023 21:38:05 +0800 Subject: [PATCH 2/4] update doc --- docs/cn/data-pipeline/flusher/flusher-http.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/docs/cn/data-pipeline/flusher/flusher-http.md b/docs/cn/data-pipeline/flusher/flusher-http.md index 699485236b..e175810e85 100644 --- a/docs/cn/data-pipeline/flusher/flusher-http.md +++ b/docs/cn/data-pipeline/flusher/flusher-http.md @@ -29,6 +29,8 @@ | Convert.TagFieldsRename | Map | 否 | 对日志中tags中的json字段重命名 | | Convert.ProtocolFieldsRename | Map | 否 | ilogtail日志协议字段重命名,可当前可重命名的字段:`contents`,`tags`和`time` | | Concurrency | Int | 否 | 向url发起请求的并发数,默认为`1` | +| QueueCapacity | Int | 否 | 内部channel的缓存大小,默认为1024 +| AsyncIntercept | Boolean | 否 | 异步过滤数据,默认为否 ## 样例 From a50da8f1d867e2cda83ea8b91897b0581e102c7f Mon Sep 17 00:00:00 2001 From: zhushunjia Date: Wed, 25 Oct 2023 21:42:01 +0800 Subject: [PATCH 3/4] remove log --- plugins/flusher/http/flusher_http.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugins/flusher/http/flusher_http.go b/plugins/flusher/http/flusher_http.go index 7672f612c0..4211a9922d 100644 --- a/plugins/flusher/http/flusher_http.go +++ b/plugins/flusher/http/flusher_http.go @@ -140,7 +140,7 @@ func (f *FlusherHTTP) Init(context pipeline.Context) error { f.buildVarKeys() f.fillRequestContentType() - logger.Info(f.context.GetRuntimeContext(), "http flusher init", "initialized", "async intercept events", f.AsyncIntercept) + logger.Info(f.context.GetRuntimeContext(), "http flusher init", "initialized") return nil } From ff95b27a4b33f0a9ba1c4d9b119f033ddcc401f9 Mon Sep 17 00:00:00 2001 From: zhushunjia Date: Wed, 25 Oct 2023 23:13:59 +0800 Subject: [PATCH 4/4] fix lint issue --- plugins/flusher/http/flusher_http.go | 2 +- plugins/flusher/http/flusher_http_test.go | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/plugins/flusher/http/flusher_http.go b/plugins/flusher/http/flusher_http.go index 4211a9922d..70f81a9b4d 100644 --- a/plugins/flusher/http/flusher_http.go +++ b/plugins/flusher/http/flusher_http.go @@ -155,7 +155,7 @@ func (f *FlusherHTTP) Export(groupEventsArray []*models.PipelineGroupEvents, ctx for _, groupEvents := range groupEventsArray { if !f.AsyncIntercept && f.interceptor != nil { groupEvents = f.interceptor.Intercept(groupEvents) - // skip groupEvents that is nil or emtpy. + // skip groupEvents that is nil or empty. if groupEvents == nil || len(groupEvents.Events) == 0 { continue } diff --git a/plugins/flusher/http/flusher_http_test.go b/plugins/flusher/http/flusher_http_test.go index 46233d1dba..564042819b 100644 --- a/plugins/flusher/http/flusher_http_test.go +++ b/plugins/flusher/http/flusher_http_test.go @@ -797,15 +797,15 @@ func (b *basicAuthRoundTripper) RoundTrip(request *http.Request) (*http.Response type mockInterceptor struct { } -func (b *mockInterceptor) Description() string { +func (mi *mockInterceptor) Description() string { return "a filter that discard all events" } -func (b *mockInterceptor) Init(context pipeline.Context) error { +func (mi *mockInterceptor) Init(context pipeline.Context) error { return nil } -func (b *mockInterceptor) Stop() error { +func (mi *mockInterceptor) Stop() error { return nil }