diff --git a/aggregator/data.go b/aggregator/data.go index a1ab5af..c4ad802 100644 --- a/aggregator/data.go +++ b/aggregator/data.go @@ -175,7 +175,7 @@ func NewAggregator(parentCtx context.Context, ct *cri.CRITool, k8sChan chan inte go a.clearSocketLines(ctx) go func() { - t := time.NewTicker(2 * time.Minute) + t := time.NewTicker(1 * time.Minute) for range t.C { log.Logger.Debug(). diff --git a/config/db.go b/config/db.go index 3475ff4..df53980 100644 --- a/config/db.go +++ b/config/db.go @@ -14,7 +14,7 @@ type BackendDSConfig struct { GpuMetricsExport bool MetricsExportInterval int // in seconds - ReqBufferSize int - ConnBufferSize int - KafkaEventBufferSize int + ReqBufferSize uint64 + ConnBufferSize uint64 + KafkaEventBufferSize uint64 } diff --git a/datastore/backend.go b/datastore/backend.go index dbc7928..ecf1157 100644 --- a/datastore/backend.go +++ b/datastore/backend.go @@ -134,7 +134,7 @@ type BackendDS struct { ctx context.Context host string port string - c *http.Client + c *retryablehttp.Client batchSize uint64 reqChanBuffer chan *ReqInfo @@ -162,6 +162,7 @@ type BackendDS struct { containerEventChan chan interface{} // *ContainerEvent dsEventChan chan interface{} // *DaemonSetEvent ssEventChan chan interface{} // *StatefulSetEvent + conf config.BackendDSConfig // TODO add: // job @@ -211,7 +212,7 @@ func NewBackendDS(parentCtx context.Context, conf config.BackendDSConfig) *Backe retryClient.Logger = LeveledLogger{l: log.Logger.With().Str("component", "retryablehttp").Logger()} retryClient.Backoff = retryablehttp.DefaultBackoff retryClient.RetryWaitMin = 1 * time.Second - retryClient.RetryWaitMax = 5 * time.Second + retryClient.RetryWaitMax = 2 * time.Second retryClient.RetryMax = 2 retryClient.CheckRetry = func(ctx context.Context, resp *http.Response, err error) (bool, error) { @@ -275,7 +276,6 @@ func NewBackendDS(parentCtx context.Context, conf config.BackendDSConfig) *Backe MaxConnsPerHost: 500, // 500 connection per host } retryClient.HTTPClient.Timeout = 10 * time.Second // Set a timeout for the request - client := retryClient.StandardClient() var defaultBatchSize uint64 = 1000 @@ -288,7 +288,7 @@ func NewBackendDS(parentCtx context.Context, conf config.BackendDSConfig) *Backe ds := &BackendDS{ ctx: ctx, host: conf.Host, - c: client, + c: retryClient, batchSize: bs, reqInfoPool: newReqInfoPool(func() *ReqInfo { return &ReqInfo{} }, func(r *ReqInfo) {}), aliveConnPool: newAliveConnPool(func() *ConnInfo { return &ConnInfo{} }, func(r *ConnInfo) {}), @@ -307,6 +307,7 @@ func NewBackendDS(parentCtx context.Context, conf config.BackendDSConfig) *Backe metricsExport: conf.MetricsExport, gpuMetricsExport: conf.GpuMetricsExport, metricsExportInterval: conf.MetricsExportInterval, + conf: conf, // traceEventQueue: list.New(), // traceInfoPool: newTraceInfoPool(func() *TraceInfo { return &TraceInfo{} }, func(r *TraceInfo) {}), } @@ -427,11 +428,11 @@ func (ds *BackendDS) Start() { // return batch // } -func (b *BackendDS) DoRequest(req *http.Request) error { +func (b *BackendDS) DoRequest(req *retryablehttp.Request) error { req.Header.Set("Content-Type", "application/json") req.Header.Set("Accept", "application/json") - ctx, cancel := context.WithTimeout(b.ctx, 30*time.Second) + ctx, cancel := context.WithTimeout(b.ctx, 10*time.Second) defer cancel() resp, err := b.c.Do(req.WithContext(ctx)) @@ -501,7 +502,7 @@ func convertConnsToPayload(batch []*ConnInfo) ConnInfoPayload { // } func (b *BackendDS) sendMetricsToBackend(r io.Reader) { - req, err := http.NewRequest(http.MethodPost, fmt.Sprintf("%s/metrics/scrape/?instance=%s&monitoring_id=%s", b.host, NodeID, MonitoringID), r) + req, err := retryablehttp.NewRequest(http.MethodPost, fmt.Sprintf("%s/metrics/scrape/?instance=%s&monitoring_id=%s", b.host, NodeID, MonitoringID), r) if err != nil { log.Logger.Error().Msgf("error creating metrics request: %v", err) return @@ -539,8 +540,7 @@ func (b *BackendDS) sendToBackend(method string, payload interface{}, endpoint s log.Logger.Error().Msgf("error marshalling batch: %v", err) return } - - httpReq, err := http.NewRequest(method, b.host+endpoint, bytes.NewBuffer(payloadBytes)) + httpReq, err := retryablehttp.NewRequest(method, b.host+endpoint, bytes.NewBuffer(payloadBytes)) if err != nil { log.Logger.Error().Msgf("error creating http request: %v", err) return @@ -589,7 +589,7 @@ func (b *BackendDS) sendToBackend(method string, payload interface{}, endpoint s // } func (b *BackendDS) sendReqsInBatch(batchSize uint64) { - t := time.NewTicker(5 * time.Second) + t := time.NewTicker(1 * time.Second) defer t.Stop() send := func() { @@ -610,7 +610,19 @@ func (b *BackendDS) sendReqsInBatch(batchSize uint64) { } reqsPayload := convertReqsToPayload(batch) - log.Logger.Debug().Int("len", len(batch)).Msg("reqs batch len") + + // dynamically configure batchSize to avoid blocking on reqChanBuffer + lenBatch := uint64(len(batch)) + if batchSize == lenBatch { + // increase batchSize + batchSize *= 2 + if batchSize > b.conf.ReqBufferSize { + batchSize = b.conf.ReqBufferSize // max value + } + } else if lenBatch < batchSize/10 { + batchSize /= 2 // decrease batchSize + } + go b.sendToBackend(http.MethodPost, reqsPayload, reqEndpoint) // return reqInfoss to the pool @@ -998,7 +1010,7 @@ func (b *BackendDS) SendHealthCheck(tracing bool, metrics bool, logs bool, nsFil return } - req, err := http.NewRequest(http.MethodPut, b.host+healthCheckEndpoint, bytes.NewBuffer(payloadBytes)) + req, err := retryablehttp.NewRequest(http.MethodPut, b.host+healthCheckEndpoint, bytes.NewBuffer(payloadBytes)) if err != nil { log.Logger.Error().Msgf("error creating http request: %v", err) return diff --git a/go.mod b/go.mod index 75e9600..a5ecd01 100644 --- a/go.mod +++ b/go.mod @@ -11,7 +11,7 @@ require ( github.com/fsnotify/fsnotify v1.7.0 github.com/go-kit/log v0.2.1 github.com/golang/protobuf v1.5.3 - github.com/hashicorp/go-retryablehttp v0.7.4 + github.com/hashicorp/go-retryablehttp v0.7.7 github.com/klauspost/compress v1.16.5 github.com/pierrec/lz4/v4 v4.1.18 github.com/prometheus/client_golang v1.19.0 @@ -20,7 +20,7 @@ require ( github.com/rs/zerolog v1.33.0 github.com/stretchr/testify v1.8.4 golang.org/x/arch v0.5.0 - golang.org/x/mod v0.12.0 + golang.org/x/mod v0.14.0 inet.af/netaddr v0.0.0-20230525184311-b8eac61e914a k8s.io/api v0.27.2 k8s.io/apimachinery v0.27.2 @@ -79,7 +79,6 @@ require ( github.com/google/uuid v1.3.1 // indirect github.com/hashicorp/go-cleanhttp v0.5.2 // indirect github.com/hashicorp/go-envparse v0.1.0 // indirect - github.com/hashicorp/go-hclog v1.2.0 // indirect github.com/hodgesds/perf-utils v0.7.0 // indirect github.com/illumos/go-kstat v0.0.0-20210513183136-173c9b0a9973 // indirect github.com/imdario/mergo v0.3.16 // indirect @@ -90,7 +89,7 @@ require ( github.com/lufia/iostat v1.2.1 // indirect github.com/mailru/easyjson v0.7.7 // indirect github.com/mattn/go-colorable v0.1.13 // indirect - github.com/mattn/go-isatty v0.0.19 // indirect + github.com/mattn/go-isatty v0.0.20 // indirect github.com/mattn/go-xmlrpc v0.0.3 // indirect github.com/mdlayher/ethtool v0.0.0-20221212131811-ba3b4bc2e02c // indirect github.com/mdlayher/genetlink v1.3.2 // indirect @@ -113,16 +112,16 @@ require ( go.uber.org/multierr v1.11.0 // indirect go4.org/intern v0.0.0-20211027215823-ae77deb06f29 // indirect go4.org/unsafe/assume-no-moving-gc v0.0.0-20230525183740-e7c30c78aeb2 // indirect - golang.org/x/crypto v0.18.0 // indirect + golang.org/x/crypto v0.21.0 // indirect golang.org/x/exp v0.0.0-20230522175609-2e198f4a06a1 // indirect - golang.org/x/net v0.20.0 + golang.org/x/net v0.23.0 golang.org/x/oauth2 v0.16.0 // indirect - golang.org/x/sync v0.3.0 // indirect - golang.org/x/sys v0.18.0 // indirect - golang.org/x/term v0.16.0 // indirect + golang.org/x/sync v0.5.0 // indirect + golang.org/x/sys v0.20.0 // indirect + golang.org/x/term v0.18.0 // indirect golang.org/x/text v0.14.0 // indirect golang.org/x/time v0.3.0 - golang.org/x/tools v0.12.0 // indirect + golang.org/x/tools v0.16.1 // indirect google.golang.org/appengine v1.6.7 // indirect google.golang.org/protobuf v1.33.0 // indirect gopkg.in/inf.v0 v0.9.1 // indirect @@ -162,7 +161,7 @@ replace ( k8s.io/kube-scheduler => k8s.io/kubernetes/staging/src/k8s.io/kube-scheduler v0.0.0-20231213084502-3f7a50f38688 k8s.io/kubectl => k8s.io/kubernetes/staging/src/k8s.io/kubectl v0.0.0-20231213084502-3f7a50f38688 k8s.io/kubelet => k8s.io/kubernetes/staging/src/k8s.io/kubelet v0.0.0-20231213084502-3f7a50f38688 - k8s.io/kubernetes => k8s.io/kubernetes v1.29.0 + k8s.io/kubernetes => k8s.io/kubernetes v1.29.1 k8s.io/legacy-cloud-providers => k8s.io/kubernetes/staging/src/k8s.io/legacy-cloud-providers v0.0.0-20231213084502-3f7a50f38688 k8s.io/metrics => k8s.io/kubernetes/staging/src/k8s.io/metrics v0.0.0-20231213084502-3f7a50f38688 k8s.io/mount-utils => k8s.io/kubernetes/staging/src/k8s.io/mount-utils v0.0.0-20231213084502-3f7a50f38688 diff --git a/go.sum b/go.sum index 234e5b9..ecaf35f 100644 --- a/go.sum +++ b/go.sum @@ -45,9 +45,8 @@ github.com/emicklei/go-restful/v3 v3.11.0 h1:rAQeMHw1c7zTmncogyy8VvRZwtkmkZ4FxER github.com/emicklei/go-restful/v3 v3.11.0/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc= github.com/envoyproxy/protoc-gen-validate v1.0.2 h1:QkIBuU5k+x7/QXPvPPnWXWlCdaBFApVqftFV6k087DA= github.com/envoyproxy/protoc-gen-validate v1.0.2/go.mod h1:GpiZQP3dDbg4JouG/NNS7QWXpgx6x8QiMKdmN72jogE= -github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= -github.com/fatih/color v1.13.0 h1:8LOYc1KYPPmyKMuN8QV2DNRWNbLo6LZ0iLs8+mlH53w= -github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk= +github.com/fatih/color v1.16.0 h1:zmkK9Ngbjj+K0yRhTVONQh1p/HknKYSlNT+vZCzyokM= +github.com/fatih/color v1.16.0/go.mod h1:fL2Sau1YI5c0pdGEVCbKQbLXB6edEj1ZgiY4NijnWvE= github.com/felixge/httpsnoop v1.0.3 h1:s/nj+GCswXYzN5v2DpNMuMQYe+0DDwt5WVCU6CWBdXk= github.com/felixge/httpsnoop v1.0.3/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= github.com/frankban/quicktest v1.14.5 h1:dfYrrRyLtiqT9GyKXgdh+k4inNeTvmGbuSgZ3lx3GhA= @@ -104,11 +103,10 @@ github.com/hashicorp/go-cleanhttp v0.5.2 h1:035FKYIWjmULyFRBKPs8TBQoi0x6d9G4xc9n github.com/hashicorp/go-cleanhttp v0.5.2/go.mod h1:kO/YDlP8L1346E6Sodw+PrpBSV4/SoxCXGY6BqNFT48= github.com/hashicorp/go-envparse v0.1.0 h1:bE++6bhIsNCPLvgDZkYqo3nA+/PFI51pkrHdmPSDFPY= github.com/hashicorp/go-envparse v0.1.0/go.mod h1:OHheN1GoygLlAkTlXLXvAdnXdZxy8JUweQ1rAXx1xnc= -github.com/hashicorp/go-hclog v0.9.2/go.mod h1:5CU+agLiy3J7N7QjHK5d05KxGsuXiQLrjA0H7acj2lQ= -github.com/hashicorp/go-hclog v1.2.0 h1:La19f8d7WIlm4ogzNHB0JGqs5AUDAZ2UfCY4sJXcJdM= -github.com/hashicorp/go-hclog v1.2.0/go.mod h1:whpDNt7SSdeAju8AWKIWsul05p54N/39EeqMAyrmvFQ= -github.com/hashicorp/go-retryablehttp v0.7.4 h1:ZQgVdpTdAL7WpMIwLzCfbalOcSUdkDZnpUv3/+BxzFA= -github.com/hashicorp/go-retryablehttp v0.7.4/go.mod h1:Jy/gPYAdjqffZ/yFGCFV2doI5wjtH1ewM9u8iYVjtX8= +github.com/hashicorp/go-hclog v1.6.3 h1:Qr2kF+eVWjTiYmU7Y31tYlP1h0q/X3Nl3tPGdaB11/k= +github.com/hashicorp/go-hclog v1.6.3/go.mod h1:W4Qnvbt70Wk/zYJryRzDRU/4r0kIg0PVHBcfoyhpF5M= +github.com/hashicorp/go-retryablehttp v0.7.7 h1:C8hUCYzor8PIfXHa4UrZkU4VvK8o9ISHxT2Q8+VepXU= +github.com/hashicorp/go-retryablehttp v0.7.7/go.mod h1:pkQpWZeYWskR+D1tR2O5OcBFOxfA7DoAO6xtkuQnHTk= github.com/hodgesds/perf-utils v0.7.0 h1:7KlHGMuig4FRH5fNw68PV6xLmgTe7jKs9hgAcEAbioU= github.com/hodgesds/perf-utils v0.7.0/go.mod h1:LAklqfDadNKpkxoAJNHpD5tkY0rkZEVdnCEWN5k4QJY= github.com/illumos/go-kstat v0.0.0-20210513183136-173c9b0a9973 h1:hk4LPqXIY/c9XzRbe7dA6qQxaT6Axcbny0L/G5a4owQ= @@ -139,14 +137,12 @@ github.com/lufia/iostat v1.2.1 h1:tnCdZBIglgxD47RyD55kfWQcJMGzO+1QBziSQfesf2k= github.com/lufia/iostat v1.2.1/go.mod h1:rEPNA0xXgjHQjuI5Cy05sLlS2oRcSlWHRLrvh/AQ+Pg= github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0= github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc= -github.com/mattn/go-colorable v0.1.4/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE= github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= -github.com/mattn/go-isatty v0.0.8/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s= -github.com/mattn/go-isatty v0.0.10/go.mod h1:qgIWMr58cqv1PHHyhnkY9lrL7etaEgOFcMEpPG5Rm84= github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= -github.com/mattn/go-isatty v0.0.19 h1:JITubQf0MOLdlGRuRq+jtsDlekdYPia9ZFsB8h/APPA= github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= +github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= +github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/mattn/go-xmlrpc v0.0.3 h1:Y6WEMLEsqs3RviBrAa1/7qmbGB7DVD3brZIbqMbQdGY= github.com/mattn/go-xmlrpc v0.0.3/go.mod h1:mqc2dz7tP5x5BKlCahN/n+hs7OSZKJkS9JsHNBRlrxA= github.com/mdlayher/ethtool v0.0.0-20221212131811-ba3b4bc2e02c h1:Y7LoKqIgD7vmqJ7+6ZVnADuwUO+m3tGXbf2lK0OvjIw= @@ -205,7 +201,6 @@ github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= -github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= @@ -253,42 +248,40 @@ golang.org/x/arch v0.5.0/go.mod h1:5om86z9Hs0C8fWVUuoMHwpExlXzs5Tkyp9hOrfG7pp8= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= -golang.org/x/crypto v0.18.0 h1:PGVlW0xEltQnzFZ55hkuX5+KLyrMYhHld1YHO4AKcdc= -golang.org/x/crypto v0.18.0/go.mod h1:R0j02AL6hcrfOiy9T4ZYp/rcWeMxM3L6QYxlOuEG1mg= +golang.org/x/crypto v0.21.0 h1:X31++rzVUdKhX5sWmSOFZxx8UW/ldWx55cbf08iNAMA= +golang.org/x/crypto v0.21.0/go.mod h1:0BP7YvVV9gBbVKyeTG0Gyn+gZm94bibOW5BjDEYAOMs= golang.org/x/exp v0.0.0-20230522175609-2e198f4a06a1 h1:k/i9J1pBpvlfR+9QsetwPyERsqu1GIbi967PQMq3Ivc= golang.org/x/exp v0.0.0-20230522175609-2e198f4a06a1/go.mod h1:V1LtkGg67GoY2N1AnLN78QLrzxkLyJw7RJb1gzOOz9w= golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= -golang.org/x/mod v0.12.0 h1:rmsUpXtvNzj340zd98LZ4KntptpfRHwpFOHG188oHXc= -golang.org/x/mod v0.12.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= +golang.org/x/mod v0.14.0 h1:dGoOF9QVLYng8IHTm7BAyWqCqSheQ5pYWGhzW00YJr0= +golang.org/x/mod v0.14.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190603091049-60506f45cf65/go.mod h1:HSz+uSET+XFnRR8LxR5pz3Of3rY3CfYBVs4xY44aLks= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= -golang.org/x/net v0.20.0 h1:aCL9BSgETF1k+blQaYUBx9hJ9LOGP3gAVemcZlf1Kpo= -golang.org/x/net v0.20.0/go.mod h1:z8BVo6PvndSri0LbOE3hAn0apkU+1YvI6E70E9jsnvY= +golang.org/x/net v0.23.0 h1:7EYJ93RZ9vYSZAIb2x3lnuvqO5zneoD6IvWjuhfxjTs= +golang.org/x/net v0.23.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg= golang.org/x/oauth2 v0.16.0 h1:aDkGMBSYxElaoP81NpoUoz2oo2R2wHdZpGToUxfyQrQ= golang.org/x/oauth2 v0.16.0/go.mod h1:hqZ+0LWXsiVoZpeld6jVt06P3adbS2Uu911W1SsJv2o= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.3.0 h1:ftCYgMx6zT/asHUrPw8BLLscYtGznsLAnjq5RH9P66E= -golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y= +golang.org/x/sync v0.5.0 h1:60k92dhOjHxJkrqnwsfl8KuaHbn/5dl0lUPUklKo3qE= +golang.org/x/sync v0.5.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= -golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20191008105621-543471e840be/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20211031064116-611d5d643895/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4= -golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/term v0.16.0 h1:m+B6fahuftsE9qjo0VWp2FW0mB3MTJvR0BaMQrq0pmE= -golang.org/x/term v0.16.0/go.mod h1:yn7UURbUtPyrVJPGPq404EukNFxcm/foM+bV/bfcDsY= +golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y= +golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/term v0.18.0 h1:FcHjZXDMxI8mM3nwhX9HlKop4C0YQvCVCdwYl2wOtE8= +golang.org/x/term v0.18.0/go.mod h1:ILwASektA3OnRv7amZ1xhE/KTR+u50pbXfZ03+6Nx58= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= @@ -301,8 +294,8 @@ golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtn golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0= -golang.org/x/tools v0.12.0 h1:YW6HUoUmYBpwSgyaGaZq1fHjrBjX1rlpZ54T6mu2kss= -golang.org/x/tools v0.12.0/go.mod h1:Sc0INKfu04TlqNoRA1hgpFZbhYXHPr4V5DzpSBTPqQM= +golang.org/x/tools v0.16.1 h1:TLyB3WofjdOEepBHAU20JdNC1Zbg87elYofWYAY5oZA= +golang.org/x/tools v0.16.1/go.mod h1:kYVVN6I1mBNoB1OX+noeBjbRk4IUEPa7JJ+TJMEooJ0= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= @@ -342,8 +335,8 @@ k8s.io/klog/v2 v2.110.1 h1:U/Af64HJf7FcwMcXyKm2RPM22WZzyR7OSpYj5tg3cL0= k8s.io/klog/v2 v2.110.1/go.mod h1:YGtd1984u+GgbuZ7e08/yBuAfKLSO0+uR1Fhi6ExXjo= k8s.io/kube-openapi v0.0.0-20231010175941-2dd684a91f00 h1:aVUu9fTY98ivBPKR9Y5w/AuzbMm96cd3YHRTU83I780= k8s.io/kube-openapi v0.0.0-20231010175941-2dd684a91f00/go.mod h1:AsvuZPBlUDVuCdzJ87iajxtXuR9oktsTctW/R9wwouA= -k8s.io/kubernetes v1.29.0 h1:DOLN7g8+nnAYBi8JHoW0+/MCrZKDPIqAxzLCXDXd0cg= -k8s.io/kubernetes v1.29.0/go.mod h1:9kztbUQf9stVDcIYXx+BX3nuGCsAQDsuClkGMpPs3pA= +k8s.io/kubernetes v1.29.1 h1:fxJFVb8uqbYZDYHpwIsAndBQs360cQGb0xa1gYFh3fo= +k8s.io/kubernetes v1.29.1/go.mod h1:xZPKU0yO0CBbLTnbd+XGyRmmtmaVuJykDb8gNCkeeUE= k8s.io/kubernetes/staging/src/k8s.io/api v0.0.0-20231213084502-3f7a50f38688 h1:aHxeudzvOIHF0u3thzFxoj7iKORzaLBqdviFf8Wcj6w= k8s.io/kubernetes/staging/src/k8s.io/api v0.0.0-20231213084502-3f7a50f38688/go.mod h1:Sa7iFwhU0MWt7d5YHQbifzwth8uftQ57AoHi/qwxf/Y= k8s.io/kubernetes/staging/src/k8s.io/apiextensions-apiserver v0.0.0-20231213084502-3f7a50f38688 h1:U2x8tjEmbYnx8gG4FBuwtTCotOSql0DzQC+bEGU1aBc= diff --git a/logstreamer/stream.go b/logstreamer/stream.go index c0aeac8..0e971ea 100644 --- a/logstreamer/stream.go +++ b/logstreamer/stream.go @@ -487,11 +487,12 @@ func (ls *LogStreamer) StreamLogs() error { fsNotifyWorker := func(restartCh chan struct{}) { timeWindow := initialTimeWindow var lastChanFullTime time.Time + workerExitCh := make(chan struct{}, 1) go func() { t := time.NewTicker(10 * time.Second) for { select { - case <-ls.ctx.Done(): + case <-workerExitCh: return case <-t.C: if time.Since(lastChanFullTime) > 1*time.Minute && timeWindow > initialTimeWindow { @@ -507,6 +508,7 @@ func (ls *LogStreamer) StreamLogs() error { return case event, ok := <-ls.watcher.Events: if !ok { + workerExitCh <- struct{}{} // signal goroutine above to exit log.Logger.Info().Msg("fsnotify events channel closed") return } @@ -588,6 +590,7 @@ func (ls *LogStreamer) StreamLogs() error { case err, ok := <-ls.watcher.Errors: if !ok { log.Logger.Info().Msg("fsnotify errors channel closed") + workerExitCh <- struct{}{} // signal goroutine above to exit return } log.Logger.Error().Err(err).Msgf("fsnotify error")