Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
Signed-off-by: yeya24 <[email protected]>
  • Loading branch information
yeya24 committed Mar 10, 2020
1 parent 3fc9ea9 commit 45e6ede
Showing 1 changed file with 35 additions and 24 deletions.
59 changes: 35 additions & 24 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
const (
nmAll = "all"
nmCSV = "csv"
nmTest = "test"
nmRestore = "restore"

nmWarehouse = "warehouse"
Expand All @@ -33,13 +34,14 @@ const (

nmImporterIP = "importer-ip"
nmDB = "db"
binaryURL = "binary-url"
downloadURL = "download-url"
skipDownload = "skip-download"
)

var (
all = flag.Bool(nmAll, true, "do all the actions")
csv = flag.Bool(nmCSV, false, "generate tpcc csv files")
test = flag.Bool(nmTest, false, "run tpcc test")
restore = flag.Bool(nmRestore, false, "start lightning, importer and restore files")

tidbIP = flag.String(nmTiDBIP, "127.0.0.1", "ip of tidb-server")
Expand All @@ -57,7 +59,7 @@ var (
importerIP = flag.String(nmImporterIP, "", "ip address of tikv-importer")

dbName = flag.String(nmDB, "tpcc", "test database name")
goTPCBinary = flag.String(binaryURL, "https://github.com/yeya24/go-tpc/releases/download/v0.1/go-tpc", "url of the go-tpc binary to download")
goTPCFile = flag.String(downloadURL, "https://github.com/pingcap/go-tpc/releases/download/v1.0.0/go-tpc_1.0.0_linux_amd64.tar.gz", "url of the go-tpc binary to download")
skipDownloading = flag.Bool(skipDownload, false, "skip downloading the go-tpc binary")
)

Expand All @@ -74,16 +76,25 @@ func main() {
os.Exit(1)
}

if *test {
if err = runTPCCTest(lightningIPs[0], *tidbIP, *tidbPort, *dbName, *warehouse, *threads); err != nil {
fmt.Println(err.Error())
os.Exit(1)
}
fmt.Println("tpcc test finished successfully!")
os.Exit(0)
}

var start2 time.Time
if *all || *csv {
if err = fetchTpcc(dataDirs, lightningIPs, *goTPCBinary, *skipDownloading); err != nil {
if err = fetchTpcc(dataDirs, lightningIPs, *goTPCFile, *skipDownloading); err != nil {
os.Exit(1)
}
start2 = time.Now()
if err = genSchema(*tidbIP, *tidbPort, lightningIPs[0], *dbName); err != nil {
if err = dropDB(*tidbIP, *tidbPort, *dbName); err != nil {
os.Exit(1)
}
if err = genCSV(lightningIPs, dataDirs, *warehouse, *threads, *dbName); err != nil {
if err = genCSV(lightningIPs, dataDirs, *tidbIP, *tidbPort, *dbName, *warehouse, *threads); err != nil {
os.Exit(1)
}
}
Expand Down Expand Up @@ -169,9 +180,7 @@ func fetchTpcc(lightningDirs []string, lightningIPs []string, binaryURL string,
return
}

// TODO(yeya24): This is just a tmp release to download the binary.
// Change it when the binary can be directly download in the main repo.
if _, _, err = runCmd("ssh", ip, fmt.Sprintf("wget -O /tmp/go-tpc %s; chmod +x /tmp/go-tpc", binaryURL)); err != nil {
if _, _, err = runCmd("ssh", ip, fmt.Sprintf("wget -O /tmp/go-tpc.tar.gz %s; tar -xvf /tmp/go-tpc.tar.gz -C /tmp/; rm -f /tmp/go-tpc.tar.gz; chmod +x /tmp/go-tpc", binaryURL)); err != nil {
errCh <- err
return
}
Expand All @@ -198,44 +207,35 @@ func fetchTpcc(lightningDirs []string, lightningIPs []string, binaryURL string,

/**
> mysql -h tidbIP -u root -P tidbPort -e "drop database if exists dbName"
> /tmp/go-tpc tpcc schema -U root -H tidbIP -P tidbPort -D dbName
*/
func genSchema(tidbIP, tidbPort, lightningIP, dbName string) (err error) {
func dropDB(tidbIP, tidbPort, dbName string) (err error) {
if _, _, err = runCmd("bash", "-c", fmt.Sprintf(`mysql -h %s -u root -P %s -e "drop database if exists %s"`, tidbIP, tidbPort, dbName)); err != nil {
return
}
var stdOutMsg []byte
// Create schema, including database and table.
if stdOutMsg, _, err = runCmd("ssh", lightningIP, fmt.Sprintf("/tmp/go-tpc tpcc schema -U root -H %s -P %s -D %s", tidbIP, tidbPort, dbName)); err != nil {
return
}
fmt.Printf("%s", stdOutMsg)
return
}

/**
> cd /tmp/benchmarksql/run
> /tmp/go-tpc tpcc prepare -D dbName -T threadsNum --warehouses warehouseNum --csv.output outputDir --csv.tables [tables]
> /tmp/go-tpc tpcc prepare -D dbName -T threadsNum --warehouses warehouseNum --output outputDir --tables [tables]
*/
func genCSV(lightningIPs []string, lightningDirs []string, warehouse, threads int64, dbName string) (err error) {
func genCSV(lightningIPs []string, lightningDirs []string, tidbIP, tidbPort, dbName string, warehouse, threads int64) (err error) {
var specifiedTables []string
switch len(lightningIPs) {
case 1:
// empty means generating all tables
specifiedTables = []string{""}
case 2:
specifiedTables = []string{"--csv.tables customer", "--csv.tables stock,orders"}
specifiedTables = []string{"--tables stock", "--tables order_line,customer,config,district,history,item,new_order,orders,warehouse"}
case 3:
specifiedTables = []string{"--csv.tables customer", "--csv.tables stock", "--csv.tables orders"}
// TODO(yeya24): What about len > 3?
specifiedTables = []string{"--tables stock", "--tables orders,order_line", "--tables customer,config,district,history,item,new_order,warehouse"}
}

errCh := make(chan error, 3)
wg := &sync.WaitGroup{}
for i, lightningIP := range lightningIPs {
ip := lightningIP
dir := lightningDirs[i]
table := specifiedTables[i]
tables := specifiedTables[i]
wg.Add(1)
stdOutMsg := make(chan string, 40)
go func() {
Expand All @@ -246,7 +246,7 @@ func genCSV(lightningIPs []string, lightningDirs []string, warehouse, threads in
go func() {
defer wg.Done()
if _, err = runCmdAndGetStdOutInTime(stdOutMsg, "ssh", ip, fmt.Sprintf("cd %s; rm -rf *; "+
"/tmp/go-tpc tpcc prepare -D %s -T %d --warehouses %d --csv.output %s %s", dir, dbName, threads, warehouse, dir, table)); err != nil {
"/tmp/go-tpc tpcc prepare -U root -H %s -P %s -D %s -T %d --warehouses %d --output %s %s", dir, tidbIP, tidbPort, dbName, threads, warehouse, dir, tables)); err != nil {
if err != nil {
errCh <- err
return
Expand Down Expand Up @@ -326,6 +326,17 @@ func restoreData(importerIPs []string, lightningIPs []string, deployDir string)
return
}

func runTPCCTest(lightningIP, tidbIP, tidbPort, dbName string, warehouse, threads int64) (err error) {
stdOutMsg := make(chan string, 40)
if _, err = runCmdAndGetStdOutInTime(stdOutMsg, "ssh", lightningIP, fmt.Sprintf("/tmp/go-tpc tpcc check -U root -H %s -P %s -D %s -T %d --warehouses %d", tidbIP, tidbPort, dbName, threads, warehouse)); err != nil {
return
}
if _, err = runCmdAndGetStdOutInTime(stdOutMsg, "ssh", lightningIP, fmt.Sprintf("/tmp/go-tpc tpcc run -U root -H %s -P %s -D %s -T %d --warehouses %d", tidbIP, tidbPort, dbName, threads, warehouse)); err != nil {
return
}
return
}

func runCmd(name string, arg ...string) (stdOutBytes []byte, stdErrBytes []byte, err error) {
if _, err = exec.LookPath(name); err != nil {
fmt.Printf("%s %s\n%s", name, strings.Join(arg, " "), err.Error())
Expand Down

0 comments on commit 45e6ede

Please sign in to comment.