Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

improve event processing #167

Open
wants to merge 11 commits into
base: develop
Choose a base branch
from
2 changes: 1 addition & 1 deletion aggregator/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ func NewAggregator(parentCtx context.Context, ct *cri.CRITool, k8sChan chan inte
go a.clearSocketLines(ctx)

go func() {
t := time.NewTicker(2 * time.Minute)
t := time.NewTicker(1 * time.Minute)

for range t.C {
log.Logger.Debug().
Expand Down
6 changes: 3 additions & 3 deletions config/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ type BackendDSConfig struct {
GpuMetricsExport bool
MetricsExportInterval int // in seconds

ReqBufferSize int
ConnBufferSize int
KafkaEventBufferSize int
ReqBufferSize uint64
ConnBufferSize uint64
KafkaEventBufferSize uint64
}
36 changes: 24 additions & 12 deletions datastore/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ type BackendDS struct {
ctx context.Context
host string
port string
c *http.Client
c *retryablehttp.Client
batchSize uint64

reqChanBuffer chan *ReqInfo
Expand Down Expand Up @@ -162,6 +162,7 @@ type BackendDS struct {
containerEventChan chan interface{} // *ContainerEvent
dsEventChan chan interface{} // *DaemonSetEvent
ssEventChan chan interface{} // *StatefulSetEvent
conf config.BackendDSConfig

// TODO add:
// job
Expand Down Expand Up @@ -211,7 +212,7 @@ func NewBackendDS(parentCtx context.Context, conf config.BackendDSConfig) *Backe
retryClient.Logger = LeveledLogger{l: log.Logger.With().Str("component", "retryablehttp").Logger()}
retryClient.Backoff = retryablehttp.DefaultBackoff
retryClient.RetryWaitMin = 1 * time.Second
retryClient.RetryWaitMax = 5 * time.Second
retryClient.RetryWaitMax = 2 * time.Second
retryClient.RetryMax = 2

retryClient.CheckRetry = func(ctx context.Context, resp *http.Response, err error) (bool, error) {
Expand Down Expand Up @@ -275,7 +276,6 @@ func NewBackendDS(parentCtx context.Context, conf config.BackendDSConfig) *Backe
MaxConnsPerHost: 500, // 500 connection per host
}
retryClient.HTTPClient.Timeout = 10 * time.Second // Set a timeout for the request
client := retryClient.StandardClient()

var defaultBatchSize uint64 = 1000

Expand All @@ -288,7 +288,7 @@ func NewBackendDS(parentCtx context.Context, conf config.BackendDSConfig) *Backe
ds := &BackendDS{
ctx: ctx,
host: conf.Host,
c: client,
c: retryClient,
batchSize: bs,
reqInfoPool: newReqInfoPool(func() *ReqInfo { return &ReqInfo{} }, func(r *ReqInfo) {}),
aliveConnPool: newAliveConnPool(func() *ConnInfo { return &ConnInfo{} }, func(r *ConnInfo) {}),
Expand All @@ -307,6 +307,7 @@ func NewBackendDS(parentCtx context.Context, conf config.BackendDSConfig) *Backe
metricsExport: conf.MetricsExport,
gpuMetricsExport: conf.GpuMetricsExport,
metricsExportInterval: conf.MetricsExportInterval,
conf: conf,
// traceEventQueue: list.New(),
// traceInfoPool: newTraceInfoPool(func() *TraceInfo { return &TraceInfo{} }, func(r *TraceInfo) {}),
}
Expand Down Expand Up @@ -427,11 +428,11 @@ func (ds *BackendDS) Start() {
// return batch
// }

func (b *BackendDS) DoRequest(req *http.Request) error {
func (b *BackendDS) DoRequest(req *retryablehttp.Request) error {
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Accept", "application/json")

ctx, cancel := context.WithTimeout(b.ctx, 30*time.Second)
ctx, cancel := context.WithTimeout(b.ctx, 10*time.Second)
defer cancel()

resp, err := b.c.Do(req.WithContext(ctx))
Expand Down Expand Up @@ -501,7 +502,7 @@ func convertConnsToPayload(batch []*ConnInfo) ConnInfoPayload {
// }

func (b *BackendDS) sendMetricsToBackend(r io.Reader) {
req, err := http.NewRequest(http.MethodPost, fmt.Sprintf("%s/metrics/scrape/?instance=%s&monitoring_id=%s", b.host, NodeID, MonitoringID), r)
req, err := retryablehttp.NewRequest(http.MethodPost, fmt.Sprintf("%s/metrics/scrape/?instance=%s&monitoring_id=%s", b.host, NodeID, MonitoringID), r)
if err != nil {
log.Logger.Error().Msgf("error creating metrics request: %v", err)
return
Expand Down Expand Up @@ -539,8 +540,7 @@ func (b *BackendDS) sendToBackend(method string, payload interface{}, endpoint s
log.Logger.Error().Msgf("error marshalling batch: %v", err)
return
}

httpReq, err := http.NewRequest(method, b.host+endpoint, bytes.NewBuffer(payloadBytes))
httpReq, err := retryablehttp.NewRequest(method, b.host+endpoint, bytes.NewBuffer(payloadBytes))
if err != nil {
log.Logger.Error().Msgf("error creating http request: %v", err)
return
Expand Down Expand Up @@ -589,7 +589,7 @@ func (b *BackendDS) sendToBackend(method string, payload interface{}, endpoint s
// }

func (b *BackendDS) sendReqsInBatch(batchSize uint64) {
t := time.NewTicker(5 * time.Second)
t := time.NewTicker(1 * time.Second)
defer t.Stop()

send := func() {
Expand All @@ -610,7 +610,19 @@ func (b *BackendDS) sendReqsInBatch(batchSize uint64) {
}

reqsPayload := convertReqsToPayload(batch)
log.Logger.Debug().Int("len", len(batch)).Msg("reqs batch len")

// dynamically configure batchSize to avoid blocking on reqChanBuffer
lenBatch := uint64(len(batch))
if batchSize == lenBatch {
// increase batchSize
batchSize *= 2
if batchSize > b.conf.ReqBufferSize {
batchSize = b.conf.ReqBufferSize // max value
}
} else if lenBatch < batchSize/10 {
batchSize /= 2 // decrease batchSize
}

go b.sendToBackend(http.MethodPost, reqsPayload, reqEndpoint)

// return reqInfoss to the pool
Expand Down Expand Up @@ -998,7 +1010,7 @@ func (b *BackendDS) SendHealthCheck(tracing bool, metrics bool, logs bool, nsFil
return
}

req, err := http.NewRequest(http.MethodPut, b.host+healthCheckEndpoint, bytes.NewBuffer(payloadBytes))
req, err := retryablehttp.NewRequest(http.MethodPut, b.host+healthCheckEndpoint, bytes.NewBuffer(payloadBytes))
if err != nil {
log.Logger.Error().Msgf("error creating http request: %v", err)
return
Expand Down
21 changes: 10 additions & 11 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ require (
github.com/fsnotify/fsnotify v1.7.0
github.com/go-kit/log v0.2.1
github.com/golang/protobuf v1.5.3
github.com/hashicorp/go-retryablehttp v0.7.4
github.com/hashicorp/go-retryablehttp v0.7.7
github.com/klauspost/compress v1.16.5
github.com/pierrec/lz4/v4 v4.1.18
github.com/prometheus/client_golang v1.19.0
Expand All @@ -20,7 +20,7 @@ require (
github.com/rs/zerolog v1.33.0
github.com/stretchr/testify v1.8.4
golang.org/x/arch v0.5.0
golang.org/x/mod v0.12.0
golang.org/x/mod v0.14.0
inet.af/netaddr v0.0.0-20230525184311-b8eac61e914a
k8s.io/api v0.27.2
k8s.io/apimachinery v0.27.2
Expand Down Expand Up @@ -79,7 +79,6 @@ require (
github.com/google/uuid v1.3.1 // indirect
github.com/hashicorp/go-cleanhttp v0.5.2 // indirect
github.com/hashicorp/go-envparse v0.1.0 // indirect
github.com/hashicorp/go-hclog v1.2.0 // indirect
github.com/hodgesds/perf-utils v0.7.0 // indirect
github.com/illumos/go-kstat v0.0.0-20210513183136-173c9b0a9973 // indirect
github.com/imdario/mergo v0.3.16 // indirect
Expand All @@ -90,7 +89,7 @@ require (
github.com/lufia/iostat v1.2.1 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.19 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/mattn/go-xmlrpc v0.0.3 // indirect
github.com/mdlayher/ethtool v0.0.0-20221212131811-ba3b4bc2e02c // indirect
github.com/mdlayher/genetlink v1.3.2 // indirect
Expand All @@ -113,16 +112,16 @@ require (
go.uber.org/multierr v1.11.0 // indirect
go4.org/intern v0.0.0-20211027215823-ae77deb06f29 // indirect
go4.org/unsafe/assume-no-moving-gc v0.0.0-20230525183740-e7c30c78aeb2 // indirect
golang.org/x/crypto v0.18.0 // indirect
golang.org/x/crypto v0.21.0 // indirect
golang.org/x/exp v0.0.0-20230522175609-2e198f4a06a1 // indirect
golang.org/x/net v0.20.0
golang.org/x/net v0.23.0
golang.org/x/oauth2 v0.16.0 // indirect
golang.org/x/sync v0.3.0 // indirect
golang.org/x/sys v0.18.0 // indirect
golang.org/x/term v0.16.0 // indirect
golang.org/x/sync v0.5.0 // indirect
golang.org/x/sys v0.20.0 // indirect
golang.org/x/term v0.18.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/time v0.3.0
golang.org/x/tools v0.12.0 // indirect
golang.org/x/tools v0.16.1 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/protobuf v1.33.0 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
Expand Down Expand Up @@ -162,7 +161,7 @@ replace (
k8s.io/kube-scheduler => k8s.io/kubernetes/staging/src/k8s.io/kube-scheduler v0.0.0-20231213084502-3f7a50f38688
k8s.io/kubectl => k8s.io/kubernetes/staging/src/k8s.io/kubectl v0.0.0-20231213084502-3f7a50f38688
k8s.io/kubelet => k8s.io/kubernetes/staging/src/k8s.io/kubelet v0.0.0-20231213084502-3f7a50f38688
k8s.io/kubernetes => k8s.io/kubernetes v1.29.0
k8s.io/kubernetes => k8s.io/kubernetes v1.29.1
k8s.io/legacy-cloud-providers => k8s.io/kubernetes/staging/src/k8s.io/legacy-cloud-providers v0.0.0-20231213084502-3f7a50f38688
k8s.io/metrics => k8s.io/kubernetes/staging/src/k8s.io/metrics v0.0.0-20231213084502-3f7a50f38688
k8s.io/mount-utils => k8s.io/kubernetes/staging/src/k8s.io/mount-utils v0.0.0-20231213084502-3f7a50f38688
Expand Down
Loading