Skip to content
This repository has been archived by the owner on Oct 12, 2023. It is now read-only.

Feature: Pre aggregate metrics to reduce cost #33

Merged
merged 15 commits into from
Feb 21, 2019
83 changes: 67 additions & 16 deletions pkg/appinsights.go
Original file line number Diff line number Diff line change
@@ -1,66 +1,104 @@
package batchinsights

import (
"bytes"
"fmt"
"strconv"
"time"

"github.com/Microsoft/ApplicationInsights-Go/appinsights"
)

type AppInsightsService struct {
client appinsights.TelemetryClient
client appinsights.TelemetryClient
aggregateCollectionStart *time.Time
aggregates map[string]*appinsights.AggregateMetricTelemetry
}

const AGGREGATE_TIME = time.Duration(1) * time.Minute

func NewAppInsightsService(instrumentationKey string, poolId string, nodeId string) AppInsightsService {
client := appinsights.NewTelemetryClient(instrumentationKey)
client.Context().Tags.Cloud().SetRole(poolId)
client.Context().Tags.Cloud().SetRoleInstance(nodeId)

return AppInsightsService{
client: client,
client: client,
aggregates: make(map[string]*appinsights.AggregateMetricTelemetry),
}
}

func (service *AppInsightsService) track(metric *appinsights.MetricTelemetry) {
t := time.Now()

if service.aggregateCollectionStart != nil {
elapsed := t.Sub(*service.aggregateCollectionStart)

if elapsed > AGGREGATE_TIME {
for k, aggregate := range service.aggregates {
fmt.Printf(" - %s: %f\n", k, aggregate.Value)
service.client.Track(aggregate)
}
service.aggregates = make(map[string]*appinsights.AggregateMetricTelemetry)
service.aggregateCollectionStart = &t
}
} else {
service.aggregateCollectionStart = &t
}

id := getMetricId(metric)

aggregate, ok := service.aggregates[id]
if !ok {
aggregate = appinsights.NewAggregateMetricTelemetry(metric.Name)
aggregate.Properties = metric.Properties
service.aggregates[id] = aggregate
}
aggregate.AddData([]float64{metric.Value})
}

func (service AppInsightsService) UploadStats(stats NodeStats) {
func (service *AppInsightsService) UploadStats(stats NodeStats) {
client := service.client

for cpuN, percent := range stats.cpuPercents {
metric := appinsights.NewMetricTelemetry("Cpu usage", percent)
metric.Properties["CPU #"] = strconv.Itoa(cpuN)
client.Track(metric)
metric.Properties["Core count"] = strconv.Itoa(len(stats.cpuPercents))
service.track(metric)
}

for _, usage := range stats.diskUsage {
usedMetric := appinsights.NewMetricTelemetry("Disk usage", float64(usage.Used))
usedMetric.Properties["Disk"] = usage.Path
client.Track(usedMetric)
service.track(usedMetric)
freeMetric := appinsights.NewMetricTelemetry("Disk free", float64(usage.Free))
freeMetric.Properties["Disk"] = usage.Path
client.Track(freeMetric)
service.track(freeMetric)
}

if stats.memory != nil {
client.TrackMetric("Memory used", float64(stats.memory.Used))
client.TrackMetric("Memory available", float64(stats.memory.Total-stats.memory.Used))
service.track(appinsights.NewMetricTelemetry("Memory used", float64(stats.memory.Used)))
service.track(appinsights.NewMetricTelemetry("Memory available", float64(stats.memory.Total-stats.memory.Used)))
}
if stats.diskIO != nil {
client.TrackMetric("Disk read", float64(stats.diskIO.ReadBps))
client.TrackMetric("Disk write", float64(stats.diskIO.WriteBps))
service.track(appinsights.NewMetricTelemetry("Disk read", float64(stats.diskIO.ReadBps)))
service.track(appinsights.NewMetricTelemetry("Disk write", float64(stats.diskIO.WriteBps)))
}

if stats.netIO != nil {
client.TrackMetric("Network read", float64(stats.netIO.ReadBps))
client.TrackMetric("Network write", float64(stats.netIO.WriteBps))
service.track(appinsights.NewMetricTelemetry("Network read", float64(stats.netIO.ReadBps)))
service.track(appinsights.NewMetricTelemetry("Network write", float64(stats.netIO.WriteBps)))
}

if len(stats.gpus) > 0 {
for cpuN, usage := range stats.gpus {
gpuMetric := appinsights.NewMetricTelemetry("Gpu usage", usage.GPU)
gpuMetric.Properties["GPU #"] = strconv.Itoa(cpuN)
client.Track(gpuMetric)
service.track(gpuMetric)

gpuMemoryMetric := appinsights.NewMetricTelemetry("Gpu memory usage", usage.Memory)
gpuMemoryMetric.Properties["GPU #"] = strconv.Itoa(cpuN)
client.Track(gpuMemoryMetric)
service.track(gpuMemoryMetric)
}
}

Expand All @@ -73,18 +111,31 @@ func (service AppInsightsService) UploadStats(stats NodeStats) {
cpuMetric := appinsights.NewMetricTelemetry("Process CPU", processStats.cpu)
cpuMetric.Properties["Process Name"] = processStats.name
cpuMetric.Properties["PID"] = pidStr
client.Track(cpuMetric)
service.track(cpuMetric)
}

{
memMetric := appinsights.NewMetricTelemetry("Process Memory", float64(processStats.memory))
memMetric.Properties["Process Name"] = processStats.name
memMetric.Properties["PID"] = pidStr
client.Track(memMetric)
service.track(memMetric)
}

}
}

