Skip to content

Commit

Permalink
feat:add mysql table meta func && sql addEscape, delEscape func issue… (
Browse files Browse the repository at this point in the history
apache#294)

* feat:add mysql table meta func && sql addEscape, delEscape func issue#290

* fix:TestMetaCache func rename

* fix:ci lint fail fix

* fix: solve GetTableMeta return type && constant define

* fix:solve code format and meta cache func fault

* fix:solev git ci fail

* fix:solev git ci fail

* format:format code

* fix:merge master && format code

* fix:solve name conflict

Co-authored-by: 王瑞 <[email protected]>
Co-authored-by: wangrui130 <[email protected]>
  • Loading branch information
3 people authored Oct 13, 2022
1 parent 15d6a1a commit eadf8bf
Show file tree
Hide file tree
Showing 17 changed files with 1,021 additions and 29 deletions.
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ require (
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/go-cmp v0.5.9 // indirect
github.com/gorilla/websocket v1.4.2 // indirect
github.com/grpc-ecosystem/grpc-opentracing v0.0.0-20180507213350-8e809c8a8645 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
Expand Down Expand Up @@ -140,6 +141,7 @@ require (
gopkg.in/ini.v1 v1.62.0 // indirect
gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
gotest.tools v2.2.0+incompatible
moul.io/http2curl v1.0.0 // indirect
vimagination.zapto.org/memio v0.0.0-20200222190306-588ebc67b97d // indirect
)
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1421,6 +1421,7 @@ gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C
gopkg.in/yaml.v3 v3.0.0/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gotest.tools v2.2.0+incompatible h1:VsBPFP1AI068pPrMxtb/S8Zkgf9xEmTLJjfM+P5UIEo=
gotest.tools v2.2.0+incompatible/go.mod h1:DsYFclhRJ6vuDpmuTbkuFWG+y2sxOXAzmJt81HFBacw=
honnef.co/go/tools v0.0.0-20180728063816-88497007e858/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
Expand Down
17 changes: 17 additions & 0 deletions pkg/datasource/sql/async_worker.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,20 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package sql

import (
Expand Down
30 changes: 15 additions & 15 deletions pkg/datasource/sql/datasource/base/meta_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package base

import (
"context"
"database/sql"
"errors"
"sync"
"time"
Expand All @@ -29,7 +30,7 @@ import (
type (
// trigger
trigger interface {
LoadOne(table string) (types.TableMeta, error)
LoadOne(ctx context.Context, dbName string, table string, conn *sql.Conn) (*types.TableMeta, error)

LoadAll() ([]types.TableMeta, error)
}
Expand All @@ -46,13 +47,14 @@ type BaseTableMetaCache struct {
expireDuration time.Duration
capity int32
size int32
dbName string
cache map[string]*entry
cancel context.CancelFunc
trigger trigger
}

// NewBaseCache
func NewBaseCache(capity int32, expireDuration time.Duration, trigger trigger) (*BaseTableMetaCache, error) {
func NewBaseCache(capity int32, dbName string, expireDuration time.Duration, trigger trigger) *BaseTableMetaCache {
ctx, cancel := context.WithCancel(context.Background())

c := &BaseTableMetaCache{
Expand All @@ -61,15 +63,14 @@ func NewBaseCache(capity int32, expireDuration time.Duration, trigger trigger) (
size: 0,
expireDuration: expireDuration,
cache: map[string]*entry{},
dbName: dbName,
cancel: cancel,
trigger: trigger,
}

if err := c.Init(ctx); err != nil {
return nil, err
}
c.Init(ctx)

return c, nil
return c
}

// init
Expand Down Expand Up @@ -135,32 +136,31 @@ func (c *BaseTableMetaCache) scanExpire(ctx context.Context) {
}

// GetTableMeta
func (c *BaseTableMetaCache) GetTableMeta(table string) (types.TableMeta, error) {
func (c *BaseTableMetaCache) GetTableMeta(ctx context.Context, tableName string, conn *sql.Conn) (types.TableMeta, error) {
c.lock.Lock()
defer c.lock.Unlock()

v, ok := c.cache[table]

v, ok := c.cache[tableName]
if !ok {
meta, err := c.trigger.LoadOne(table)
meta, err := c.trigger.LoadOne(ctx, c.dbName, tableName, conn)
if err != nil {
return types.TableMeta{}, err
}

if !meta.IsEmpty() {
c.cache[table] = &entry{
value: meta,
if meta != nil && !meta.IsEmpty() {
c.cache[tableName] = &entry{
value: *meta,
lastAccess: time.Now(),
}

return meta, nil
return *meta, nil
}

return types.TableMeta{}, errors.New("not found table metadata")
}

v.lastAccess = time.Now()
c.cache[table] = v
c.cache[tableName] = v

return v.value, nil
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/datasource/sql/datasource/datasource_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ type TableMetaCache interface {
// Init
Init(ctx context.Context, conn *sql.DB) error
// GetTableMeta
GetTableMeta(table string) (types.TableMeta, error)
GetTableMeta(ctx context.Context, table string, conn *sql.Conn) (*types.TableMeta, error)
// Destroy
Destroy() error
}
Expand Down
40 changes: 36 additions & 4 deletions pkg/datasource/sql/datasource/mysql/meta_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,23 +20,55 @@ package mysql
import (
"context"
"database/sql"
"sync"
"time"

"github.com/pkg/errors"

"github.com/seata/seata-go/pkg/datasource/sql/datasource/base"
"github.com/seata/seata-go/pkg/datasource/sql/types"
)

var (
capacity int32 = 1024
EexpireTime = 15 * time.Minute
tableMetaInstance *tableMetaCache
tableMetaOnce sync.Once
DBName = "seata"
)

type tableMetaCache struct {
cache *base.BaseTableMetaCache
tableMetaCache *base.BaseTableMetaCache
}

func GetTableMetaInstance() *tableMetaCache {
// Todo constant.DBName get from config
tableMetaOnce.Do(func() {
tableMetaInstance = &tableMetaCache{
tableMetaCache: base.NewBaseCache(capacity, DBName, EexpireTime, NewMysqlTrigger()),
}
})

return tableMetaInstance
}

// Init
func (c *tableMetaCache) Init(ctx context.Context, conn *sql.DB) error {
return nil
}

// GetTableMeta
func (c *tableMetaCache) GetTableMeta(table string) (types.TableMeta, error) {
return types.TableMeta{}, nil
// GetTableMeta get table info from cache or information schema
func (c *tableMetaCache) GetTableMeta(ctx context.Context, tableName string, conn *sql.Conn) (*types.TableMeta, error) {
if tableName == "" {
return nil, errors.New("TableMeta cannot be fetched without tableName")
}

tableMeta, err := c.tableMetaCache.GetTableMeta(ctx, tableName, conn)
if err != nil {
return nil, err
}

return &tableMeta, nil
}

// Destroy
Expand Down
55 changes: 55 additions & 0 deletions pkg/datasource/sql/datasource/mysql/meta_cache_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package mysql

import (
"context"
"database/sql"
_ "github.com/go-sql-driver/mysql"
"gotest.tools/assert"
"testing"
)

// TestGetTableMeta
func TestGetTableMeta(t *testing.T) {
// local test can annotation t.SkipNow()
t.SkipNow()

testTableMeta := func() {
metaInstance := GetTableMetaInstance()

db, err := sql.Open("mysql", "root:123456@tcp(127.0.0.1:3306)/seata?multiStatements=true")
if err != nil {
t.Fatal(err)
}

defer db.Close()

ctx := context.Background()
conn, _ := db.Conn(ctx)

tableMeta, err := metaInstance.GetTableMeta(ctx, "undo_log", conn)
assert.NilError(t, err)

t.Logf("%+v", tableMeta)
}

t.Run("testTableMeta", func(t *testing.T) {
testTableMeta()
})
}
Loading

0 comments on commit eadf8bf

Please sign in to comment.