From 337176b6c41d275c03c334ca8aef836bbfd5aacf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adolfo=20Garc=C3=ADa=20Veytia=20=28Puerco=29?= Date: Wed, 24 Jul 2024 17:54:40 -0600 Subject: [PATCH] Revert fork of nozzle/throttler MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This commit reverts the import of a fork of nozzle/throttler. We mistakenly assumed the project was not maintained but happily found out it is not. Signed-off-by: Adolfo GarcĂ­a Veytia (Puerco) --- throttler/README.md | 78 --------- throttler/example_test.go | 108 ------------- throttler/throttler.go | 190 ---------------------- throttler/throttler_test.go | 304 ------------------------------------ 4 files changed, 680 deletions(-) delete mode 100644 throttler/README.md delete mode 100644 throttler/example_test.go delete mode 100644 throttler/throttler.go delete mode 100644 throttler/throttler_test.go diff --git a/throttler/README.md b/throttler/README.md deleted file mode 100644 index bd1379a..0000000 --- a/throttler/README.md +++ /dev/null @@ -1,78 +0,0 @@ -# Throttler - Intelligent WaitGroups - -[![GoDoc](https://pkg.go.dev/sigs.k8s.io/release-utils/throttler?status.svg)](https://pkg.go.dev/sigs.k8s.io/release-utils/throttler?status.svg) - -__Note:__ This package was adopted by the Kubernetes RelEng team to continue its -maintenance, it was forked from github.com/nozzle/throttle at -[2ea9822](https://github.com/nozzle/throttler/commit/2ea982251481626167b7f83be1434b5c42540c1a). - -Throttler fills the gap between sync.WaitGroup and manually monitoring your -goroutines with channels. The API is almost identical to Wait Groups, but it -allows you to set a max number of workers that can be running simultaneously. -It uses channels internally to block until a job completes by calling Done() or -until all jobs have been completed. It also provides a built in error channel -that captures your goroutine errors and provides access to them as `[]error` -after you exit the loop. - -See a fully functional example of the original module on the playground at http://bit.ly/throttler-v3 - -Compare the Throttler example to the sync.WaitGroup example from http://golang.org/pkg/sync/#example_WaitGroup - -### How to use Throttler - -```golang -// This example fetches several URLs concurrently, -// using a Throttler to block until all the fetches are complete. -// Compare to http://golang.org/pkg/sync/#example_WaitGroup -func ExampleThrottler() { - var urls = []string{ - "http://www.golang.org/", - "http://www.google.com/", - "http://www.somestupidname.com/", - } - // Create a new Throttler that will get 2 urls at a time - t := throttler.New(2, len(urls)) - for _, url := range urls { - // Launch a goroutine to fetch the URL. - go func(url string) { - // Fetch the URL. - err := http.Get(url) - // Let Throttler know when the goroutine completes - // so it can dispatch another worker - t.Done(err) - }(url) - // Pauses until a worker is available or all jobs have been completed - // Returning the total number of goroutines that have errored - // lets you choose to break out of the loop without starting any more - errorCount := t.Throttle() - } -} -``` - -### vs How to use a sync.WaitGroup - -```golang -// This example fetches several URLs concurrently, -// using a WaitGroup to block until all the fetches are complete. -func ExampleWaitGroup() { - var wg sync.WaitGroup - var urls = []string{ - "http://www.golang.org/", - "http://www.google.com/", - "http://www.somestupidname.com/", - } - for _, url := range urls { - // Increment the WaitGroup counter. - wg.Add(1) - // Launch a goroutine to fetch the URL. - go func(url string) { - // Decrement the counter when the goroutine completes. - defer wg.Done() - // Fetch the URL. - http.Get(url) - }(url) - } - // Wait for all HTTP fetches to complete. - wg.Wait() -} -``` diff --git a/throttler/example_test.go b/throttler/example_test.go deleted file mode 100644 index 45fe7b7..0000000 --- a/throttler/example_test.go +++ /dev/null @@ -1,108 +0,0 @@ -/* -Copyright 2024 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -// This package was forked and adapted from the original at -// pkg:golang/github.com/nozzle/throttler@2ea982251481626167b7f83be1434b5c42540c1a -// full commit history has been preserved. - -package throttler - -import ( - "fmt" - "os" -) - -type httpPkg struct{} - -func (httpPkg) Get(_ string) error { return nil } - -var http httpPkg - -// This example fetches several URLs concurrently, -// using a WaitGroup to block until all the fetches are complete. -// -//nolint:testableexamples // TODO - Rewrite examples -func ExampleWaitGroup() { -} - -// This example fetches several URLs concurrently, -// using a Throttler to block until all the fetches are complete. -// Compare to http://golang.org/pkg/sync/#example_WaitGroup -// -//nolint:testableexamples // TODO - Rewrite examples -func ExampleThrottler() { - urls := []string{ - "http://www.golang.org/", - "http://www.google.com/", - "http://www.somestupidname.com/", - } - // Create a new Throttler that will get 2 urls at a time - t := New(2, len(urls)) - for _, url := range urls { - // Launch a goroutine to fetch the URL. - go func(url string) { - // Fetch the URL. - err := http.Get(url) - // Let Throttler know when the goroutine completes - // so it can dispatch another worker - t.Done(err) - }(url) - // Pauses until a worker is available or all jobs have been completed - // Returning the total number of goroutines that have errored - // lets you choose to break out of the loop without starting any more - errorCount := t.Throttle() - if errorCount > 0 { - break - } - } -} - -// This example fetches several URLs concurrently, -// using a Throttler to block until all the fetches are complete -// and checks the errors returned. -// Compare to http://golang.org/pkg/sync/#example_WaitGroup -// -//nolint:testableexamples // TODO - Rewrite examples -func ExampleThrottler_errors() { - urls := []string{ - "http://www.golang.org/", - "http://www.google.com/", - "http://www.somestupidname.com/", - } - // Create a new Throttler that will get 2 urls at a time - t := New(2, len(urls)) - for _, url := range urls { - // Launch a goroutine to fetch the URL. - go func(url string) { - // Let Throttler know when the goroutine completes - // so it can dispatch another worker - defer t.Done(nil) - // Fetch the URL. - if err := http.Get(url); err != nil { - fmt.Fprintf(os.Stderr, "error fetching %q: %v", url, err) - } - }(url) - // Pauses until a worker is available or all jobs have been completed - t.Throttle() - } - - if t.Err() != nil { - // Loop through the errors to see the details - for i, err := range t.Errs() { - fmt.Printf("error #%d: %s", i, err) - } - } -} diff --git a/throttler/throttler.go b/throttler/throttler.go deleted file mode 100644 index c159edc..0000000 --- a/throttler/throttler.go +++ /dev/null @@ -1,190 +0,0 @@ -/* -Copyright 2024 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -// This package was forked and adapted from the original at -// pkg:golang/github.com/nozzle/throttler@2ea982251481626167b7f83be1434b5c42540c1a -// full commit history has been preserved. - -// Package throttler fills the gap between sync.WaitGroup and manually monitoring your goroutines -// with channels. The API is almost identical to Wait Groups, but it allows you to set -// a max number of workers that can be running simultaneously. It uses channels internally -// to block until a job completes by calling Done(err) or until all jobs have been completed. -// -// After exiting the loop where you are using Throttler, you can call the `Err` or `Errs` method to check -// for errors. `Err` will return a single error representative of all the errors Throttler caught. The -// `Errs` method will return all the errors as a slice of errors (`[]error`). -// -// Compare the Throttler example to the sync.WaitGroup example http://golang.org/pkg/sync/#example_WaitGroup -// -// This package was forked and adapted from the original at -// pkg:golang/github.com/nozzle/throttler@2ea982251481626167b7f83be1434b5c42540c1a -// full commit history has been preserved. -// -// See a fully functional example of the original package on the playground at http://bit.ly/throttler-v3 -package throttler - -import ( - "fmt" - "math" - "sync" - "sync/atomic" -) - -// Throttler stores all the information about the number of workers, the active -// workers and error information. -type Throttler struct { - maxWorkers int32 - workerCount int32 - batchingTotal int32 - batchSize int32 - totalJobs int32 - jobsStarted int32 - jobsCompleted int32 - doneChan chan struct{} - errsMutex *sync.Mutex - errs []error - errorCount int32 -} - -// New returns a Throttler that will govern the max number of workers and will -// work with the total number of jobs. It panics if maxWorkers < 1. -func New(maxWorkers, totalJobs int) *Throttler { - if maxWorkers < 1 { - panic("maxWorkers has to be at least 1") - } - return &Throttler{ - maxWorkers: int32(maxWorkers), - batchSize: 1, - totalJobs: int32(totalJobs), - doneChan: make(chan struct{}, totalJobs), - errsMutex: &sync.Mutex{}, - } -} - -// NewBatchedThrottler returns a Throttler (just like New), but also enables batching. -func NewBatchedThrottler(maxWorkers, batchingTotal, batchSize int) *Throttler { - totalJobs := int(math.Ceil(float64(batchingTotal) / float64(batchSize))) - t := New(maxWorkers, totalJobs) - t.batchSize = int32(batchSize) - t.batchingTotal = int32(batchingTotal) - return t -} - -// SetMaxWorkers lets you change the total number of workers that can run concurrently. NOTE: If -// all workers are currently running, this setting is not guaranteed to take effect until one of them -// completes and Throttle() is called again. -func (t *Throttler) SetMaxWorkers(maxWorkers int) { - if maxWorkers < 1 { - panic("maxWorkers has to be at least 1") - } - atomic.StoreInt32(&t.maxWorkers, int32(maxWorkers)) -} - -// Throttle works similarly to sync.WaitGroup, except inside your goroutine dispatch -// loop rather than after. It will not block until the number of active workers -// matches the max number of workers designated in the call to NewThrottler or -// all of the jobs have been dispatched. It stops blocking when Done has been called -// as many times as totalJobs. -func (t *Throttler) Throttle() int { - if atomic.LoadInt32(&t.totalJobs) < 1 { - return int(atomic.LoadInt32(&t.errorCount)) - } - atomic.AddInt32(&t.jobsStarted, 1) - atomic.AddInt32(&t.workerCount, 1) - - // check to see if the current number of workers equals the max number of workers - // if they are equal, wait for one to finish before continuing. - if atomic.LoadInt32(&t.workerCount) == atomic.LoadInt32(&t.maxWorkers) { - atomic.AddInt32(&t.jobsCompleted, 1) - atomic.AddInt32(&t.workerCount, -1) - <-t.doneChan - } - - // check to see if all of the jobs have been started, and if so, wait until all - // jobs have been completed before continuing. - if atomic.LoadInt32(&t.jobsStarted) == atomic.LoadInt32(&t.totalJobs) { - for atomic.LoadInt32(&t.jobsCompleted) < atomic.LoadInt32(&t.totalJobs) { - atomic.AddInt32(&t.jobsCompleted, 1) - <-t.doneChan - } - } - - return int(atomic.LoadInt32(&t.errorCount)) -} - -// Done lets Throttler know that a job has been completed so that another worker -// can be activated. If Done is called less times than totalJobs, -// Throttle will block forever. -func (t *Throttler) Done(err error) { - if err != nil { - t.errsMutex.Lock() - t.errs = append(t.errs, err) - atomic.AddInt32(&t.errorCount, 1) - t.errsMutex.Unlock() - } - t.doneChan <- struct{}{} -} - -// Err returns an error representative of all errors caught by throttler. -func (t *Throttler) Err() error { - t.errsMutex.Lock() - defer t.errsMutex.Unlock() - if atomic.LoadInt32(&t.errorCount) == 0 { - return nil - } - return multiErrors(t.errs) -} - -// Errs returns a slice of any errors that were received from calling Done(). -func (t *Throttler) Errs() []error { - t.errsMutex.Lock() - defer t.errsMutex.Unlock() - return t.errs -} - -type multiErrors []error - -func (te multiErrors) Error() string { - errString := te[0].Error() - if len(te) > 1 { - errString += fmt.Sprintf(" (and %d more errors)", len(te)-1) - } - return errString -} - -// BatchStartIndex returns the starting index for the next batch. The job count isn't modified -// until th.Throttle() is called, so if you don't call Throttle before executing this -// again, it will return the same index as before. -func (t *Throttler) BatchStartIndex() int { - return int(atomic.LoadInt32(&t.jobsStarted) * atomic.LoadInt32(&t.batchSize)) -} - -// BatchEndIndex returns the ending index for the next batch. It either returns the full batch size -// or the remaining amount of jobs. The job count isn't modified -// until th.Throttle() is called, so if you don't call Throttle before executing this -// again, it will return the same index as before. -func (t *Throttler) BatchEndIndex() int { - end := (atomic.LoadInt32(&t.jobsStarted) + 1) * atomic.LoadInt32(&t.batchSize) - if end > atomic.LoadInt32(&t.batchingTotal) { - end = atomic.LoadInt32(&t.batchingTotal) - } - return int(end) -} - -// TotalJobs returns the total number of jobs throttler is performing. -func (t *Throttler) TotalJobs() int { - return int(atomic.LoadInt32(&t.totalJobs)) -} diff --git a/throttler/throttler_test.go b/throttler/throttler_test.go deleted file mode 100644 index 9255f54..0000000 --- a/throttler/throttler_test.go +++ /dev/null @@ -1,304 +0,0 @@ -/* -Copyright 2024 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -// This package was forked and adapted from the original at -// pkg:golang/github.com/nozzle/throttler@2ea982251481626167b7f83be1434b5c42540c1a -// full commit history has been preserved. - -package throttler - -import ( - "fmt" - "math/rand" - "reflect" - "strconv" - "testing" - "time" - - "github.com/stretchr/testify/require" -) - -func TestThrottle(t *testing.T) { - tests := []struct { - Desc string - Jobs []string - MaxWorkers int - TotalJobs int - }{ - { - "Standard implementation", - []string{ - "job01", "job02", "job03", "job04", "job05", "job06", "job07", "job08", "job09", "job10", - "job11", "job12", "job13", "job14", "job15", "job16", "job17", "job18", "job19", "job20", - "job21", "job22", "job23", "job24", "job25", "job26", "job27", "job28", "job29", "job30", - "job31", "job32", "job33", "job34", "job35", "job36", "job37", "job38", "job39", "job40", - "job41", "job42", "job43", "job44", "job45", "job46", "job47", "job48", "job49", "job50", - }, - 5, - -1, - }, { - "Incorrectly has 0 as TotalWorkers", - []string{ - "job01", "job02", "job03", "job04", "job05", "job06", "job07", "job08", "job09", "job10", - "job11", "job12", "job13", "job14", "job15", "job16", "job17", "job18", "job19", "job20", - "job21", "job22", "job23", "job24", "job25", "job26", "job27", "job28", "job29", "job30", - "job31", "job32", "job33", "job34", "job35", "job36", "job37", "job38", "job39", "job40", - "job41", "job42", "job43", "job44", "job45", "job46", "job47", "job48", "job49", "job50", - }, - 5, - 0, - }, { - "More workers than jobs", - []string{ - "job01", "job02", "job03", "job04", "job05", "job06", "job07", "job08", "job09", "job10", - "job11", "job12", "job13", "job14", "job15", "job16", "job17", "job18", "job19", "job20", - "job21", "job22", "job23", "job24", "job25", "job26", "job27", "job28", "job29", "job30", - "job31", "job32", "job33", "job34", "job35", "job36", "job37", "job38", "job39", "job40", - "job41", "job42", "job43", "job44", "job45", "job46", "job47", "job48", "job49", "job50", - }, - 50000, - -1, - }, - } - - for _, test := range tests { - totalJobs := len(test.Jobs) - if test.TotalJobs != -1 { - totalJobs = test.TotalJobs - } - th := New(test.MaxWorkers, totalJobs) - for range test.Jobs { - go func(th *Throttler) { - defer th.Done(nil) - //nolint: gosec // Bump to rand/v2 when the library moves to go 1.22+ - time.Sleep(time.Duration(rand.Intn(5)) * time.Millisecond) - }(th) - th.Throttle() - } - require.NoError(t, th.Err()) - } -} - -func TestThrottleWithErrors(t *testing.T) { - tests := []struct { - Desc string - Jobs []string - MaxWorkers int - TotalJobs int - }{ - { - "Standard implementation", - []string{ - "job01", "job02", "job03", "job04", "job05", "job06", "job07", "job08", "job09", "job10", - "job11", "job12", "job13", "job14", "job15", "job16", "job17", "job18", "job19", "job20", - "job21", "job22", "job23", "job24", "job25", "job26", "job27", "job28", "job29", "job30", - "job31", "job32", "job33", "job34", "job35", "job36", "job37", "job38", "job39", "job40", - "job41", "job42", "job43", "job44", "job45", "job46", "job47", "job48", "job49", "job50", - }, - 5, - -1, - }, { - "Standard implementation", - []string{"job01", "job02"}, - 5, - -1, - }, - } - - for _, test := range tests { - totalJobs := len(test.Jobs) - if test.TotalJobs != -1 { - totalJobs = test.TotalJobs - } - th := New(test.MaxWorkers, totalJobs) - for _, job := range test.Jobs { - go func(job string, th *Throttler) { - var err error - jobNum, err := strconv.ParseInt(job[len(job)-2:], 10, 8) - if jobNum%2 != 0 { - err = fmt.Errorf("Error on %s", job) - } - defer th.Done(err) - //nolint: gosec // Bump to rand/v2 when the library moves to go 1.22+ - time.Sleep(time.Duration(rand.Intn(5)) * time.Millisecond) - }(job, th) - th.Throttle() - } - if len(th.Errs()) != totalJobs/2 { - t.Fatal("The wrong number of errors were returned") - } - if th.Err() != nil { - fmt.Println("err:", th.Err()) - } - } -} - -func TestThrottlePanic(t *testing.T) { - defer func() { - if r := recover(); r == nil { - t.Fatal("Test failed to panic") - } - }() - New(0, 100) -} - -func TestBatchedThrottler(t *testing.T) { - tests := []struct { - Desc string - ToBeBatched []string - MaxWorkers int - BatchSize int - ExpectedBatchedSlices [][]string - }{ - { - "Standard implementation", - []string{ - "item01", "item02", "item03", "item04", "item05", "item06", "item07", "item08", "item09", "item10", - "item11", "item12", "item13", "item14", "item15", "item16", "item17", "item18", "item19", "item20", - "item21", "item22", "item23", "item24", "item25", "item26", "item27", "item28", "item29", "item30", - "item31", "item32", "item33", "item34", "item35", "item36", "item37", "item38", "item39", "item40", - "item41", "item42", "item43", "item44", "item45", "item46", "item47", "item48", "item49", - }, - 10, - 2, - [][]string{ - {"item01", "item02"}, - {"item03", "item04"}, - {"item05", "item06"}, - {"item07", "item08"}, - {"item09", "item10"}, - {"item11", "item12"}, - {"item13", "item14"}, - {"item15", "item16"}, - {"item17", "item18"}, - {"item19", "item20"}, - {"item21", "item22"}, - {"item23", "item24"}, - {"item25", "item26"}, - {"item27", "item28"}, - {"item29", "item30"}, - {"item31", "item32"}, - {"item33", "item34"}, - {"item35", "item36"}, - {"item37", "item38"}, - {"item39", "item40"}, - {"item41", "item42"}, - {"item43", "item44"}, - {"item45", "item46"}, - {"item47", "item48"}, - {"item49"}, - }, - }, - } - - for _, test := range tests { - th := NewBatchedThrottler(test.MaxWorkers, len(test.ToBeBatched), test.BatchSize) - for i := 0; i < th.TotalJobs(); i++ { - go func(tbbSlice []string, expectedSlice []string) { - var err error - if !reflect.DeepEqual(tbbSlice, expectedSlice) { - err = fmt.Errorf("wanted: %#v | got: %#v", expectedSlice, tbbSlice) - } - th.Done(err) - }(test.ToBeBatched[th.BatchStartIndex():th.BatchEndIndex()], test.ExpectedBatchedSlices[i]) - if errCount := th.Throttle(); errCount > 0 { - break - } - } - - if th.Err() != nil { - t.Fatal(th.Err()) - } - } -} - -func TestSetMaxWorkers(t *testing.T) { - tests := []struct { - Desc string - Jobs []string - InitialMaxWorkers int - EndMaxWorkers int - TotalJobs int - }{ - { - "Standard implementation", - []string{ - "job01", "job02", "job03", "job04", "job05", "job06", "job07", "job08", "job09", "job10", - "job11", "job12", "job13", "job14", "job15", "job16", "job17", "job18", "job19", "job20", - "job21", "job22", "job23", "job24", "job25", "job26", "job27", "job28", "job29", "job30", - "job31", "job32", "job33", "job34", "job35", "job36", "job37", "job38", "job39", "job40", - "job41", "job42", "job43", "job44", "job45", "job46", "job47", "job48", "job49", "job50", - }, - 1, - 5, - -1, - }, { - "Incorrectly has 0 as TotalWorkers", - []string{ - "job01", "job02", "job03", "job04", "job05", "job06", "job07", "job08", "job09", "job10", - "job11", "job12", "job13", "job14", "job15", "job16", "job17", "job18", "job19", "job20", - "job21", "job22", "job23", "job24", "job25", "job26", "job27", "job28", "job29", "job30", - "job31", "job32", "job33", "job34", "job35", "job36", "job37", "job38", "job39", "job40", - "job41", "job42", "job43", "job44", "job45", "job46", "job47", "job48", "job49", "job50", - }, - 1, - 5, - 0, - }, { - "More workers than jobs", - []string{ - "job01", "job02", "job03", "job04", "job05", "job06", "job07", "job08", "job09", "job10", - "job11", "job12", "job13", "job14", "job15", "job16", "job17", "job18", "job19", "job20", - "job21", "job22", "job23", "job24", "job25", "job26", "job27", "job28", "job29", "job30", - "job31", "job32", "job33", "job34", "job35", "job36", "job37", "job38", "job39", "job40", - "job41", "job42", "job43", "job44", "job45", "job46", "job47", "job48", "job49", "job50", - }, - 1, - 50000, - -1, - }, - } - - for _, test := range tests { - totalJobs := len(test.Jobs) - if test.TotalJobs != -1 { - totalJobs = test.TotalJobs - } - th := New(test.InitialMaxWorkers, totalJobs) - for i := range test.Jobs { - if i == test.InitialMaxWorkers+1 { - th.SetMaxWorkers(test.EndMaxWorkers) - } - go func(th *Throttler) { - defer th.Done(nil) - //nolint: gosec // Bump to rand/v2 when the library moves to go 1.22+ - time.Sleep(time.Duration(rand.Intn(5)) * time.Millisecond) - }(th) - th.Throttle() - } - require.NoError(t, th.Err()) - } -} - -func TestSetMaxWorkersPanic(t *testing.T) { - defer func() { - if r := recover(); r == nil { - t.Fatal("Test failed to panic") - } - }() - th := New(1, 10) - th.SetMaxWorkers(-1) -}