Skip to content

Commit

Permalink
Merge pull request #46 from sasile/master
Browse files Browse the repository at this point in the history
fix ingest worker + latency hist
  • Loading branch information
sasile authored May 17, 2018
2 parents 5eab75e + dbb48c3 commit debee3c
Show file tree
Hide file tree
Showing 5 changed files with 116 additions and 19 deletions.
34 changes: 20 additions & 14 deletions http_blaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import (
"sync"
"sync/atomic"
"time"
"strconv"
"github.com/v3io/http_blaster/httpblaster/histogram"
)

var (
Expand All @@ -55,8 +55,10 @@ var (
worker_qd int = 10000
verbose bool = false
enable_ui bool
LatencyCollectorGet tui.LatencyCollector
LatencyCollectorPut tui.LatencyCollector
ch_put_latency chan time.Duration
ch_get_latency chan time.Duration
LatencyCollectorGet histogram.LatencyHist// tui.LatencyCollector
LatencyCollectorPut histogram.LatencyHist//tui.LatencyCollector
StatusesCollector tui.StatusesCollector
term_ui *tui.Term_ui
dump_failures bool = true
Expand Down Expand Up @@ -163,8 +165,8 @@ func load_test_Config() {
}

func generate_executors(term_ui *tui.Term_ui) {
ch_put_latency := LatencyCollectorPut.New(160, 1)
ch_get_latency := LatencyCollectorGet.New(160, 1)
ch_put_latency = LatencyCollectorPut.New()
ch_get_latency = LatencyCollectorGet.New()
ch_statuses := StatusesCollector.New(160, 1)

for Name, workload := range cfg.Workloads {
Expand Down Expand Up @@ -200,6 +202,8 @@ func wait_for_completion() {
log.Println("Wait for executors to finish")
ex_group.Wait()
end_time = time.Now()
close(ch_get_latency)
close(ch_put_latency)
}

func wait_for_ui_completion(ch_done chan struct{}) {
Expand Down Expand Up @@ -396,7 +400,7 @@ func exit(err_code int) {
func handle_exit() {
if err := recover(); err != nil {
log.Println(err)
os.Exit(1)
log.Exit(1)
}
}

Expand All @@ -412,8 +416,8 @@ func enable_tui() chan struct{} {
case <-ch_done:
return
case <-tick:
term_ui.Update_put_latency_chart(LatencyCollectorPut.Get())
term_ui.Update_get_latency_chart(LatencyCollectorGet.Get())
//term_ui.Update_put_latency_chart(LatencyCollectorPut.Get())
//term_ui.Update_get_latency_chart(LatencyCollectorGet.Get())
term_ui.Update_status_codes(StatusesCollector.Get())
term_ui.Refresh_log()
term_ui.Render()
Expand All @@ -428,17 +432,20 @@ func enable_tui() chan struct{} {
func dump_latencies_histograms() {
prefix_get := "GetHist"
prefix_put := "PutHist"
title := "type \t usec \t\t\t percentage\n"
title := "type \t usec \t\t percentage\n"
strout := "Latency Histograms:\n"
log.Println("LatencyCollectorGet")
vs_get, ls_get := LatencyCollectorGet.GetResults()
if len(vs_get) >0 {
strout += "Get latency histogram:\n"
strout += title
//total := float64(0)
for i, v := range vs_get {
value, _ := strconv.ParseFloat(v, 64)
strout += fmt.Sprintf("%s: %.3f \t\t %.4f%%\n", prefix_get, value, ls_get[i])
//value, _ := strconv.ParseFloat(v, 64)
//total += ls_get[i]
strout += fmt.Sprintf("%s: %s \t\t %3.4f%%\n", prefix_get, v,ls_get[i])
}
//strout += fmt.Sprintf("total: %v", total)
}
vs_put, ls_put := LatencyCollectorPut.GetResults()
if len(vs_put) >0 {
Expand All @@ -447,8 +454,7 @@ func dump_latencies_histograms() {

for i, v := range vs_put {
if ls_put[i] != 0 {
value, _ := strconv.ParseFloat(v, 64)
strout += fmt.Sprintf("%s:%.3f \t\t %.4f%%\n", prefix_put, value, ls_put[i])
strout += fmt.Sprintf("%s: %s \t\t %3.4f%%\n", prefix_put, v,ls_put[i])
}
}
}
Expand All @@ -473,7 +479,7 @@ func main() {
log.Println("Starting http_blaster")

defer handle_exit()
defer close_log_file()
//defer close_log_file()
defer stop_cpu_profile()
defer write_mem_profile()

Expand Down
63 changes: 63 additions & 0 deletions httpblaster/histogram/latency_hist.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package histogram

import (
"time"
"sync"
log "github.com/sirupsen/logrus"
"fmt"
"sort"
)

type LatencyHist struct {
ch_values chan time.Duration
hist map[int]int
count int64
wg sync.WaitGroup
}


func (self *LatencyHist) Add(v time.Duration) {
log.Debugln("values added")
self.ch_values <- v
}

func (self *LatencyHist) place(v float64) {
self.hist[int(v/100)]++
}

func (self *LatencyHist)New()chan time.Duration {
log.Debugln("new latency hist")
self.hist = make(map[int]int)
self.wg.Add(1)

self.ch_values = make(chan time.Duration, 10000)
go func() {
defer self.wg.Done()
for v := range self.ch_values {
self.count++
self.place(float64(v.Nanoseconds() / 1000))
}
}()
return self.ch_values
}

func (self *LatencyHist) GetResults() ([]string, []float64) {
log.Debugln("get latency hist")
self.wg.Wait()
var keys []int
for k := range self.hist {
keys = append(keys, k)
}
sort.Ints(keys)
log.Debugln("latency hist wait released")
res_strings := [] string{}
res_values := [] float64{}
for _,k := range keys{
v := self.hist[k]
res_strings = append(res_strings, fmt.Sprintf("%5d - %5d",
k*100, (k+1)*100) )
value := float64(v * 100) / float64(self.count)
res_values = append(res_values,value)
}
return res_strings, res_values
}
29 changes: 29 additions & 0 deletions httpblaster/histogram/latency_hist_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package histogram

import (
"testing"
"time"
"math/rand"
)

func TestLatencyHist_Get(t *testing.T) {
l := LatencyHist{}
c:=l.New()
req:= 1000000

go func() {
for i := 0; i < req; i++ {
l.Add(time.Microsecond * time.Duration(rand.Intn(2000)))

}
close(c)
}()

s,v:= l.GetResults()
total:= float64(0)
for i,_ := range s{
total+=v[i]
t.Logf("%6v(us)\t\t%3.2f%%", s[i], v[i])
}
t.Logf("Total: %3.3f", total)
}
2 changes: 1 addition & 1 deletion httpblaster/tui/latency_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ type LatencyCollector struct {
}

func (self *LatencyCollector) New(n int, alpha float64) chan time.Duration {
self.WeighHist = gohistogram.NewHistogram(20)
self.WeighHist = gohistogram.NewHistogram(50)
self.ch_values = make(chan time.Duration, 400000)
go func() {
for v := range self.ch_values {
Expand Down
7 changes: 3 additions & 4 deletions httpblaster/worker/ingest_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,16 +133,15 @@ func (w *IngestWorker) RunWorker(ch_resp chan *request_generators.Response, ch_r
oncePrepare.Do(prepareRequest)
}

var response *request_generators.Response
var err error
var d time.Duration
response := request_generators.AcquireResponse()
LOOP:
for i := 0; i < w.retry_count; i++ {
response := request_generators.AcquireResponse()
err, d = w.send_request(submit_request, response)
if err != nil {
//retry on error
request_generators.ReleaseResponse(response)
response.Response.Reset()
continue
} else{
ch_statuses <- response.Response.StatusCode()
Expand All @@ -154,7 +153,7 @@ func (w *IngestWorker) RunWorker(ch_resp chan *request_generators.Response, ch_r
break LOOP
} else if i+1 < w.retry_count {
//not the last loop
request_generators.ReleaseResponse(response)
response.Response.Reset()
}
} else {
break LOOP
Expand Down

0 comments on commit debee3c

Please sign in to comment.