Skip to content

Commit

Permalink
time series simulator
Browse files Browse the repository at this point in the history
  • Loading branch information
pdziepak committed Jun 12, 2017
1 parent aa4baa6 commit 8dae319
Show file tree
Hide file tree
Showing 2 changed files with 159 additions and 3 deletions.
38 changes: 35 additions & 3 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ var tableName string
var counterTableName string

var concurrency int
var maximumRate int

var testDuration time.Duration

Expand All @@ -30,6 +31,8 @@ var inRestriction bool

var timeout time.Duration

var startTime time.Time

var stopAll uint32

func PrepareDatabase(session *gocql.Session, replicationFactor int) {
Expand All @@ -50,7 +53,7 @@ func PrepareDatabase(session *gocql.Session, replicationFactor int) {
}
}

func GetWorkload(name string, threadId int, partitionOffset int64) WorkloadGenerator {
func GetWorkload(name string, threadId int, partitionOffset int64, mode string, writeRate int64, distribution string) WorkloadGenerator {
switch name {
case "sequential":
pksPerThread := partitionCount / int64(concurrency)
Expand All @@ -64,6 +67,14 @@ func GetWorkload(name string, threadId int, partitionOffset int64) WorkloadGener
return NewSequentialVisitAll(thisOffset+partitionOffset, thisSize, clusteringRowCount)
case "uniform":
return NewRandomUniform(threadId, partitionCount, clusteringRowCount)
case "timeseries":
if mode == "read" {
return NewTimeSeriesReader(threadId, concurrency, partitionCount, clusteringRowCount, writeRate, distribution, startTime)
} else if mode == "write" {
return NewTimeSeriesWriter(threadId, concurrency, partitionCount, clusteringRowCount, startTime, int64(maximumRate/concurrency))
} else {
log.Fatal("time series workload supports only write and read modes")
}
default:
log.Fatal("unknown workload: ", name)
}
Expand Down Expand Up @@ -102,11 +113,13 @@ func main() {
var nodes string
var clientCompression bool
var connectionCount int
var maximumRate int
var pageSize int

var partitionOffset int64

var writeRate int64
var distribution string

flag.StringVar(&mode, "mode", "", "operating mode: write, read")
flag.StringVar(&workload, "workload", "", "workload: sequential, uniform")
flag.StringVar(&consistencyLevel, "consistency-level", "quorum", "consistency level")
Expand All @@ -131,6 +144,11 @@ func main() {

flag.Int64Var(&partitionOffset, "partition-offset", 0, "start of the partition range (only for sequential workload)")

var startTimestamp int64
flag.Int64Var(&writeRate, "write-rate", 0, "rate of writes (relevant only for time series reads)")
flag.Int64Var(&startTimestamp, "start-timestamp", 0, "start timestamp of the write load (relevant only for time series reads)")
flag.StringVar(&distribution, "distribution", "uniform", "distribution of keys (relevant only for time series reads)")

flag.StringVar(&keyspaceName, "keyspace", "scylla_bench", "keyspace to use")
flag.StringVar(&tableName, "table", "test", "table to use")
flag.Parse()
Expand Down Expand Up @@ -163,6 +181,10 @@ func main() {
}
}

if workload == "timeseries" && mode == "read" && writeRate == 0 {
log.Fatal("write rate must be provided for time series reads loads")
}

cluster := gocql.NewCluster(nodes)
cluster.NumConns = connectionCount
cluster.PageSize = pageSize
Expand Down Expand Up @@ -241,10 +263,20 @@ func main() {
fmt.Println("Maximum rate:\t\t unlimited")
}
fmt.Println("Client compression:\t", clientCompression)
if workload == "timeseries" {
fmt.Println("Start timestamp:\t", startTime.UnixNano())
fmt.Println("Write rate:\t\t", int64(maximumRate)/partitionCount)
}

if startTimestamp != 0 {
startTime = time.Unix(0, startTimestamp)
} else {
startTime = time.Now()
}

fmt.Println("\ntime\t\toperations/s\trows/s\t\tmax\t\t99.9th\t\t99th\t\t95th\t\t90th\t\tmean")
result := RunConcurrently(maximumRate, func(i int, resultChannel chan Result, rateLimiter RateLimiter) {
GetMode(mode)(session, resultChannel, GetWorkload(workload, i, partitionOffset), rateLimiter)
GetMode(mode)(session, resultChannel, GetWorkload(workload, i, partitionOffset, mode, writeRate, distribution), rateLimiter)
})

fmt.Println("\nResults")
Expand Down
124 changes: 124 additions & 0 deletions workloads.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,20 @@
package main

import (
"log"
"math"
"math/rand"
"time"
)

func MinInt64(a int64, b int64) int64 {
if a < b {
return a
} else {
return b
}
}

type WorkloadGenerator interface {
NextPartitionKey() int64
NextClusteringKey() int64
Expand Down Expand Up @@ -73,3 +83,117 @@ func (ru *RandomUniform) IsDone() bool {
func (ru *RandomUniform) IsPartitionDone() bool {
return false
}

type TimeSeriesWrite struct {
PkStride int64
PkOffset int64
PkCount int64
PkPosition int64
PkGeneration int64
CkCount int64
CkPosition int64
StartTime time.Time
Period time.Duration
MoveToNextPartition bool
}

func NewTimeSeriesWriter(threadId int, threadCount int, pkCount int64, ckCount int64, startTime time.Time, rate int64) *TimeSeriesWrite {
period := time.Duration(int64(time.Second.Nanoseconds()) * (pkCount / int64(threadCount)) / rate)
pkStride := int64(threadCount)
pkOffset := int64(threadId)
return &TimeSeriesWrite{pkStride, pkOffset, pkCount, pkOffset - pkStride, 0,
ckCount, 0, startTime, period, false}
}

func (tsw *TimeSeriesWrite) NextPartitionKey() int64 {
tsw.PkPosition += tsw.PkStride
if tsw.PkPosition >= tsw.PkCount {
tsw.PkPosition = tsw.PkOffset
tsw.CkPosition++
if tsw.CkPosition >= tsw.CkCount {
tsw.PkGeneration++
tsw.CkPosition = 0
}
}
tsw.MoveToNextPartition = false
return tsw.PkPosition<<32 | tsw.PkGeneration
}

func (tsw *TimeSeriesWrite) NextClusteringKey() int64 {
tsw.MoveToNextPartition = true
position := tsw.CkPosition + tsw.PkGeneration*tsw.CkCount
return -(tsw.StartTime.UnixNano() + tsw.Period.Nanoseconds()*position)
}

func (*TimeSeriesWrite) IsDone() bool {
return false
}

func (tsw *TimeSeriesWrite) IsPartitionDone() bool {
return tsw.MoveToNextPartition
}

type TimeSeriesRead struct {
Generator *rand.Rand
HalfNormalDist bool
PkStride int64
PkOffset int64
PkCount int64
PkPosition int64
StartTimestamp int64
CkCount int64
CurrentGeneration int64
Period int64
}

func NewTimeSeriesReader(threadId int, threadCount int, pkCount int64, ckCount int64, writeRate int64, distribution string, startTime time.Time) *TimeSeriesRead {
var halfNormalDist bool
switch distribution {
case "uniform":
halfNormalDist = false
case "hnormal":
halfNormalDist = true
default:
log.Fatal("unknown distribution", distribution)
}
generator := rand.New(rand.NewSource(int64(time.Now().Nanosecond() * (threadId + 1))))
pkStride := int64(threadCount)
pkOffset := int64(threadId) % pkCount
period := time.Second.Nanoseconds() / writeRate
return &TimeSeriesRead{generator, halfNormalDist, pkStride, pkOffset, pkCount, pkOffset - pkStride,
startTime.UnixNano(), ckCount, 0, period}
}

func RandomInt64(generator *rand.Rand, halfNormalDist bool, maxValue int64) int64 {
if halfNormalDist {
value := 1. - math.Min(math.Abs(generator.NormFloat64()), 4.)/4.
return int64(float64(maxValue) * value)
} else {
return generator.Int63n(maxValue)
}
}

func (tsw *TimeSeriesRead) NextPartitionKey() int64 {
tsw.PkPosition += tsw.PkStride
if tsw.PkPosition >= tsw.PkCount {
tsw.PkPosition = tsw.PkOffset
}
maxGeneration := (time.Now().UnixNano()-tsw.StartTimestamp)/(tsw.Period*tsw.CkCount) + 1
tsw.CurrentGeneration = RandomInt64(tsw.Generator, tsw.HalfNormalDist, maxGeneration)
return tsw.PkPosition<<32 | tsw.CurrentGeneration
}

func (tsw *TimeSeriesRead) NextClusteringKey() int64 {
maxRange := (time.Now().UnixNano()-tsw.StartTimestamp)/tsw.Period - tsw.CurrentGeneration*tsw.CkCount + 1
maxRange = MinInt64(tsw.CkCount, maxRange)
timestampDelta := (tsw.CurrentGeneration*tsw.CkCount + RandomInt64(tsw.Generator, tsw.HalfNormalDist, maxRange)) * tsw.Period
return -(timestampDelta + tsw.StartTimestamp)
}

func (*TimeSeriesRead) IsDone() bool {
return false
}

func (tsw *TimeSeriesRead) IsPartitionDone() bool {
return false
}

0 comments on commit 8dae319

Please sign in to comment.