Skip to content

Commit

Permalink
collect failed dag ins id
Browse files Browse the repository at this point in the history
  • Loading branch information
Wenne committed Nov 8, 2023
1 parent ac83bb4 commit e80f79e
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 1 deletion.
6 changes: 6 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,9 @@ mock:
for file in `find . -type d \( -path ./.git -o -path ./.github \) -prune -o -name '*.go' -print | xargs grep --files-with-matches -e '//go:generate mockgen'`; do \
go generate $$file; \
done

.PHONY: build
GO := GO111MODULE=on go
GOBUILD := CGO_ENABLED=0 $(GO) build
build:
GOARCH=amd64 GOOS=linux $(GOBUILD) -gcflags "all=-N -l" -o dist/fastflow examples/mysql/main.go
15 changes: 14 additions & 1 deletion pkg/exporter/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package exporter
import (
"context"
"net/http"
"strings"
"sync"
"sync/atomic"

"github.com/prometheus/client_golang/prometheus"
Expand All @@ -23,7 +25,7 @@ var (
failedTaskCountDesc = prometheus.NewDesc(
"fastflow_executor_task_failed_total",
"The count of already failed task.",
[]string{"worker_key"}, nil,
[]string{"worker_key", "dag_ins_ids"}, nil,
)
successTaskCountDesc = prometheus.NewDesc(
"fastflow_executor_task_success_total",
Expand Down Expand Up @@ -60,9 +62,12 @@ var (

// ExecutorCollector
type ExecutorCollector struct {
rwMutex sync.RWMutex

RunningTaskCount int64
SuccessTaskCount uint64
FailedTaskCount uint64
FailedDagInsIDs []string
CompletedTaskCount uint64

ParseElapsedMs int64
Expand All @@ -88,6 +93,9 @@ func (c *ExecutorCollector) Handle(cxt context.Context, e goevent.Event) {
switch completeEvent.TaskIns.Status {
case entity.TaskInstanceStatusFailed:
atomic.AddUint64(&c.FailedTaskCount, 1)
c.rwMutex.Lock()
c.FailedDagInsIDs = append(c.FailedDagInsIDs, completeEvent.TaskIns.DagInsID)
c.rwMutex.Unlock()
case entity.TaskInstanceStatusSuccess:
atomic.AddUint64(&c.SuccessTaskCount, 1)
}
Expand Down Expand Up @@ -120,11 +128,16 @@ func (c *ExecutorCollector) Collect(ch chan<- prometheus.Metric) {
float64(c.CompletedTaskCount),
mod.GetKeeper().WorkerKey(),
)
c.rwMutex.Lock()
dagInsIdStr := strings.Join(c.FailedDagInsIDs, ",")
c.FailedDagInsIDs = []string{}
c.rwMutex.Unlock()
ch <- prometheus.MustNewConstMetric(
failedTaskCountDesc,
prometheus.CounterValue,
float64(c.FailedTaskCount),
mod.GetKeeper().WorkerKey(),
dagInsIdStr,
)
ch <- prometheus.MustNewConstMetric(
successTaskCountDesc,
Expand Down

0 comments on commit e80f79e

Please sign in to comment.