Skip to content

Commit

Permalink
Remove error logs in processing loop
Browse files Browse the repository at this point in the history
Errors are provided as metrics only to minimize the impact on resources
consumption when an error is thrown repeatedly.
  • Loading branch information
jotak committed May 4, 2023
1 parent 95395fc commit 3c965eb
Show file tree
Hide file tree
Showing 2 changed files with 3 additions and 6 deletions.
1 change: 0 additions & 1 deletion pkg/pipeline/ingest/ingest_grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,6 @@ func instrumentGRPC(m *metrics) grpc2.UnaryServerInterceptor {

resp, err = handler(ctx, req)
if err != nil {
glog.Errorf("Reporting metric error: %v", err)
m.error(fmt.Sprint(status.Code(err)))
}

Expand Down
8 changes: 3 additions & 5 deletions pkg/pipeline/ingest/ingest_kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,19 +113,17 @@ func (k *ingestKafka) isStopped() bool {
}

func (k *ingestKafka) processRecordDelay(record config.GenericMap) {
TimeFlowEndInterface, ok := record["TimeFlowEndMs"]
timeFlowEndInterface, ok := record["TimeFlowEndMs"]
if !ok {
klog.Errorf("TimeFlowEndMs missing in record %v", record)
k.metrics.error("TimeFlowEndMs missing")
return
}
TimeFlowEnd, ok := TimeFlowEndInterface.(int64)
timeFlowEnd, ok := timeFlowEndInterface.(int64)
if !ok {
klog.Errorf("Cannot parse TimeFlowEndMs of record %v", record)
k.metrics.error("Cannot parse TimeFlowEndMs")
return
}
delay := time.Since(time.UnixMilli(TimeFlowEnd)).Seconds()
delay := time.Since(time.UnixMilli(timeFlowEnd)).Seconds()
k.metrics.latency.Observe(delay)
}

Expand Down

0 comments on commit 3c965eb

Please sign in to comment.