Skip to content

Commit

Permalink
Merge pull request #691 from mesg-foundation/fix/exeuction-db-race
Browse files Browse the repository at this point in the history
Add db transaction mechanism to database execution
  • Loading branch information
NicolasMahe authored Jan 15, 2019
2 parents 5fc2cf0 + 41a8017 commit bd86dbf
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 16 deletions.
26 changes: 19 additions & 7 deletions api/submit_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"encoding/json"
"fmt"

"github.com/mesg-foundation/core/database"
"github.com/mesg-foundation/core/execution"
"github.com/mesg-foundation/core/pubsub"
"github.com/mesg-foundation/core/service"
Expand Down Expand Up @@ -40,36 +41,47 @@ func (s *resultSubmitter) Submit(executionID string, outputKey string, outputDat
// processExecution processes execution and marks it as complated or failed.
func (s *resultSubmitter) processExecution(executionID string, outputKey string, outputData []byte) (exec *execution.Execution, stateChanged bool, err error) {
stateChanged = false
exec, err = s.api.execDB.Find(executionID)
tx, err := s.api.execDB.OpenTransaction()
if err != nil {
return nil, false, err
}

exec, err = tx.Find(executionID)
if err != nil {
tx.Discard()
return nil, false, err
}

exec.Service, err = service.FromService(exec.Service, service.ContainerOption(s.api.container))
if err != nil {
return s.saveExecution(exec, err)
return s.saveExecution(tx, exec, err)
}

var outputDataMap map[string]interface{}
if err := json.Unmarshal(outputData, &outputDataMap); err != nil {
return s.saveExecution(exec, fmt.Errorf("invalid output data error: %s", err))
return s.saveExecution(tx, exec, fmt.Errorf("invalid output data error: %s", err))
}

if err := exec.Complete(outputKey, outputDataMap); err != nil {
return s.saveExecution(exec, err)
return s.saveExecution(tx, exec, err)
}

return s.saveExecution(exec, nil)
return s.saveExecution(tx, exec, nil)
}

func (s *resultSubmitter) saveExecution(exec *execution.Execution, err error) (execOut *execution.Execution, stateChanged bool, errOut error) {
func (s *resultSubmitter) saveExecution(tx database.ExecutionTransaction, exec *execution.Execution, err error) (execOut *execution.Execution, stateChanged bool, errOut error) {
if err != nil {
if errFailed := exec.Failed(err); errFailed != nil {
tx.Discard()
return exec, false, errFailed
}
}
if errSave := s.api.execDB.Save(exec); errSave != nil {
if errSave := tx.Save(exec); errSave != nil {
tx.Discard()
return exec, true, errSave
}
if errCommit := tx.Commit(); errCommit != nil {
return exec, true, errCommit
}
return exec, true, err
}
75 changes: 66 additions & 9 deletions database/execution_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package database
import (
"encoding/json"
"errors"
"io"

"github.com/mesg-foundation/core/execution"
"github.com/syndtr/goleveldb/leveldb"
Expand All @@ -12,7 +13,16 @@ import (
type ExecutionDB interface {
Find(executionID string) (*execution.Execution, error)
Save(execution *execution.Execution) error
Close() error
OpenTransaction() (ExecutionTransaction, error)
io.Closer
}

// ExecutionTransaction is the transaction handle.
type ExecutionTransaction interface {
Find(executionID string) (*execution.Execution, error)
Save(execution *execution.Execution) error
Commit() error
Discard()
}

// LevelDBExecutionDB is a concrete implementation of the DB interface
Expand All @@ -32,7 +42,59 @@ func NewExecutionDB(path string) (*LevelDBExecutionDB, error) {

// Find the execution based on an executionID, returns an error if not found
func (db *LevelDBExecutionDB) Find(executionID string) (*execution.Execution, error) {
data, err := db.db.Get([]byte(executionID), nil)
return executionFind(db.db, executionID)
}

// Save an instance of executable in the database
// Returns an error if anything from marshaling to database saving goes wrong
func (db *LevelDBExecutionDB) Save(execution *execution.Execution) error {
return executionSave(db.db, execution)
}

// OpenTransaction opens an atomic DB transaction. Only one transaction can be
// opened at a time.
func (db *LevelDBExecutionDB) OpenTransaction() (ExecutionTransaction, error) {
tx, err := db.db.OpenTransaction()
if err != nil {
return nil, err
}
return &LevelDBExecutionTransaction{tx: tx}, nil
}

// Close closes database.
func (db *LevelDBExecutionDB) Close() error {
return db.db.Close()
}

// LevelDBExecutionTransaction is the transaction handle.
type LevelDBExecutionTransaction struct {
tx *leveldb.Transaction
}

// Find the execution based on an executionID, returns an error if not found
func (tx *LevelDBExecutionTransaction) Find(executionID string) (*execution.Execution, error) {
return executionFind(tx.tx, executionID)
}

// Save an instance of executable in the database
// Returns an error if anything from marshaling to database saving goes wrong
func (tx *LevelDBExecutionTransaction) Save(execution *execution.Execution) error {
return executionSave(tx.tx, execution)
}

// Commit commits the transaction.
func (tx *LevelDBExecutionTransaction) Commit() error {
return tx.tx.Commit()
}

// Discard discards the transaction.
func (tx *LevelDBExecutionTransaction) Discard() {
tx.tx.Discard()
}

// Find the execution based on an executionID, returns an error if not found
func executionFind(db leveldbTxDB, executionID string) (*execution.Execution, error) {
data, err := db.Get([]byte(executionID), nil)
if err != nil {
return nil, err
}
Expand All @@ -45,18 +107,13 @@ func (db *LevelDBExecutionDB) Find(executionID string) (*execution.Execution, er

// Save an instance of executable in the database
// Returns an error if anything from marshaling to database saving goes wrong
func (db *LevelDBExecutionDB) Save(execution *execution.Execution) error {
func executionSave(db leveldbTxDB, execution *execution.Execution) error {
if execution.ID == "" {
return errors.New("database: can't save service without id")
}
data, err := json.Marshal(execution)
if err != nil {
return err
}
return db.db.Put([]byte(execution.ID), data, nil)
}

// Close closes database.
func (db *LevelDBExecutionDB) Close() error {
return db.db.Close()
return db.Put([]byte(execution.ID), data, nil)
}
18 changes: 18 additions & 0 deletions database/leveldb.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package database

import (
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/iterator"
"github.com/syndtr/goleveldb/leveldb/opt"
"github.com/syndtr/goleveldb/leveldb/util"
)

// txdb is common interface for leveldb database and transaction.
type leveldbTxDB interface {
Delete(key []byte, wo *opt.WriteOptions) error
Get(key []byte, ro *opt.ReadOptions) ([]byte, error)
Has(key []byte, ro *opt.ReadOptions) (bool, error)
NewIterator(slice *util.Range, ro *opt.ReadOptions) iterator.Iterator
Put(key, value []byte, wo *opt.WriteOptions) error
Write(b *leveldb.Batch, wo *opt.WriteOptions) error
}

0 comments on commit bd86dbf

Please sign in to comment.