diff --git a/docs/cn/data-pipeline/flusher/flusher-http.md b/docs/cn/data-pipeline/flusher/flusher-http.md index 699485236b..e175810e85 100644 --- a/docs/cn/data-pipeline/flusher/flusher-http.md +++ b/docs/cn/data-pipeline/flusher/flusher-http.md @@ -29,6 +29,8 @@ | Convert.TagFieldsRename | Map | 否 | 对日志中tags中的json字段重命名 | | Convert.ProtocolFieldsRename | Map | 否 | ilogtail日志协议字段重命名,可当前可重命名的字段:`contents`,`tags`和`time` | | Concurrency | Int | 否 | 向url发起请求的并发数,默认为`1` | +| QueueCapacity | Int | 否 | 内部channel的缓存大小,默认为1024 +| AsyncIntercept | Boolean | 否 | 异步过滤数据,默认为否 ## 样例 diff --git a/plugins/flusher/http/flusher_http.go b/plugins/flusher/http/flusher_http.go index e2641448db..70f81a9b4d 100644 --- a/plugins/flusher/http/flusher_http.go +++ b/plugins/flusher/http/flusher_http.go @@ -67,7 +67,9 @@ type FlusherHTTP struct { Concurrency int // How many requests can be performed in concurrent Authenticator *extensions.ExtensionConfig // name and options of the extensions.ClientAuthenticator extension to use FlushInterceptor *extensions.ExtensionConfig // name and options of the extensions.FlushInterceptor extension to use + AsyncIntercept bool // intercept the event asynchronously RequestInterceptors []extensions.ExtensionConfig // custom request interceptor settings + QueueCapacity int // capacity of channel varKeys []string @@ -127,7 +129,10 @@ func (f *FlusherHTTP) Init(context pipeline.Context) error { return err } - f.queue = make(chan interface{}) + if f.QueueCapacity <= 0 { + f.QueueCapacity = 1024 + } + f.queue = make(chan interface{}, f.QueueCapacity) for i := 0; i < f.Concurrency; i++ { go f.runFlushTask() } @@ -148,12 +153,14 @@ func (f *FlusherHTTP) Flush(projectName string, logstoreName string, configName func (f *FlusherHTTP) Export(groupEventsArray []*models.PipelineGroupEvents, ctx pipeline.PipelineContext) error { for _, groupEvents := range groupEventsArray { - if f.interceptor != nil { + if !f.AsyncIntercept && f.interceptor != nil { groupEvents = f.interceptor.Intercept(groupEvents) - if groupEvents == nil { + // skip groupEvents that is nil or empty. + if groupEvents == nil || len(groupEvents.Events) == 0 { continue } } + f.addTask(groupEvents) } return nil @@ -269,6 +276,12 @@ func (f *FlusherHTTP) convertAndFlush(data interface{}) error { case *protocol.LogGroup: logs, varValues, err = f.converter.ToByteStreamWithSelectedFields(v, f.varKeys) case *models.PipelineGroupEvents: + if f.AsyncIntercept && f.interceptor != nil { + v = f.interceptor.Intercept(v) + if v == nil || len(v.Events) == 0 { + return nil + } + } logs, varValues, err = f.converter.ToByteStreamWithSelectedFieldsV2(v, f.varKeys) default: return fmt.Errorf("unsupport data type") @@ -452,8 +465,9 @@ func (f *FlusherHTTP) fillRequestContentType() { func init() { pipeline.Flushers["flusher_http"] = func() pipeline.Flusher { return &FlusherHTTP{ - Timeout: defaultTimeout, - Concurrency: 1, + QueueCapacity: 1024, + Timeout: defaultTimeout, + Concurrency: 1, Convert: helper.ConvertConfig{ Protocol: converter.ProtocolCustomSingle, Encoding: converter.EncodingJSON, diff --git a/plugins/flusher/http/flusher_http_test.go b/plugins/flusher/http/flusher_http_test.go index d1a36baf7c..564042819b 100644 --- a/plugins/flusher/http/flusher_http_test.go +++ b/plugins/flusher/http/flusher_http_test.go @@ -669,16 +669,86 @@ func TestGetNextRetryDelay(t *testing.T) { } } +func TestHttpFlusherFlushWithInterceptor(t *testing.T) { + Convey("Given a http flusher with sync intercepter", t, func() { + mockIntercepter := &mockInterceptor{} + flusher := &FlusherHTTP{ + RemoteURL: "http://test.com/write", + Convert: helper.ConvertConfig{ + Protocol: converter.ProtocolInfluxdb, + Encoding: converter.EncodingCustom, + }, + interceptor: mockIntercepter, + AsyncIntercept: false, + Timeout: defaultTimeout, + Concurrency: 1, + queue: make(chan interface{}, 10), + } + + Convey("should discard all events", func() { + groupEvents := models.PipelineGroupEvents{ + Events: []models.PipelineEvent{&models.Metric{ + Name: "cpu.load.short", + Timestamp: 1672321328000000000, + Tags: models.NewTagsWithKeyValues("host", "server01", "region", "cn"), + Value: &models.MetricSingleValue{Value: 0.64}, + }}, + } + err := flusher.Export([]*models.PipelineGroupEvents{&groupEvents}, nil) + So(err, ShouldBeNil) + So(flusher.queue, ShouldBeEmpty) + }) + }) + + Convey("Given a http flusher with async intercepter", t, func() { + mockIntercepter := &mockInterceptor{} + flusher := &FlusherHTTP{ + RemoteURL: "http://test.com/write", + Convert: helper.ConvertConfig{ + Protocol: converter.ProtocolInfluxdb, + Encoding: converter.EncodingCustom, + }, + interceptor: mockIntercepter, + AsyncIntercept: true, + Timeout: defaultTimeout, + Concurrency: 1, + queue: make(chan interface{}, 10), + } + + Convey("should discard all events", func() { + groupEvents := models.PipelineGroupEvents{ + Events: []models.PipelineEvent{&models.Metric{ + Name: "cpu.load.short", + Timestamp: 1672321328000000000, + Tags: models.NewTagsWithKeyValues("host", "server01", "region", "cn"), + Value: &models.MetricSingleValue{Value: 0.64}, + }}, + } + err := flusher.Export([]*models.PipelineGroupEvents{&groupEvents}, nil) + So(err, ShouldBeNil) + So(len(flusher.queue), ShouldEqual, 1) + err = flusher.convertAndFlush(<-flusher.queue) + So(err, ShouldBeNil) + }) + + }) +} + type mockContext struct { pipeline.Context - basicAuth *basicAuth + basicAuth *basicAuth + interceptor *mockInterceptor } func (c mockContext) GetExtension(name string, cfg any) (pipeline.Extension, error) { - if c.basicAuth == nil { - return nil, fmt.Errorf("basicAuth not set") + if c.basicAuth != nil { + return c.basicAuth, nil } - return c.basicAuth, nil + + if c.interceptor != nil { + return c.interceptor, nil + } + return nil, fmt.Errorf("basicAuth not set") } func (c mockContext) GetConfigName() string { @@ -723,3 +793,26 @@ func (b *basicAuthRoundTripper) RoundTrip(request *http.Request) (*http.Response request.SetBasicAuth(b.auth.Username, b.auth.Password) return b.base.RoundTrip(request) } + +type mockInterceptor struct { +} + +func (mi *mockInterceptor) Description() string { + return "a filter that discard all events" +} + +func (mi *mockInterceptor) Init(context pipeline.Context) error { + return nil +} + +func (mi *mockInterceptor) Stop() error { + return nil +} + +func (mi *mockInterceptor) Intercept(group *models.PipelineGroupEvents) *models.PipelineGroupEvents { + if group == nil { + return nil + } + group.Events = group.Events[:0] + return group +}