Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lightning: enhance request log in server-mode and restore progress #33718

Merged
merged 10 commits into from
Apr 18, 2022
2 changes: 1 addition & 1 deletion br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -1243,7 +1243,7 @@ loopWrite:
engine.importedKVSize.Add(rangeStats.totalBytes)
engine.importedKVCount.Add(rangeStats.count)
engine.finishedRanges.add(finishedRange)
metric.BytesCounter.WithLabelValues(metric.TableStateImported).Add(float64(rangeStats.totalBytes))
metric.BytesCounter.WithLabelValues(metric.BytesStateImported).Add(float64(rangeStats.totalBytes))
}
return errors.Trace(err)
}
Expand Down
63 changes: 54 additions & 9 deletions br/pkg/lightning/lightning.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,50 @@ func (l *Lightning) GoServe() error {
return l.goServe(statusAddr, io.Discard)
}

// TODO: maybe handle http request using gin
type loggingResponseWriter struct {
http.ResponseWriter
statusCode int
body string
}

func newLoggingResponseWriter(w http.ResponseWriter) *loggingResponseWriter {
return &loggingResponseWriter{ResponseWriter: w, statusCode: http.StatusOK}
}

func (lrw *loggingResponseWriter) WriteHeader(code int) {
lrw.statusCode = code
lrw.ResponseWriter.WriteHeader(code)
}

func (lrw *loggingResponseWriter) Write(d []byte) (int, error) {
// keep first part of the response for logging, max 1K
if lrw.body == "" && len(d) > 0 {
length := len(d)
if length > 1024 {
length = 1024
}
lrw.body = string(d[:length])
}
return lrw.ResponseWriter.Write(d)
}

func httpHandleWrapper(h http.HandlerFunc) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
logger := log.L().With(zap.String("method", r.Method), zap.Stringer("url", r.URL)).
Begin(zapcore.InfoLevel, "process http request")

newWriter := newLoggingResponseWriter(w)
h.ServeHTTP(newWriter, r)

bodyField := zap.Skip()
if newWriter.Header().Get("Content-Encoding") != "gzip" {
bodyField = zap.String("body", newWriter.body)
}
logger.End(zapcore.InfoLevel, nil, zap.Int("status", newWriter.statusCode), bodyField)
}
}

