Skip to content

Commit

Permalink
Add -iterations command-line flag
Browse files Browse the repository at this point in the history
-iterations is usable for workloads that have a well defined number of
ops and thus are limited by nature. Currently "sequential" is the only
such workload. -iterations allows running an arbitrary number of
sequential reads/writes.
-iterations can be combined with -duration to limit the test-run by
both the number of iterations and time.
  • Loading branch information
denesb authored and mmatczuk committed Aug 24, 2018
1 parent 1d02201 commit f365453
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 2 deletions.
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ Essentially the following query is executed:

SELECT * FROM scylla_bench.test WHERE token(pk) >= ? AND token(pk) <= ?

The number of iterations to run can be specified with the `-iterations` flag. The default is 1.

### Workloads

The second very important part of scylla-bench configuration is the workload. While mode chooses what kind of requests are to be sent to the cluster the workload decides which partitions and rows should be the target of these requests.
Expand All @@ -123,6 +125,7 @@ This workload sequentially visits all partitions and rows in them. If the concur
The first loader will write partitions [0, 5), the second [5, 10) and the third [10, 15).

The sequential workload is useful for initial population of the database (in write mode) or warming up the cache for in-memory tests (in read mode).
The number of iterations to run can be specified with the `-iterations` flag. The default is 1.

#### Uniform workload (`-workload unifrom`)

Expand Down Expand Up @@ -162,6 +165,8 @@ Note that if the effective write rate is lower than the specified one the reader
* `-validate-data` defines data integrity verification. If set then some none-zero data will be written in such a way that it can be validated during read operation.
Note that this option should be set for both write and read (counter_update and counter_read) modes.

* `-iterations` sets the Number of iterations to run the given workloads. This is only relevant for workloads that have a finite number of steps. Currently the only such workloads are [sequential](#sequential-workload--workload-sequential) and [scan](#scan-mode--mode-scan). Can be combined with `-duration` to limit a run by both number of iterations and time. Set to 0 for infinite iterations. Defaults to 1.

## Examples

1. Sequential write to populate the database: `scylla-bench -workload sequential -mode write -nodes 127.0.0.1`
Expand Down
6 changes: 6 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ var (
rangeCount int

timeout time.Duration
iterations uint

startTime time.Time

Expand Down Expand Up @@ -202,6 +203,7 @@ func main() {
flag.IntVar(&rangeCount, "range-count", 1, "number of ranges to split the token space into (relevant only for scan mode)")

flag.DurationVar(&testDuration, "duration", 0, "duration of the test in seconds (0 for unlimited)")
flag.UintVar(&iterations, "iterations", 1, "number of iterations to run (0 for unlimited, relevant only for workloads that have a defined number of ops to execute)")

flag.Int64Var(&partitionOffset, "partition-offset", 0, "start of the partition range (only for sequential workload)")

Expand Down Expand Up @@ -248,6 +250,10 @@ func main() {
log.Fatal("uniform workload requires limited test duration")
}

if iterations > 1 && workload != "sequential" && workload != "scan" {
log.Fatal("iterations only supported for the sequential and scan workload")
}

if partitionOffset != 0 && workload != "sequential" {
log.Fatal("partition-offset has a meaning only in sequential workloads")
}
Expand Down
30 changes: 29 additions & 1 deletion modes.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,12 +226,40 @@ func (rb *ResultBuilder) RecordLatency(latency time.Duration, rateLimiter RateLi

var errorRecordingLatency bool

type TestIterator struct {
iteration uint
workload WorkloadGenerator
}

func NewTestIterator(workload WorkloadGenerator) *TestIterator {
return &TestIterator{0, workload}
}

func (ti *TestIterator) IsDone() bool {
if atomic.LoadUint32(&stopAll) != 0 {
return true;
}

if ti.workload.IsDone() {
if ti.iteration + 1 == iterations {
return true
} else {
ti.workload.Restart()
ti.iteration++
return false
}
} else {
return false
}
}

func RunTest(resultChannel chan Result, workload WorkloadGenerator, rateLimiter RateLimiter, test func(rb *ResultBuilder) (error, time.Duration)) {
rb := NewResultBuilder()

start := time.Now()
partialStart := start
for !workload.IsDone() && atomic.LoadUint32(&stopAll) == 0 {
iter := NewTestIterator(workload)
for !iter.IsDone() {
rateLimiter.Wait()

err, latency := test(rb)
Expand Down
22 changes: 21 additions & 1 deletion workloads.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,19 @@ type WorkloadGenerator interface {
NextClusteringKey() int64
IsPartitionDone() bool
IsDone() bool
Restart()
}

type SequentialVisitAll struct {
PartitionOffset int64
PartitionCount int64
ClusteringRowCount int64
NextPartition int64
NextClusteringRow int64
}

func NewSequentialVisitAll(partitionOffset int64, partitionCount int64, clusteringRowCount int64) *SequentialVisitAll {
return &SequentialVisitAll{partitionOffset + partitionCount, clusteringRowCount, partitionOffset, 0}
return &SequentialVisitAll{partitionOffset, partitionOffset + partitionCount, clusteringRowCount, partitionOffset, 0}
}

func (sva *SequentialVisitAll) NextTokenRange() TokenRange {
Expand All @@ -69,6 +71,11 @@ func (sva *SequentialVisitAll) IsDone() bool {
return sva.NextPartition >= sva.PartitionCount || (sva.NextPartition+1 == sva.PartitionCount && sva.NextClusteringRow >= sva.ClusteringRowCount)
}

func (sva *SequentialVisitAll) Restart() {
sva.NextClusteringRow = 0
sva.NextPartition = sva.PartitionOffset
}

func (sva *SequentialVisitAll) IsPartitionDone() bool {
return sva.NextClusteringRow == sva.ClusteringRowCount
}
Expand Down Expand Up @@ -104,6 +111,9 @@ func (ru *RandomUniform) IsPartitionDone() bool {
return false
}

func (ru *RandomUniform) Restart() {
}

type TimeSeriesWrite struct {
PkStride int64
PkOffset int64
Expand Down Expand Up @@ -157,6 +167,9 @@ func (tsw *TimeSeriesWrite) IsPartitionDone() bool {
return tsw.MoveToNextPartition
}

func (*TimeSeriesWrite) Restart() {
}

type TimeSeriesRead struct {
Generator *rand.Rand
HalfNormalDist bool
Expand Down Expand Up @@ -226,6 +239,9 @@ func (tsw *TimeSeriesRead) IsPartitionDone() bool {
return false
}

func (tsw *TimeSeriesRead) Restart() {
}

type RangeScan struct {
TotalRangeCount int
RangeOffset int
Expand Down Expand Up @@ -284,3 +300,7 @@ func (*RangeScan) IsPartitionDone() bool {
func (rs *RangeScan) IsDone() bool {
return rs.NextRange >= rs.RangeCount
}

func (rs *RangeScan) Restart() {
rs.NextRange = rs.RangeOffset
}

0 comments on commit f365453

Please sign in to comment.