Skip to content

Commit

Permalink
Merge pull request #226 from buildkite/refactor-collector
Browse files Browse the repository at this point in the history
Split Collect
  • Loading branch information
DrJosh9000 authored Nov 8, 2023
2 parents 4eff383 + 637b602 commit e2af9bf
Showing 1 changed file with 172 additions and 157 deletions.
329 changes: 172 additions & 157 deletions collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,206 +98,221 @@ func (c *Collector) Collect() (*Result, error) {
}

if len(c.Queues) == 0 {
log.Println("Collecting agent metrics for all queues")

endpoint, err := url.Parse(c.Endpoint)
if err != nil {
if err := c.collectAllQueues(httpClient, result); err != nil {
return nil, err
}
} else {
for _, queue := range c.Queues {
if err := c.collectQueue(httpClient, result, queue); err != nil {
return nil, err
}
}
}

endpoint.Path += "/metrics"
if !c.Quiet {
result.Dump()
}

req, err := http.NewRequest("GET", endpoint.String(), nil)
if err != nil {
return nil, err
}
return result, nil
}

req.Header.Set("User-Agent", c.UserAgent)
req.Header.Set("Authorization", fmt.Sprintf("Token %s", c.Token))
func (c *Collector) collectAllQueues(httpClient *http.Client, result *Result) error {
log.Println("Collecting agent metrics for all queues")

if c.DebugHttp {
if dump, err := httputil.DumpRequest(req, true); err == nil {
log.Printf("DEBUG request uri=%s\n%s\n", req.URL, dump)
}
}
endpoint, err := url.Parse(c.Endpoint)
if err != nil {
return err
}

res, err := httpClient.Do(req)
if err != nil {
return nil, err
}
defer res.Body.Close()
endpoint.Path += "/metrics"

if res.StatusCode == 401 {
return nil, fmt.Errorf("http 401 response received %w", ErrUnauthorized)
}
req, err := http.NewRequest("GET", endpoint.String(), nil)
if err != nil {
return err
}

if c.DebugHttp {
if dump, err := httputil.DumpResponse(res, true); err == nil {
log.Printf("DEBUG response uri=%s\n%s\n", req.URL, dump)
}
req.Header.Set("User-Agent", c.UserAgent)
req.Header.Set("Authorization", fmt.Sprintf("Token %s", c.Token))

if c.DebugHttp {
if dump, err := httputil.DumpRequest(req, true); err == nil {
log.Printf("DEBUG request uri=%s\n%s\n", req.URL, dump)
}
}

// Handle any errors
if res.StatusCode != http.StatusOK {
// If it's json response, show the error message
if strings.HasPrefix(res.Header.Get("Content-Type"), "application/json") {
var errStruct struct {
Message string `json:"message"`
}
err := json.NewDecoder(res.Body).Decode(&errStruct)
if err == nil {
return nil, errors.New(errStruct.Message)
} else {
log.Printf("Failed to decode error: %v", err)
}
}
res, err := httpClient.Do(req)
if err != nil {
return err
}
defer res.Body.Close()

return nil, fmt.Errorf("Request failed with %s (%d)", res.Status, res.StatusCode)
}
if res.StatusCode == 401 {
return fmt.Errorf("http 401 response received %w", ErrUnauthorized)
}

var allMetrics allMetricsResponse
if c.DebugHttp {
if dump, err := httputil.DumpResponse(res, true); err == nil {
log.Printf("DEBUG response uri=%s\n%s\n", req.URL, dump)
}
}

// Check if we get a poll duration header from server
if pollSeconds := res.Header.Get(PollDurationHeader); pollSeconds != "" {
pollSecondsInt, err := strconv.ParseInt(pollSeconds, 10, 64)
if err != nil {
log.Printf("Failed to parse %s header: %v", PollDurationHeader, err)
// Handle any errors
if res.StatusCode != http.StatusOK {
// If it's json response, show the error message
if strings.HasPrefix(res.Header.Get("Content-Type"), "application/json") {
var errStruct struct {
Message string `json:"message"`
}
err := json.NewDecoder(res.Body).Decode(&errStruct)
if err == nil {
return errors.New(errStruct.Message)
} else {
result.PollDuration = time.Duration(pollSecondsInt) * time.Second
log.Printf("Failed to decode error: %v", err)
}
}

err = json.NewDecoder(res.Body).Decode(&allMetrics)
return fmt.Errorf("Request failed with %s (%d)", res.Status, res.StatusCode)
}

var allMetrics allMetricsResponse

// Check if we get a poll duration header from server
if pollSeconds := res.Header.Get(PollDurationHeader); pollSeconds != "" {
pollSecondsInt, err := strconv.ParseInt(pollSeconds, 10, 64)
if err != nil {
return nil, err
log.Printf("Failed to parse %s header: %v", PollDurationHeader, err)
} else {
result.PollDuration = time.Duration(pollSecondsInt) * time.Second
}
}

if allMetrics.Organization.Slug == "" {
return nil, fmt.Errorf("No organization slug was found in the metrics response")
}
err = json.NewDecoder(res.Body).Decode(&allMetrics)
if err != nil {
return err
}

log.Printf("Found organization %q", allMetrics.Organization.Slug)
result.Org = allMetrics.Organization.Slug

result.Totals[ScheduledJobsCount] = allMetrics.Jobs.Scheduled
result.Totals[RunningJobsCount] = allMetrics.Jobs.Running
result.Totals[UnfinishedJobsCount] = allMetrics.Jobs.Total
result.Totals[WaitingJobsCount] = allMetrics.Jobs.Waiting
result.Totals[IdleAgentCount] = allMetrics.Agents.Idle
result.Totals[BusyAgentCount] = allMetrics.Agents.Busy
result.Totals[TotalAgentCount] = allMetrics.Agents.Total
result.Totals[BusyAgentPercentage] = busyAgentPercentage(allMetrics.Agents.metricsAgentsResponse)

for queueName, queueJobMetrics := range allMetrics.Jobs.Queues {
if _, ok := result.Queues[queueName]; !ok {
result.Queues[queueName] = map[string]int{}
}
result.Queues[queueName][ScheduledJobsCount] = queueJobMetrics.Scheduled
result.Queues[queueName][RunningJobsCount] = queueJobMetrics.Running
result.Queues[queueName][UnfinishedJobsCount] = queueJobMetrics.Total
result.Queues[queueName][WaitingJobsCount] = queueJobMetrics.Waiting
if allMetrics.Organization.Slug == "" {
return fmt.Errorf("No organization slug was found in the metrics response")
}

log.Printf("Found organization %q", allMetrics.Organization.Slug)
result.Org = allMetrics.Organization.Slug

result.Totals[ScheduledJobsCount] = allMetrics.Jobs.Scheduled
result.Totals[RunningJobsCount] = allMetrics.Jobs.Running
result.Totals[UnfinishedJobsCount] = allMetrics.Jobs.Total
result.Totals[WaitingJobsCount] = allMetrics.Jobs.Waiting
result.Totals[IdleAgentCount] = allMetrics.Agents.Idle
result.Totals[BusyAgentCount] = allMetrics.Agents.Busy
result.Totals[TotalAgentCount] = allMetrics.Agents.Total
result.Totals[BusyAgentPercentage] = busyAgentPercentage(allMetrics.Agents.metricsAgentsResponse)

for queueName, queueJobMetrics := range allMetrics.Jobs.Queues {
if _, ok := result.Queues[queueName]; !ok {
result.Queues[queueName] = map[string]int{}
}
result.Queues[queueName][ScheduledJobsCount] = queueJobMetrics.Scheduled
result.Queues[queueName][RunningJobsCount] = queueJobMetrics.Running
result.Queues[queueName][UnfinishedJobsCount] = queueJobMetrics.Total
result.Queues[queueName][WaitingJobsCount] = queueJobMetrics.Waiting
}

for queueName, queueAgentMetrics := range allMetrics.Agents.Queues {
if _, ok := result.Queues[queueName]; !ok {
result.Queues[queueName] = map[string]int{}
}
result.Queues[queueName][IdleAgentCount] = queueAgentMetrics.Idle
result.Queues[queueName][BusyAgentCount] = queueAgentMetrics.Busy
result.Queues[queueName][TotalAgentCount] = queueAgentMetrics.Total
result.Queues[queueName][BusyAgentPercentage] = busyAgentPercentage(queueAgentMetrics)
for queueName, queueAgentMetrics := range allMetrics.Agents.Queues {
if _, ok := result.Queues[queueName]; !ok {
result.Queues[queueName] = map[string]int{}
}
} else {
for _, queue := range c.Queues {
log.Printf("Collecting agent metrics for queue '%s'", queue)
result.Queues[queueName][IdleAgentCount] = queueAgentMetrics.Idle
result.Queues[queueName][BusyAgentCount] = queueAgentMetrics.Busy
result.Queues[queueName][TotalAgentCount] = queueAgentMetrics.Total
result.Queues[queueName][BusyAgentPercentage] = busyAgentPercentage(queueAgentMetrics)
}

endpoint, err := url.Parse(c.Endpoint)
if err != nil {
return nil, err
}
return nil
}

endpoint.Path += "/metrics/queue"
endpoint.RawQuery = url.Values{"name": {queue}}.Encode()
func (c *Collector) collectQueue(httpClient *http.Client, result *Result, queue string) error {
log.Printf("Collecting agent metrics for queue '%s'", queue)

req, err := http.NewRequest("GET", endpoint.String(), nil)
if err != nil {
return nil, err
}
endpoint, err := url.Parse(c.Endpoint)
if err != nil {
return err
}

req.Header.Set("User-Agent", c.UserAgent)
req.Header.Set("Authorization", fmt.Sprintf("Token %s", c.Token))
endpoint.Path += "/metrics/queue"
endpoint.RawQuery = url.Values{"name": {queue}}.Encode()

if c.DebugHttp {
if dump, err := httputil.DumpRequest(req, true); err == nil {
log.Printf("DEBUG request uri=%s\n%s\n", req.URL, dump)
}
}
req, err := http.NewRequest("GET", endpoint.String(), nil)
if err != nil {
return err
}

res, err := httpClient.Do(req)
if err != nil {
return nil, err
}
defer res.Body.Close()
req.Header.Set("User-Agent", c.UserAgent)
req.Header.Set("Authorization", fmt.Sprintf("Token %s", c.Token))

if res.StatusCode == 401 {
return nil, fmt.Errorf("http 401 response received %w", ErrUnauthorized)
}
if c.DebugHttp {
if dump, err := httputil.DumpRequest(req, true); err == nil {
log.Printf("DEBUG request uri=%s\n%s\n", req.URL, dump)
}
}

if c.DebugHttp {
if dump, err := httputil.DumpResponse(res, true); err == nil {
log.Printf("DEBUG response uri=%s\n%s\n", req.URL, dump)
}
}
res, err := httpClient.Do(req)
if err != nil {
return err
}
defer res.Body.Close()

// Handle any errors
if res.StatusCode != http.StatusOK {
// If it's json response, show the error message
if strings.HasPrefix(res.Header.Get("Content-Type"), "application/json") {
var errStruct struct {
Message string `json:"message"`
}
err := json.NewDecoder(res.Body).Decode(&errStruct)
if err == nil {
return nil, errors.New(errStruct.Message)
} else {
log.Printf("Failed to decode error: %v", err)
}
}

return nil, fmt.Errorf("Request failed with %s (%d)", res.Status, res.StatusCode)
}
if res.StatusCode == 401 {
return fmt.Errorf("http 401 response received %w", ErrUnauthorized)
}

var queueMetrics queueMetricsResponse
err = json.NewDecoder(res.Body).Decode(&queueMetrics)
if err != nil {
return nil, err
}
if c.DebugHttp {
if dump, err := httputil.DumpResponse(res, true); err == nil {
log.Printf("DEBUG response uri=%s\n%s\n", req.URL, dump)
}
}

if queueMetrics.Organization.Slug == "" {
return nil, fmt.Errorf("No organization slug was found in the metrics response")
// Handle any errors
if res.StatusCode != http.StatusOK {
// If it's json response, show the error message
if strings.HasPrefix(res.Header.Get("Content-Type"), "application/json") {
var errStruct struct {
Message string `json:"message"`
}

log.Printf("Found organization %q", queueMetrics.Organization.Slug)
result.Org = queueMetrics.Organization.Slug

result.Queues[queue] = map[string]int{
ScheduledJobsCount: queueMetrics.Jobs.Scheduled,
RunningJobsCount: queueMetrics.Jobs.Running,
UnfinishedJobsCount: queueMetrics.Jobs.Total,
WaitingJobsCount: queueMetrics.Jobs.Waiting,
IdleAgentCount: queueMetrics.Agents.Idle,
BusyAgentCount: queueMetrics.Agents.Busy,
TotalAgentCount: queueMetrics.Agents.Total,
BusyAgentPercentage: busyAgentPercentage(queueMetrics.Agents),
err := json.NewDecoder(res.Body).Decode(&errStruct)
if err == nil {
return errors.New(errStruct.Message)
} else {
log.Printf("Failed to decode error: %v", err)
}
}

return fmt.Errorf("Request failed with %s (%d)", res.Status, res.StatusCode)
}

if !c.Quiet {
result.Dump()
var queueMetrics queueMetricsResponse
err = json.NewDecoder(res.Body).Decode(&queueMetrics)
if err != nil {
return err
}

return result, nil
if queueMetrics.Organization.Slug == "" {
return fmt.Errorf("No organization slug was found in the metrics response")
}

log.Printf("Found organization %q", queueMetrics.Organization.Slug)
result.Org = queueMetrics.Organization.Slug

result.Queues[queue] = map[string]int{
ScheduledJobsCount: queueMetrics.Jobs.Scheduled,
RunningJobsCount: queueMetrics.Jobs.Running,
UnfinishedJobsCount: queueMetrics.Jobs.Total,
WaitingJobsCount: queueMetrics.Jobs.Waiting,
IdleAgentCount: queueMetrics.Agents.Idle,
BusyAgentCount: queueMetrics.Agents.Busy,
TotalAgentCount: queueMetrics.Agents.Total,
BusyAgentPercentage: busyAgentPercentage(queueMetrics.Agents),
}
return nil
}

func busyAgentPercentage(agents metricsAgentsResponse) int {
Expand Down

0 comments on commit e2af9bf

Please sign in to comment.