From 780bb47d28cbdaa2e881e2265ad6cfc5583fefcc Mon Sep 17 00:00:00 2001 From: krhubert Date: Sun, 13 Jan 2019 00:04:33 +0100 Subject: [PATCH] Add db transaction mechanism to database execution --- api/submit_result.go | 26 ++++++++++---- database/execution_db.go | 75 +++++++++++++++++++++++++++++++++++----- database/leveldb.go | 18 ++++++++++ 3 files changed, 103 insertions(+), 16 deletions(-) create mode 100644 database/leveldb.go diff --git a/api/submit_result.go b/api/submit_result.go index 66f3ec038..b0a3cea54 100644 --- a/api/submit_result.go +++ b/api/submit_result.go @@ -4,6 +4,7 @@ import ( "encoding/json" "fmt" + "github.com/mesg-foundation/core/database" "github.com/mesg-foundation/core/execution" "github.com/mesg-foundation/core/pubsub" "github.com/mesg-foundation/core/service" @@ -40,36 +41,47 @@ func (s *resultSubmitter) Submit(executionID string, outputKey string, outputDat // processExecution processes execution and marks it as complated or failed. func (s *resultSubmitter) processExecution(executionID string, outputKey string, outputData []byte) (exec *execution.Execution, stateChanged bool, err error) { stateChanged = false - exec, err = s.api.execDB.Find(executionID) + tx, err := s.api.execDB.OpenTransaction() if err != nil { return nil, false, err } + exec, err = tx.Find(executionID) + if err != nil { + tx.Discard() + return nil, false, err + } + exec.Service, err = service.FromService(exec.Service, service.ContainerOption(s.api.container)) if err != nil { - return s.saveExecution(exec, err) + return s.saveExecution(tx, exec, err) } var outputDataMap map[string]interface{} if err := json.Unmarshal(outputData, &outputDataMap); err != nil { - return s.saveExecution(exec, fmt.Errorf("invalid output data error: %s", err)) + return s.saveExecution(tx, exec, fmt.Errorf("invalid output data error: %s", err)) } if err := exec.Complete(outputKey, outputDataMap); err != nil { - return s.saveExecution(exec, err) + return s.saveExecution(tx, exec, err) } - return s.saveExecution(exec, nil) + return s.saveExecution(tx, exec, nil) } -func (s *resultSubmitter) saveExecution(exec *execution.Execution, err error) (execOut *execution.Execution, stateChanged bool, errOut error) { +func (s *resultSubmitter) saveExecution(tx database.ExecutionTransaction, exec *execution.Execution, err error) (execOut *execution.Execution, stateChanged bool, errOut error) { if err != nil { if errFailed := exec.Failed(err); errFailed != nil { + tx.Discard() return exec, false, errFailed } } - if errSave := s.api.execDB.Save(exec); errSave != nil { + if errSave := tx.Save(exec); errSave != nil { + tx.Discard() return exec, true, errSave } + if errCommit := tx.Commit(); errCommit != nil { + return exec, true, errCommit + } return exec, true, err } diff --git a/database/execution_db.go b/database/execution_db.go index 0e52d97e0..97680ca07 100644 --- a/database/execution_db.go +++ b/database/execution_db.go @@ -3,6 +3,7 @@ package database import ( "encoding/json" "errors" + "io" "github.com/mesg-foundation/core/execution" "github.com/syndtr/goleveldb/leveldb" @@ -12,7 +13,16 @@ import ( type ExecutionDB interface { Find(executionID string) (*execution.Execution, error) Save(execution *execution.Execution) error - Close() error + OpenTransaction() (ExecutionTransaction, error) + io.Closer +} + +// ExecutionTransaction is the transaction handle. +type ExecutionTransaction interface { + Find(executionID string) (*execution.Execution, error) + Save(execution *execution.Execution) error + Commit() error + Discard() } // LevelDBExecutionDB is a concrete implementation of the DB interface @@ -32,7 +42,59 @@ func NewExecutionDB(path string) (*LevelDBExecutionDB, error) { // Find the execution based on an executionID, returns an error if not found func (db *LevelDBExecutionDB) Find(executionID string) (*execution.Execution, error) { - data, err := db.db.Get([]byte(executionID), nil) + return executionFind(db.db, executionID) +} + +// Save an instance of executable in the database +// Returns an error if anything from marshaling to database saving goes wrong +func (db *LevelDBExecutionDB) Save(execution *execution.Execution) error { + return executionSave(db.db, execution) +} + +// OpenTransaction opens an atomic DB transaction. Only one transaction can be +// opened at a time. +func (db *LevelDBExecutionDB) OpenTransaction() (ExecutionTransaction, error) { + tx, err := db.db.OpenTransaction() + if err != nil { + return nil, err + } + return &LevelDBExecutionTransaction{tx: tx}, nil +} + +// Close closes database. +func (db *LevelDBExecutionDB) Close() error { + return db.db.Close() +} + +// LevelDBExecutionTransaction is the transaction handle. +type LevelDBExecutionTransaction struct { + tx *leveldb.Transaction +} + +// Find the execution based on an executionID, returns an error if not found +func (tx *LevelDBExecutionTransaction) Find(executionID string) (*execution.Execution, error) { + return executionFind(tx.tx, executionID) +} + +// Save an instance of executable in the database +// Returns an error if anything from marshaling to database saving goes wrong +func (tx *LevelDBExecutionTransaction) Save(execution *execution.Execution) error { + return executionSave(tx.tx, execution) +} + +// Commit commits the transaction. +func (tx *LevelDBExecutionTransaction) Commit() error { + return tx.tx.Commit() +} + +// Discard discards the transaction. +func (tx *LevelDBExecutionTransaction) Discard() { + tx.tx.Discard() +} + +// Find the execution based on an executionID, returns an error if not found +func executionFind(db leveldbTxDB, executionID string) (*execution.Execution, error) { + data, err := db.Get([]byte(executionID), nil) if err != nil { return nil, err } @@ -45,7 +107,7 @@ func (db *LevelDBExecutionDB) Find(executionID string) (*execution.Execution, er // Save an instance of executable in the database // Returns an error if anything from marshaling to database saving goes wrong -func (db *LevelDBExecutionDB) Save(execution *execution.Execution) error { +func executionSave(db leveldbTxDB, execution *execution.Execution) error { if execution.ID == "" { return errors.New("database: can't save service without id") } @@ -53,10 +115,5 @@ func (db *LevelDBExecutionDB) Save(execution *execution.Execution) error { if err != nil { return err } - return db.db.Put([]byte(execution.ID), data, nil) -} - -// Close closes database. -func (db *LevelDBExecutionDB) Close() error { - return db.db.Close() + return db.Put([]byte(execution.ID), data, nil) } diff --git a/database/leveldb.go b/database/leveldb.go new file mode 100644 index 000000000..cc26084d0 --- /dev/null +++ b/database/leveldb.go @@ -0,0 +1,18 @@ +package database + +import ( + "github.com/syndtr/goleveldb/leveldb" + "github.com/syndtr/goleveldb/leveldb/iterator" + "github.com/syndtr/goleveldb/leveldb/opt" + "github.com/syndtr/goleveldb/leveldb/util" +) + +// txdb is common interface for leveldb database and transaction. +type leveldbTxDB interface { + Delete(key []byte, wo *opt.WriteOptions) error + Get(key []byte, ro *opt.ReadOptions) ([]byte, error) + Has(key []byte, ro *opt.ReadOptions) (bool, error) + NewIterator(slice *util.Range, ro *opt.ReadOptions) iterator.Iterator + Put(key, value []byte, wo *opt.WriteOptions) error + Write(b *leveldb.Batch, wo *opt.WriteOptions) error +}