diff --git a/.gitignore b/.gitignore index fabfaa2..c612456 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,2 @@ .idea -tpcc_gener +tpccgen diff --git a/README.md b/README.md new file mode 100644 index 0000000..e129618 --- /dev/null +++ b/README.md @@ -0,0 +1,65 @@ +# tpccgen + +A command line utility to generate/import/test via [go-tpc](https://github.com/pingcap/go-tpc) + +## Install + +```bash +go build -o tpccgen main.go +``` + +## Flags + +```bash +Usage of ./tpccgen: + -all + do all the actions (default true) + -csv + generate tpcc csv files + -data-dir string + data source directory of lightning + -db string + test database name (default "tpcc") + -deploy-dir string + directory path of cluster deployment + -download-url string + url of the go-tpc binary to download (default "https://github.com/pingcap/go-tpc/releases/download/v1.0.0/go-tpc_1.0.0_linux_amd64.tar.gz") + -importer-ip string + ip address of tikv-importer + -lightning-ip string + ip address of tidb-lightnings + -restore + start lightning, importer and restore files + -skip-download + skip downloading the go-tpc binary + -test + run tpcc test + -threads int + number of threads (default 40) + -tidb-ip string + ip of tidb-server (default "127.0.0.1") + -tidb-port string + port of tidb-server (default "4000") + -warehouse int + number of warehouses (default 100) +``` + +## Usage + +By default, tpccgen will automatically take 3 actions: + +1. download [go-tpc](https://github.com/pingcap/go-tpc) binary +2. generate csv data +3. import csv data through [tidb-lightning](https://github.com/pingcap/tidb-lightning) + +You can also add some flags you want. For example: + +```bash +./tpccgen --lightning-ip 172.16.5.90 --importer-ip 172.16.5.89 --data-dir /data/lightning --tidb-ip 172.16.5.84 --tidb-port 4111 --deploy-dir /data/deploy --db test --threads 40 --warehouse 10 +``` + +If you want to perform tpcc test, you must add `--test` flag just like the command below: + +```bash +./tpccgen --test --lightning-ip 172.16.5.90 --tidb-ip 172.16.5.84 --tidb-port 4111 --db test --threads 40 --warehouse 10 +``` diff --git a/main.go b/main.go index 019ad28..027a3c4 100644 --- a/main.go +++ b/main.go @@ -18,9 +18,11 @@ import ( const ( nmAll = "all" nmCSV = "csv" + nmTest = "test" nmRestore = "restore" nmWarehouse = "warehouse" + nmThreads = "threads" nmTiDBIP = "tidb-ip" nmTiDBPort = "tidb-port" @@ -31,18 +33,23 @@ const ( nmDataDir = "data-dir" nmImporterIP = "importer-ip" + nmDB = "db" + downloadURL = "download-url" + skipDownload = "skip-download" ) var ( - all = flag.Bool(nmAll, true, "do all the actions") + all = flag.Bool(nmAll, true, "do all the actions, test is not included") csv = flag.Bool(nmCSV, false, "generate tpcc csv files") + test = flag.Bool(nmTest, false, "run tpcc test") restore = flag.Bool(nmRestore, false, "start lightning, importer and restore files") tidbIP = flag.String(nmTiDBIP, "127.0.0.1", "ip of tidb-server") tidbPort = flag.String(nmTiDBPort, "4000", "port of tidb-server") deployDir = flag.String(nmDeployDir, "", "directory path of cluster deployment") - warehouse = flag.Int64(nmWarehouse, 100, "count of warehouse") + warehouse = flag.Int64(nmWarehouse, 100, "number of warehouses") + threads = flag.Int64(nmThreads, 40, "number of threads of go-tpc") //ansibleDir = flag.String(nmAnsibleDir, "", "ansible directory path") @@ -50,6 +57,10 @@ var ( lightningIP = flag.String(nmLightningIP, "", "ip address of tidb-lightnings") dataDir = flag.String(nmDataDir, "", "data source directory of lightning") importerIP = flag.String(nmImporterIP, "", "ip address of tikv-importer") + + dbName = flag.String(nmDB, "tpcc", "test database name") + goTPCFile = flag.String(downloadURL, "https://github.com/pingcap/go-tpc/releases/download/v1.0.0/go-tpc_1.0.0_linux_amd64.tar.gz", "url of the go-tpc binary to download") + skipDownloading = flag.Bool(skipDownload, false, "skip downloading the go-tpc binary") ) func main() { @@ -66,15 +77,27 @@ func main() { } var start2 time.Time - if *all || *csv { - if err = fetchTpccRepoAndEnforceConf(*tidbIP, *tidbPort, *warehouse, dataDirs, lightningIPs); err != nil { + start2 = time.Now() + if err = fetchTpcc(dataDirs, lightningIPs, *goTPCFile, *skipDownloading); err != nil { + os.Exit(1) + } + + // If test flag is enabled, just run tpcc test. + if *test { + var testStart time.Time + if err = runTPCCTest(lightningIPs[0], *tidbIP, *tidbPort, *dbName, *warehouse, *threads); err != nil { + fmt.Println(err.Error()) os.Exit(1) } - start2 = time.Now() - if err = genSchema(*tidbIP, *tidbPort, lightningIPs[0]); err != nil { + fmt.Println("prepare cost:", time.Since(start).String(), "test cost:", time.Since(testStart).String()) + os.Exit(0) + } + + if *all || *csv { + if err = dropDB(*tidbIP, *tidbPort, *dbName); err != nil { os.Exit(1) } - if err = genCSV(lightningIPs, dataDirs); err != nil { + if err = genCSV(lightningIPs, dataDirs, *tidbIP, *tidbPort, *dbName, *warehouse, *threads); err != nil { os.Exit(1) } } @@ -143,73 +166,40 @@ func getLightningIPsAndDataDirs() (lightningIPs, dataDirs []string, err error) { } /** -> rm -rf /tmp/benchmarksql -> git clone https://github.com/pingcap/benchmarksql.git /tmp/benchmarksql -> yum install -y java ant -> cd /tmp/benchmarksql -> ant -> sed -i 's/localhost:4000/tidb_ip:tidb_port' /tmp/benchmarksql/run/props.mysql -> sed -i "s/warehouses=[0-9]\+/warehouses=10000/" /tmp/benchmarksql/run/props.mysql -> echo fileLocation=%s >> /tmp/benchmarksql/run/props.mysql -> echo tableName=%s >> /tmp/benchmarksql/run/props.mysql +> rm -f /tmp/go-tpc +> wget -O /tmp/go-tpc binary_url; chmod +x /tmp/go-tpc */ -func fetchTpccRepoAndEnforceConf(tidbIP, tidbPort string, warehouse int64, lightningDirs []string, lightningIPs []string) (err error) { - var tableName []string - switch len(lightningIPs) { - case 1: - tableName = []string{"all"} - case 2: - tableName = []string{"customer", "stock,order"} - case 3: - tableName = []string{"customer", "stock", "order"} - } - - errCh := make(chan error, 3) +func fetchTpcc(lightningDirs []string, lightningIPs []string, binaryURL string, skipDownloading bool) (err error) { + errCh := make(chan error, len(lightningIPs)*10) // 3 should enough, echo run 3 cmd at most wg := &sync.WaitGroup{} - for i, lightningIP := range lightningIPs { - tn := tableName[i] - ip := lightningIP + for i := 0; i < len(lightningIPs); i++ { wg.Add(1) - go func() { + go func(ip string, dir string) { defer wg.Done() - if _, _, err = runCmd("ssh", ip, `rm -rf /tmp/benchmarksql`); err != nil { - errCh <- err - return - } - if _, _, err = runCmd("ssh", ip, `cd /tmp; git clone -b specify_table https://github.com/XuHuaiyu/benchmarksql.git`); err != nil { - errCh <- err - return - } - if _, _, err = runCmd("ssh", ip, `sudo yum install -y java ant`); err != nil { - errCh <- err - return - } - if _, _, err = runCmd("ssh", ip, fmt.Sprintf(`cd /tmp/benchmarksql; ant`)); err != nil { - errCh <- err - return - } - if _, _, err = runCmd("ssh", ip, fmt.Sprintf(`sed -i '%s' %s`, fmt.Sprintf(`s/localhost:4000/%s/;s/warehouses=[0-9]\+/%s/;s/loadWorkers=[0-9]\+/loadWorkers=%d/`, tidbIP+":"+tidbPort, fmt.Sprintf("warehouses=%d", warehouse), runtime.NumCPU()), "/tmp/benchmarksql/run/props.mysql")); err != nil { - errCh <- err - return - } - if _, _, err = runCmd("ssh", ip, fmt.Sprintf("echo fileLocation=%s/tpcc. >> /tmp/benchmarksql/run/props.mysql", lightningDirs[i])); err != nil { - errCh <- err - return - } - if _, _, err = runCmd("ssh", ip, fmt.Sprintf("echo tableName=%s >> /tmp/benchmarksql/run/props.mysql", tn)); err != nil { - errCh <- err - return + if !skipDownloading { + if _, _, err = runCmd("ssh", ip, `rm -f /tmp/go-tpc`); err != nil { + errCh <- err + return + } + + if _, _, err = runCmd("ssh", ip, fmt.Sprintf("wget -O /tmp/go-tpc.tar.gz %s; tar -xvf /tmp/go-tpc.tar.gz -C /tmp/; rm -f /tmp/go-tpc.tar.gz; chmod +x /tmp/go-tpc", binaryURL)); err != nil { + errCh <- err + return + } + fmt.Println("Download go-tpc binary successfully!") } - if _, _, err = runCmd("ssh", ip, fmt.Sprintf("mkdir -p %s", lightningDirs[i])); err != nil { + if _, _, err = runCmd("ssh", ip, fmt.Sprintf("mkdir -p %s", dir)); err != nil { errCh <- err return } - }() + }(lightningIPs[i], lightningDirs[i]) } + go func() { wg.Wait() close(errCh) }() + for err = range errCh { if err != nil { return @@ -220,38 +210,36 @@ func fetchTpccRepoAndEnforceConf(tidbIP, tidbPort string, warehouse int64, light } /** -> mysql -h tidbIP -u root -P tidbPort -e "drop database if exists tpcc" -> mysql -h tidbIP -u root -P tidbPort -e "create database tpcc"` -> cd /tmp/benchmarksql/run -> ./runSQL.sh props.mysql sql.mysql/tableCreates.sql -> ./runSQL.sh props.mysql sql.mysql/indexCreates.sql -> cd - +> mysql -h tidbIP -u root -P tidbPort -e "drop database if exists dbName" */ -func genSchema(tidbIP, tidbPort string, lightningIP string) (err error) { - if _, _, err = runCmd("bash", "-c", fmt.Sprintf(`mysql -h %s -u root -P %s -e "drop database if exists tpcc"`, tidbIP, tidbPort)); err != nil { - return - } - if _, _, err = runCmd("bash", "-c", fmt.Sprintf(`mysql -h %s -u root -P %s -e "create database tpcc"`, tidbIP, tidbPort)); err != nil { +func dropDB(tidbIP, tidbPort, dbName string) (err error) { + if _, _, err = runCmd("bash", "-c", fmt.Sprintf(`mysql -h %s -u root -P %s -e "drop database if exists %s"`, tidbIP, tidbPort, dbName)); err != nil { return } - var stdOutMsg []byte - if stdOutMsg, _, err = runCmd("ssh", lightningIP, "cd /tmp/benchmarksql/run; ./runSQL.sh props.mysql sql.mysql/tableCreates.sql; ./runSQL.sh props.mysql sql.mysql/indexCreates.sql"); err != nil { - return - } - fmt.Printf("%s", stdOutMsg) return } /** -> cd /tmp/benchmarksql/run -> ./runLoader.sh props.mysql props.mysql +> /tmp/go-tpc tpcc prepare -D dbName -T threadsNum --warehouses warehouseNum --output outputDir --tables [tables] */ -func genCSV(lightningIPs []string, lightningDirs []string) (err error) { +func genCSV(lightningIPs []string, lightningDirs []string, tidbIP, tidbPort, dbName string, warehouse, threads int64) (err error) { + var specifiedTables []string + switch len(lightningIPs) { + case 1: + // empty means generating all tables + specifiedTables = []string{""} + case 2: + specifiedTables = []string{"--tables stock", "--tables order_line,customer,config,district,history,item,new_order,orders,warehouse"} + case 3: + specifiedTables = []string{"--tables stock", "--tables orders,order_line", "--tables customer,config,district,history,item,new_order,warehouse"} + } + errCh := make(chan error, 3) wg := &sync.WaitGroup{} for i, lightningIP := range lightningIPs { ip := lightningIP dir := lightningDirs[i] + tables := specifiedTables[i] wg.Add(1) stdOutMsg := make(chan string, 40) go func() { @@ -261,7 +249,8 @@ func genCSV(lightningIPs []string, lightningDirs []string) (err error) { }() go func() { defer wg.Done() - if _, err = runCmdAndGetStdOutInTime(stdOutMsg, "ssh", ip, fmt.Sprintf("cd %s; rm -rf *; cd /tmp/benchmarksql/run; ./runLoader.sh props.mysql props.mysql", dir)); err != nil { + if _, err = runCmdAndGetStdOutInTime(stdOutMsg, "ssh", ip, fmt.Sprintf("cd %s; rm -rf *; "+ + "/tmp/go-tpc tpcc prepare -U root -H %s -P %s -D %s -T %d --warehouses %d --output %s %s", dir, tidbIP, tidbPort, dbName, threads, warehouse, dir, tables)); err != nil { if err != nil { errCh <- err return @@ -341,6 +330,37 @@ func restoreData(importerIPs []string, lightningIPs []string, deployDir string) return } +func runTPCCTest(lightningIP, tidbIP, tidbPort, dbName string, warehouse, threads int64) (err error) { + errCh := make(chan error, 1) + wg := &sync.WaitGroup{} + wg.Add(1) + stdOutMsg := make(chan string, 40) + go func() { + for line := range stdOutMsg { + fmt.Println(line) + } + }() + go func() { + defer wg.Done() + if _, err = runCmdAndGetStdOutInTime(stdOutMsg, "ssh", lightningIP, fmt.Sprintf("/tmp/go-tpc tpcc check -U root -H %s -P %s -D %s -T %d --warehouses %d", tidbIP, tidbPort, dbName, threads, warehouse)); err != nil { + return + } + if _, err = runCmdAndGetStdOutInTime(stdOutMsg, "ssh", lightningIP, fmt.Sprintf("/tmp/go-tpc tpcc run -U root -H %s -P %s -D %s -T %d --warehouses %d", tidbIP, tidbPort, dbName, threads, warehouse)); err != nil { + return + } + }() + go func() { + wg.Wait() + close(errCh) + }() + for err = range errCh { + return + } + + fmt.Println("#============\ntpcc test finished\n#============") + return +} + func runCmd(name string, arg ...string) (stdOutBytes []byte, stdErrBytes []byte, err error) { if _, err = exec.LookPath(name); err != nil { fmt.Printf("%s %s\n%s", name, strings.Join(arg, " "), err.Error())