Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use go-tpc instead of benchmarksql #1

Merged
merged 9 commits into from
Mar 17, 2020
Merged
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 52 additions & 67 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ const (
nmRestore = "restore"

nmWarehouse = "warehouse"
nmThreads = "threads"

nmTiDBIP = "tidb-ip"
nmTiDBPort = "tidb-port"
Expand All @@ -31,6 +32,9 @@ const (
nmDataDir = "data-dir"

nmImporterIP = "importer-ip"
nmDB = "db"
binaryURL = "binary-url"
skipDownload = "skip-download"
)

var (
Expand All @@ -42,14 +46,19 @@ var (
tidbPort = flag.String(nmTiDBPort, "4000", "port of tidb-server")
deployDir = flag.String(nmDeployDir, "", "directory path of cluster deployment")

warehouse = flag.Int64(nmWarehouse, 100, "count of warehouse")
warehouse = flag.Int64(nmWarehouse, 100, "number of warehouses")
threads = flag.Int64(nmThreads, 40, "number of threads")

//ansibleDir = flag.String(nmAnsibleDir, "", "ansible directory path")

// TODO: If there is only one lightning, we do not need this var, we can fetch it from ansible.
lightningIP = flag.String(nmLightningIP, "", "ip address of tidb-lightnings")
dataDir = flag.String(nmDataDir, "", "data source directory of lightning")
importerIP = flag.String(nmImporterIP, "", "ip address of tikv-importer")

dbName = flag.String(nmDB, "tpcc", "test database name")
goTPCBinary = flag.String(binaryURL, "https://github.com/yeya24/go-tpc/releases/download/v0.1/go-tpc", "url of the go-tpc binary to download")
skipDownloading = flag.Bool(skipDownload, false, "skip downloading the go-tpc binary")
)

func main() {
Expand All @@ -67,14 +76,14 @@ func main() {

var start2 time.Time
if *all || *csv {
if err = fetchTpccRepoAndEnforceConf(*tidbIP, *tidbPort, *warehouse, dataDirs, lightningIPs); err != nil {
if err = fetchTpcc(dataDirs, lightningIPs, *goTPCBinary, *skipDownloading); err != nil {
os.Exit(1)
}
start2 = time.Now()
if err = genSchema(*tidbIP, *tidbPort, lightningIPs[0]); err != nil {
if err = genSchema(*tidbIP, *tidbPort, lightningIPs[0], *dbName); err != nil {
os.Exit(1)
}
if err = genCSV(lightningIPs, dataDirs); err != nil {
if err = genCSV(lightningIPs, dataDirs, *warehouse, *threads, *dbName); err != nil {
os.Exit(1)
}
}
Expand Down Expand Up @@ -143,62 +152,30 @@ func getLightningIPsAndDataDirs() (lightningIPs, dataDirs []string, err error) {
}

/**
> rm -rf /tmp/benchmarksql
> git clone https://github.com/pingcap/benchmarksql.git /tmp/benchmarksql
> yum install -y java ant
> cd /tmp/benchmarksql
> ant
> sed -i 's/localhost:4000/tidb_ip:tidb_port' /tmp/benchmarksql/run/props.mysql
> sed -i "s/warehouses=[0-9]\+/warehouses=10000/" /tmp/benchmarksql/run/props.mysql
> echo fileLocation=%s >> /tmp/benchmarksql/run/props.mysql
> echo tableName=%s >> /tmp/benchmarksql/run/props.mysql
> rm -f /tmp/go-tpc
> wget -O /tmp/go-tpc binary_url; chmod +x /tmp/go-tpc
*/
func fetchTpccRepoAndEnforceConf(tidbIP, tidbPort string, warehouse int64, lightningDirs []string, lightningIPs []string) (err error) {
var tableName []string
switch len(lightningIPs) {
case 1:
tableName = []string{"all"}
case 2:
tableName = []string{"customer", "stock,order"}
case 3:
tableName = []string{"customer", "stock", "order"}
}

func fetchTpcc(lightningDirs []string, lightningIPs []string, binaryURL string, skipDownloading bool) (err error) {
errCh := make(chan error, 3)
wg := &sync.WaitGroup{}
for i, lightningIP := range lightningIPs {
tn := tableName[i]
ip := lightningIP
wg.Add(1)
go func() {
defer wg.Done()
if _, _, err = runCmd("ssh", ip, `rm -rf /tmp/benchmarksql`); err != nil {
errCh <- err
return
}
if _, _, err = runCmd("ssh", ip, `cd /tmp; git clone -b specify_table https://github.com/XuHuaiyu/benchmarksql.git`); err != nil {
errCh <- err
return
}
if _, _, err = runCmd("ssh", ip, `sudo yum install -y java ant`); err != nil {
errCh <- err
return
}
if _, _, err = runCmd("ssh", ip, fmt.Sprintf(`cd /tmp/benchmarksql; ant`)); err != nil {
errCh <- err
return
}
if _, _, err = runCmd("ssh", ip, fmt.Sprintf(`sed -i '%s' %s`, fmt.Sprintf(`s/localhost:4000/%s/;s/warehouses=[0-9]\+/%s/;s/loadWorkers=[0-9]\+/loadWorkers=%d/`, tidbIP+":"+tidbPort, fmt.Sprintf("warehouses=%d", warehouse), runtime.NumCPU()), "/tmp/benchmarksql/run/props.mysql")); err != nil {
errCh <- err
return
}
if _, _, err = runCmd("ssh", ip, fmt.Sprintf("echo fileLocation=%s/tpcc. >> /tmp/benchmarksql/run/props.mysql", lightningDirs[i])); err != nil {
errCh <- err
return
}
if _, _, err = runCmd("ssh", ip, fmt.Sprintf("echo tableName=%s >> /tmp/benchmarksql/run/props.mysql", tn)); err != nil {
errCh <- err
return
if !skipDownloading {
if _, _, err = runCmd("ssh", ip, `rm -f /tmp/go-tpc`); err != nil {
errCh <- err
return
}

// TODO(yeya24): This is just a tmp release to download the binary.
// Change it when the binary can be directly download in the main repo.
if _, _, err = runCmd("ssh", ip, fmt.Sprintf("wget -O /tmp/go-tpc %s; chmod +x /tmp/go-tpc", binaryURL)); err != nil {
errCh <- err
return
}
fmt.Println("Download go-tpc binary successfully!")
}
if _, _, err = runCmd("ssh", ip, fmt.Sprintf("mkdir -p %s", lightningDirs[i])); err != nil {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

loop variable i captured by func literal

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! Will update soon

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

-       threads   = flag.Int64(nmThreads, 40, "number of threads")
+       threads   = flag.Int64(nmThreads, 40, "number of threads of go-tpc")

        //ansibleDir = flag.String(nmAnsibleDir, "", "ansible directory path")

@@ -170,12 +170,11 @@ func getLightningIPsAndDataDirs() (lightningIPs, dataDirs []string, err error) {
 > wget -O /tmp/go-tpc binary_url; chmod +x /tmp/go-tpc
 */
 func fetchTpcc(lightningDirs []string, lightningIPs []string, binaryURL string, skipDownloading bool) (err error) {
-       errCh := make(chan error, 3)
+       errCh := make(chan error, len(lightningIPs)*10) // 3 should enough, echo run 3 cmd at most
        wg := &sync.WaitGroup{}
-       for i, lightningIP := range lightningIPs {
-               ip := lightningIP
+       for i := 0; i < len(lightningIPs); i++ {
                wg.Add(1)
-               go func() {
+               go func(ip string, dir string) {
                        defer wg.Done()
                        if !skipDownloading {
                                if _, _, err = runCmd("ssh", ip, `rm -f /tmp/go-tpc`); err != nil {
@@ -189,16 +188,18 @@ func fetchTpcc(lightningDirs []string, lightningIPs []string, binaryURL string,
                                }
                                fmt.Println("Download go-tpc binary successfully!")
                        }
-                       if _, _, err = runCmd("ssh", ip, fmt.Sprintf("mkdir -p %s", lightningDirs[i])); err != nil {
+                       if _, _, err = runCmd("ssh", ip, fmt.Sprintf("mkdir -p %s", dir)); err != nil {
                                errCh <- err
                                return
                        }
-               }()
+               }(lightningIPs[i], lightningDirs[i])
        }
+
        go func() {
                wg.Wait()
                close(errCh)
        }()
+

also looks maybe block at push into errCh if having multi lightning, have we test it with mutl lightning?
I make a change but has no right to push

errCh <- err
Expand All @@ -220,22 +197,16 @@ func fetchTpccRepoAndEnforceConf(tidbIP, tidbPort string, warehouse int64, light
}

/**
> mysql -h tidbIP -u root -P tidbPort -e "drop database if exists tpcc"
> mysql -h tidbIP -u root -P tidbPort -e "create database tpcc"`
> cd /tmp/benchmarksql/run
> ./runSQL.sh props.mysql sql.mysql/tableCreates.sql
> ./runSQL.sh props.mysql sql.mysql/indexCreates.sql
> cd -
> mysql -h tidbIP -u root -P tidbPort -e "drop database if exists dbName"
> /tmp/go-tpc tpcc schema -U root -H tidbIP -P tidbPort -D dbName
*/
func genSchema(tidbIP, tidbPort string, lightningIP string) (err error) {
if _, _, err = runCmd("bash", "-c", fmt.Sprintf(`mysql -h %s -u root -P %s -e "drop database if exists tpcc"`, tidbIP, tidbPort)); err != nil {
return
}
if _, _, err = runCmd("bash", "-c", fmt.Sprintf(`mysql -h %s -u root -P %s -e "create database tpcc"`, tidbIP, tidbPort)); err != nil {
func genSchema(tidbIP, tidbPort, lightningIP, dbName string) (err error) {
if _, _, err = runCmd("bash", "-c", fmt.Sprintf(`mysql -h %s -u root -P %s -e "drop database if exists %s"`, tidbIP, tidbPort, dbName)); err != nil {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

建库和建表,都放在 /tmp/go-tpc tpcc schema -U root -H %s -P %s -D %s 中吧。用户的服务器上可能没有安装 mysql client

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

现在已经删掉schema 这个命令了。然后建表和建库都是自动的。但是drop掉已经存在的库不支持,这个怎么办?还是用mysql drop一下吧?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

为啥不代码里用 driver drop?

return
}
var stdOutMsg []byte
if stdOutMsg, _, err = runCmd("ssh", lightningIP, "cd /tmp/benchmarksql/run; ./runSQL.sh props.mysql sql.mysql/tableCreates.sql; ./runSQL.sh props.mysql sql.mysql/indexCreates.sql"); err != nil {
// Create schema, including database and table.
if stdOutMsg, _, err = runCmd("ssh", lightningIP, fmt.Sprintf("/tmp/go-tpc tpcc schema -U root -H %s -P %s -D %s", tidbIP, tidbPort, dbName)); err != nil {
return
}
fmt.Printf("%s", stdOutMsg)
Expand All @@ -244,14 +215,27 @@ func genSchema(tidbIP, tidbPort string, lightningIP string) (err error) {

/**
> cd /tmp/benchmarksql/run
> ./runLoader.sh props.mysql props.mysql
> /tmp/go-tpc tpcc prepare -D dbName -T threadsNum --warehouses warehouseNum --csv.output outputDir --csv.tables [tables]
*/
func genCSV(lightningIPs []string, lightningDirs []string) (err error) {
func genCSV(lightningIPs []string, lightningDirs []string, warehouse, threads int64, dbName string) (err error) {
var specifiedTables []string
switch len(lightningIPs) {
case 1:
// empty means generating all tables
specifiedTables = []string{""}
case 2:
specifiedTables = []string{"--csv.tables customer", "--csv.tables stock,orders"}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. 在存在 3 个 lightning 时,分别为:
  • stock
  • order_line
  • customer, config, district, history, item, new_order, oorder, warehouse
  1. 当存在 2 个 lightning 时,分别为:
  • stock
  • order_line, customer, config, district, history, item, new_order, oorder, warehouse
  1. 因为lightning 目前导入的瓶颈,在 stock 表上(约需要 2 小时),其次是 stock 表(约需要 1h30m),剩余表一起的导入时间约 1h。所以当 > 3 台 lightning 时,只需要用到 3 台即可,用更多的 lightning 也无法加速整体的导入时间。

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@XuHuaiyu 在 go-tpc 中,order_line 的生成是依赖于 orders 表。所以当存在3个lightning的时候,需要把order_line 和 order 放在同一台机器生成,那这个怎么调整一下呢?就在第二台机器上生成order_line和orders把?

case 3:
specifiedTables = []string{"--csv.tables customer", "--csv.tables stock", "--csv.tables orders"}
// TODO(yeya24): What about len > 3?
}

errCh := make(chan error, 3)
wg := &sync.WaitGroup{}
for i, lightningIP := range lightningIPs {
ip := lightningIP
dir := lightningDirs[i]
table := specifiedTables[i]
wg.Add(1)
stdOutMsg := make(chan string, 40)
go func() {
Expand All @@ -261,7 +245,8 @@ func genCSV(lightningIPs []string, lightningDirs []string) (err error) {
}()
go func() {
defer wg.Done()
if _, err = runCmdAndGetStdOutInTime(stdOutMsg, "ssh", ip, fmt.Sprintf("cd %s; rm -rf *; cd /tmp/benchmarksql/run; ./runLoader.sh props.mysql props.mysql", dir)); err != nil {
if _, err = runCmdAndGetStdOutInTime(stdOutMsg, "ssh", ip, fmt.Sprintf("cd %s; rm -rf *; "+
"/tmp/go-tpc tpcc prepare -D %s -T %d --warehouses %d --csv.output %s %s", dir, dbName, threads, warehouse, dir, table)); err != nil {
if err != nil {
errCh <- err
return
Expand Down