Skip to content

Commit

Permalink
skip table on error and fix golint (pingcap#21)
Browse files Browse the repository at this point in the history
  • Loading branch information
holys authored and huachaohuang committed Mar 29, 2018
1 parent 4e49b82 commit 05ad9cf
Show file tree
Hide file tree
Showing 10 changed files with 64 additions and 49 deletions.
2 changes: 1 addition & 1 deletion ingest/common/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func (m *Metrics) costTimeNS(name string, ns int64) {

func (m *Metrics) DumpTiming() string {
marks := make([]string, 0, len(m.Timing))
for mark, _ := range m.Timing {
for mark := range m.Timing {
marks = append(marks, mark)
}
sort.Strings(marks)
Expand Down
6 changes: 3 additions & 3 deletions ingest/kv/kv-deliver.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ type KVDeliverKeeper struct {
pdAddr string
clientsPool []*KVDeliverClient // aka. connection pool

txnIdCounter int // TODO : need to update to another algorithm
txnIDCounter int // TODO : need to update to another algorithm
txnBoard map[uuid.UUID]*txnInfo
txns map[string][]*deliverTxn // map[tag]{*txn, *txn, *txn ...}

Expand All @@ -311,7 +311,7 @@ func NewKVDeliverKeeper(importServerAddr, pdAddr string) *KVDeliverKeeper {
pdAddr: pdAddr,
clientsPool: make([]*KVDeliverClient, 0, 32),

txnIdCounter: 0, // TODO : need to update to another algorithm
txnIDCounter: 0, // TODO : need to update to another algorithm
txns: make(map[string][]*deliverTxn),
txnBoard: make(map[uuid.UUID]*txnInfo),
txnFlushQueue: make(chan *deliverTxn, 64),
Expand Down Expand Up @@ -349,7 +349,7 @@ func (k *KVDeliverKeeper) validate(txn *deliverTxn) bool {
}

func (k *KVDeliverKeeper) newTxn(db string, table string) *deliverTxn {
k.txnIdCounter++
k.txnIDCounter++
uuid := uuid.Must(uuid.NewV4())

tag := buildTag(db, table)
Expand Down
4 changes: 2 additions & 2 deletions ingest/kv/sql2kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ func setGlobalVars() {
plan.PreparedPlanCacheCapacity = 10
}

func InitMembufCap(batchSqlLength int64) {
kv.ImportingTxnMembufCap = int(batchSqlLength) * 4
func InitMembufCap(batchSQLLength int64) {
kv.ImportingTxnMembufCap = int(batchSQLLength) * 4
// TODO : calculate predicted ratio, bwtween sql and kvs' size, base on specified DDL
}

Expand Down
4 changes: 4 additions & 0 deletions ingest/log/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"gopkg.in/natefinch/lumberjack.v2"

"github.com/pingcap/tidb-lightning/ingest/common"
"github.com/pingcap/tidb/util/logutil"
)

const (
Expand Down Expand Up @@ -130,6 +131,9 @@ func InitLogger(cfg *LogConfig) error {
log.AddHook(&contextHook{})
log.SetFormatter(&SimpleTextFormater{})

// increase tidb log level to hide the annoying log.
logutil.InitLogger(&logutil.LogConfig{Level: "warn"})

if len(cfg.File) > 0 {
if common.IsDirExists(cfg.File) {
return errors.Errorf("can't use directory as log file name : %s", cfg.File)
Expand Down
6 changes: 3 additions & 3 deletions ingest/mydump/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,15 +245,15 @@ func countValues(sqlText []byte) int {
ps : Count num of tuples (/values) appears within sql statement like :
"INSERT INTO `table` VALUES (..), (..), (..);"
*/
var textLen int = len(sqlText)
var textLen = len(sqlText)
var slice []byte
var tuplesNum int = 0
var tuplesNum int

for i, chr := range sqlText {
if chr == ')' && i < textLen-1 {
slice = bytes.TrimSpace(sqlText[i+1:])
if len(slice) > 0 && (slice[0] == ',' || slice[0] == ';') {
tuplesNum += 1
tuplesNum++
}
}
}
Expand Down
12 changes: 6 additions & 6 deletions ingest/mydump/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func NewMDDataReader(file string, offset int64) (*MDDataReader, error) {
}

if len(mdr.stmtHeader) == 0 {
return nil, errors.New("can not find any insert statment !")
return nil, errors.New("can not find any insert statment")
}

mdr.skipAnnotation(offset)
Expand Down Expand Up @@ -144,12 +144,12 @@ func (r *MDDataReader) Tell() int64 {
}

func (r *MDDataReader) currOffset() int64 {
if off, err := r.fd.Seek(0, io.SeekCurrent); err != nil {
off, err := r.fd.Seek(0, io.SeekCurrent)
if err != nil {
log.Errorf("get file offset failed (%s) : %v", r.file, err)
return -1
} else {
return off
}
return off
}

func getInsertStatmentHeader(file string) []byte {
Expand Down Expand Up @@ -196,7 +196,7 @@ func (r *MDDataReader) Read(minSize int64) ([][]byte, error) {
defer reader.Reset(fd)

// split file's content into multi sql statement
var stmts [][]byte = make([][]byte, 0, 8)
var stmts = make([][]byte, 0, 8)
appendSQL := func(sql []byte) {
sql = bytes.TrimSpace(sql)
sqlLen := len(sql)
Expand Down Expand Up @@ -235,7 +235,7 @@ func (r *MDDataReader) Read(minSize int64) ([][]byte, error) {
(...);
'''
*/
var statment []byte = make([]byte, 0, minSize+4096)
var statment = make([]byte, 0, minSize+4096)
var readSize, lineSize int64
var line []byte
var err error
Expand Down
5 changes: 2 additions & 3 deletions ingest/mydump/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,8 @@ func (rs regionSlice) Swap(i, j int) {
func (rs regionSlice) Less(i, j int) bool {
if rs[i].File == rs[j].File {
return rs[i].Offset < rs[j].Offset
} else {
return rs[i].File < rs[j].File
}
return rs[i].File < rs[j].File
}

////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -108,7 +107,7 @@ func (f *RegionFounder) MakeTableRegions(meta *MDTableMeta, allocateRowID bool)
region.BeginRowID = -1
}

var tableRows int64 = 0
var tableRows int64
for _, region := range filesRegions {
if allocateRowID {
region.BeginRowID = tableRows + 1
Expand Down
66 changes: 39 additions & 27 deletions ingest/restore/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,8 @@ func (rc *RestoreControlloer) restoreTables(ctx context.Context) error {
}
}()

skipTables := make(map[string]struct{})

var wg sync.WaitGroup
for _, task := range tasks {
select {
Expand All @@ -183,11 +185,21 @@ func (rc *RestoreControlloer) restoreTables(ctx context.Context) error {

worker := workers.Apply()
wg.Add(1)
log.Warnf("region allowed to run >>>>>> [%s]", task.region.Name())
log.Infof("restoring region %s", task.region.Name())
go func(w *RestoreWorker, t *regionRestoreTask) {
defer wg.Done()
defer workers.Recycle(w)
t.Run(ctx)
defer wg.Done()
table := fmt.Sprintf("%s.%s", t.region.DB, t.region.Table)
if _, ok := skipTables[table]; ok {
log.Infof("something wrong with table %s before, so skip region %s", table, t.region.Name())
return
}
err := t.Run(ctx)
if err != nil {
log.Errorf("table %s region %s run task error %s", table, t.region.Name(), errors.ErrorStack(err))
skipTables[table] = struct{}{}
}

}(worker, task)
}
wg.Wait() // TODO ... ctx abroted
Expand Down Expand Up @@ -354,7 +366,7 @@ const (
statFailed string = "failed"
)

type restoreCallback func(regionID int, maxRowID int64, rows uint64, checksum *verify.KVChecksum, err error)
type restoreCallback func(regionID int, maxRowID int64, rows uint64, checksum *verify.KVChecksum) error

type regionRestoreTask struct {
status string
Expand Down Expand Up @@ -383,21 +395,24 @@ func newRegionRestoreTask(
}
}

func (t *regionRestoreTask) Run(ctx context.Context) {
func (t *regionRestoreTask) Run(ctx context.Context) error {
region := t.region
log.Infof("Start restore region : [%s] ...", t.region.Name())

t.status = statRunning
maxRowID, rows, checksum, err := t.run(ctx)
if err != nil {
log.Errorf("Table region (%s) restore failed : %s", region.Name(), err.Error())
return errors.Trace(err)
}

log.Infof("Finished restore region : [%s]", region.Name())
t.callback(region.ID, maxRowID, rows, checksum, err)
err = t.callback(region.ID, maxRowID, rows, checksum)
if err != nil {
return errors.Trace(err)
}
t.status = statFinished

return
return nil
}

func (t *regionRestoreTask) run(ctx context.Context) (int64, uint64, *verify.KVChecksum, error) {
Expand Down Expand Up @@ -571,12 +586,9 @@ func (tr *TableRestore) loadRegions() {
regions := founder.MakeTableRegions(tr.tableMeta, preAllocateRowsID)

table := tr.tableMeta.Name
for _, region := range regions {
log.Warnf("[%s] region - %s", table, region.Name())
}

id2regions := make(map[int]*mydump.TableRegion)
for _, region := range regions {
log.Infof("[%s] region - %s", table, region.Name())
id2regions[region.ID] = region
}

Expand All @@ -593,18 +605,11 @@ func (tr *TableRestore) loadRegions() {
return
}

func (tr *TableRestore) onRegionFinished(id int, maxRowID int64, rows uint64, checksum *verify.KVChecksum, err error) {
func (tr *TableRestore) onRegionFinished(id int, maxRowID int64, rows uint64, checksum *verify.KVChecksum) error {
table := tr.tableInfo.Name
tr.mux.Lock()
defer tr.mux.Unlock()

region := tr.id2regions[id]
if err != nil {
log.Errorf("[%s] region (%s) restore failed : %s",
table, region.Name(), err.Error())
return
}

tr.handledRegions[id] = &regionStat{
maxRowID: maxRowID,
rows: rows,
Expand All @@ -613,19 +618,22 @@ func (tr *TableRestore) onRegionFinished(id int, maxRowID int64, rows uint64, ch

total := len(tr.regions)
handled := len(tr.handledRegions)
log.Infof("[%s] handled region count = %d (%s)", table, handled, common.Percent(handled, total))
log.Infof("table %s handled region count = %d (%s)", table, handled, common.Percent(handled, total))
if handled == len(tr.tasks) {
tr.onFinished()
err := tr.onFinished()
if err != nil {
return errors.Trace(err)
}
}

return
return nil
}

func (tr *TableRestore) makeKVDeliver() (kv.KVDeliver, error) {
return makeKVDeliver(tr.ctx, tr.cfg, tr.dbInfo, tr.tableInfo)
}

func (tr *TableRestore) onFinished() {
func (tr *TableRestore) onFinished() error {
// generate meta kv
var (
tableMaxRowID int64
Expand All @@ -643,13 +651,17 @@ func (tr *TableRestore) onFinished() {
log.Infof("table %s self-calculated checksum %s", table, checksum)
tr.localChecksums[table] = checksum

tr.restoreTableMeta(tableMaxRowID)
if err := tr.restoreTableMeta(tableMaxRowID); err != nil {
return errors.Trace(err)
}

// flush all kvs into TiKV ~
tr.ingestKV()
if err := tr.ingestKV(); err != nil {
return errors.Trace(err)
}

log.Infof("table %s has imported %d rows", table, tableRows)
return
return nil
}

func (tr *TableRestore) restoreTableMeta(rowID int64) error {
Expand Down
6 changes: 3 additions & 3 deletions ingest/sql/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@ func ParseInsertStmt(sql []byte, values *[]interface{}) error {
stack := 0
for e = s; e < size; e++ {
if sql[e] == '(' {
stack += 1
stack++
} else if sql[e] == ')' {
stack -= 1
stack--
if stack == 0 {
break
}
Expand All @@ -57,7 +57,7 @@ func ParseInsertStmt(sql []byte, values *[]interface{}) error {

// extract columns' values
_ = parseRowValues(sql[s+1:e], values)
e += 1 // skip ')'
e++ // skip ')'

// check ending ")," or ");"
for ; e < size; e++ {
Expand Down
2 changes: 1 addition & 1 deletion ingest/verification/checksum.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func (c *KVChecksum) Update(kvs []kvec.KvPair) {
sum = crc64.Update(0, c.ecmaTable, pair.Key)
sum = crc64.Update(sum, c.ecmaTable, pair.Val)
checksum ^= sum
kvNum += 1
kvNum++
bytes += (len(pair.Key) + len(pair.Val))
}

Expand Down

0 comments on commit 05ad9cf

Please sign in to comment.