Skip to content

Commit

Permalink
feat(fence): implement fence use datasource proxy (apache#420)
Browse files Browse the repository at this point in the history
  • Loading branch information
106umao authored and georgehao committed May 7, 2023
1 parent 81dfe65 commit e99c9c1
Show file tree
Hide file tree
Showing 10 changed files with 456 additions and 45 deletions.
5 changes: 4 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@ require (
vimagination.zapto.org/byteio v0.0.0-20200222190125-d27cba0f0b10
)

require github.com/agiledragon/gomonkey/v2 v2.9.0
require (
github.com/agiledragon/gomonkey v2.0.2+incompatible
github.com/agiledragon/gomonkey/v2 v2.9.0
)

require (
github.com/RoaringBitmap/roaring v1.2.0 // indirect
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ github.com/VividCortex/gohistogram v1.0.0/go.mod h1:Pf5mBqqDxYaXu3hDrrU+w6nw50o/
github.com/Workiva/go-datastructures v1.0.52 h1:PLSK6pwn8mYdaoaCZEMsXBpBotr4HHn9abU0yMQt0NI=
github.com/Workiva/go-datastructures v1.0.52/go.mod h1:Z+F2Rca0qCsVYDS8z7bAGm8f3UkzuWYS/oBZz5a7VVA=
github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5/go.mod h1:SkGFH1ia65gfNATL8TAiHDNxPzPdmEL5uirI2Uyuz6c=
github.com/agiledragon/gomonkey v2.0.2+incompatible h1:eXKi9/piiC3cjJD1658mEE2o3NjkJ5vDLgYjCQu0Xlw=
github.com/agiledragon/gomonkey v2.0.2+incompatible/go.mod h1:2NGfXu1a80LLr2cmWXGBDaHEjb1idR6+FVlX5T3D9hw=
github.com/agiledragon/gomonkey/v2 v2.9.0 h1:PDiKKybR596O6FHW+RVSG0Z7uGCBNbmbUXh3uCNQ7Hc=
github.com/agiledragon/gomonkey/v2 v2.9.0/go.mod h1:ap1AmDzcVOAz1YpeJ3TCzIgstoaWLA6jbbgxfB4w2iY=
Expand Down
48 changes: 27 additions & 21 deletions pkg/rm/tcc/fence/fence_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,36 +25,42 @@ import (
"github.com/seata/seata-go/pkg/rm/tcc/fence/enum"
"github.com/seata/seata-go/pkg/rm/tcc/fence/handler"
"github.com/seata/seata-go/pkg/tm"
"github.com/seata/seata-go/pkg/util/log"
)

// WithFence This method is a suspended API interface that asserts the phase timing of a transaction
// WithFence Execute the fence database operation first and then call back the business method
func WithFence(ctx context.Context, tx *sql.Tx, callback func() error) (err error) {
if err = DoFence(ctx, tx); err != nil {
return err
}

if err := callback(); err != nil {
return fmt.Errorf("the business method error msg of: %p, [%w]", callback, err)
}

return
}

// DeFence This method is a suspended API interface that asserts the phase timing of a transaction
// and performs corresponding database operations to ensure transaction consistency
// case 1: if fencePhase is FencePhaseNotExist, will return a fence not found error.
// case 2: if fencePhase is FencePhasePrepare, will do prepare fence operation.
// case 3: if fencePhase is FencePhaseCommit, will do commit fence operation.
// case 4: if fencePhase is FencePhaseRollback, will do rollback fence operation.
// case 5: if fencePhase not in above case, will return a fence phase illegal error.
func WithFence(ctx context.Context, tx *sql.Tx, callback func() error) (err error) {
fp := tm.GetFencePhase(ctx)
h := handler.GetFenceHandler()

switch {
case fp == enum.FencePhaseNotExist:
err = fmt.Errorf("xid %s, tx name %s, fence phase not exist", tm.GetXID(ctx), tm.GetTxName(ctx))
case fp == enum.FencePhasePrepare:
err = h.PrepareFence(ctx, tx, callback)
case fp == enum.FencePhaseCommit:
err = h.CommitFence(ctx, tx, callback)
case fp == enum.FencePhaseRollback:
err = h.RollbackFence(ctx, tx, callback)
default:
err = fmt.Errorf("fence phase: %v illegal", fp)
}
func DoFence(ctx context.Context, tx *sql.Tx) error {
hd := handler.GetFenceHandler()
phase := tm.GetFencePhase(ctx)

if err != nil {
log.Error(err)
switch phase {
case enum.FencePhaseNotExist:
return fmt.Errorf("xid %s, tx name %s, fence phase not exist", tm.GetXID(ctx), tm.GetTxName(ctx))
case enum.FencePhasePrepare:
return hd.PrepareFence(ctx, tx)
case enum.FencePhaseCommit:
return hd.CommitFence(ctx, tx)
case enum.FencePhaseRollback:
return hd.RollbackFence(ctx, tx)
}

return
return fmt.Errorf("fence phase: %v illegal", phase)
}
100 changes: 100 additions & 0 deletions pkg/rm/tcc/fence/fence_driver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package fence

import (
"context"
"database/sql"
"database/sql/driver"

"github.com/go-sql-driver/mysql"
"github.com/pkg/errors"
"github.com/seata/seata-go/pkg/util/log"
)

const (
// SeataFenceMySQLDriver MySQL driver for fence
SeataFenceMySQLDriver = "seata-fence-mysql"
)

func init() {
sql.Register(SeataFenceMySQLDriver, &FenceDriver{
TargetDriver: &mysql.MySQLDriver{},
})
}

type FenceDriver struct {
TargetDriver driver.Driver
TargetDB *sql.DB
}

func (fd *FenceDriver) Open(name string) (driver.Conn, error) {
return nil, errors.New("operation unsupported")
}

func (fd *FenceDriver) OpenConnector(name string) (connector driver.Connector, re error) {
connector = &dsnConnector{dsn: name, driver: fd.TargetDriver}
if driverCtx, ok := fd.TargetDriver.(driver.DriverContext); ok {
connector, re = driverCtx.OpenConnector(name)
if re != nil {
log.Errorf("open connector: %w", re)
return nil, re
}
}

fd.TargetDB = sql.OpenDB(connector)

return &SeataFenceConnector{
TargetConnector: connector,
TargetDB: fd.TargetDB,
}, nil
}

type dsnConnector struct {
dsn string
driver driver.Driver
}

func (connector *dsnConnector) Connect(_ context.Context) (driver.Conn, error) {
return connector.driver.Open(connector.dsn)
}

func (connector *dsnConnector) Driver() driver.Driver {
return connector.driver
}

type SeataFenceConnector struct {
TargetConnector driver.Connector
TargetDB *sql.DB
}

func (connector *SeataFenceConnector) Connect(ctx context.Context) (driver.Conn, error) {
targetConn, err := connector.TargetConnector.Connect(ctx)
if err != nil {
return nil, err
}

return &FenceConn{
TargetConn: targetConn,
TargetDB: connector.TargetDB,
}, nil
}

func (connector *SeataFenceConnector) Driver() driver.Driver {
return connector.TargetConnector.Driver()
}
159 changes: 159 additions & 0 deletions pkg/rm/tcc/fence/fence_driver_conn.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package fence

import (
"context"
"database/sql"
"database/sql/driver"

"github.com/pkg/errors"
"github.com/seata/seata-go/pkg/tm"
"github.com/seata/seata-go/pkg/util/log"
)

type FenceConn struct {
TargetConn driver.Conn
TargetDB *sql.DB
}

func (c *FenceConn) ResetSession(ctx context.Context) error {
resetter, ok := c.TargetConn.(driver.SessionResetter)
if !ok {
return driver.ErrSkip
}

return resetter.ResetSession(ctx)
}

func (c *FenceConn) Prepare(query string) (driver.Stmt, error) {
return c.TargetConn.Prepare(query)
}

func (c *FenceConn) PrepareContext(ctx context.Context, query string) (driver.Stmt, error) {
return c.TargetConn.Prepare(query)
}

func (c *FenceConn) Exec(query string, args []driver.Value) (driver.Result, error) {
execer, ok := c.TargetConn.(driver.Execer)
if !ok {
return nil, driver.ErrSkip
}

return execer.Exec(query, args)
}

func (c *FenceConn) ExecContext(ctx context.Context, query string, args []driver.NamedValue) (driver.Result, error) {
execerContext, ok := c.TargetConn.(driver.ExecerContext)
if !ok {
values := make([]driver.Value, 0, len(args))
for i := range args {
values = append(values, args[i].Value)
}
return c.Exec(query, values)
}

return execerContext.ExecContext(ctx, query, args)
}

func (c *FenceConn) Query(query string, args []driver.Value) (driver.Rows, error) {
queryer, ok := c.TargetConn.(driver.Queryer)
if !ok {
return nil, driver.ErrSkip
}

return queryer.Query(query, args)
}

func (c *FenceConn) QueryContext(ctx context.Context, query string, args []driver.NamedValue) (driver.Rows, error) {
QueryerContext, ok := c.TargetConn.(driver.QueryerContext)
if !ok {
values := make([]driver.Value, 0, len(args))

for i := range args {
values = append(values, args[i].Value)
}

return c.Query(query, values)
}

return QueryerContext.QueryContext(ctx, query, args)
}

func (c *FenceConn) Begin() (driver.Tx, error) {
return nil, errors.New("operation unsupport")
}

func (c *FenceConn) BeginTx(ctx context.Context, opts driver.TxOptions) (driver.Tx, error) {
beginer, ok := c.TargetConn.(driver.ConnBeginTx)
if !ok {
return nil, errors.New("operation unsupported")
}

tx, err := beginer.BeginTx(ctx, opts)
if err != nil {
return nil, err
}

if !tm.IsSeataContext(ctx) {
return nil, errors.New("there is not seata context")
}

// check if have been begin fence tx
if tm.IsFenceTxBegin(ctx) {
return tx, nil
}

tm.SetFenceTxBeginedFlag(ctx, true)

fenceTx, err := c.TargetDB.BeginTx(ctx, &sql.TxOptions{})
if err != nil {
return nil, err
}
defer func() {
if err != nil {
if err := fenceTx.Rollback(); err != nil {
log.Error(err)
}

// although it have not any db operations yet, is still rollback to avoid leak tx.
if err := tx.Rollback(); err != nil {
log.Error(err)
}
}
}()

// do fence operations
emptyCallback := func() error {
return nil
}

if err := WithFence(ctx, fenceTx, emptyCallback); err != nil {
return nil, err
}

return &FenceTx{
Ctx: ctx,
TargetTx: tx,
TargetFenceTx: fenceTx,
}, nil
}

func (c *FenceConn) Close() error {
return c.TargetConn.Close()
}
30 changes: 30 additions & 0 deletions pkg/rm/tcc/fence/fence_driver_conn_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package fence

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestBegin(t *testing.T) {
tx, err := (&FenceConn{}).Begin()
assert.NotNil(t, err)
assert.Nil(t, tx)
}
Loading

0 comments on commit e99c9c1

Please sign in to comment.