Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add flusher_http queue buffer and async interceptor settings #1203

Merged
merged 4 commits into from
Oct 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/cn/data-pipeline/flusher/flusher-http.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
| Convert.TagFieldsRename | Map<String,String> | 否 | 对日志中tags中的json字段重命名 |
| Convert.ProtocolFieldsRename | Map<String,String> | 否 | ilogtail日志协议字段重命名,可当前可重命名的字段:`contents`,`tags`和`time` |
| Concurrency | Int | 否 | 向url发起请求的并发数,默认为`1` |
| QueueCapacity | Int | 否 | 内部channel的缓存大小,默认为1024
| AsyncIntercept | Boolean | 否 | 异步过滤数据,默认为否

## 样例

Expand Down
24 changes: 19 additions & 5 deletions plugins/flusher/http/flusher_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,9 @@ type FlusherHTTP struct {
Concurrency int // How many requests can be performed in concurrent
Authenticator *extensions.ExtensionConfig // name and options of the extensions.ClientAuthenticator extension to use
FlushInterceptor *extensions.ExtensionConfig // name and options of the extensions.FlushInterceptor extension to use
AsyncIntercept bool // intercept the event asynchronously
RequestInterceptors []extensions.ExtensionConfig // custom request interceptor settings
QueueCapacity int // capacity of channel

varKeys []string

Expand Down Expand Up @@ -127,7 +129,10 @@ func (f *FlusherHTTP) Init(context pipeline.Context) error {
return err
}

f.queue = make(chan interface{})
if f.QueueCapacity <= 0 {
f.QueueCapacity = 1024
}
f.queue = make(chan interface{}, f.QueueCapacity)
for i := 0; i < f.Concurrency; i++ {
go f.runFlushTask()
}
Expand All @@ -148,12 +153,14 @@ func (f *FlusherHTTP) Flush(projectName string, logstoreName string, configName

func (f *FlusherHTTP) Export(groupEventsArray []*models.PipelineGroupEvents, ctx pipeline.PipelineContext) error {
for _, groupEvents := range groupEventsArray {
if f.interceptor != nil {
if !f.AsyncIntercept && f.interceptor != nil {
groupEvents = f.interceptor.Intercept(groupEvents)
if groupEvents == nil {
// skip groupEvents that is nil or empty.
if groupEvents == nil || len(groupEvents.Events) == 0 {
continue
}
}

f.addTask(groupEvents)
}
return nil
Expand Down Expand Up @@ -269,6 +276,12 @@ func (f *FlusherHTTP) convertAndFlush(data interface{}) error {
case *protocol.LogGroup:
logs, varValues, err = f.converter.ToByteStreamWithSelectedFields(v, f.varKeys)
case *models.PipelineGroupEvents:
if f.AsyncIntercept && f.interceptor != nil {
v = f.interceptor.Intercept(v)
if v == nil || len(v.Events) == 0 {
return nil
}
}
logs, varValues, err = f.converter.ToByteStreamWithSelectedFieldsV2(v, f.varKeys)
default:
return fmt.Errorf("unsupport data type")
Expand Down Expand Up @@ -452,8 +465,9 @@ func (f *FlusherHTTP) fillRequestContentType() {
func init() {
pipeline.Flushers["flusher_http"] = func() pipeline.Flusher {
return &FlusherHTTP{
Timeout: defaultTimeout,
Concurrency: 1,
QueueCapacity: 1024,
Timeout: defaultTimeout,
Concurrency: 1,
Convert: helper.ConvertConfig{
Protocol: converter.ProtocolCustomSingle,
Encoding: converter.EncodingJSON,
Expand Down
101 changes: 97 additions & 4 deletions plugins/flusher/http/flusher_http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -669,16 +669,86 @@ func TestGetNextRetryDelay(t *testing.T) {
}
}

func TestHttpFlusherFlushWithInterceptor(t *testing.T) {
Convey("Given a http flusher with sync intercepter", t, func() {
mockIntercepter := &mockInterceptor{}
flusher := &FlusherHTTP{
RemoteURL: "http://test.com/write",
Convert: helper.ConvertConfig{
Protocol: converter.ProtocolInfluxdb,
Encoding: converter.EncodingCustom,
},
interceptor: mockIntercepter,
AsyncIntercept: false,
Timeout: defaultTimeout,
Concurrency: 1,
queue: make(chan interface{}, 10),
}

Convey("should discard all events", func() {
groupEvents := models.PipelineGroupEvents{
Events: []models.PipelineEvent{&models.Metric{
Name: "cpu.load.short",
Timestamp: 1672321328000000000,
Tags: models.NewTagsWithKeyValues("host", "server01", "region", "cn"),
Value: &models.MetricSingleValue{Value: 0.64},
}},
}
err := flusher.Export([]*models.PipelineGroupEvents{&groupEvents}, nil)
So(err, ShouldBeNil)
So(flusher.queue, ShouldBeEmpty)
})
})

Convey("Given a http flusher with async intercepter", t, func() {
mockIntercepter := &mockInterceptor{}
flusher := &FlusherHTTP{
RemoteURL: "http://test.com/write",
Convert: helper.ConvertConfig{
Protocol: converter.ProtocolInfluxdb,
Encoding: converter.EncodingCustom,
},
interceptor: mockIntercepter,
AsyncIntercept: true,
Timeout: defaultTimeout,
Concurrency: 1,
queue: make(chan interface{}, 10),
}

Convey("should discard all events", func() {
groupEvents := models.PipelineGroupEvents{
Events: []models.PipelineEvent{&models.Metric{
Name: "cpu.load.short",
Timestamp: 1672321328000000000,
Tags: models.NewTagsWithKeyValues("host", "server01", "region", "cn"),
Value: &models.MetricSingleValue{Value: 0.64},
}},
}
err := flusher.Export([]*models.PipelineGroupEvents{&groupEvents}, nil)
So(err, ShouldBeNil)
So(len(flusher.queue), ShouldEqual, 1)
err = flusher.convertAndFlush(<-flusher.queue)
So(err, ShouldBeNil)
})

})
}

type mockContext struct {
pipeline.Context
basicAuth *basicAuth
basicAuth *basicAuth
interceptor *mockInterceptor
}

func (c mockContext) GetExtension(name string, cfg any) (pipeline.Extension, error) {
if c.basicAuth == nil {
return nil, fmt.Errorf("basicAuth not set")
if c.basicAuth != nil {
return c.basicAuth, nil
}
return c.basicAuth, nil

if c.interceptor != nil {
return c.interceptor, nil
}
return nil, fmt.Errorf("basicAuth not set")
}

func (c mockContext) GetConfigName() string {
Expand Down Expand Up @@ -723,3 +793,26 @@ func (b *basicAuthRoundTripper) RoundTrip(request *http.Request) (*http.Response
request.SetBasicAuth(b.auth.Username, b.auth.Password)
return b.base.RoundTrip(request)
}

type mockInterceptor struct {
}

func (mi *mockInterceptor) Description() string {
return "a filter that discard all events"
}

func (mi *mockInterceptor) Init(context pipeline.Context) error {
return nil
}

func (mi *mockInterceptor) Stop() error {
return nil
}

func (mi *mockInterceptor) Intercept(group *models.PipelineGroupEvents) *models.PipelineGroupEvents {
if group == nil {
return nil
}
group.Events = group.Events[:0]
return group
}
Loading