diff --git a/config/options.go b/config/options.go index beafe9ea..54651768 100644 --- a/config/options.go +++ b/config/options.go @@ -7,6 +7,7 @@ type Options struct { DataFileSize int64 //数据文件的大小 SyncWrite bool // 每次写数据是否持久化 IndexType IndexerType + FIOType FIOType } // IteratorOptions 索引迭代器配置项 @@ -25,6 +26,14 @@ type WriteBatchOptions struct { SyncWrites bool } +type FIOType = int8 + +const ( + FileIOType = iota + 1 // Standard File IO + BufIOType // File IO with buffer + MmapIOType // Memory Mapping IO +) + type IndexerType = int8 const ( @@ -40,6 +49,7 @@ var DefaultOptions = Options{ DataFileSize: 256 * 1024 * 1024, // 256MB SyncWrite: false, IndexType: Btree, + FIOType: FileIOType, } var DefaultIteratorOptions = IteratorOptions{ diff --git a/data/data_file.go b/data/data_file.go index 204be527..375b95cc 100644 --- a/data/data_file.go +++ b/data/data_file.go @@ -22,9 +22,9 @@ type DataFile struct { } // OpenDataFile 打开新的数据文件 -func OpenDataFile(dirPath string, fildID uint32) (*DataFile, error) { +func OpenDataFile(dirPath string, fildID uint32, fileSize int64, fioType int8) (*DataFile, error) { fileName := GetDataFileName(dirPath, fildID) - return newDataFile(fileName, fildID) + return newDataFile(fileName, fildID, fileSize, fioType) } func GetDataFileName(dirPath string, fildID uint32) string { @@ -32,20 +32,20 @@ func GetDataFileName(dirPath string, fildID uint32) string { } // OpenHintFile 打开 Hint 索引文件 -func OpenHintFile(dirPath string) (*DataFile, error) { +func OpenHintFile(dirPath string, fileSize int64, fioType int8) (*DataFile, error) { fileName := filepath.Join(dirPath, HintFileSuffix) - return newDataFile(fileName, 0) + return newDataFile(fileName, 0, fileSize, fioType) } // OpenMergeFinaFile 打开标识 merge 完成的文件 -func OpenMergeFinaFile(dirPath string) (*DataFile, error) { +func OpenMergeFinaFile(dirPath string, fileSize int64, fioType int8) (*DataFile, error) { fileName := filepath.Join(dirPath, MergeFinaFileSuffix) - return newDataFile(fileName, 0) + return newDataFile(fileName, 0, fileSize, fioType) } -func newDataFile(dirPath string, fildID uint32) (*DataFile, error) { +func newDataFile(dirPath string, fildID uint32, fileSize int64, fioType int8) (*DataFile, error) { //初始化 IOManager 管理器接口 - ioManager, err := fio.NewIOManager(dirPath) + ioManager, err := fio.NewIOManager(dirPath, fileSize, fioType) if err != nil { return nil, err } diff --git a/data/data_file_test.go b/data/data_file_test.go index 98b5a821..545ae58c 100644 --- a/data/data_file_test.go +++ b/data/data_file_test.go @@ -1,27 +1,30 @@ package data import ( + "github.com/ByteStorage/FlyDB/fio" "github.com/stretchr/testify/assert" "os" "testing" ) +const DefaultFileSize = 256 * 1024 * 1024 + func TestOpenDataFile(t *testing.T) { - dataFile1, err := OpenDataFile(os.TempDir(), 0) + dataFile1, err := OpenDataFile(os.TempDir(), 0, DefaultFileSize, fio.FileIOType) assert.Nil(t, err) assert.NotNil(t, dataFile1) - dataFile2, err := OpenDataFile(os.TempDir(), 1) + dataFile2, err := OpenDataFile(os.TempDir(), 1, DefaultFileSize, fio.FileIOType) assert.Nil(t, err) assert.NotNil(t, dataFile2) - dataFile3, err := OpenDataFile(os.TempDir(), 1) + dataFile3, err := OpenDataFile(os.TempDir(), 1, DefaultFileSize, fio.FileIOType) assert.Nil(t, err) assert.NotNil(t, dataFile3) } func TestDataFile_Write(t *testing.T) { - dataFile, err := OpenDataFile(os.TempDir(), 12312) + dataFile, err := OpenDataFile(os.TempDir(), 12312, DefaultFileSize, fio.FileIOType) assert.Nil(t, err) assert.NotNil(t, dataFile) @@ -36,7 +39,7 @@ func TestDataFile_Write(t *testing.T) { } func TestDataFile_Close(t *testing.T) { - dataFile, err := OpenDataFile(os.TempDir(), 1111111) + dataFile, err := OpenDataFile(os.TempDir(), 1111111, DefaultFileSize, fio.FileIOType) assert.Nil(t, err) assert.NotNil(t, dataFile) @@ -45,7 +48,7 @@ func TestDataFile_Close(t *testing.T) { } func TestDataFile_Sync(t *testing.T) { - dataFile, err := OpenDataFile(os.TempDir(), 2222222) + dataFile, err := OpenDataFile(os.TempDir(), 2222222, DefaultFileSize, fio.FileIOType) assert.Nil(t, err) assert.NotNil(t, dataFile) @@ -54,7 +57,7 @@ func TestDataFile_Sync(t *testing.T) { } func TestDataFile_ReadLogRecord(t *testing.T) { - dataFile, err := OpenDataFile(os.TempDir(), 123) + dataFile, err := OpenDataFile(os.TempDir(), 123, DefaultFileSize, fio.FileIOType) assert.Nil(t, err) assert.NotNil(t, dataFile) diff --git a/engine/db.go b/engine/db.go index 92ddc4fa..cbbf6109 100644 --- a/engine/db.go +++ b/engine/db.go @@ -218,7 +218,7 @@ func (db *DB) setActiveDataFile() error { } // Open a new data file - dataFile, err := data.OpenDataFile(db.options.DirPath, initialFileID) + dataFile, err := data.OpenDataFile(db.options.DirPath, initialFileID, db.options.DataFileSize, db.options.FIOType) if err != nil { return err } @@ -368,7 +368,7 @@ func (db *DB) loadDataFiles() error { // Walk through each file id and open the corresponding data file for i, fid := range fileIds { - dataFile, err := data.OpenDataFile(db.options.DirPath, uint32(fid)) + dataFile, err := data.OpenDataFile(db.options.DirPath, uint32(fid), db.options.DataFileSize, db.options.FIOType) if err != nil { return err } diff --git a/engine/merge.go b/engine/merge.go index e8637875..4fff22e6 100755 --- a/engine/merge.go +++ b/engine/merge.go @@ -85,7 +85,7 @@ func (db *DB) Merge() error { } // 打开 hint 文件存储索引 - hintFile, err := data.OpenHintFile(mergePath) + hintFile, err := data.OpenHintFile(mergePath, db.options.DataFileSize, db.options.FIOType) if err != nil { return err } @@ -131,7 +131,7 @@ func (db *DB) Merge() error { } // 写标识 merge 完成的文件 - mergeFinaFile, err := data.OpenMergeFinaFile(mergePath) + mergeFinaFile, err := data.OpenMergeFinaFile(mergePath, db.options.DataFileSize, db.options.FIOType) if err != nil { return err } @@ -223,7 +223,7 @@ func (db *DB) loadMergeFiles() error { // 获取最近没有参与 merge 的文件 id func (db *DB) getRecentlyNonMergeFileId(dirPath string) (uint32, error) { - mergeFinaFile, err := data.OpenMergeFinaFile(dirPath) + mergeFinaFile, err := data.OpenMergeFinaFile(dirPath, db.options.DataFileSize, db.options.FIOType) if err != nil { return 0, err } @@ -247,7 +247,7 @@ func (db *DB) loadIndexFromHintFile() error { } // 打开 hint 文件 - hintFile, err := data.OpenHintFile(db.options.DirPath) + hintFile, err := data.OpenHintFile(db.options.DirPath, db.options.DataFileSize, db.options.FIOType) if err != nil { return err } diff --git a/fio/file_io.go b/fio/file_io.go index 28efcc50..50e7dcbc 100644 --- a/fio/file_io.go +++ b/fio/file_io.go @@ -45,7 +45,3 @@ func (fio *FileIO) Size() (int64, error) { } return stat.Size(), nil } - -func NewIOManager(filename string) (IOManager, error) { - return NewFileIOManager(filename) -} diff --git a/fio/io_manager.go b/fio/io_manager.go index 3f62b89c..f2898024 100644 --- a/fio/io_manager.go +++ b/fio/io_manager.go @@ -2,6 +2,14 @@ package fio const DataFilePerm = 0644 //0644 表示创建了一个文件,文件所有者可以读写,其他人只能读 +const DefaultFileSize = 256 * 1024 * 1024 + +const ( + FileIOType = iota + 1 // Standard File IO + BufIOType // File IO with buffer + MmapIOType // Memory Mapping IO +) + // IOManager 抽象 IO 管理接口, 可以接入不同的 IO 类型, 目前支持标准文件 IO type IOManager interface { // Read 从文件的给定位置读取对应的数据 @@ -19,3 +27,16 @@ type IOManager interface { // Size get file size Size() (int64, error) } + +// NewIOManager get IOManager based on type +func NewIOManager(filename string, fileSize int64, fioType int8) (IOManager, error) { + switch fioType { + case FileIOType: + return NewFileIOManager(filename) + case BufIOType: + return NewBufIOManager(filename) + case MmapIOType: + return NewMMapIOManager(filename, fileSize) + } + return NewFileIOManager(filename) +} diff --git a/fio/mmap_io.go b/fio/mmap_io.go index 8bd885a7..5bc20cd0 100644 --- a/fio/mmap_io.go +++ b/fio/mmap_io.go @@ -7,20 +7,18 @@ import ( "unsafe" ) -// DefaultMemMapSize 最大映射内存大小 -const DefaultMemMapSize = 256 * 1024 * 1024 - -// MMapIO 标准系统文件IO type MMapIO struct { - fd *os.File // 系统文件描述符 - data []byte // 与文件对应的映射区 - dirty bool // 是否更改过 - offset int64 // 写入位置 + fd *os.File // system file descriptor + data []byte // the mapping area corresponding to the file + dirty bool // has changed + offset int64 // next write location + fileSize int64 // max file size } -// NewMMapIOManager 初始化标准文件 IO -func NewMMapIOManager(fileName string) (*MMapIO, error) { - mmapIO := &MMapIO{} +// NewMMapIOManager Initialize Mmap IO +func NewMMapIOManager(fileName string, fileSize int64) (*MMapIO, error) { + mmapIO := &MMapIO{fileSize: fileSize} + fd, err := os.OpenFile( fileName, os.O_CREATE|os.O_RDWR|os.O_APPEND, @@ -31,13 +29,13 @@ func NewMMapIOManager(fileName string) (*MMapIO, error) { } info, _ := fd.Stat() - // 将文件扩容到映射区大小, 保存时会裁剪 - if err := fd.Truncate(DefaultMemMapSize); err != nil { + // Expand files to maximum file size, crop when saving + if err := fd.Truncate(fileSize); err != nil { return nil, err } - // 构建映射 - b, err := syscall.Mmap(int(fd.Fd()), 0, DefaultMemMapSize, syscall.PROT_WRITE|syscall.PROT_READ, syscall.MAP_SHARED) + // Building mappings between memory and disk files + b, err := syscall.Mmap(int(fd.Fd()), 0, int(fileSize), syscall.PROT_WRITE|syscall.PROT_READ, syscall.MAP_SHARED) if err != nil { return nil, err } @@ -48,14 +46,16 @@ func NewMMapIOManager(fileName string) (*MMapIO, error) { return mmapIO, nil } +// Read Copy data from the mapping area to byte slice func (mio *MMapIO) Read(b []byte, offset int64) (int, error) { return copy(b, mio.data[offset:]), nil } +// Write Copy data from byte slice to the mapping area func (mio *MMapIO) Write(b []byte) (int, error) { oldOffset := mio.offset newOffset := mio.offset + int64(len(b)) - if newOffset > DefaultMemMapSize { + if newOffset > mio.fileSize { return 0, errors.New("exceed file max content length") } @@ -64,6 +64,7 @@ func (mio *MMapIO) Write(b []byte) (int, error) { return copy(mio.data[oldOffset:], b), nil } +// Sync Synchronize data from memory to disk func (mio *MMapIO) Sync() error { if !mio.dirty { return nil @@ -78,6 +79,7 @@ func (mio *MMapIO) Sync() error { return nil } +// Close file func (mio *MMapIO) Close() (err error) { if err = mio.fd.Truncate(mio.offset); err != nil { return err @@ -91,10 +93,12 @@ func (mio *MMapIO) Close() (err error) { return mio.fd.Close() } +// Size return the size of current file func (mio *MMapIO) Size() (int64, error) { return mio.offset, nil } +// UnMap Unmapping between memory and files func (mio *MMapIO) UnMap() error { if mio.data == nil { return nil diff --git a/fio/mmap_io_test.go b/fio/mmap_io_test.go index 4d8017ba..82dc4bfd 100644 --- a/fio/mmap_io_test.go +++ b/fio/mmap_io_test.go @@ -8,7 +8,7 @@ import ( func TestNewMMapIOManager(t *testing.T) { path := filepath.Join("/tmp", "a.data") - mio, err := NewMMapIOManager(path) + mio, err := NewMMapIOManager(path, DefaultFileSize) defer destoryFile(path) assert.Nil(t, err) @@ -17,7 +17,7 @@ func TestNewMMapIOManager(t *testing.T) { func TestMMapIO_Write(t *testing.T) { path := filepath.Join("/tmp", "a.data") - mio, err := NewMMapIOManager(path) + mio, err := NewMMapIOManager(path, DefaultFileSize) defer destoryFile(path) assert.Nil(t, err) assert.NotNil(t, mio) @@ -36,7 +36,7 @@ func TestMMapIO_Write(t *testing.T) { func TestMMapIO_Read(t *testing.T) { path := filepath.Join("/tmp", "a.data") - mio, err := NewMMapIOManager(path) + mio, err := NewMMapIOManager(path, DefaultFileSize) defer destoryFile(path) assert.Nil(t, err) @@ -60,7 +60,7 @@ func TestMMapIO_Read(t *testing.T) { func TestMMapIO_Sync(t *testing.T) { path := filepath.Join("/tmp", "a.data") - mio, err := NewMMapIOManager(path) + mio, err := NewMMapIOManager(path, DefaultFileSize) defer destoryFile(path) assert.Nil(t, err) @@ -73,7 +73,7 @@ func TestMMapIO_Sync(t *testing.T) { func TestMMapIO_Close(t *testing.T) { path := filepath.Join("/tmp", "a.data") - mio, err := NewMMapIOManager(path) + mio, err := NewMMapIOManager(path, DefaultFileSize) defer destoryFile(path) assert.Nil(t, err) @@ -88,7 +88,7 @@ func TestMMapIO_Close(t *testing.T) { func TestMMapIO_Write_Speed(t *testing.T) { path := filepath.Join("/tmp", "a.data") - mio, err := NewMMapIOManager(path) + mio, err := NewMMapIOManager(path, DefaultFileSize) assert.Nil(t, err) assert.NotNil(t, mio) @@ -104,7 +104,7 @@ func TestMMapIO_Write_Speed(t *testing.T) { func TestMMapIO_Read_Speed(t *testing.T) { path := filepath.Join("/tmp", "a.data") - mio, err := NewMMapIOManager(path) + mio, err := NewMMapIOManager(path, DefaultFileSize) defer destoryFile(path) assert.Nil(t, err) assert.NotNil(t, mio) diff --git a/http/http_server_test.go b/http/http_server_test.go index c41cf9c6..5f746ab2 100644 --- a/http/http_server_test.go +++ b/http/http_server_test.go @@ -38,6 +38,7 @@ func TestNewHTTPServer(t *testing.T) { // 测试Put方法 func TestPut(t *testing.T) { handler, _ := newHttpHandler() + defer handler.Close() // 创建一个测试用的http server server := httptest.NewServer(http.HandlerFunc(handler.PutHandler)) defer server.Close() @@ -91,6 +92,7 @@ func TestPut(t *testing.T) { func TestDel(t *testing.T) { handler, _ := newHttpHandler() + defer handler.Close() // 创建一个测试用的http server server := httptest.NewServer(http.HandlerFunc(handler.DelHandler)) defer server.Close() @@ -138,6 +140,7 @@ func TestDel(t *testing.T) { func TestGet(t *testing.T) { handler, _ := newHttpHandler() + defer handler.Close() // 创建一个测试用的http server server := httptest.NewServer(http.HandlerFunc(handler.GetHandler)) defer server.Close() @@ -184,6 +187,7 @@ func TestGet(t *testing.T) { func TestPost(t *testing.T) { handler, _ := newHttpHandler() + defer handler.Close() // 创建一个测试用的 HTTP 服务器 server := httptest.NewServer(http.HandlerFunc(handler.PostHandler)) defer server.Close() @@ -238,6 +242,7 @@ func TestPost(t *testing.T) { func TestGetListKeysHandler(t *testing.T) { handler, _ := newHttpHandler() + defer handler.Close() // 创建一个测试用的http server server := httptest.NewServer(http.HandlerFunc(handler.GetListKeysHandler)) defer server.Close() diff --git a/raft/raft.go b/raft/raft.go index 5d0bf28b..8f31fc9d 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -23,6 +23,7 @@ var DefaultOptions = config.Options{ DataFileSize: 256 * 1024 * 1024, // 256MB SyncWrite: false, IndexType: Btree, + FIOType: config.FileIOType, } // Cluster define the cluster of db