From e94ed92a17da3643e6b6c3eafd0116b913001c07 Mon Sep 17 00:00:00 2001 From: yingchunliu-zte Date: Tue, 10 Sep 2024 15:59:49 +0800 Subject: [PATCH] using primary and slave DBs to solve the panic problem caused by DB conflicts Signed-off-by: yingchunliu-zte --- bucket.go | 118 +++++++++++++++++++++++++++++++++++++++++++++++------ db.go | 120 ++++++++++++++++++++++++++++++++++++++++++++++++++++-- tx.go | 104 +++++++++++++++++++++++++++++++++++++++------- 3 files changed, 311 insertions(+), 31 deletions(-) diff --git a/bucket.go b/bucket.go index 6371ace97..a121a6cd6 100644 --- a/bucket.go +++ b/bucket.go @@ -41,6 +41,8 @@ type Bucket struct { // // This is non-persisted across transactions so it must be set in every Tx. FillPercent float64 + + slave *Bucket } // newBucket returns a new bucket associated with a transaction. @@ -82,10 +84,20 @@ func (b *Bucket) Cursor() *Cursor { } } -// Bucket retrieves a nested bucket by name. +// Bucket retrieves a nested bucket by name with slave. +func (b *Bucket) Bucket(name []byte) *Bucket { + rb := b.bucket(name) + if rb != nil && b.slave != nil { + rb.slave = b.slave.bucket(name) + } + + return rb +} + +// bucket retrieves a nested bucket by name. // Returns nil if the bucket does not exist. // The bucket instance is only valid for the lifetime of the transaction. -func (b *Bucket) Bucket(name []byte) *Bucket { +func (b *Bucket) bucket(name []byte) *Bucket { if b.buckets != nil { if child := b.buckets[string(name)]; child != nil { return child @@ -142,10 +154,20 @@ func (b *Bucket) openBucket(value []byte) *Bucket { return &child } -// CreateBucket creates a new bucket at the given key and returns the new bucket. +// CreateBucket creates a new bucket at the given key and returns the new bucket with slave. +func (b *Bucket) CreateBucket(key []byte) (rb *Bucket, err error) { + rb, err = b.createBucket(key) + if err == nil && b.slave != nil { + rb.slave, err = b.slave.createBucket(key) + } + + return +} + +// createBucket creates a new bucket at the given key and returns the new bucket. // Returns an error if the key already exists, if the bucket name is blank, or if the bucket name is too long. // The bucket instance is only valid for the lifetime of the transaction. -func (b *Bucket) CreateBucket(key []byte) (rb *Bucket, err error) { +func (b *Bucket) createBucket(key []byte) (rb *Bucket, err error) { if lg := b.tx.db.Logger(); lg != discardLogger { lg.Debugf("Creating bucket %q", key) defer func() { @@ -199,10 +221,20 @@ func (b *Bucket) CreateBucket(key []byte) (rb *Bucket, err error) { return b.Bucket(newKey), nil } -// CreateBucketIfNotExists creates a new bucket if it doesn't already exist and returns a reference to it. +// CreateBucketIfNotExists creates a new bucket with slave if it doesn't already exist and returns a reference to it. +func (b *Bucket) CreateBucketIfNotExists(key []byte) (rb *Bucket, err error) { + rb, err = b.createBucketIfNotExists(key) + if err == nil && b.slave != nil { + rb.slave, err = b.slave.createBucketIfNotExists(key) + } + + return +} + +// createBucketIfNotExists creates a new bucket if it doesn't already exist and returns a reference to it. // Returns an error if the bucket name is blank, or if the bucket name is too long. // The bucket instance is only valid for the lifetime of the transaction. -func (b *Bucket) CreateBucketIfNotExists(key []byte) (rb *Bucket, err error) { +func (b *Bucket) createBucketIfNotExists(key []byte) (rb *Bucket, err error) { if lg := b.tx.db.Logger(); lg != discardLogger { lg.Debugf("Creating bucket if not exist %q", key) defer func() { @@ -269,8 +301,18 @@ func (b *Bucket) CreateBucketIfNotExists(key []byte) (rb *Bucket, err error) { } // DeleteBucket deletes a bucket at the given key. -// Returns an error if the bucket does not exist, or if the key represents a non-bucket value. func (b *Bucket) DeleteBucket(key []byte) (err error) { + err = b.deleteBucket(key) + if err == nil && b.slave != nil { + err = b.slave.deleteBucket(key) + } + + return +} + +// deleteBucket deletes a bucket at the given key. +// Returns an error if the bucket does not exist, or if the key represents a non-bucket value. +func (b *Bucket) deleteBucket(key []byte) (err error) { if lg := b.tx.db.Logger(); lg != discardLogger { lg.Debugf("Deleting bucket %q", key) defer func() { @@ -327,13 +369,23 @@ func (b *Bucket) DeleteBucket(key []byte) (err error) { return nil } +// MoveBucket moves a sub-bucket from the source bucket to the destination bucket with slave. +func (b *Bucket) MoveBucket(key []byte, dstBucket *Bucket) (err error) { + err = b.moveBucket(key, dstBucket) + if err == nil && b.slave != nil && dstBucket.slave != nil { + err = b.slave.moveBucket(key, dstBucket.slave) + } + + return +} + // MoveBucket moves a sub-bucket from the source bucket to the destination bucket. // Returns an error if // 1. the sub-bucket cannot be found in the source bucket; // 2. or the key already exists in the destination bucket; // 3. or the key represents a non-bucket value; // 4. the source and destination buckets are the same. -func (b *Bucket) MoveBucket(key []byte, dstBucket *Bucket) (err error) { +func (b *Bucket) moveBucket(key []byte, dstBucket *Bucket) (err error) { lg := b.tx.db.Logger() if lg != discardLogger { lg.Debugf("Moving bucket %q", key) @@ -445,11 +497,21 @@ func (b *Bucket) Get(key []byte) []byte { return v } -// Put sets the value for a key in the bucket. +// Put sets the value for a key in the bucket with slave. +func (b *Bucket) Put(key []byte, value []byte) (err error) { + err = b.put(key, value) + if err == nil && b.slave != nil { + err = b.slave.put(key, value) + } + + return +} + +// put sets the value for a key in the bucket. // If the key exist then its previous value will be overwritten. // Supplied value must remain valid for the life of the transaction. // Returns an error if the bucket was created from a read-only transaction, if the key is blank, if the key is too large, or if the value is too large. -func (b *Bucket) Put(key []byte, value []byte) (err error) { +func (b *Bucket) put(key []byte, value []byte) (err error) { if lg := b.tx.db.Logger(); lg != discardLogger { lg.Debugf("Putting key %q", key) defer func() { @@ -493,10 +555,20 @@ func (b *Bucket) Put(key []byte, value []byte) (err error) { return nil } +// Delete removes a key from the bucket with slave. +func (b *Bucket) Delete(key []byte) (err error) { + err = b.delete(key) + if err == nil && b.slave != nil { + err = b.slave.delete(key) + } + + return +} + // Delete removes a key from the bucket. // If the key does not exist then nothing is done and a nil error is returned. // Returns an error if the bucket was created from a read-only transaction. -func (b *Bucket) Delete(key []byte) (err error) { +func (b *Bucket) delete(key []byte) (err error) { if lg := b.tx.db.Logger(); lg != discardLogger { lg.Debugf("Deleting key %q", key) defer func() { @@ -539,8 +611,18 @@ func (b *Bucket) Sequence() uint64 { return b.InSequence() } -// SetSequence updates the sequence number for the bucket. +// SetSequence updates the sequence number for the bucket with slave. func (b *Bucket) SetSequence(v uint64) error { + err := b.setSequence(v) + if err == nil && b.slave != nil { + err = b.slave.setSequence(v) + } + + return err +} + +// SetSequence updates the sequence number for the bucket. +func (b *Bucket) setSequence(v uint64) error { if b.tx.db == nil { return errors.ErrTxClosed } else if !b.Writable() { @@ -558,8 +640,18 @@ func (b *Bucket) SetSequence(v uint64) error { return nil } -// NextSequence returns an autoincrementing integer for the bucket. +// NextSequence returns an autoincrementing integer for the bucket with slave. func (b *Bucket) NextSequence() (uint64, error) { + r, err := b.nextSequence() + if err == nil && b.slave != nil { + _, err = b.slave.nextSequence() + } + + return r, err +} + +// nextSequence returns an autoincrementing integer for the bucket. +func (b *Bucket) nextSequence() (uint64, error) { if b.tx.db == nil { return 0, errors.ErrTxClosed } else if !b.Writable() { diff --git a/db.go b/db.go index 5c1947e99..52c94feed 100644 --- a/db.go +++ b/db.go @@ -154,6 +154,8 @@ type DB struct { // Read only mode. // When true, Update() and Begin(true) return ErrDatabaseReadOnly immediately. readOnly bool + + slave *DB } // Path returns the path to currently open database file. @@ -171,11 +173,102 @@ func (db *DB) String() string { return fmt.Sprintf("DB<%q>", db.path) } -// Open creates and opens a database at the given path with a given file mode. +// Open creates and opens master database and slave database +func Open(path string, mode os.FileMode, options *Options) (db *DB, err error) { + var master *DB + + master, err = tryOpenMasterDB(path, 0644, options) + if err != nil { + return nil, err + } + + openSlave := true + if options != nil && options.OpenSlave != nil { + openSlave = *options.OpenSlave + } + + if !openSlave { + return master, nil + } + + slaveDbPath := path + ".slave" + err = copyFile(path, slaveDbPath) + if err != nil { + return nil, err + } + + var slave *DB + slave, err = open(slaveDbPath, 0644, options) + if err != nil { + return nil, err + } + + master.slave = slave + + return master, nil +} + +func tryOpenMasterDB(path string, mode os.FileMode, options *Options) (db *DB, err error) { + slaveDbPath := path + ".slave" + pathBackup := path + ".backup" + + defer func() { + if e := recover(); e != nil { + if _, err := os.Stat(slaveDbPath); err == nil { + if err := os.Rename(slaveDbPath, pathBackup); err != nil { + panic(fmt.Sprintf("rename %s-%s err %v: failed (%v)", slaveDbPath, pathBackup, err, e)) + } + } else { + panic(fmt.Sprintf("slave db path %s err %v, by open db %s failed (%v)", slaveDbPath, err, path, e)) + } + + panic(fmt.Sprintf("open db %s failed (%v),rename %s-%s success", path, e, slaveDbPath, pathBackup)) + } + }() + + if _, err := os.Stat(pathBackup); err == nil { + if err := os.Rename(pathBackup, path); err != nil { + return nil, err + } + } + + db, err = open(path, 0644, options) + + return +} + +func copyFile(src, dst string) error { + os.RemoveAll(dst) + + srcFile, err := os.Open(src) + if err != nil { + return err + } + defer srcFile.Close() + + dstFile, err := os.Create(dst) + if err != nil { + return err + } + defer dstFile.Close() + + _, err = io.Copy(dstFile, srcFile) + if err != nil { + return err + } + + if err := dstFile.Sync(); err != nil { + return err + } + + return nil +} + +// open creates and opens a database at the given path with a given file mode. // If the file does not exist then it will be created automatically with a given file mode. // Passing in nil options will cause Bolt to open the database with the default options. // Note: For read/write transactions, ensure the owner has write permission on the created/opened database file, e.g. 0600 -func Open(path string, mode os.FileMode, options *Options) (db *DB, err error) { +func open(path string, mode os.FileMode, options *Options) (db *DB, err error) { db = &DB{ opened: true, } @@ -664,10 +757,20 @@ func (db *DB) init() error { return nil } +// Close releases all database resources with slave. +func (db *DB) Close() error { + err := db._close() + if err == nil && db.slave != nil { + err = db.slave._close() + } + + return err +} + // Close releases all database resources. // It will block waiting for any open transactions to finish // before closing the database and returning. -func (db *DB) Close() error { +func (db *DB) _close() error { db.rwlock.Lock() defer db.rwlock.Unlock() @@ -814,6 +917,15 @@ func (db *DB) beginTx() (*Tx, error) { } func (db *DB) beginRWTx() (*Tx, error) { + tx, err := db._beginRWTx() + if err == nil && db.slave != nil { + tx.slave, err = db.slave._beginRWTx() + } + + return tx, err +} + +func (db *DB) _beginRWTx() (*Tx, error) { // If the database was opened with Options.ReadOnly, return an error. if db.readOnly { return nil, berrors.ErrDatabaseReadOnly @@ -1330,6 +1442,8 @@ type Options struct { // Logger is the logger used for bbolt. Logger Logger + + OpenSlave *bool } func (o *Options) String() string { diff --git a/tx.go b/tx.go index 7b5db7727..54fe7c4f4 100644 --- a/tx.go +++ b/tx.go @@ -41,6 +41,8 @@ type Tx struct { // workloads. For databases that are much larger than available RAM, // set the flag to syscall.O_DIRECT to avoid trashing the page cache. WriteFlag int + + slave *Tx } // init initializes the transaction. @@ -105,34 +107,84 @@ func (tx *Tx) Inspect() BucketStructure { return tx.root.Inspect() } +// Bucket retrieves a bucket by name with slave. +func (tx *Tx) Bucket(name []byte) *Bucket { + b := tx.bucket(name) + if b != nil && tx.slave != nil { + b.slave = tx.slave.bucket(name) + } + + return b +} + // Bucket retrieves a bucket by name. // Returns nil if the bucket does not exist. // The bucket instance is only valid for the lifetime of the transaction. -func (tx *Tx) Bucket(name []byte) *Bucket { +func (tx *Tx) bucket(name []byte) *Bucket { return tx.root.Bucket(name) } +// CreateBucket creates a new bucket with slave. +func (tx *Tx) CreateBucket(name []byte) (*Bucket, error) { + b, err := tx.createBucket(name) + if err == nil && tx.slave != nil { + b.slave, err = tx.slave.createBucket(name) + } + + return b, err +} + // CreateBucket creates a new bucket. // Returns an error if the bucket already exists, if the bucket name is blank, or if the bucket name is too long. // The bucket instance is only valid for the lifetime of the transaction. -func (tx *Tx) CreateBucket(name []byte) (*Bucket, error) { +func (tx *Tx) createBucket(name []byte) (*Bucket, error) { return tx.root.CreateBucket(name) } +// CreateBucketIfNotExists creates a new bucket with slave if it doesn't already exist. +func (tx *Tx) CreateBucketIfNotExists(name []byte) (*Bucket, error) { + b, err := tx.createBucketIfNotExists(name) + if err == nil && tx.slave != nil { + b.slave, err = tx.slave.createBucketIfNotExists(name) + } + + return b, err +} + // CreateBucketIfNotExists creates a new bucket if it doesn't already exist. // Returns an error if the bucket name is blank, or if the bucket name is too long. // The bucket instance is only valid for the lifetime of the transaction. -func (tx *Tx) CreateBucketIfNotExists(name []byte) (*Bucket, error) { +func (tx *Tx) createBucketIfNotExists(name []byte) (*Bucket, error) { return tx.root.CreateBucketIfNotExists(name) } -// DeleteBucket deletes a bucket. -// Returns an error if the bucket cannot be found or if the key represents a non-bucket value. +// DeleteBucket deletes a bucket with slave. func (tx *Tx) DeleteBucket(name []byte) error { + err := tx.deleteBucket(name) + if err == nil && tx.slave != nil { + err = tx.slave.deleteBucket(name) + } + + return err +} + +// deleteBucket deletes a bucket. +// Returns an error if the bucket cannot be found or if the key represents a non-bucket value. +func (tx *Tx) deleteBucket(name []byte) error { return tx.root.DeleteBucket(name) } -// MoveBucket moves a sub-bucket from the source bucket to the destination bucket. +// MoveBucket moves a sub-bucket from the source bucket to the destination bucket with slave. +func (tx *Tx) MoveBucket(child []byte, src *Bucket, dst *Bucket) error { + err := tx.moveBucket(child, src, dst) + if err == nil && src.slave != nil && src.slave != nil && tx.slave != nil { + err = tx.slave.moveBucket(child, src.slave, dst.slave) + } + + return err +} + +// moveBucket moves a sub-bucket from the source bucket to the destination bucket. // Returns an error if // 1. the sub-bucket cannot be found in the source bucket; // 2. or the key already exists in the destination bucket; @@ -140,7 +192,7 @@ func (tx *Tx) DeleteBucket(name []byte) error { // // If src is nil, it means moving a top level bucket into the target bucket. // If dst is nil, it means converting the child bucket into a top level bucket. -func (tx *Tx) MoveBucket(child []byte, src *Bucket, dst *Bucket) error { +func (tx *Tx) moveBucket(child []byte, src *Bucket, dst *Bucket) error { if src == nil { src = &tx.root } @@ -164,10 +216,27 @@ func (tx *Tx) OnCommit(fn func()) { tx.commitHandlers = append(tx.commitHandlers, fn) } -// Commit writes all changes to disk, updates the meta page and closes the transaction. +// Commit writes all changes to disk, updates the meta page and closes the transaction with slave. +func (tx *Tx) Commit() (err error) { + err = tx.commit() + if err == nil && tx.slave != nil { + err = tx.slave.commit() + } + + if err == nil { + // Execute commit handlers now that the locks have been removed. + for _, fn := range tx.commitHandlers { + fn() + } + } + + return err +} + +// commit writes all changes to disk, updates the meta page and closes the transaction. // Returns an error if a disk write error occurs, or if Commit is // called on a read-only transaction. -func (tx *Tx) Commit() (err error) { +func (tx *Tx) commit() (err error) { txId := tx.ID() lg := tx.db.Logger() if lg != discardLogger { @@ -274,11 +343,6 @@ func (tx *Tx) Commit() (err error) { // Finalize the transaction. tx.close() - // Execute commit handlers now that the locks have been removed. - for _, fn := range tx.commitHandlers { - fn() - } - return nil } @@ -297,9 +361,19 @@ func (tx *Tx) commitFreelist() error { return nil } +// Rollback closes the transaction and ignores all previous updates with slave. +func (tx *Tx) Rollback() error { + err := tx._rollback() + if err == nil && tx.slave != nil { + err = tx.slave._rollback() + } + + return err +} + // Rollback closes the transaction and ignores all previous updates. Read-only // transactions must be rolled back and not committed. -func (tx *Tx) Rollback() error { +func (tx *Tx) _rollback() error { common.Assert(!tx.managed, "managed tx rollback not allowed") if tx.db == nil { return berrors.ErrTxClosed