Skip to content

Commit

Permalink
Merge pull request #47 from sasile/master
Browse files Browse the repository at this point in the history
histogram
  • Loading branch information
sasile authored May 22, 2018
2 parents debee3c + 2f28cf0 commit 7b66f00
Show file tree
Hide file tree
Showing 10 changed files with 230 additions and 150 deletions.
183 changes: 114 additions & 69 deletions http_blaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ package main
import (
"flag"
"fmt"
log "github.com/sirupsen/logrus"
"github.com/Gurpartap/logrus-stack"
log "github.com/sirupsen/logrus"
"github.com/v3io/http_blaster/httpblaster"
"github.com/v3io/http_blaster/httpblaster/config"
"github.com/v3io/http_blaster/httpblaster/tui"
Expand All @@ -34,38 +34,39 @@ import (
"sync"
"sync/atomic"
"time"
"github.com/v3io/http_blaster/httpblaster/histogram"
//"github.com/v3io/http_blaster/httpblaster/histogram"
"sort"
)

var (
start_time time.Time
end_time time.Time
wl_id int32 = -1
conf_file string
results_file string
showVersion bool
dataBfr []byte
cpu_profile = false
mem_profile = false
cfg config.TomlConfig
executors []*httpblaster.Executor
ex_group sync.WaitGroup
enable_log bool
log_file *os.File
worker_qd int = 10000
verbose bool = false
enable_ui bool
ch_put_latency chan time.Duration
ch_get_latency chan time.Duration
LatencyCollectorGet histogram.LatencyHist// tui.LatencyCollector
LatencyCollectorPut histogram.LatencyHist//tui.LatencyCollector
StatusesCollector tui.StatusesCollector
term_ui *tui.Term_ui
dump_failures bool = true
dump_location string = "."
start_time time.Time
end_time time.Time
wl_id int32 = -1
conf_file string
results_file string
showVersion bool
dataBfr []byte
cpu_profile = false
mem_profile = false
cfg config.TomlConfig
executors []*httpblaster.Executor
ex_group sync.WaitGroup
enable_log bool
log_file *os.File
worker_qd int = 10000
verbose bool = false
enable_ui bool
ch_put_latency chan time.Duration
ch_get_latency chan time.Duration
//LatencyCollectorGet histogram.LatencyHist// tui.LatencyCollector
//LatencyCollectorPut histogram.LatencyHist//tui.LatencyCollector
//StatusesCollector tui.StatusesCollector
term_ui *tui.Term_ui
dump_failures bool = true
dump_location string = "."
)

const AppVersion = "3.0.3"
const AppVersion = "3.0.5"

func init() {
const (
Expand Down Expand Up @@ -165,9 +166,9 @@ func load_test_Config() {
}

func generate_executors(term_ui *tui.Term_ui) {
ch_put_latency = LatencyCollectorPut.New()
ch_get_latency = LatencyCollectorGet.New()
ch_statuses := StatusesCollector.New(160, 1)
//ch_put_latency = LatencyCollectorPut.New()
//ch_get_latency = LatencyCollectorGet.New()
//ch_statuses := StatusesCollector.New(160, 1)

for Name, workload := range cfg.Workloads {
log.Println("Adding executor for ", Name)
Expand All @@ -183,9 +184,9 @@ func generate_executors(term_ui *tui.Term_ui) {
TermUi: term_ui,
Ch_get_latency: ch_get_latency,
Ch_put_latency: ch_put_latency,
Ch_statuses: ch_statuses,
DumpFailures: dump_failures,
DumpLocation: dump_location}
//Ch_statuses: ch_statuses,
DumpFailures: dump_failures,
DumpLocation: dump_location}
executors = append(executors, e)
}
}
Expand All @@ -202,8 +203,8 @@ func wait_for_completion() {
log.Println("Wait for executors to finish")
ex_group.Wait()
end_time = time.Now()
close(ch_get_latency)
close(ch_put_latency)
//close(ch_get_latency)
///close(ch_put_latency)
}

func wait_for_ui_completion(ch_done chan struct{}) {
Expand Down Expand Up @@ -418,7 +419,7 @@ func enable_tui() chan struct{} {
case <-tick:
//term_ui.Update_put_latency_chart(LatencyCollectorPut.Get())
//term_ui.Update_get_latency_chart(LatencyCollectorGet.Get())
term_ui.Update_status_codes(StatusesCollector.Get())
//term_ui.Update_status_codes(StatusesCollector.Get())
term_ui.Refresh_log()
term_ui.Render()
}
Expand All @@ -430,55 +431,99 @@ func enable_tui() chan struct{} {
}

func dump_latencies_histograms() {
prefix_get := "GetHist"
prefix_put := "PutHist"
title := "type \t usec \t\t percentage\n"
strout := "Latency Histograms:\n"
log.Println("LatencyCollectorGet")
vs_get, ls_get := LatencyCollectorGet.GetResults()
if len(vs_get) >0 {
strout += "Get latency histogram:\n"
strout += title
//total := float64(0)
for i, v := range vs_get {
//value, _ := strconv.ParseFloat(v, 64)
//total += ls_get[i]
strout += fmt.Sprintf("%s: %s \t\t %3.4f%%\n", prefix_get, v,ls_get[i])
latency_get := make(map[int64]int)
latency_put := make(map[int64]int)
total_get := 0
total_put := 0

for _, e := range executors {
hist := e.LatencyHist()
if e.GetType() == "GET" {
for k, v := range hist {
latency_get[k] += v
total_get += v
}
} else {
for k, v := range hist {
latency_put[k] += v
total_put += v
}
}
//strout += fmt.Sprintf("total: %v", total)
}
vs_put, ls_put := LatencyCollectorPut.GetResults()
if len(vs_put) >0 {
strout += "Put latency histogram:\n"
strout += title
dump_latency_histogram(latency_get, total_get, "GET")
dump_latency_histogram(latency_put, total_put, "PUT")

for i, v := range vs_put {
if ls_put[i] != 0 {
strout += fmt.Sprintf("%s: %s \t\t %3.4f%%\n", prefix_put, v,ls_put[i])
}
}

func remap_latency_histogram(hist map[int64]int) map[int64]int {
res := make(map[int64]int)
for k, v := range hist {
if k > 10000 { //1 sec
res[10000] += v
} else if k > 5000 { //500 mili
res[5000] += v
} else if k > 1000 { // 100mili
res[1000] += v
} else if k > 100 { //10 mili
res[100] += v
} else if k > 50 { //5 mili
res[50] += v
} else if k > 20 { //2 mili
res[20] += v
} else if k > 10 { //1 mili
res[10] += v
} else { //below 1 mili
res[k] += v
}
}
log.Println(strout)
return res
}

func dump_status_code_histogram() {
log.Println("Status codes:")
labels, values := StatusesCollector.Get()
for i, v := range labels {
if values[i] != 0 {
log.Println(fmt.Sprintf("%v %v%%", v, values[i]))
func dump_latency_histogram(histogram map[int64]int, total int, req_type string) ([]string, []float64) {
var keys []int
var prefix string
title := "type \t usec \t\t percentage\n"
if req_type == "GET" {
prefix = "GetHist"
} else {
prefix = "PutHist"
}
strout := fmt.Sprintf("%s Latency Histograms:\n", prefix)
hist := remap_latency_histogram(histogram)
for k := range hist {
keys = append(keys, int(k))
}
sort.Ints(keys)
log.Debugln("latency hist wait released")
res_strings := []string{}
res_values := []float64{}

for _, k := range keys {
v := hist[int64(k)]
res_strings = append(res_strings, fmt.Sprintf("%5d", k*100))
value := float64(v*100) / float64(total)
res_values = append(res_values, value)
}

if len(res_strings) > 0 {
strout += title
for i, v := range res_strings {
strout += fmt.Sprintf("%s: %s \t\t %3.4f%%\n", prefix, v, res_values[i])
}
}
log.Println(strout)
return res_strings, res_values
}


func main() {
parse_cmd_line_args()
load_test_Config()
ch_done := enable_tui()
configure_log()
log.Println("Starting http_blaster")

defer handle_exit()
//defer handle_exit()
//defer close_log_file()
defer stop_cpu_profile()
defer write_mem_profile()
Expand All @@ -489,7 +534,7 @@ func main() {
wait_for_completion()
log.Println("Executors done!")
dump_latencies_histograms()
dump_status_code_histogram()
//dump_status_code_histogram()
err_code := report()
log.Println("Done with error code ", err_code)
wait_for_ui_completion(ch_done)
Expand Down
51 changes: 32 additions & 19 deletions httpblaster/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,9 @@ type Executor struct {
TermUi *tui.Term_ui
Ch_get_latency chan time.Duration
Ch_put_latency chan time.Duration
Ch_statuses chan int
DumpFailures bool
DumpLocation string
//Ch_statuses chan int
DumpFailures bool
DumpLocation string
}

func (self *Executor) load_request_generator() (chan *request_generators.Request,
Expand Down Expand Up @@ -129,14 +129,18 @@ func (self *Executor) load_request_generator() (chan *request_generators.Request
return ch_req, release_req, ch_response
}

func (self *Executor)GetWorkerType() worker.WorkerType {
func (self *Executor) GetWorkerType() worker.WorkerType {
gen_type := strings.ToLower(self.Workload.Generator)
if gen_type == request_generators.PERFORMANCE{
if gen_type == request_generators.PERFORMANCE {
return worker.PERFORMANCE_WORKER
}
return worker.INGESTION_WORKER
}

func (self *Executor) GetType() string {
return self.Workload.Type
}

func (self *Executor) run(wg *sync.WaitGroup) error {
defer wg.Done()
self.Start_time = time.Now()
Expand All @@ -157,18 +161,20 @@ func (self *Executor) run(wg *sync.WaitGroup) error {
server := fmt.Sprintf("%s:%s", host_address, self.Globals.Port)
w := worker.NewWorker(self.GetWorkerType(),
server, self.Globals.TLSMode, self.Workload.Lazy,
self.Globals.RetryOnStatusCodes,
self.Globals.RetryCount, self.Globals.PemFile, i)
self.Globals.RetryOnStatusCodes,
self.Globals.RetryCount, self.Globals.PemFile, i)
self.workers = append(self.workers, w)
var ch_latency chan time.Duration
if self.Workload.Type == "GET" {
ch_latency = self.Ch_get_latency
} else {
ch_latency = self.Ch_put_latency
}
//var ch_latency chan time.Duration
//if self.Workload.Type == "GET" {
// ch_latency = self.Ch_get_latency
//} else {
// ch_latency = self.Ch_put_latency
//}

go w.RunWorker(ch_response, ch_req, &workers_wg, release_req_flag, ch_latency,
self.Ch_statuses, self.DumpFailures, self.DumpLocation)
go w.RunWorker(ch_response, ch_req,
&workers_wg, release_req_flag, // ch_latency,
//self.Ch_statuses,
self.DumpFailures, self.DumpLocation)
}
ended := make(chan bool)
go func() {
Expand Down Expand Up @@ -239,10 +245,6 @@ LOOP:
func (self *Executor) Start(wg *sync.WaitGroup) error {
self.results.Statuses = make(map[int]uint64)
log.Info("at executor start ", self.Workload)
//self.host = self.Globals.Server
//self.port = self.Globals.Port
//self.tls_mode = self.Globals.TLSMode
//self.Globals.StatusCodesAcceptance = self.Globals.StatusCodesAcceptance
go func() {
self.run(wg)
}()
Expand Down Expand Up @@ -289,3 +291,14 @@ func (self *Executor) Report() (executor_result, error) {
}
return self.results, nil
}

func (self *Executor) LatencyHist() map[int64]int {
res := make(map[int64]int)
for _, w := range self.workers {
hist := w.GetHist()
for k, v := range hist {
res[k] += v
}
}
return res
}
Loading

0 comments on commit 7b66f00

Please sign in to comment.