Skip to content

Commit

Permalink
add support for counter updates
Browse files Browse the repository at this point in the history
  • Loading branch information
pdziepak committed Mar 8, 2017
1 parent d061dad commit 59c9510
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 0 deletions.
9 changes: 9 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

var keyspaceName string
var tableName string
var counterTableName string

var concurrency int

Expand All @@ -40,6 +41,11 @@ func PrepareDatabase(session *gocql.Session, replicationFactor int) {
if err != nil {
log.Fatal(err)
}

err = session.Query("CREATE TABLE IF NOT EXISTS " + keyspaceName + "." + counterTableName + " (pk bigint, ck bigint, c1 counter, c2 counter, c3 counter, c4 counter, c5 counter, PRIMARY KEY(pk, ck)) WITH compression = { }").Exec()
if err != nil {
log.Fatal(err)
}
}

func GetWorkload(name string, threadId int, partitionOffset int) WorkloadGenerator {
Expand Down Expand Up @@ -69,6 +75,8 @@ func GetMode(name string) func(session *gocql.Session, workload WorkloadGenerato
return DoWrites
}
return DoBatchedWrites
case "counter_update":
return DoCounterUpdates
case "read":
return DoReads
default:
Expand Down Expand Up @@ -114,6 +122,7 @@ func main() {
flag.StringVar(&keyspaceName, "keyspace", "scylla_bench", "keyspace to use")
flag.StringVar(&tableName, "table", "test", "table to use")
flag.Parse()
counterTableName = "test_counters"

flag.Usage = func() {
fmt.Fprintf(os.Stdout, "Usage:\n%s [options]\n\n", os.Args[0])
Expand Down
31 changes: 31 additions & 0 deletions modes.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,37 @@ func DoBatchedWrites(session *gocql.Session, workload WorkloadGenerator) Result
return result
}

func DoCounterUpdates(session *gocql.Session, workload WorkloadGenerator) Result {
query := session.Query("UPDATE " + keyspaceName + "." + counterTableName + " SET c1 = c1 + 1, c2 = c2 + 1, c3 = c3 + 1, c4 = c4 + 1, c5 = c5 + 1 WHERE pk = ? AND ck = ?")

var operations int
latencyHistogram := NewHistogram()

start := time.Now()
for !workload.IsDone() && atomic.LoadUint32(&stopAll) == 0 {
operations++
pk := workload.NextPartitionKey()
ck := workload.NextClusteringKey()
bound := query.Bind(pk, ck)

requestStart := time.Now()
err := bound.Exec()
requestEnd := time.Now()
if err != nil {
log.Fatal(err)
}

latency := requestEnd.Sub(requestStart)
err = latencyHistogram.RecordValue(latency.Nanoseconds())
if err != nil {
log.Fatal(err)
}
}
end := time.Now()

return Result{end.Sub(start), operations, operations, latencyHistogram}
}

func DoReads(session *gocql.Session, workload WorkloadGenerator) Result {
request := fmt.Sprintf("SELECT * FROM %s.%s WHERE pk = ? AND ck >= ? LIMIT %d", keyspaceName, tableName, rowsPerRequest)
query := session.Query(request)
Expand Down

0 comments on commit 59c9510

Please sign in to comment.