func (l *Lightning) goServe(statusAddr string, realAddrWriter io.Writer) error {
mux := http.NewServeMux()
mux.Handle("/", http.RedirectHandler("/web/", http.StatusFound))
Expand All @@ -145,13 +189,13 @@ func (l *Lightning) goServe(statusAddr string, realAddrWriter io.Writer) error {
mux.HandleFunc("/debug/pprof/trace", pprof.Trace)

handleTasks := http.StripPrefix("/tasks", http.HandlerFunc(l.handleTask))
mux.Handle("/tasks", handleTasks)
mux.Handle("/tasks/", handleTasks)
mux.HandleFunc("/progress/task", handleProgressTask)
mux.HandleFunc("/progress/table", handleProgressTable)
mux.HandleFunc("/pause", handlePause)
mux.HandleFunc("/resume", handleResume)
mux.HandleFunc("/loglevel", handleLogLevel)
mux.Handle("/tasks", httpHandleWrapper(handleTasks.ServeHTTP))
mux.Handle("/tasks/", httpHandleWrapper(handleTasks.ServeHTTP))
mux.HandleFunc("/progress/task", httpHandleWrapper(handleProgressTask))
mux.HandleFunc("/progress/table", httpHandleWrapper(handleProgressTable))
mux.HandleFunc("/pause", httpHandleWrapper(handlePause))
mux.HandleFunc("/resume", httpHandleWrapper(handleResume))
mux.HandleFunc("/loglevel", httpHandleWrapper(handleLogLevel))

mux.Handle("/web/", http.StripPrefix("/web", httpgzip.FileServer(web.Res, httpgzip.FileServerOptions{
IndexHTML: true,
Expand Down Expand Up @@ -215,7 +259,8 @@ func (l *Lightning) RunServer() error {
if err != nil {
return err
}
err = l.run(context.Background(), task, nil)
o := &options{}
err = l.run(context.Background(), task, o)
if err != nil && !common.IsContextCanceledError(err) {
restore.DeliverPauser.Pause() // force pause the progress on error
log.L().Error("tidb lightning encountered error", zap.Error(err))
Expand Down Expand Up @@ -559,7 +604,7 @@ func (l *Lightning) handlePostTask(w http.ResponseWriter, req *http.Request) {
writeJSONError(w, http.StatusBadRequest, "cannot read request", err)
return
}
log.L().Debug("received task config", zap.ByteString("content", data))
log.L().Info("received task config", zap.ByteString("content", data))

cfg := config.NewConfig()
if err = cfg.LoadFromGlobal(l.globalCfg); err != nil {
Expand Down
19 changes: 18 additions & 1 deletion br/pkg/lightning/metric/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,18 @@ import (
const (
// states used for the TableCounter labels
TableStatePending = "pending"
TableStateWritten = "written"
TableStateImported = "imported"
TableStateCompleted = "completed"

BytesStateTotalRestore = "total_restore" // total source data bytes needs to restore
BytesStateRestored = "restored" // source data bytes restored during restore engine
BytesStateRestoreWritten = "written" // bytes written during restore engine
BytesStateImported = "imported" // bytes imported during import engine
sleepymole marked this conversation as resolved.
Show resolved Hide resolved

ProgressPhaseTotal = "total" // total restore progress(not include post-process, like checksum and analyze)
ProgressPhaseRestore = "restore" // restore engine progress
ProgressPhaseImport = "import" // import engine progress

// results used for the TableCounter labels
TableResultSuccess = "success"
TableResultFailure = "failure"
Expand Down Expand Up @@ -193,6 +201,14 @@ var (
Help: "disk/memory size currently occupied by intermediate files in local backend",
}, []string{"medium"},
)

ProgressGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "lightning",
Name: "progress",
Help: "progress of lightning phase",
}, []string{"phase"},
)
)

//nolint:gochecknoinits // TODO: refactor
Expand All @@ -216,6 +232,7 @@ func init() {
prometheus.MustRegister(ChunkParserReadBlockSecondsHistogram)
prometheus.MustRegister(ApplyWorkerSecondsHistogram)
prometheus.MustRegister(LocalStorageUsageBytesGauge)
prometheus.MustRegister(ProgressGauge)
}

func RecordTableCount(status string, err error) {
Expand Down
114 changes: 85 additions & 29 deletions br/pkg/lightning/restore/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -1162,6 +1162,8 @@ func (rc *Controller) buildRunPeriodicActionAndCancelFunc(ctx context.Context, s
case <-logProgressChan:
// log the current progress periodically, so OPS will know that we're still working
nanoseconds := float64(time.Since(start).Nanoseconds())
totalRestoreBytes := metric.ReadCounter(metric.BytesCounter.WithLabelValues(metric.BytesStateTotalRestore))
restoredBytes := metric.ReadCounter(metric.BytesCounter.WithLabelValues(metric.BytesStateRestored))
// the estimated chunk is not accurate(likely under estimated), but the actual count is not accurate
// before the last table start, so use the bigger of the two should be a workaround
estimated := metric.ReadCounter(metric.ChunkCounter.WithLabelValues(metric.ChunkStateEstimated))
Expand All @@ -1179,8 +1181,8 @@ func (rc *Controller) buildRunPeriodicActionAndCancelFunc(ctx context.Context, s
engineEstimated = enginePending
}
engineFinished := metric.ReadCounter(metric.ProcessedEngineCounter.WithLabelValues(metric.TableStateImported, metric.TableResultSuccess))
bytesWritten := metric.ReadCounter(metric.BytesCounter.WithLabelValues(metric.TableStateWritten))
bytesImported := metric.ReadCounter(metric.BytesCounter.WithLabelValues(metric.TableStateImported))
bytesWritten := metric.ReadCounter(metric.BytesCounter.WithLabelValues(metric.BytesStateRestoreWritten))
bytesImported := metric.ReadCounter(metric.BytesCounter.WithLabelValues(metric.BytesStateImported))

var state string
var remaining zap.Field
Expand All @@ -1197,37 +1199,64 @@ func (rc *Controller) buildRunPeriodicActionAndCancelFunc(ctx context.Context, s
state = "preparing"
}

// since we can't accurately estimate the extra time cost by import after all writing are finished,
// so here we use estimatedWritingProgress * 0.8 + estimatedImportingProgress * 0.2 as the total
// progress.
// lightning restore is separated into restore engine and import engine, they are both parallelized
sleepymole marked this conversation as resolved.
Show resolved Hide resolved
// and pipelined between engines, so we can only weight the progress of those 2 phase to get the
// total progress.
//
// for local & importer backend:
// in most case import engine is faster since there's little computations, but inside one engine
// restore and import is serialized, the progress of those two will not differ too much, and
// import engine determines the end time of the whole restore, so we average them for now.
sleepymole marked this conversation as resolved.
Show resolved Hide resolved
// the result progress may fall behind the real progress if import is faster.
//
// for tidb backend, we do nothing during import engine, so we use restore engine progress as the
// total progress.
restoreBytesField := zap.Skip()
importBytesField := zap.Skip()
remaining = zap.Skip()
totalPercent := 0.0
if finished > 0 {
writePercent := math.Min(finished/estimated, 1.0)
importPercent := 1.0
if bytesWritten > 0 {
totalBytes := bytesWritten / writePercent
importPercent = math.Min(bytesImported/totalBytes, 1.0)
if restoredBytes > 0 {
restorePercent := math.Min(restoredBytes/totalRestoreBytes, 1.0)
metric.ProgressGauge.WithLabelValues(metric.ProgressPhaseRestore).Set(restorePercent)
if rc.cfg.TikvImporter.Backend != config.BackendTiDB {
sleepymole marked this conversation as resolved.
Show resolved Hide resolved
var importPercent float64
if bytesWritten > 0 {
// estimate total import bytes from written bytes
// when importPercent = 1, totalImportBytes = bytesWritten, but there's case
// bytesImported may bigger or smaller than bytesWritten such as when deduplicate
// we calculate progress using engines then use the bigger one in case bytesImported is
// smaller.
totalImportBytes := bytesWritten / restorePercent
biggerPercent := math.Max(bytesImported/totalImportBytes, engineFinished/engineEstimated)
importPercent = math.Min(biggerPercent, 1.0)
importBytesField = zap.String("import-bytes", fmt.Sprintf("%s/%s(estimated)",
units.BytesSize(bytesImported), units.BytesSize(totalImportBytes)))
}
metric.ProgressGauge.WithLabelValues(metric.ProgressPhaseImport).Set(importPercent)
totalPercent = (restorePercent + importPercent) / 2
} else {
totalPercent = restorePercent
}
totalPercent = writePercent*0.8 + importPercent*0.2
if totalPercent < 1.0 {
remainNanoseconds := (1.0 - totalPercent) / totalPercent * nanoseconds
remaining = zap.Duration("remaining", time.Duration(remainNanoseconds).Round(time.Second))
}
restoreBytesField = zap.String("restore-bytes", fmt.Sprintf("%s/%s",
units.BytesSize(restoredBytes), units.BytesSize(totalRestoreBytes)))
}
metric.ProgressGauge.WithLabelValues(metric.ProgressPhaseTotal).Set(totalPercent)

formatPercent := func(finish, estimate float64) string {
speed := ""
if estimated > 0 {
speed = fmt.Sprintf(" (%.1f%%)", finish/estimate*100)
formatPercent := func(num, denom float64) string {
if denom > 0 {
return fmt.Sprintf(" (%.1f%%)", num/denom*100)
}
return speed
return ""
}

// avoid output bytes speed if there are no unfinished chunks
chunkSpeed := zap.Skip()
encodeSpeedField := zap.Skip()
if bytesRead > 0 {
chunkSpeed = zap.Float64("speed(MiB/s)", bytesRead/(1048576e-9*nanoseconds))
encodeSpeedField = zap.Float64("encode speed(MiB/s)", bytesRead/(1048576e-9*nanoseconds))
}

// Note: a speed of 28 MiB/s roughly corresponds to 100 GiB/hour.
Expand All @@ -1237,7 +1266,8 @@ func (rc *Controller) buildRunPeriodicActionAndCancelFunc(ctx context.Context, s
zap.String("tables", fmt.Sprintf("%.0f/%.0f%s", completedTables, totalTables, formatPercent(completedTables, totalTables))),
zap.String("chunks", fmt.Sprintf("%.0f/%.0f%s", finished, estimated, formatPercent(finished, estimated))),
zap.String("engines", fmt.Sprintf("%.f/%.f%s", engineFinished, engineEstimated, formatPercent(engineFinished, engineEstimated))),
chunkSpeed,
restoreBytesField, importBytesField,
encodeSpeedField,
zap.String("state", state),
remaining,
)
Expand Down Expand Up @@ -1447,6 +1477,7 @@ func (rc *Controller) restoreTables(ctx context.Context) (finalErr error) {
finalErr = err
return
}
logTask.End(zap.ErrorLevel, nil)
// clean up task metas
if cleanup {
logTask.Info("cleanup task metas")
Expand All @@ -1473,11 +1504,13 @@ func (rc *Controller) restoreTables(ctx context.Context) (finalErr error) {
for task := range taskCh {
tableLogTask := task.tr.logger.Begin(zap.InfoLevel, "restore table")
web.BroadcastTableCheckpoint(task.tr.tableName, task.cp)

needPostProcess, err := task.tr.restoreTable(ctx2, rc, task.cp)

err = common.NormalizeOrWrapErr(common.ErrRestoreTable, err, task.tr.tableName)
tableLogTask.End(zap.ErrorLevel, err)
web.BroadcastError(task.tr.tableName, err)
metric.RecordTableCount("completed", err)
metric.RecordTableCount(metric.TableStateCompleted, err)
restoreErr.Set(err)
if needPostProcess {
postProcessTaskChan <- task
Expand All @@ -1487,6 +1520,8 @@ func (rc *Controller) restoreTables(ctx context.Context) (finalErr error) {
}()
}

var allTasks []task
var totalDataSizeToRestore int64
for _, dbMeta := range rc.dbMetas {
dbInfo := rc.dbInfos[dbMeta.Name]
for _, tableMeta := range dbMeta.Tables {
Expand All @@ -1508,15 +1543,33 @@ func (rc *Controller) restoreTables(ctx context.Context) (finalErr error) {
return errors.Trace(err)
}

wg.Add(1)
select {
case taskCh <- task{tr: tr, cp: cp}:
case <-ctx.Done():
return ctx.Err()
allTasks = append(allTasks, task{tr: tr, cp: cp})

if len(cp.Engines) == 0 {
for _, fi := range tableMeta.DataFiles {
totalDataSizeToRestore += fi.FileMeta.FileSize
}
} else {
for _, eng := range cp.Engines {
for _, chunk := range eng.Chunks {
totalDataSizeToRestore += chunk.Chunk.EndOffset - chunk.Chunk.Offset
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems when task is paused and resumed, we'll use unfinished bytes to calculate progress. This will display a 0% progress when resuming, which makes me think that the resuming is failed and we're starting from scratch 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's consistent to previous impl, cal progress from current checkpoint, not from the datafile begin.

}
}
}
}
}

metric.BytesCounter.WithLabelValues(metric.BytesStateTotalRestore).Add(float64(totalDataSizeToRestore))

for i := range allTasks {
wg.Add(1)
select {
case taskCh <- allTasks[i]:
case <-ctx.Done():
return ctx.Err()
}
}

wg.Wait()
// if context is done, should return directly
select {
Expand Down Expand Up @@ -2154,7 +2207,8 @@ func (cr *chunkRestore) deliverLoop(
var kvPacket []deliveredKVs
// init these two field as checkpoint current value, so even if there are no kv pairs delivered,
// chunk checkpoint should stay the same
offset := cr.chunk.Chunk.Offset
startOffset := cr.chunk.Chunk.Offset
currOffset := startOffset
rowID := cr.chunk.Chunk.PrevRowIDMax

populate:
Expand All @@ -2168,7 +2222,7 @@ func (cr *chunkRestore) deliverLoop(
for _, p := range kvPacket {
p.kvs.ClassifyAndAppend(&dataKVs, &dataChecksum, &indexKVs, &indexChecksum)
columns = p.columns
offset = p.offset
currOffset = p.offset
rowID = p.rowID
}
case <-ctx.Done():
Expand Down Expand Up @@ -2231,9 +2285,11 @@ func (cr *chunkRestore) deliverLoop(
// In local mode, we should write these checkpoint after engine flushed.
cr.chunk.Checksum.Add(&dataChecksum)
cr.chunk.Checksum.Add(&indexChecksum)
cr.chunk.Chunk.Offset = offset
cr.chunk.Chunk.Offset = currOffset
cr.chunk.Chunk.PrevRowIDMax = rowID

metric.BytesCounter.WithLabelValues(metric.BytesStateRestored).Add(float64(currOffset - startOffset))

if dataChecksum.SumKVS() != 0 || indexChecksum.SumKVS() != 0 {
// No need to save checkpoint if nothing was delivered.
dataSynced = cr.maybeSaveCheckpoint(rc, t, engineID, cr.chunk, dataEngine, indexEngine)
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/restore/table_restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -538,7 +538,7 @@ func (tr *TableRestore) restoreEngine(
}
if err == nil {
metric.ChunkCounter.WithLabelValues(metric.ChunkStateFinished).Add(remainChunkCnt)
metric.BytesCounter.WithLabelValues(metric.TableStateWritten).Add(float64(cr.chunk.Checksum.SumSize()))
metric.BytesCounter.WithLabelValues(metric.BytesStateRestoreWritten).Add(float64(cr.chunk.Checksum.SumSize()))
if dataFlushStatus != nil && indexFlushStaus != nil {
if dataFlushStatus.Flushed() && indexFlushStaus.Flushed() {
saveCheckpoint(rc, tr, engineID, cr.chunk)
Expand Down