Skip to content

Commit

Permalink
*: LOAD DATA support load one file from s3 and other OSS (#40489)
Browse files Browse the repository at this point in the history
ref #40499
  • Loading branch information
lance6716 authored Jan 18, 2023
1 parent 89de573 commit dc30a5b
Show file tree
Hide file tree
Showing 16 changed files with 9,602 additions and 9,269 deletions.
14 changes: 14 additions & 0 deletions br/pkg/storage/parse.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@ func ParseRawURL(rawURL string) (*url.URL, error) {
return u, nil
}

// ParseBackendFromURL constructs a structured backend description from the
// *url.URL.
func ParseBackendFromURL(u *url.URL, options *BackendOptions) (*backuppb.StorageBackend, error) {
return parseBackend(u, "", options)
}

// ParseBackend constructs a structured backend description from the
// storage URL.
func ParseBackend(rawURL string, options *BackendOptions) (*backuppb.StorageBackend, error) {
Expand All @@ -45,6 +51,14 @@ func ParseBackend(rawURL string, options *BackendOptions) (*backuppb.StorageBack
if err != nil {
return nil, errors.Trace(err)
}
return parseBackend(u, rawURL, options)
}

func parseBackend(u *url.URL, rawURL string, options *BackendOptions) (*backuppb.StorageBackend, error) {
if rawURL == "" {
// try to handle hdfs for ParseBackendFromURL caller
rawURL = u.String()
}
switch u.Scheme {
case "":
absPath, err := filepath.Abs(rawURL)
Expand Down
3 changes: 3 additions & 0 deletions br/pkg/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,9 @@ func Create(ctx context.Context, backend *backuppb.StorageBackend, sendCreds boo

// New creates an ExternalStorage with options.
func New(ctx context.Context, backend *backuppb.StorageBackend, opts *ExternalStorageOptions) (ExternalStorage, error) {
if opts == nil {
opts = &ExternalStorageOptions{}
}
switch backend := backend.Backend.(type) {
case *backuppb.StorageBackend_Local:
if backend.Local == nil {
Expand Down
3 changes: 2 additions & 1 deletion executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -937,6 +937,7 @@ func (b *executorBuilder) buildLoadData(v *plannercore.LoadData) Executor {
IgnoreLines: v.IgnoreLines,
ColumnAssignments: v.ColumnAssignments,
ColumnsAndUserVars: v.ColumnsAndUserVars,
OnDuplicate: v.OnDuplicate,
Ctx: b.ctx,
}
columnNames := loadDataInfo.initFieldMappings()
Expand All @@ -947,7 +948,7 @@ func (b *executorBuilder) buildLoadData(v *plannercore.LoadData) Executor {
}
loadDataExec := &LoadDataExec{
baseExecutor: newBaseExecutor(b.ctx, nil, v.ID()),
IsLocal: v.IsLocal,
FileLocRef: v.FileLocRef,
OnDuplicate: v.OnDuplicate,
loadDataInfo: loadDataInfo,
}
Expand Down
181 changes: 170 additions & 11 deletions executor/load_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,27 @@
package executor

import (
"bufio"
"bytes"
"context"
"fmt"
"io"
"path/filepath"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/parser/terror"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessiontxn"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
Expand All @@ -40,13 +47,15 @@ import (
var (
null = []byte("NULL")
taskQueueSize = 16 // the maximum number of pending tasks to commit in queue
// InTest is a flag that bypass gcs authentication in unit tests.
InTest bool
)

// LoadDataExec represents a load data executor.
type LoadDataExec struct {
baseExecutor

IsLocal bool
FileLocRef ast.FileLocRefTp
OnDuplicate ast.OnDuplicateKeyHandlingType
loadDataInfo *LoadDataInfo
}
Expand All @@ -55,29 +64,73 @@ type LoadDataExec struct {
func (e *LoadDataExec) Next(ctx context.Context, req *chunk.Chunk) error {
req.GrowAndReset(e.maxChunkSize)
// TODO: support load data without local field.
if !e.IsLocal {
if e.FileLocRef == ast.FileLocServer {
return errors.New("Load Data: don't support load data without local field")
}
e.loadDataInfo.OnDuplicate = e.OnDuplicate
// TODO: support lines terminated is "".
if len(e.loadDataInfo.LinesInfo.Terminated) == 0 {
return errors.New("Load Data: don't support load data terminated is nil")
}

sctx := e.loadDataInfo.ctx
val := sctx.Value(LoadDataVarKey)
if val != nil {
sctx.SetValue(LoadDataVarKey, nil)
return errors.New("Load Data: previous load data option isn't closed normal")
}
if e.loadDataInfo.Path == "" {
return errors.New("Load Data: infile path is empty")
}
sctx.SetValue(LoadDataVarKey, e.loadDataInfo)
if !e.loadDataInfo.Table.Meta().IsBaseTable() {
return errors.New("can only load data into base tables")
}

switch e.FileLocRef {
case ast.FileLocServer:
panic("FileLocServer should be handled earlier")
case ast.FileLocClient:
// let caller use handleQuerySpecial to read data in this connection
sctx := e.loadDataInfo.ctx
val := sctx.Value(LoadDataVarKey)
if val != nil {
sctx.SetValue(LoadDataVarKey, nil)
return errors.New("Load Data: previous load data option wasn't closed normally")
}
sctx.SetValue(LoadDataVarKey, e.loadDataInfo)
case ast.FileLocRemote:
return e.loadFromRemote(ctx)
}
return nil
}

func (e *LoadDataExec) loadFromRemote(ctx context.Context) error {
u, err := storage.ParseRawURL(e.loadDataInfo.Path)
if err != nil {
return err
}
var filename string
u.Path, filename = filepath.Split(u.Path)
b, err := storage.ParseBackendFromURL(u, nil)
if err != nil {
return err
}
if b.GetLocal() != nil {
return errors.Errorf("Load Data: don't support load data from tidb-server when set REMOTE, path %s", e.loadDataInfo.Path)
}

opt := &storage.ExternalStorageOptions{}
if InTest {
opt.NoCredentials = true
}
s, err := storage.New(ctx, b, opt)
if err != nil {
return err
}
fileReader, err := s.Open(ctx, filename)
if err != nil {
return err
}
defer fileReader.Close()
reader := bufio.NewReader(fileReader)

return e.loadDataInfo.Load(ctx, func() ([]byte, error) {
return reader.ReadBytes('\n')
})
}

// Close implements the Executor Close interface.
func (e *LoadDataExec) Close() error {
if e.runtimeStats != nil && e.loadDataInfo != nil && e.loadDataInfo.stats != nil {
Expand All @@ -103,6 +156,7 @@ type CommitTask struct {
}

// LoadDataInfo saves the information of loading data operation.
// TODO: rename it and remove unnecessary public methods.
type LoadDataInfo struct {
*InsertValues

Expand Down Expand Up @@ -132,6 +186,111 @@ type FieldMapping struct {
UserVar *ast.VariableExpr
}

// Load reads from readerFn and do load data job.
func (e *LoadDataInfo) Load(ctx context.Context, readerFn func() ([]byte, error)) error {
e.InitQueues()
e.SetMaxRowsInBatch(uint64(e.Ctx.GetSessionVars().DMLBatchSize))
e.StartStopWatcher()
// let stop watcher goroutine quit
defer e.ForceQuit()
err := sessiontxn.NewTxn(ctx, e.Ctx)
if err != nil {
return err
}
// processStream process input data, enqueue commit task
wg := new(sync.WaitGroup)
wg.Add(1)
go processStream(ctx, readerFn, e, wg)
err = e.CommitWork(ctx)
wg.Wait()
return err
}

// processStream process input stream from network
func processStream(ctx context.Context, readerFn func() ([]byte, error), loadDataInfo *LoadDataInfo, wg *sync.WaitGroup) {
var err error
var shouldBreak bool
var prevData, curData []byte
defer func() {
r := recover()
if r != nil {
logutil.Logger(ctx).Error("process routine panicked",
zap.Reflect("r", r),
zap.Stack("stack"))
}
if err != nil || r != nil {
loadDataInfo.ForceQuit()
} else {
loadDataInfo.CloseTaskQueue()
}
wg.Done()
}()
for {
curData, err = readerFn()
if err != nil {
if terror.ErrorNotEqual(err, io.EOF) {
logutil.Logger(ctx).Error("read data for LOAD DATA failed", zap.Error(err))
break
}
err = nil
}
if len(curData) == 0 {
loadDataInfo.Drained = true
shouldBreak = true
if len(prevData) == 0 {
break
}
}
select {
case <-loadDataInfo.QuitCh:
err = errors.New("processStream forced to quit")
default:
}
if err != nil {
break
}
// prepare batch and enqueue task
prevData, err = insertDataWithCommit(ctx, prevData, curData, loadDataInfo)
if err != nil {
break
}
if shouldBreak {
break
}
}
if err != nil {
logutil.Logger(ctx).Error("load data process stream error", zap.Error(err))
return
}
if err = loadDataInfo.EnqOneTask(ctx); err != nil {
logutil.Logger(ctx).Error("load data process stream error", zap.Error(err))
return
}
}

func insertDataWithCommit(ctx context.Context, prevData,
curData []byte, loadDataInfo *LoadDataInfo) ([]byte, error) {
var err error
var reachLimit bool
for {
prevData, reachLimit, err = loadDataInfo.InsertData(ctx, prevData, curData)
if err != nil {
return nil, err
}
if !reachLimit {
break
}
// push into commit task queue
err = loadDataInfo.EnqOneTask(ctx)
if err != nil {
return prevData, err
}
curData = prevData
prevData = nil
}
return prevData, nil
}

// reorderColumns reorder the e.insertColumns according to the order of columnNames
// Note: We must ensure there must be one-to-one mapping between e.insertColumns and columnNames in terms of column name.
func (e *LoadDataInfo) reorderColumns(columnNames []string) error {
Expand Down
18 changes: 18 additions & 0 deletions executor/loadremotetest/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
load("@io_bazel_rules_go//go:def.bzl", "go_test")

go_test(
name = "loadremotetest_test",
srcs = [
"main_test.go",
"one_csv_test.go",
"util_test.go",
],
deps = [
"//executor",
"//kv",
"//testkit",
"@com_github_fsouza_fake_gcs_server//fakestorage",
"@com_github_stretchr_testify//suite",
"@org_uber_go_goleak//:goleak",
],
)
33 changes: 33 additions & 0 deletions executor/loadremotetest/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package loadremotetest

import (
"testing"

"go.uber.org/goleak"
)

func TestMain(m *testing.M) {
opts := []goleak.Option{
goleak.IgnoreTopFunction("github.com/golang/glog.(*loggingT).flushDaemon"),
goleak.IgnoreTopFunction("github.com/lestrrat-go/httprc.runFetchWorker"),
goleak.IgnoreTopFunction("github.com/tikv/client-go/v2/txnkv/transaction.keepAlive"),
goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"),
goleak.IgnoreTopFunction("internal/poll.runtime_pollWait"),
goleak.IgnoreTopFunction("net.(*netFD).connect.func2"),
}
goleak.VerifyTestMain(m, opts...)
}
Loading

0 comments on commit dc30a5b

Please sign in to comment.