Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use go-tpc instead of benchmarksql #1

Merged
merged 9 commits into from
Mar 17, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
.idea
tpcc_gener
tpccgen
65 changes: 65 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
# tpccgen

A command line utility to generate/import/test via [go-tpc](https://github.com/pingcap/go-tpc)

## Install

```bash
go build -o tpccgen main.go
```

## Flags

```bash
Usage of ./tpccgen:
-all
do all the actions (default true)
-csv
generate tpcc csv files
-data-dir string
data source directory of lightning
-db string
test database name (default "tpcc")
-deploy-dir string
directory path of cluster deployment
-download-url string
url of the go-tpc binary to download (default "https://github.com/pingcap/go-tpc/releases/download/v1.0.0/go-tpc_1.0.0_linux_amd64.tar.gz")
-importer-ip string
ip address of tikv-importer
-lightning-ip string
ip address of tidb-lightnings
-restore
start lightning, importer and restore files
-skip-download
skip downloading the go-tpc binary
-test
run tpcc test
-threads int
number of threads (default 40)
-tidb-ip string
ip of tidb-server (default "127.0.0.1")
-tidb-port string
port of tidb-server (default "4000")
-warehouse int
number of warehouses (default 100)
```

## Usage

By default, tpccgen will automatically take 3 actions:

1. download [go-tpc](https://github.com/pingcap/go-tpc) binary
2. generate csv data
3. import csv data through [tidb-lightning](https://github.com/pingcap/tidb-lightning)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个建议再详细点,比如会修改 tidb-lignting 的配置文件

另外说明下使用要先部署要 lightning, 并且需要有 wget, mysql 等(如果还有其它的)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! Add to my todo list now. Will do tomorrow


You can also add some flags you want. For example:

```bash
./tpccgen --lightning-ip 172.16.5.90 --importer-ip 172.16.5.89 --data-dir /data/lightning --tidb-ip 172.16.5.84 --tidb-port 4111 --deploy-dir /data/deploy --db test --threads 40 --warehouse 10
```

If you want to perform tpcc test, you must add `--test` flag just like the command below:

```bash
./tpccgen --test --lightning-ip 172.16.5.90 --tidb-ip 172.16.5.84 --tidb-port 4111 --db test --threads 40 --warehouse 10
```
182 changes: 101 additions & 81 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@ import (
const (
nmAll = "all"
nmCSV = "csv"
nmTest = "test"
nmRestore = "restore"

nmWarehouse = "warehouse"
nmThreads = "threads"

nmTiDBIP = "tidb-ip"
nmTiDBPort = "tidb-port"
Expand All @@ -31,25 +33,34 @@ const (
nmDataDir = "data-dir"

nmImporterIP = "importer-ip"
nmDB = "db"
downloadURL = "download-url"
skipDownload = "skip-download"
)

var (
all = flag.Bool(nmAll, true, "do all the actions")
all = flag.Bool(nmAll, true, "do all the actions, test is not included")
csv = flag.Bool(nmCSV, false, "generate tpcc csv files")
test = flag.Bool(nmTest, false, "run tpcc test")
restore = flag.Bool(nmRestore, false, "start lightning, importer and restore files")

tidbIP = flag.String(nmTiDBIP, "127.0.0.1", "ip of tidb-server")
tidbPort = flag.String(nmTiDBPort, "4000", "port of tidb-server")
deployDir = flag.String(nmDeployDir, "", "directory path of cluster deployment")

warehouse = flag.Int64(nmWarehouse, 100, "count of warehouse")
warehouse = flag.Int64(nmWarehouse, 100, "number of warehouses")
threads = flag.Int64(nmThreads, 40, "number of threads of go-tpc")

//ansibleDir = flag.String(nmAnsibleDir, "", "ansible directory path")

// TODO: If there is only one lightning, we do not need this var, we can fetch it from ansible.
lightningIP = flag.String(nmLightningIP, "", "ip address of tidb-lightnings")
dataDir = flag.String(nmDataDir, "", "data source directory of lightning")
importerIP = flag.String(nmImporterIP, "", "ip address of tikv-importer")

dbName = flag.String(nmDB, "tpcc", "test database name")
goTPCFile = flag.String(downloadURL, "https://github.com/pingcap/go-tpc/releases/download/v1.0.0/go-tpc_1.0.0_linux_amd64.tar.gz", "url of the go-tpc binary to download")
skipDownloading = flag.Bool(skipDownload, false, "skip downloading the go-tpc binary")
)

func main() {
Expand All @@ -66,15 +77,27 @@ func main() {
}

var start2 time.Time
if *all || *csv {
if err = fetchTpccRepoAndEnforceConf(*tidbIP, *tidbPort, *warehouse, dataDirs, lightningIPs); err != nil {
start2 = time.Now()
if err = fetchTpcc(dataDirs, lightningIPs, *goTPCFile, *skipDownloading); err != nil {
os.Exit(1)
}

// If test flag is enabled, just run tpcc test.
if *test {
var testStart time.Time
if err = runTPCCTest(lightningIPs[0], *tidbIP, *tidbPort, *dbName, *warehouse, *threads); err != nil {
fmt.Println(err.Error())
os.Exit(1)
}
start2 = time.Now()
if err = genSchema(*tidbIP, *tidbPort, lightningIPs[0]); err != nil {
fmt.Println("prepare cost:", time.Since(start).String(), "test cost:", time.Since(testStart).String())
os.Exit(0)
}

if *all || *csv {
if err = dropDB(*tidbIP, *tidbPort, *dbName); err != nil {
os.Exit(1)
}
if err = genCSV(lightningIPs, dataDirs); err != nil {
if err = genCSV(lightningIPs, dataDirs, *tidbIP, *tidbPort, *dbName, *warehouse, *threads); err != nil {
os.Exit(1)
}
}
Expand Down Expand Up @@ -143,73 +166,40 @@ func getLightningIPsAndDataDirs() (lightningIPs, dataDirs []string, err error) {
}

/**
> rm -rf /tmp/benchmarksql
> git clone https://github.com/pingcap/benchmarksql.git /tmp/benchmarksql
> yum install -y java ant
> cd /tmp/benchmarksql
> ant
> sed -i 's/localhost:4000/tidb_ip:tidb_port' /tmp/benchmarksql/run/props.mysql
> sed -i "s/warehouses=[0-9]\+/warehouses=10000/" /tmp/benchmarksql/run/props.mysql
> echo fileLocation=%s >> /tmp/benchmarksql/run/props.mysql
> echo tableName=%s >> /tmp/benchmarksql/run/props.mysql
> rm -f /tmp/go-tpc
> wget -O /tmp/go-tpc binary_url; chmod +x /tmp/go-tpc
*/
func fetchTpccRepoAndEnforceConf(tidbIP, tidbPort string, warehouse int64, lightningDirs []string, lightningIPs []string) (err error) {
var tableName []string
switch len(lightningIPs) {
case 1:
tableName = []string{"all"}
case 2:
tableName = []string{"customer", "stock,order"}
case 3:
tableName = []string{"customer", "stock", "order"}
}

errCh := make(chan error, 3)
func fetchTpcc(lightningDirs []string, lightningIPs []string, binaryURL string, skipDownloading bool) (err error) {
errCh := make(chan error, len(lightningIPs)*10) // 3 should enough, echo run 3 cmd at most
wg := &sync.WaitGroup{}
for i, lightningIP := range lightningIPs {
tn := tableName[i]
ip := lightningIP
for i := 0; i < len(lightningIPs); i++ {
wg.Add(1)
go func() {
go func(ip string, dir string) {
defer wg.Done()
if _, _, err = runCmd("ssh", ip, `rm -rf /tmp/benchmarksql`); err != nil {
errCh <- err
return
}
if _, _, err = runCmd("ssh", ip, `cd /tmp; git clone -b specify_table https://github.com/XuHuaiyu/benchmarksql.git`); err != nil {
errCh <- err
return
}
if _, _, err = runCmd("ssh", ip, `sudo yum install -y java ant`); err != nil {
errCh <- err
return
}
if _, _, err = runCmd("ssh", ip, fmt.Sprintf(`cd /tmp/benchmarksql; ant`)); err != nil {
errCh <- err
return
}
if _, _, err = runCmd("ssh", ip, fmt.Sprintf(`sed -i '%s' %s`, fmt.Sprintf(`s/localhost:4000/%s/;s/warehouses=[0-9]\+/%s/;s/loadWorkers=[0-9]\+/loadWorkers=%d/`, tidbIP+":"+tidbPort, fmt.Sprintf("warehouses=%d", warehouse), runtime.NumCPU()), "/tmp/benchmarksql/run/props.mysql")); err != nil {
errCh <- err
return
}
if _, _, err = runCmd("ssh", ip, fmt.Sprintf("echo fileLocation=%s/tpcc. >> /tmp/benchmarksql/run/props.mysql", lightningDirs[i])); err != nil {
errCh <- err
return
}
if _, _, err = runCmd("ssh", ip, fmt.Sprintf("echo tableName=%s >> /tmp/benchmarksql/run/props.mysql", tn)); err != nil {
errCh <- err
return
if !skipDownloading {
if _, _, err = runCmd("ssh", ip, `rm -f /tmp/go-tpc`); err != nil {
errCh <- err
return
}

if _, _, err = runCmd("ssh", ip, fmt.Sprintf("wget -O /tmp/go-tpc.tar.gz %s; tar -xvf /tmp/go-tpc.tar.gz -C /tmp/; rm -f /tmp/go-tpc.tar.gz; chmod +x /tmp/go-tpc", binaryURL)); err != nil {
errCh <- err
return
}
fmt.Println("Download go-tpc binary successfully!")
}
if _, _, err = runCmd("ssh", ip, fmt.Sprintf("mkdir -p %s", lightningDirs[i])); err != nil {
if _, _, err = runCmd("ssh", ip, fmt.Sprintf("mkdir -p %s", dir)); err != nil {
errCh <- err
return
}
}()
}(lightningIPs[i], lightningDirs[i])
}

go func() {
wg.Wait()
close(errCh)
}()

for err = range errCh {
if err != nil {
return
Expand All @@ -220,38 +210,36 @@ func fetchTpccRepoAndEnforceConf(tidbIP, tidbPort string, warehouse int64, light
}

/**
> mysql -h tidbIP -u root -P tidbPort -e "drop database if exists tpcc"
> mysql -h tidbIP -u root -P tidbPort -e "create database tpcc"`
> cd /tmp/benchmarksql/run
> ./runSQL.sh props.mysql sql.mysql/tableCreates.sql
> ./runSQL.sh props.mysql sql.mysql/indexCreates.sql
> cd -
> mysql -h tidbIP -u root -P tidbPort -e "drop database if exists dbName"
*/
func genSchema(tidbIP, tidbPort string, lightningIP string) (err error) {
if _, _, err = runCmd("bash", "-c", fmt.Sprintf(`mysql -h %s -u root -P %s -e "drop database if exists tpcc"`, tidbIP, tidbPort)); err != nil {
return
}
if _, _, err = runCmd("bash", "-c", fmt.Sprintf(`mysql -h %s -u root -P %s -e "create database tpcc"`, tidbIP, tidbPort)); err != nil {
func dropDB(tidbIP, tidbPort, dbName string) (err error) {
if _, _, err = runCmd("bash", "-c", fmt.Sprintf(`mysql -h %s -u root -P %s -e "drop database if exists %s"`, tidbIP, tidbPort, dbName)); err != nil {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

建库和建表,都放在 /tmp/go-tpc tpcc schema -U root -H %s -P %s -D %s 中吧。用户的服务器上可能没有安装 mysql client

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

现在已经删掉schema 这个命令了。然后建表和建库都是自动的。但是drop掉已经存在的库不支持,这个怎么办?还是用mysql drop一下吧?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

为啥不代码里用 driver drop?

return
}
var stdOutMsg []byte
if stdOutMsg, _, err = runCmd("ssh", lightningIP, "cd /tmp/benchmarksql/run; ./runSQL.sh props.mysql sql.mysql/tableCreates.sql; ./runSQL.sh props.mysql sql.mysql/indexCreates.sql"); err != nil {
return
}
fmt.Printf("%s", stdOutMsg)
return
}

/**
> cd /tmp/benchmarksql/run
> ./runLoader.sh props.mysql props.mysql
> /tmp/go-tpc tpcc prepare -D dbName -T threadsNum --warehouses warehouseNum --output outputDir --tables [tables]
*/
func genCSV(lightningIPs []string, lightningDirs []string) (err error) {
func genCSV(lightningIPs []string, lightningDirs []string, tidbIP, tidbPort, dbName string, warehouse, threads int64) (err error) {
var specifiedTables []string
switch len(lightningIPs) {
case 1:
// empty means generating all tables
specifiedTables = []string{""}
case 2:
specifiedTables = []string{"--tables stock", "--tables order_line,customer,config,district,history,item,new_order,orders,warehouse"}
case 3:
specifiedTables = []string{"--tables stock", "--tables orders,order_line", "--tables customer,config,district,history,item,new_order,warehouse"}
}

errCh := make(chan error, 3)
wg := &sync.WaitGroup{}
for i, lightningIP := range lightningIPs {
ip := lightningIP
dir := lightningDirs[i]
tables := specifiedTables[i]
wg.Add(1)
stdOutMsg := make(chan string, 40)
go func() {
Expand All @@ -261,7 +249,8 @@ func genCSV(lightningIPs []string, lightningDirs []string) (err error) {
}()
go func() {
defer wg.Done()
if _, err = runCmdAndGetStdOutInTime(stdOutMsg, "ssh", ip, fmt.Sprintf("cd %s; rm -rf *; cd /tmp/benchmarksql/run; ./runLoader.sh props.mysql props.mysql", dir)); err != nil {
if _, err = runCmdAndGetStdOutInTime(stdOutMsg, "ssh", ip, fmt.Sprintf("cd %s; rm -rf *; "+
"/tmp/go-tpc tpcc prepare -U root -H %s -P %s -D %s -T %d --warehouses %d --output %s %s", dir, tidbIP, tidbPort, dbName, threads, warehouse, dir, tables)); err != nil {
if err != nil {
errCh <- err
return
Expand Down Expand Up @@ -341,6 +330,37 @@ func restoreData(importerIPs []string, lightningIPs []string, deployDir string)
return
}

func runTPCCTest(lightningIP, tidbIP, tidbPort, dbName string, warehouse, threads int64) (err error) {
errCh := make(chan error, 1)
wg := &sync.WaitGroup{}
wg.Add(1)
stdOutMsg := make(chan string, 40)
go func() {
for line := range stdOutMsg {
fmt.Println(line)
}
}()
go func() {
defer wg.Done()
if _, err = runCmdAndGetStdOutInTime(stdOutMsg, "ssh", lightningIP, fmt.Sprintf("/tmp/go-tpc tpcc check -U root -H %s -P %s -D %s -T %d --warehouses %d", tidbIP, tidbPort, dbName, threads, warehouse)); err != nil {
return
}
if _, err = runCmdAndGetStdOutInTime(stdOutMsg, "ssh", lightningIP, fmt.Sprintf("/tmp/go-tpc tpcc run -U root -H %s -P %s -D %s -T %d --warehouses %d", tidbIP, tidbPort, dbName, threads, warehouse)); err != nil {
return
}
}()
go func() {
wg.Wait()
close(errCh)
}()
for err = range errCh {
return
}

fmt.Println("#============\ntpcc test finished\n#============")
return
}

func runCmd(name string, arg ...string) (stdOutBytes []byte, stdErrBytes []byte, err error) {
if _, err = exec.LookPath(name); err != nil {
fmt.Printf("%s %s\n%s", name, strings.Join(arg, " "), err.Error())
Expand Down