Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor DB transactions #255

Merged
merged 2 commits into from
Jan 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions chain_events/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,24 +7,24 @@ import (
log "github.com/sirupsen/logrus"
)

type handler interface {
type chainEventHandler interface {
Handle(context.Context, flow.Event)
}

type event struct {
handlers []handler
type chainEvent struct {
handlers []chainEventHandler
}

var Event event // singleton of type event
var ChainEvent chainEvent // singleton of type event

// Register adds an event handler for this event
func (e *event) Register(handler handler) {
func (e *chainEvent) Register(handler chainEventHandler) {
log.Debug("Registering Flow event handler")
e.handlers = append(e.handlers, handler)
}

// Trigger sends out an event with the payload
func (e *event) Trigger(ctx context.Context, payload flow.Event) {
func (e *chainEvent) Trigger(ctx context.Context, payload flow.Event) {
log.
WithFields(log.Fields{"payload": payload}).
Trace("Handling Flow event")
Expand Down
7 changes: 1 addition & 6 deletions chain_events/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func (l *ListenerImpl) run(ctx context.Context, start, end uint64) error {
}

for _, event := range events {
Event.Trigger(ctx, event)
ChainEvent.Trigger(ctx, event)
}

return nil
Expand Down Expand Up @@ -154,11 +154,6 @@ func (l *ListenerImpl) Start() Listener {
start := status.LatestHeight + 1 // LatestHeight has already been checked, add 1
end := min(latestBlock.Height, start+l.maxBlocks) // Limit maximum end
if err := l.run(ctx, start, end); err != nil {
if strings.Contains(err.Error(), "database is locked") {
// Sqlite throws this error from time to time when accessing it from
// multiple threads; listener is run in a separate thread.
return nil
}
return err
}
status.LatestHeight = end
Expand Down
19 changes: 15 additions & 4 deletions chain_events/store_gorm.go
Original file line number Diff line number Diff line change
@@ -1,23 +1,34 @@
package chain_events

import (
"sync"

ds_gorm "github.com/flow-hydraulics/flow-wallet-api/datastore/gorm"
"gorm.io/gorm"
"gorm.io/gorm/clause"
)

type GormStore struct {
db *gorm.DB
statusMutex sync.Mutex
db *gorm.DB
}

func NewGormStore(db *gorm.DB) Store {
return &GormStore{db}
return &GormStore{db: db}
}

// LockedStatus runs a transaction on the database manipulating 'status' of type ListenerStatus.
func (s *GormStore) LockedStatus(fn func(status *ListenerStatus) error) error {
return s.db.Transaction(func(tx *gorm.DB) error {
s.statusMutex.Lock()
defer s.statusMutex.Unlock()

return ds_gorm.Transaction(s.db, func(tx *gorm.DB) error {
status := ListenerStatus{}

if err := tx.FirstOrCreate(&status).Error; err != nil {
if err := tx.
// NOWAIT so this call will fail rather than use a stale value
Clauses(clause.Locking{Strength: "UPDATE", Options: "NOWAIT"}).
FirstOrCreate(&status).Error; err != nil {
return err // rollback
}

Expand Down
31 changes: 31 additions & 0 deletions datastore/gorm/transaction.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package gorm

import "gorm.io/gorm"

// Transaction performs a function on a gorm database transaction instance
// when using something else than sqlite as the dialector (mysql or psql).
// When using sqlite it will fallback to regular gorm database instance.
func Transaction(db *gorm.DB, fn func(tx *gorm.DB) error) error {
isSqlite := db.Config.Dialector.Name() == "sqlite"

var tx *gorm.DB

if !isSqlite {
tx = db.Begin()
} else {
tx = db
}

if err := fn(tx); err != nil {
if !isSqlite {
tx.Rollback()
}
return err
}

if !isSqlite {
tx.Commit()
}

return nil
}
24 changes: 20 additions & 4 deletions keys/store_gorm.go
Original file line number Diff line number Diff line change
@@ -1,24 +1,35 @@
package keys

import (
"sync"
"time"

"gorm.io/gorm"
"gorm.io/gorm/clause"

ds_gorm "github.com/flow-hydraulics/flow-wallet-api/datastore/gorm"
)

type GormStore struct {
db *gorm.DB
accountKeyMutex sync.Mutex
proposalKeyMutex sync.Mutex
db *gorm.DB
}

func NewGormStore(db *gorm.DB) Store {
return &GormStore{db}
return &GormStore{db: db}
}

func (s *GormStore) AccountKey(address string) (Storable, error) {
s.accountKeyMutex.Lock()
defer s.accountKeyMutex.Unlock()

k := Storable{}

err := s.db.Transaction(func(tx *gorm.DB) error {
err := ds_gorm.Transaction(s.db, func(tx *gorm.DB) error {
if err := tx.
// NOWAIT so this call will fail rather than use a stale value
Clauses(clause.Locking{Strength: "UPDATE", Options: "NOWAIT"}).
Where(&Storable{AccountAddress: address}).
Order("updated_at asc").
Limit(1).Find(&k).Error; err != nil {
Expand All @@ -36,10 +47,15 @@ func (s *GormStore) AccountKey(address string) (Storable, error) {
}

func (s *GormStore) ProposalKeyIndex() (int, error) {
s.proposalKeyMutex.Lock()
defer s.proposalKeyMutex.Unlock()

p := ProposalKey{}

err := s.db.Transaction(func(tx *gorm.DB) error {
err := ds_gorm.Transaction(s.db, func(tx *gorm.DB) error {
if err := tx.
// NOWAIT so this call will fail rather than use a stale value
Clauses(clause.Locking{Strength: "UPDATE", Options: "NOWAIT"}).
Order("updated_at asc").
Limit(1).Find(&p).Error; err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ func runServer(cfg *configs.Config) {
}()

// Register a handler for chain events
chain_events.Event.Register(&tokens.ChainEventHandler{
chain_events.ChainEvent.Register(&tokens.ChainEventHandler{
AccountService: accountService,
ChainListener: listener,
TemplateService: templateService,
Expand Down
2 changes: 1 addition & 1 deletion main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1499,7 +1499,7 @@ func TestTemplateHandlers(t *testing.T) {
body: strings.NewReader(fmt.Sprintf(`{"name":"TestToken","address":"%s"}`, cfg.AdminAddress)),
contentType: "application/json",
url: "/",
expected: `UNIQUE constraint failed: tokens.name`,
expected: `.*`, // Error message differs based on used db dialector
status: http.StatusBadRequest,
},
{
Expand Down
2 changes: 0 additions & 2 deletions tests/accounts_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,6 @@ func Test_Delete_Non_Custodial_Account_Is_Idempotent(t *testing.T) {

// Test if the service is able to concurrently create multiple accounts
func Test_Add_Multiple_New_Custodial_Accounts(t *testing.T) {
t.Skip("sqlite will cause a database locked error")

cfg := test.LoadConfig(t, testConfigPath)

instanceCount := 1
Expand Down
2 changes: 1 addition & 1 deletion tests/test/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func GetServices(t *testing.T, cfg *configs.Config) Services {
)

// Register a handler for chain events
chain_events.Event.Register(&tokens.ChainEventHandler{
chain_events.ChainEvent.Register(&tokens.ChainEventHandler{
AccountService: accountService,
ChainListener: listener,
TemplateService: templateService,
Expand Down