client.Channel().Flush()
}

func getMetricId(metric *appinsights.MetricTelemetry) string {
groupBy := createKeyValuePairs(metric.Properties)
return fmt.Sprintf("%s/%s", metric.Name, groupBy)
}

func createKeyValuePairs(m map[string]string) string {
b := new(bytes.Buffer)
for key, value := range m {
fmt.Fprintf(b, "%s=%s", key, value)
}
return b.String()
}
1 change: 0 additions & 1 deletion pkg/disk/disk_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ func DiskIO() *utils.IOStats {
var writeBytes uint64 = 0

for _, v := range counters {
fmt.Println("stats", v.WriteBytes, v.WriteTime, v.WriteCount)
readBytes += v.ReadBytes
writeBytes += v.WriteBytes
}
Expand Down
24 changes: 20 additions & 4 deletions scripts/README.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,27 @@
## Script to be used as one liner
# Script to be used as one liner

# For linux
### For linux

* `run-linux.sh`: Run a published version for linux
* `dev.sh`: Will install go, git then build and run on the fly


# For windows
### For windows
* `run-windows.ps1`: Run a publush version for windows
* `dev-windows.ps1`: Will install go, git then build and run on the fly
* `dev-windows.ps1`: Will install go, git then build and run on the fly


## Development
There is some dev script that will install go and other needed dependencies to build and run this project on the fly.
Set `BATCH_INSIGHTS_BRANCH` environment variable to the branch you are testing

On linux
```bash
/bin/bash -c 'wget -O - https://raw.githubusercontent.com/Azure/batch-insights/$BATCH_INSIGHTS_BRANCH/scripts/dev.sh | bash'
```

On windows

```powershell
cmd /c @"%SystemRoot%\System32\WindowsPowerShell\v1.0\powershell.exe" -NoProfile -InputFormat None -ExecutionPolicy Bypass -Command "iex ((New-Object System.Net.WebClient).DownloadString('https://raw.githubusercontent.com/Azure/batch-insights/$env:BATCH_INSIGHTS_BRANCH/dev-windows.ps1'))"
```
5 changes: 4 additions & 1 deletion scripts/dev.sh
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
set -e

branch=$BATCH_INSIGHTS_BRANCH
echo "Running Batch insights dev script for linux from branch $branch"

apt-get update
apt-get install -y git binutils bison build-essential

Expand All @@ -13,7 +16,7 @@ export PATH=$GOPATH/bin:$GOROOT/bin:$PATH

echo GO version $(go version)

git clone https://github.com/Azure/batch-insights
git clone https://github.com/Azure/batch-insights -b $branch

cd batch-insights
go build
Expand Down