Skip to content

Commit

Permalink
Merge pull request #1 from XuHuaiyu/support-go-tpc
Browse files Browse the repository at this point in the history
Use go-tpc instead of benchmarksql
  • Loading branch information
july2993 committed Mar 17, 2020
2 parents ef8846e + 6cff89e commit 533c457
Show file tree
Hide file tree
Showing 3 changed files with 167 additions and 82 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
.idea
tpcc_gener
tpccgen
65 changes: 65 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
# tpccgen

A command line utility to generate/import/test via [go-tpc](https://github.com/pingcap/go-tpc)

## Install

```bash
go build -o tpccgen main.go
```

## Flags

```bash
Usage of ./tpccgen:
-all
do all the actions (default true)
-csv
generate tpcc csv files
-data-dir string
data source directory of lightning
-db string
test database name (default "tpcc")
-deploy-dir string
directory path of cluster deployment
-download-url string
url of the go-tpc binary to download (default "https://github.com/pingcap/go-tpc/releases/download/v1.0.0/go-tpc_1.0.0_linux_amd64.tar.gz")
-importer-ip string
ip address of tikv-importer
-lightning-ip string
ip address of tidb-lightnings
-restore
start lightning, importer and restore files
-skip-download
skip downloading the go-tpc binary
-test
run tpcc test
-threads int
number of threads (default 40)
-tidb-ip string
ip of tidb-server (default "127.0.0.1")
-tidb-port string
port of tidb-server (default "4000")
-warehouse int
number of warehouses (default 100)
```

## Usage

By default, tpccgen will automatically take 3 actions:

1. download [go-tpc](https://github.com/pingcap/go-tpc) binary
2. generate csv data
3. import csv data through [tidb-lightning](https://github.com/pingcap/tidb-lightning)

You can also add some flags you want. For example:

```bash
./tpccgen --lightning-ip 172.16.5.90 --importer-ip 172.16.5.89 --data-dir /data/lightning --tidb-ip 172.16.5.84 --tidb-port 4111 --deploy-dir /data/deploy --db test --threads 40 --warehouse 10
```

If you want to perform tpcc test, you must add `--test` flag just like the command below:

```bash
./tpccgen --test --lightning-ip 172.16.5.90 --tidb-ip 172.16.5.84 --tidb-port 4111 --db test --threads 40 --warehouse 10
```
182 changes: 101 additions & 81 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@ import (
const (
nmAll = "all"
nmCSV = "csv"
nmTest = "test"
nmRestore = "restore"

nmWarehouse = "warehouse"
nmThreads = "threads"

nmTiDBIP = "tidb-ip"
nmTiDBPort = "tidb-port"
Expand All @@ -31,25 +33,34 @@ const (
nmDataDir = "data-dir"

nmImporterIP = "importer-ip"
nmDB = "db"
downloadURL = "download-url"
skipDownload = "skip-download"
)

var (
all = flag.Bool(nmAll, true, "do all the actions")
all = flag.Bool(nmAll, true, "do all the actions, test is not included")
csv = flag.Bool(nmCSV, false, "generate tpcc csv files")
test = flag.Bool(nmTest, false, "run tpcc test")
restore = flag.Bool(nmRestore, false, "start lightning, importer and restore files")

tidbIP = flag.String(nmTiDBIP, "127.0.0.1", "ip of tidb-server")
tidbPort = flag.String(nmTiDBPort, "4000", "port of tidb-server")
deployDir = flag.String(nmDeployDir, "", "directory path of cluster deployment")

warehouse = flag.Int64(nmWarehouse, 100, "count of warehouse")
warehouse = flag.Int64(nmWarehouse, 100, "number of warehouses")
threads = flag.Int64(nmThreads, 40, "number of threads of go-tpc")

//ansibleDir = flag.String(nmAnsibleDir, "", "ansible directory path")

// TODO: If there is only one lightning, we do not need this var, we can fetch it from ansible.
lightningIP = flag.String(nmLightningIP, "", "ip address of tidb-lightnings")
dataDir = flag.String(nmDataDir, "", "data source directory of lightning")
importerIP = flag.String(nmImporterIP, "", "ip address of tikv-importer")

dbName = flag.String(nmDB, "tpcc", "test database name")
goTPCFile = flag.String(downloadURL, "https://github.com/pingcap/go-tpc/releases/download/v1.0.0/go-tpc_1.0.0_linux_amd64.tar.gz", "url of the go-tpc binary to download")
skipDownloading = flag.Bool(skipDownload, false, "skip downloading the go-tpc binary")
)

func main() {
Expand All @@ -66,15 +77,27 @@ func main() {
}

var start2 time.Time
if *all || *csv {
if err = fetchTpccRepoAndEnforceConf(*tidbIP, *tidbPort, *warehouse, dataDirs, lightningIPs); err != nil {
start2 = time.Now()
if err = fetchTpcc(dataDirs, lightningIPs, *goTPCFile, *skipDownloading); err != nil {
os.Exit(1)
}

// If test flag is enabled, just run tpcc test.
if *test {
var testStart time.Time
if err = runTPCCTest(lightningIPs[0], *tidbIP, *tidbPort, *dbName, *warehouse, *threads); err != nil {
fmt.Println(err.Error())
os.Exit(1)
}
start2 = time.Now()
if err = genSchema(*tidbIP, *tidbPort, lightningIPs[0]); err != nil {
fmt.Println("prepare cost:", time.Since(start).String(), "test cost:", time.Since(testStart).String())
os.Exit(0)
}

if *all || *csv {
if err = dropDB(*tidbIP, *tidbPort, *dbName); err != nil {
os.Exit(1)
}
if err = genCSV(lightningIPs, dataDirs); err != nil {
if err = genCSV(lightningIPs, dataDirs, *tidbIP, *tidbPort, *dbName, *warehouse, *threads); err != nil {
os.Exit(1)
}
}
Expand Down Expand Up @@ -143,73 +166,40 @@ func getLightningIPsAndDataDirs() (lightningIPs, dataDirs []string, err error) {
}

/**
> rm -rf /tmp/benchmarksql
> git clone https://github.com/pingcap/benchmarksql.git /tmp/benchmarksql
> yum install -y java ant
> cd /tmp/benchmarksql
> ant
> sed -i 's/localhost:4000/tidb_ip:tidb_port' /tmp/benchmarksql/run/props.mysql
> sed -i "s/warehouses=[0-9]\+/warehouses=10000/" /tmp/benchmarksql/run/props.mysql
> echo fileLocation=%s >> /tmp/benchmarksql/run/props.mysql
> echo tableName=%s >> /tmp/benchmarksql/run/props.mysql
> rm -f /tmp/go-tpc
> wget -O /tmp/go-tpc binary_url; chmod +x /tmp/go-tpc
*/
func fetchTpccRepoAndEnforceConf(tidbIP, tidbPort string, warehouse int64, lightningDirs []string, lightningIPs []string) (err error) {
var tableName []string
switch len(lightningIPs) {
case 1:
tableName = []string{"all"}
case 2:
tableName = []string{"customer", "stock,order"}
case 3:
tableName = []string{"customer", "stock", "order"}
}

errCh := make(chan error, 3)
func fetchTpcc(lightningDirs []string, lightningIPs []string, binaryURL string, skipDownloading bool) (err error) {
errCh := make(chan error, len(lightningIPs)*10) // 3 should enough, echo run 3 cmd at most
wg := &sync.WaitGroup{}
for i, lightningIP := range lightningIPs {
tn := tableName[i]
ip := lightningIP
for i := 0; i < len(lightningIPs); i++ {
wg.Add(1)
go func() {
go func(ip string, dir string) {
defer wg.Done()
if _, _, err = runCmd("ssh", ip, `rm -rf /tmp/benchmarksql`); err != nil {
errCh <- err
return
}
if _, _, err = runCmd("ssh", ip, `cd /tmp; git clone -b specify_table https://github.com/XuHuaiyu/benchmarksql.git`); err != nil {
errCh <- err
return
}
if _, _, err = runCmd("ssh", ip, `sudo yum install -y java ant`); err != nil {
errCh <- err
return
}
if _, _, err = runCmd("ssh", ip, fmt.Sprintf(`cd /tmp/benchmarksql; ant`)); err != nil {
errCh <- err
return
}
if _, _, err = runCmd("ssh", ip, fmt.Sprintf(`sed -i '%s' %s`, fmt.Sprintf(`s/localhost:4000/%s/;s/warehouses=[0-9]\+/%s/;s/loadWorkers=[0-9]\+/loadWorkers=%d/`, tidbIP+":"+tidbPort, fmt.Sprintf("warehouses=%d", warehouse), runtime.NumCPU()), "/tmp/benchmarksql/run/props.mysql")); err != nil {
errCh <- err
return
}
if _, _, err = runCmd("ssh", ip, fmt.Sprintf("echo fileLocation=%s/tpcc. >> /tmp/benchmarksql/run/props.mysql", lightningDirs[i])); err != nil {
errCh <- err
return
}
if _, _, err = runCmd("ssh", ip, fmt.Sprintf("echo tableName=%s >> /tmp/benchmarksql/run/props.mysql", tn)); err != nil {
errCh <- err
return
if !skipDownloading {
if _, _, err = runCmd("ssh", ip, `rm -f /tmp/go-tpc`); err != nil {
errCh <- err
return
}

if _, _, err = runCmd("ssh", ip, fmt.Sprintf("wget -O /tmp/go-tpc.tar.gz %s; tar -xvf /tmp/go-tpc.tar.gz -C /tmp/; rm -f /tmp/go-tpc.tar.gz; chmod +x /tmp/go-tpc", binaryURL)); err != nil {
errCh <- err
return
}
fmt.Println("Download go-tpc binary successfully!")
}
if _, _, err = runCmd("ssh", ip, fmt.Sprintf("mkdir -p %s", lightningDirs[i])); err != nil {
if _, _, err = runCmd("ssh", ip, fmt.Sprintf("mkdir -p %s", dir)); err != nil {
errCh <- err
return
}
}()
}(lightningIPs[i], lightningDirs[i])
}

go func() {
wg.Wait()
close(errCh)
}()

for err = range errCh {
if err != nil {
return
Expand All @@ -220,38 +210,36 @@ func fetchTpccRepoAndEnforceConf(tidbIP, tidbPort string, warehouse int64, light
}

/**
> mysql -h tidbIP -u root -P tidbPort -e "drop database if exists tpcc"
> mysql -h tidbIP -u root -P tidbPort -e "create database tpcc"`
> cd /tmp/benchmarksql/run
> ./runSQL.sh props.mysql sql.mysql/tableCreates.sql
> ./runSQL.sh props.mysql sql.mysql/indexCreates.sql
> cd -
> mysql -h tidbIP -u root -P tidbPort -e "drop database if exists dbName"
*/
func genSchema(tidbIP, tidbPort string, lightningIP string) (err error) {
if _, _, err = runCmd("bash", "-c", fmt.Sprintf(`mysql -h %s -u root -P %s -e "drop database if exists tpcc"`, tidbIP, tidbPort)); err != nil {
return
}
if _, _, err = runCmd("bash", "-c", fmt.Sprintf(`mysql -h %s -u root -P %s -e "create database tpcc"`, tidbIP, tidbPort)); err != nil {
func dropDB(tidbIP, tidbPort, dbName string) (err error) {
if _, _, err = runCmd("bash", "-c", fmt.Sprintf(`mysql -h %s -u root -P %s -e "drop database if exists %s"`, tidbIP, tidbPort, dbName)); err != nil {
return
}
var stdOutMsg []byte
if stdOutMsg, _, err = runCmd("ssh", lightningIP, "cd /tmp/benchmarksql/run; ./runSQL.sh props.mysql sql.mysql/tableCreates.sql; ./runSQL.sh props.mysql sql.mysql/indexCreates.sql"); err != nil {
return
}
fmt.Printf("%s", stdOutMsg)
return
}

/**
> cd /tmp/benchmarksql/run
> ./runLoader.sh props.mysql props.mysql
> /tmp/go-tpc tpcc prepare -D dbName -T threadsNum --warehouses warehouseNum --output outputDir --tables [tables]
*/
func genCSV(lightningIPs []string, lightningDirs []string) (err error) {
func genCSV(lightningIPs []string, lightningDirs []string, tidbIP, tidbPort, dbName string, warehouse, threads int64) (err error) {
var specifiedTables []string
switch len(lightningIPs) {
case 1:
// empty means generating all tables
specifiedTables = []string{""}
case 2:
specifiedTables = []string{"--tables stock", "--tables order_line,customer,config,district,history,item,new_order,orders,warehouse"}
case 3:
specifiedTables = []string{"--tables stock", "--tables orders,order_line", "--tables customer,config,district,history,item,new_order,warehouse"}
}

errCh := make(chan error, 3)
wg := &sync.WaitGroup{}
for i, lightningIP := range lightningIPs {
ip := lightningIP
dir := lightningDirs[i]
tables := specifiedTables[i]
wg.Add(1)
stdOutMsg := make(chan string, 40)
go func() {
Expand All @@ -261,7 +249,8 @@ func genCSV(lightningIPs []string, lightningDirs []string) (err error) {
}()
go func() {
defer wg.Done()
if _, err = runCmdAndGetStdOutInTime(stdOutMsg, "ssh", ip, fmt.Sprintf("cd %s; rm -rf *; cd /tmp/benchmarksql/run; ./runLoader.sh props.mysql props.mysql", dir)); err != nil {
if _, err = runCmdAndGetStdOutInTime(stdOutMsg, "ssh", ip, fmt.Sprintf("cd %s; rm -rf *; "+
"/tmp/go-tpc tpcc prepare -U root -H %s -P %s -D %s -T %d --warehouses %d --output %s %s", dir, tidbIP, tidbPort, dbName, threads, warehouse, dir, tables)); err != nil {
if err != nil {
errCh <- err
return
Expand Down Expand Up @@ -341,6 +330,37 @@ func restoreData(importerIPs []string, lightningIPs []string, deployDir string)
return
}

func runTPCCTest(lightningIP, tidbIP, tidbPort, dbName string, warehouse, threads int64) (err error) {
errCh := make(chan error, 1)
wg := &sync.WaitGroup{}
wg.Add(1)
stdOutMsg := make(chan string, 40)
go func() {
for line := range stdOutMsg {
fmt.Println(line)
}
}()
go func() {
defer wg.Done()
if _, err = runCmdAndGetStdOutInTime(stdOutMsg, "ssh", lightningIP, fmt.Sprintf("/tmp/go-tpc tpcc check -U root -H %s -P %s -D %s -T %d --warehouses %d", tidbIP, tidbPort, dbName, threads, warehouse)); err != nil {
return
}
if _, err = runCmdAndGetStdOutInTime(stdOutMsg, "ssh", lightningIP, fmt.Sprintf("/tmp/go-tpc tpcc run -U root -H %s -P %s -D %s -T %d --warehouses %d", tidbIP, tidbPort, dbName, threads, warehouse)); err != nil {
return
}
}()
go func() {
wg.Wait()
close(errCh)
}()
for err = range errCh {
return
}

fmt.Println("#============\ntpcc test finished\n#============")
return
}

func runCmd(name string, arg ...string) (stdOutBytes []byte, stdErrBytes []byte, err error) {
if _, err = exec.LookPath(name); err != nil {
fmt.Printf("%s %s\n%s", name, strings.Join(arg, " "), err.Error())
Expand Down

0 comments on commit 533c457

Please sign in to comment.