Skip to content
This repository has been archived by the owner on Aug 12, 2022. It is now read-only.

fix: Calculate goroutines with ulimit #256

Merged
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions helpers/limit/limits.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package limit

import (
"github.com/pbnjay/memory"
)

const gbInBytes int = 1024 * 1024 * 1024
roneli marked this conversation as resolved.
Show resolved Hide resolved
const goroutinesPerGB float64 = 250000

func GetMaxGoRoutines() uint64 {
limit := calculateGoRoutines(getMemory())
ulimit, err := getUlimit()
if err != nil || ulimit == 0 {
return limit
}
if ulimit > limit {
return limit
}
roneli marked this conversation as resolved.
Show resolved Hide resolved
return ulimit
}

func getMemory() uint64 {
return memory.TotalMemory()
}

func calculateGoRoutines(totalMemory uint64) uint64 {
if totalMemory == 0 {
// assume we have 2 GB RAM
return uint64(goroutinesPerGB * 2)
}
return uint64(goroutinesPerGB * float64(totalMemory) / float64(gbInBytes))
}
2 changes: 1 addition & 1 deletion helpers/limits_test.go → helpers/limit/limits_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package helpers
package limit

import (
"testing"
Expand Down
13 changes: 13 additions & 0 deletions helpers/limit/ulimit_unix.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
//go:build darwin || linux

package limit

import (
"syscall"
)

func getUlimit() (uint64, error) {
var rLimit syscall.Rlimit
err := syscall.Getrlimit(syscall.RLIMIT_NOFILE, &rLimit)
return rLimit.Max, err
}
7 changes: 7 additions & 0 deletions helpers/limit/ulimit_win.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
//go:build windows

package limit

func getUlimit() (uint64, error) {
return 0, nil
}
25 changes: 0 additions & 25 deletions helpers/limits.go

This file was deleted.

7 changes: 4 additions & 3 deletions provider/execution/execution_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ import (
"testing"
"time"

"github.com/cloudquery/cq-provider-sdk/helpers"
"github.com/cloudquery/cq-provider-sdk/helpers/limit"

"github.com/cloudquery/cq-provider-sdk/provider/diag"
"github.com/cloudquery/cq-provider-sdk/provider/schema"
"github.com/cloudquery/cq-provider-sdk/testlog"
Expand Down Expand Up @@ -528,7 +529,7 @@ func TestTableExecutor_Resolve(t *testing.T) {
if tc.SetupStorage != nil {
storage = tc.SetupStorage(t)
}
limiter := semaphore.NewWeighted(int64(helpers.GetMaxGoRoutines()))
limiter := semaphore.NewWeighted(int64(limit.GetMaxGoRoutines()))
exec := NewTableExecutor(tc.Name, storage, testlog.New(t), tc.Table, tc.ExtraFields, nil, nil, limiter, 10*time.Second)
count, diags := exec.Resolve(context.Background(), executionClient)
assert.Equal(t, tc.ExpectedResourceCount, count)
Expand Down Expand Up @@ -647,7 +648,7 @@ func TestTableExecutor_resolveResourceValues(t *testing.T) {
if tc.SetupStorage != nil {
storage = tc.SetupStorage(t)
}
limiter := semaphore.NewWeighted(int64(helpers.GetMaxGoRoutines()))
limiter := semaphore.NewWeighted(int64(limit.GetMaxGoRoutines()))
exec := NewTableExecutor(tc.Name, storage, testlog.New(t), tc.Table, nil, nil, nil, limiter, 0)

r := schema.NewResourceData(storage.Dialect(), tc.Table, nil, tc.ResourceData, tc.MetaData, exec.executionStart)
Expand Down
4 changes: 3 additions & 1 deletion provider/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"sync"
"sync/atomic"

"github.com/cloudquery/cq-provider-sdk/helpers/limit"

"github.com/cloudquery/cq-provider-sdk/database"
"github.com/cloudquery/cq-provider-sdk/migration/migrator"
"github.com/cloudquery/cq-provider-sdk/provider/diag"
Expand Down Expand Up @@ -189,7 +191,7 @@ func (p *Provider) FetchResources(ctx context.Context, request *cqproto.FetchRes
var goroutinesSem *semaphore.Weighted
maxGoroutines := request.MaxGoroutines
if maxGoroutines == 0 {
maxGoroutines = helpers.GetMaxGoRoutines()
maxGoroutines = limit.GetMaxGoRoutines()
}
goroutinesSem = semaphore.NewWeighted(helpers.Uint64ToInt64(maxGoroutines))

Expand Down