Skip to content

Commit

Permalink
using primary and slave DBs to solve the panic problem caused by DB c…
Browse files Browse the repository at this point in the history
…onflicts

Signed-off-by: yingchunliu-zte <[email protected]>
  • Loading branch information
yingchunliu-zte committed Sep 10, 2024
1 parent 0cecda6 commit e94ed92
Show file tree
Hide file tree
Showing 3 changed files with 311 additions and 31 deletions.
118 changes: 105 additions & 13 deletions bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ type Bucket struct {
//
// This is non-persisted across transactions so it must be set in every Tx.
FillPercent float64

slave *Bucket
}

// newBucket returns a new bucket associated with a transaction.
Expand Down Expand Up @@ -82,10 +84,20 @@ func (b *Bucket) Cursor() *Cursor {
}
}

// Bucket retrieves a nested bucket by name.
// Bucket retrieves a nested bucket by name with slave.
func (b *Bucket) Bucket(name []byte) *Bucket {
rb := b.bucket(name)
if rb != nil && b.slave != nil {
rb.slave = b.slave.bucket(name)
}

return rb
}

// bucket retrieves a nested bucket by name.
// Returns nil if the bucket does not exist.
// The bucket instance is only valid for the lifetime of the transaction.
func (b *Bucket) Bucket(name []byte) *Bucket {
func (b *Bucket) bucket(name []byte) *Bucket {
if b.buckets != nil {
if child := b.buckets[string(name)]; child != nil {
return child
Expand Down Expand Up @@ -142,10 +154,20 @@ func (b *Bucket) openBucket(value []byte) *Bucket {
return &child
}

// CreateBucket creates a new bucket at the given key and returns the new bucket.
// CreateBucket creates a new bucket at the given key and returns the new bucket with slave.
func (b *Bucket) CreateBucket(key []byte) (rb *Bucket, err error) {
rb, err = b.createBucket(key)
if err == nil && b.slave != nil {
rb.slave, err = b.slave.createBucket(key)
}

return
}

// createBucket creates a new bucket at the given key and returns the new bucket.
// Returns an error if the key already exists, if the bucket name is blank, or if the bucket name is too long.
// The bucket instance is only valid for the lifetime of the transaction.
func (b *Bucket) CreateBucket(key []byte) (rb *Bucket, err error) {
func (b *Bucket) createBucket(key []byte) (rb *Bucket, err error) {
if lg := b.tx.db.Logger(); lg != discardLogger {
lg.Debugf("Creating bucket %q", key)
defer func() {
Expand Down Expand Up @@ -199,10 +221,20 @@ func (b *Bucket) CreateBucket(key []byte) (rb *Bucket, err error) {
return b.Bucket(newKey), nil
}

// CreateBucketIfNotExists creates a new bucket if it doesn't already exist and returns a reference to it.
// CreateBucketIfNotExists creates a new bucket with slave if it doesn't already exist and returns a reference to it.
func (b *Bucket) CreateBucketIfNotExists(key []byte) (rb *Bucket, err error) {
rb, err = b.createBucketIfNotExists(key)
if err == nil && b.slave != nil {
rb.slave, err = b.slave.createBucketIfNotExists(key)
}

return
}

// createBucketIfNotExists creates a new bucket if it doesn't already exist and returns a reference to it.
// Returns an error if the bucket name is blank, or if the bucket name is too long.
// The bucket instance is only valid for the lifetime of the transaction.
func (b *Bucket) CreateBucketIfNotExists(key []byte) (rb *Bucket, err error) {
func (b *Bucket) createBucketIfNotExists(key []byte) (rb *Bucket, err error) {
if lg := b.tx.db.Logger(); lg != discardLogger {
lg.Debugf("Creating bucket if not exist %q", key)
defer func() {
Expand Down Expand Up @@ -269,8 +301,18 @@ func (b *Bucket) CreateBucketIfNotExists(key []byte) (rb *Bucket, err error) {
}

// DeleteBucket deletes a bucket at the given key.
// Returns an error if the bucket does not exist, or if the key represents a non-bucket value.
func (b *Bucket) DeleteBucket(key []byte) (err error) {
err = b.deleteBucket(key)
if err == nil && b.slave != nil {
err = b.slave.deleteBucket(key)
}

return
}

// deleteBucket deletes a bucket at the given key.
// Returns an error if the bucket does not exist, or if the key represents a non-bucket value.
func (b *Bucket) deleteBucket(key []byte) (err error) {
if lg := b.tx.db.Logger(); lg != discardLogger {
lg.Debugf("Deleting bucket %q", key)
defer func() {
Expand Down Expand Up @@ -327,13 +369,23 @@ func (b *Bucket) DeleteBucket(key []byte) (err error) {
return nil
}

// MoveBucket moves a sub-bucket from the source bucket to the destination bucket with slave.
func (b *Bucket) MoveBucket(key []byte, dstBucket *Bucket) (err error) {
err = b.moveBucket(key, dstBucket)
if err == nil && b.slave != nil && dstBucket.slave != nil {
err = b.slave.moveBucket(key, dstBucket.slave)
}

return
}

// MoveBucket moves a sub-bucket from the source bucket to the destination bucket.
// Returns an error if
// 1. the sub-bucket cannot be found in the source bucket;
// 2. or the key already exists in the destination bucket;
// 3. or the key represents a non-bucket value;
// 4. the source and destination buckets are the same.
func (b *Bucket) MoveBucket(key []byte, dstBucket *Bucket) (err error) {
func (b *Bucket) moveBucket(key []byte, dstBucket *Bucket) (err error) {
lg := b.tx.db.Logger()
if lg != discardLogger {
lg.Debugf("Moving bucket %q", key)
Expand Down Expand Up @@ -445,11 +497,21 @@ func (b *Bucket) Get(key []byte) []byte {
return v
}

// Put sets the value for a key in the bucket.
// Put sets the value for a key in the bucket with slave.
func (b *Bucket) Put(key []byte, value []byte) (err error) {
err = b.put(key, value)
if err == nil && b.slave != nil {
err = b.slave.put(key, value)
}

return
}

// put sets the value for a key in the bucket.
// If the key exist then its previous value will be overwritten.
// Supplied value must remain valid for the life of the transaction.
// Returns an error if the bucket was created from a read-only transaction, if the key is blank, if the key is too large, or if the value is too large.
func (b *Bucket) Put(key []byte, value []byte) (err error) {
func (b *Bucket) put(key []byte, value []byte) (err error) {
if lg := b.tx.db.Logger(); lg != discardLogger {
lg.Debugf("Putting key %q", key)
defer func() {
Expand Down Expand Up @@ -493,10 +555,20 @@ func (b *Bucket) Put(key []byte, value []byte) (err error) {
return nil
}

// Delete removes a key from the bucket with slave.
func (b *Bucket) Delete(key []byte) (err error) {
err = b.delete(key)
if err == nil && b.slave != nil {
err = b.slave.delete(key)
}

return
}

// Delete removes a key from the bucket.
// If the key does not exist then nothing is done and a nil error is returned.
// Returns an error if the bucket was created from a read-only transaction.
func (b *Bucket) Delete(key []byte) (err error) {
func (b *Bucket) delete(key []byte) (err error) {
if lg := b.tx.db.Logger(); lg != discardLogger {
lg.Debugf("Deleting key %q", key)
defer func() {
Expand Down Expand Up @@ -539,8 +611,18 @@ func (b *Bucket) Sequence() uint64 {
return b.InSequence()
}

// SetSequence updates the sequence number for the bucket.
// SetSequence updates the sequence number for the bucket with slave.
func (b *Bucket) SetSequence(v uint64) error {
err := b.setSequence(v)
if err == nil && b.slave != nil {
err = b.slave.setSequence(v)
}

return err
}

// SetSequence updates the sequence number for the bucket.
func (b *Bucket) setSequence(v uint64) error {
if b.tx.db == nil {
return errors.ErrTxClosed
} else if !b.Writable() {
Expand All @@ -558,8 +640,18 @@ func (b *Bucket) SetSequence(v uint64) error {
return nil
}

// NextSequence returns an autoincrementing integer for the bucket.
// NextSequence returns an autoincrementing integer for the bucket with slave.
func (b *Bucket) NextSequence() (uint64, error) {
r, err := b.nextSequence()
if err == nil && b.slave != nil {
_, err = b.slave.nextSequence()
}

return r, err
}

// nextSequence returns an autoincrementing integer for the bucket.
func (b *Bucket) nextSequence() (uint64, error) {
if b.tx.db == nil {
return 0, errors.ErrTxClosed
} else if !b.Writable() {
Expand Down
120 changes: 117 additions & 3 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,8 @@ type DB struct {
// Read only mode.
// When true, Update() and Begin(true) return ErrDatabaseReadOnly immediately.
readOnly bool

slave *DB
}

// Path returns the path to currently open database file.
Expand All @@ -171,11 +173,102 @@ func (db *DB) String() string {
return fmt.Sprintf("DB<%q>", db.path)
}

// Open creates and opens a database at the given path with a given file mode.
// Open creates and opens master database and slave database
func Open(path string, mode os.FileMode, options *Options) (db *DB, err error) {
var master *DB

master, err = tryOpenMasterDB(path, 0644, options)
if err != nil {
return nil, err
}

openSlave := true
if options != nil && options.OpenSlave != nil {
openSlave = *options.OpenSlave
}

if !openSlave {
return master, nil
}

slaveDbPath := path + ".slave"
err = copyFile(path, slaveDbPath)
if err != nil {
return nil, err
}

var slave *DB
slave, err = open(slaveDbPath, 0644, options)
if err != nil {
return nil, err
}

master.slave = slave

return master, nil
}

func tryOpenMasterDB(path string, mode os.FileMode, options *Options) (db *DB, err error) {
slaveDbPath := path + ".slave"
pathBackup := path + ".backup"

defer func() {
if e := recover(); e != nil {
if _, err := os.Stat(slaveDbPath); err == nil {
if err := os.Rename(slaveDbPath, pathBackup); err != nil {
panic(fmt.Sprintf("rename %s-%s err %v: failed (%v)", slaveDbPath, pathBackup, err, e))
}
} else {
panic(fmt.Sprintf("slave db path %s err %v, by open db %s failed (%v)", slaveDbPath, err, path, e))
}

panic(fmt.Sprintf("open db %s failed (%v),rename %s-%s success", path, e, slaveDbPath, pathBackup))
}
}()

if _, err := os.Stat(pathBackup); err == nil {
if err := os.Rename(pathBackup, path); err != nil {
return nil, err
}
}

db, err = open(path, 0644, options)

return
}

func copyFile(src, dst string) error {
os.RemoveAll(dst)

srcFile, err := os.Open(src)
if err != nil {
return err
}
defer srcFile.Close()

dstFile, err := os.Create(dst)
if err != nil {
return err
}
defer dstFile.Close()

_, err = io.Copy(dstFile, srcFile)
if err != nil {
return err
}

if err := dstFile.Sync(); err != nil {
return err
}

return nil
}

// open creates and opens a database at the given path with a given file mode.
// If the file does not exist then it will be created automatically with a given file mode.
// Passing in nil options will cause Bolt to open the database with the default options.
// Note: For read/write transactions, ensure the owner has write permission on the created/opened database file, e.g. 0600
func Open(path string, mode os.FileMode, options *Options) (db *DB, err error) {
func open(path string, mode os.FileMode, options *Options) (db *DB, err error) {
db = &DB{
opened: true,
}
Expand Down Expand Up @@ -664,10 +757,20 @@ func (db *DB) init() error {
return nil
}

// Close releases all database resources with slave.
func (db *DB) Close() error {
err := db._close()
if err == nil && db.slave != nil {
err = db.slave._close()
}

return err
}

// Close releases all database resources.
// It will block waiting for any open transactions to finish
// before closing the database and returning.
func (db *DB) Close() error {
func (db *DB) _close() error {
db.rwlock.Lock()
defer db.rwlock.Unlock()

Expand Down Expand Up @@ -814,6 +917,15 @@ func (db *DB) beginTx() (*Tx, error) {
}

func (db *DB) beginRWTx() (*Tx, error) {
tx, err := db._beginRWTx()
if err == nil && db.slave != nil {
tx.slave, err = db.slave._beginRWTx()
}

return tx, err
}

func (db *DB) _beginRWTx() (*Tx, error) {
// If the database was opened with Options.ReadOnly, return an error.
if db.readOnly {
return nil, berrors.ErrDatabaseReadOnly
Expand Down Expand Up @@ -1330,6 +1442,8 @@ type Options struct {

// Logger is the logger used for bbolt.
Logger Logger

OpenSlave *bool
}

func (o *Options) String() string {
Expand Down
Loading

0 comments on commit e94ed92

Please sign in to comment.