Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update service db to use store #1244

Merged
merged 4 commits into from
Aug 23, 2019
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion core/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/mesg-foundation/engine/config"
"github.com/mesg-foundation/engine/container"
"github.com/mesg-foundation/engine/database"
"github.com/mesg-foundation/engine/database/store"
"github.com/mesg-foundation/engine/hash"
"github.com/mesg-foundation/engine/logger"
"github.com/mesg-foundation/engine/scheduler"
Expand All @@ -29,10 +30,11 @@ var network = flag.Bool("experimental-network", false, "start the engine with th

func initSDK(cfg *config.Config) (*sdk.SDK, error) {
// init services db.
serviceDB, err := database.NewServiceDB(filepath.Join(cfg.Path, cfg.Database.ServiceRelativePath))
store, err := store.NewLevelDBStore(filepath.Join(cfg.Path, cfg.Database.ServiceRelativePath))
if err != nil {
return nil, err
}
serviceDB := database.NewServiceDB(store)

// init instance db.
instanceDB, err := database.NewInstanceDB(filepath.Join(cfg.Path, cfg.Database.InstanceRelativePath))
Expand Down
84 changes: 29 additions & 55 deletions database/service_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,55 +5,35 @@ import (
"errors"
"fmt"

"github.com/mesg-foundation/engine/database/store"
"github.com/mesg-foundation/engine/hash"
"github.com/mesg-foundation/engine/service"
"github.com/sirupsen/logrus"
"github.com/syndtr/goleveldb/leveldb"
)

var (
errCannotSaveWithoutHash = errors.New("database: can't save service without hash")
)

// ServiceDB describes the API of database package.
type ServiceDB interface {
// Save saves a service to database.
Save(s *service.Service) error

// Get gets a service from database by its unique hash.
Get(hash hash.Hash) (*service.Service, error)

// Delete deletes a service from database by its unique hash.
Delete(hash hash.Hash) error

// All returns all services from database.
All() ([]*service.Service, error)

// Close closes underlying database connection.
Close() error
}

// LevelDBServiceDB is a database for storing service definition.
type LevelDBServiceDB struct {
db *leveldb.DB
// ServiceDB is a database for storing service definition.
type ServiceDB struct {
s store.Store
}

// NewServiceDB returns the database which is located under given path.
func NewServiceDB(path string) (*LevelDBServiceDB, error) {
db, err := leveldb.OpenFile(path, nil)
if err != nil {
return nil, err
func NewServiceDB(s store.Store) *ServiceDB {
return &ServiceDB{
s: s,
}
return &LevelDBServiceDB{db: db}, nil
}

// marshal returns the byte slice from service.
func (d *LevelDBServiceDB) marshal(s *service.Service) ([]byte, error) {
func (d *ServiceDB) marshal(s *service.Service) ([]byte, error) {
return json.Marshal(s)
}

// unmarshal returns the service from byte slice.
func (d *LevelDBServiceDB) unmarshal(hash hash.Hash, value []byte) (*service.Service, error) {
func (d *ServiceDB) unmarshal(hash hash.Hash, value []byte) (*service.Service, error) {
var s service.Service
if err := json.Unmarshal(value, &s); err != nil {
return nil, fmt.Errorf("database: could not decode service %q: %s", hash, err)
Expand All @@ -62,10 +42,10 @@ func (d *LevelDBServiceDB) unmarshal(hash hash.Hash, value []byte) (*service.Ser
}

// All returns every service in database.
func (d *LevelDBServiceDB) All() ([]*service.Service, error) {
func (d *ServiceDB) All() ([]*service.Service, error) {
var (
services []*service.Service
iter = d.db.NewIterator(nil, nil)
iter = d.s.NewIterator()
)
for iter.Next() {
hash := hash.Hash(iter.Key())
Expand All @@ -83,55 +63,49 @@ func (d *LevelDBServiceDB) All() ([]*service.Service, error) {
}

// Delete deletes service from database.
func (d *LevelDBServiceDB) Delete(hash hash.Hash) error {
tx, err := d.db.OpenTransaction()
func (d *ServiceDB) Delete(hash hash.Hash) error {
has, err := d.s.Has(hash)
NicolasMahe marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return err
}
if _, err := tx.Get(hash, nil); err != nil {
tx.Discard()
if err == leveldb.ErrNotFound {
return &ErrNotFound{resource: "service", hash: hash}
}
return err
if !has {
return &ErrNotFound{resource: "service", hash: hash}
NicolasMahe marked this conversation as resolved.
Show resolved Hide resolved
}
if err := tx.Delete(hash, nil); err != nil {
tx.Discard()
return err
}
return tx.Commit()

return d.s.Delete(hash)
}

// Get retrives service from database.
func (d *LevelDBServiceDB) Get(hash hash.Hash) (*service.Service, error) {
b, err := d.db.Get(hash, nil)
func (d *ServiceDB) Get(hash hash.Hash) (*service.Service, error) {
has, err := d.s.Has(hash)
NicolasMahe marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil, err
}
if !has {
return nil, &ErrNotFound{resource: "service", hash: hash}
}
b, err := d.s.Get(hash)
if err != nil {
if err == leveldb.ErrNotFound {
return nil, &ErrNotFound{resource: "service", hash: hash}
}
return nil, err
}
return d.unmarshal(hash, b)
}

// Save stores service in database.
// If there is an another service that uses the same sid, it'll be deleted.
func (d *LevelDBServiceDB) Save(s *service.Service) error {
func (d *ServiceDB) Save(s *service.Service) error {
if s.Hash.IsZero() {
return errCannotSaveWithoutHash
}

b, err := d.marshal(s)
if err != nil {
return err
}
return d.db.Put(s.Hash, b, nil)
return d.s.Put(s.Hash, b)
}

// Close closes database.
func (d *LevelDBServiceDB) Close() error {
return d.db.Close()
func (d *ServiceDB) Close() error {
return d.s.Close()
}

// ErrNotFound is an not found error.
Expand Down
11 changes: 8 additions & 3 deletions database/service_db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,19 @@ import (
"sync"
"testing"

"github.com/mesg-foundation/engine/database/store"
"github.com/mesg-foundation/engine/hash"
"github.com/mesg-foundation/engine/service"
"github.com/stretchr/testify/require"
)

const testdbname = "db.test"

func openServiceDB(t *testing.T) (*LevelDBServiceDB, func()) {
func openServiceDB(t *testing.T) (*ServiceDB, func()) {
deleteDBs(t)
db, err := NewServiceDB(testdbname)
store, err := store.NewLevelDBStore(testdbname)
require.NoError(t, err)
db := NewServiceDB(store)
return db, func() {
require.NoError(t, db.Close())
deleteDBs(t)
Expand Down Expand Up @@ -82,7 +84,10 @@ func TestServiceDBDelete(t *testing.T) {
require.IsType(t, &ErrNotFound{}, err)
}

// TOFIX: the database is not thread safe anymore...
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the database is still thread safe, but the question is why the transaction has gone

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

kvstore does not implement transaction as it should modify and read the store one transaction at a time. so they don't need concurrent access..

// Should we lock the db manually? The database could lock the whole db with a mutex.
func TestServiceDBDeleteConcurrency(t *testing.T) {
t.Skip("delete function need to be fixed or test deleted")
db, closer := openServiceDB(t)
defer closer()

Expand Down Expand Up @@ -136,7 +141,7 @@ func TestServiceDBAllWithDecodeError(t *testing.T) {
db, closer := openServiceDB(t)
defer closer()

require.NoError(t, db.db.Put(hash.Int(1), []byte("-"), nil))
require.NoError(t, db.s.Put(hash.Int(1), []byte("-")))

services, err := db.All()
require.NoError(t, err)
Expand Down
6 changes: 4 additions & 2 deletions sdk/execution/execution_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/cskr/pubsub"
"github.com/mesg-foundation/engine/container/mocks"
"github.com/mesg-foundation/engine/database"
"github.com/mesg-foundation/engine/database/store"
"github.com/mesg-foundation/engine/execution"
"github.com/mesg-foundation/engine/hash"
"github.com/mesg-foundation/engine/instance"
Expand All @@ -26,7 +27,7 @@ const (

type apiTesting struct {
*testing.T
serviceDB *database.LevelDBServiceDB
serviceDB *database.ServiceDB
executionDB *database.LevelDBExecutionDB
instanceDB *database.LevelDBInstanceDB
workflowDB *database.LevelDBWorkflowDB
Expand All @@ -45,8 +46,9 @@ func (t *apiTesting) close() {

func newTesting(t *testing.T) (*Execution, *apiTesting) {
container := &mocks.Container{}
db, err := database.NewServiceDB(servicedbname)
serviceStore, err := store.NewLevelDBStore(servicedbname)
require.NoError(t, err)
db := database.NewServiceDB(serviceStore)
service := servicesdk.New(container, db)

instDB, err := database.NewInstanceDB(instdbname)
Expand Down
2 changes: 1 addition & 1 deletion sdk/sdk.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ type SDK struct {
}

// New creates a new SDK with given options.
func New(c container.Container, serviceDB database.ServiceDB, instanceDB database.InstanceDB, execDB database.ExecutionDB, workflowDB database.WorkflowDB, engineName, port string) *SDK {
func New(c container.Container, serviceDB *database.ServiceDB, instanceDB database.InstanceDB, execDB database.ExecutionDB, workflowDB database.WorkflowDB, engineName, port string) *SDK {
ps := pubsub.New(0)
serviceSDK := servicesdk.New(c, serviceDB)
instanceSDK := instancesdk.New(c, serviceSDK, instanceDB, engineName, port)
Expand Down
4 changes: 2 additions & 2 deletions sdk/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@ type Service struct {
ps *pubsub.PubSub

container container.Container
serviceDB database.ServiceDB
serviceDB *database.ServiceDB
}

// New creates a new Service SDK with given options.
func New(c container.Container, serviceDB database.ServiceDB) *Service {
func New(c container.Container, serviceDB *database.ServiceDB) *Service {
return &Service{
ps: pubsub.New(0),
container: c,
Expand Down
4 changes: 3 additions & 1 deletion server/grpc/core/test_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/mesg-foundation/engine/container"
"github.com/mesg-foundation/engine/database"
"github.com/mesg-foundation/engine/database/store"
"github.com/mesg-foundation/engine/sdk"
"github.com/stretchr/testify/require"
)
Expand All @@ -18,8 +19,9 @@ const (
)

func newServerWithContainer(t *testing.T, c container.Container) (*Server, func()) {
db, err := database.NewServiceDB(servicedbname)
serviceStore, err := store.NewLevelDBStore(servicedbname)
require.NoError(t, err)
db := database.NewServiceDB(serviceStore)

instanceDB, err := database.NewInstanceDB(instancedbname)
require.NoError(t, err)
Expand Down