Skip to content

Commit

Permalink
Merge pull request #132 from ZhaoShuaiUp/master
Browse files Browse the repository at this point in the history
fix io manager issue(especially mmap) and test function
  • Loading branch information
qishenonly authored Jul 3, 2023
2 parents 1c28132 + 27c0db4 commit 095a932
Show file tree
Hide file tree
Showing 11 changed files with 88 additions and 48 deletions.
10 changes: 10 additions & 0 deletions config/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ type Options struct {
DataFileSize int64 //数据文件的大小
SyncWrite bool // 每次写数据是否持久化
IndexType IndexerType
FIOType FIOType
}

// IteratorOptions 索引迭代器配置项
Expand All @@ -25,6 +26,14 @@ type WriteBatchOptions struct {
SyncWrites bool
}

type FIOType = int8

const (
FileIOType = iota + 1 // Standard File IO
BufIOType // File IO with buffer
MmapIOType // Memory Mapping IO
)

type IndexerType = int8

const (
Expand All @@ -40,6 +49,7 @@ var DefaultOptions = Options{
DataFileSize: 256 * 1024 * 1024, // 256MB
SyncWrite: false,
IndexType: Btree,
FIOType: FileIOType,
}

var DefaultIteratorOptions = IteratorOptions{
Expand Down
16 changes: 8 additions & 8 deletions data/data_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,30 +22,30 @@ type DataFile struct {
}

// OpenDataFile 打开新的数据文件
func OpenDataFile(dirPath string, fildID uint32) (*DataFile, error) {
func OpenDataFile(dirPath string, fildID uint32, fileSize int64, fioType int8) (*DataFile, error) {
fileName := GetDataFileName(dirPath, fildID)
return newDataFile(fileName, fildID)
return newDataFile(fileName, fildID, fileSize, fioType)
}

func GetDataFileName(dirPath string, fildID uint32) string {
return filepath.Join(dirPath, fmt.Sprintf("%09d", fildID)+DataFileSuffix)
}

// OpenHintFile 打开 Hint 索引文件
func OpenHintFile(dirPath string) (*DataFile, error) {
func OpenHintFile(dirPath string, fileSize int64, fioType int8) (*DataFile, error) {
fileName := filepath.Join(dirPath, HintFileSuffix)
return newDataFile(fileName, 0)
return newDataFile(fileName, 0, fileSize, fioType)
}

// OpenMergeFinaFile 打开标识 merge 完成的文件
func OpenMergeFinaFile(dirPath string) (*DataFile, error) {
func OpenMergeFinaFile(dirPath string, fileSize int64, fioType int8) (*DataFile, error) {
fileName := filepath.Join(dirPath, MergeFinaFileSuffix)
return newDataFile(fileName, 0)
return newDataFile(fileName, 0, fileSize, fioType)
}

func newDataFile(dirPath string, fildID uint32) (*DataFile, error) {
func newDataFile(dirPath string, fildID uint32, fileSize int64, fioType int8) (*DataFile, error) {
//初始化 IOManager 管理器接口
ioManager, err := fio.NewIOManager(dirPath)
ioManager, err := fio.NewIOManager(dirPath, fileSize, fioType)
if err != nil {
return nil, err
}
Expand Down
17 changes: 10 additions & 7 deletions data/data_file_test.go
Original file line number Diff line number Diff line change
@@ -1,27 +1,30 @@
package data

import (
"github.com/ByteStorage/FlyDB/fio"
"github.com/stretchr/testify/assert"
"os"
"testing"
)

const DefaultFileSize = 256 * 1024 * 1024

func TestOpenDataFile(t *testing.T) {
dataFile1, err := OpenDataFile(os.TempDir(), 0)
dataFile1, err := OpenDataFile(os.TempDir(), 0, DefaultFileSize, fio.FileIOType)
assert.Nil(t, err)
assert.NotNil(t, dataFile1)

dataFile2, err := OpenDataFile(os.TempDir(), 1)
dataFile2, err := OpenDataFile(os.TempDir(), 1, DefaultFileSize, fio.FileIOType)
assert.Nil(t, err)
assert.NotNil(t, dataFile2)

dataFile3, err := OpenDataFile(os.TempDir(), 1)
dataFile3, err := OpenDataFile(os.TempDir(), 1, DefaultFileSize, fio.FileIOType)
assert.Nil(t, err)
assert.NotNil(t, dataFile3)
}

func TestDataFile_Write(t *testing.T) {
dataFile, err := OpenDataFile(os.TempDir(), 12312)
dataFile, err := OpenDataFile(os.TempDir(), 12312, DefaultFileSize, fio.FileIOType)
assert.Nil(t, err)
assert.NotNil(t, dataFile)

Expand All @@ -36,7 +39,7 @@ func TestDataFile_Write(t *testing.T) {
}

func TestDataFile_Close(t *testing.T) {
dataFile, err := OpenDataFile(os.TempDir(), 1111111)
dataFile, err := OpenDataFile(os.TempDir(), 1111111, DefaultFileSize, fio.FileIOType)
assert.Nil(t, err)
assert.NotNil(t, dataFile)

Expand All @@ -45,7 +48,7 @@ func TestDataFile_Close(t *testing.T) {
}

func TestDataFile_Sync(t *testing.T) {
dataFile, err := OpenDataFile(os.TempDir(), 2222222)
dataFile, err := OpenDataFile(os.TempDir(), 2222222, DefaultFileSize, fio.FileIOType)
assert.Nil(t, err)
assert.NotNil(t, dataFile)

Expand All @@ -54,7 +57,7 @@ func TestDataFile_Sync(t *testing.T) {
}

func TestDataFile_ReadLogRecord(t *testing.T) {
dataFile, err := OpenDataFile(os.TempDir(), 123)
dataFile, err := OpenDataFile(os.TempDir(), 123, DefaultFileSize, fio.FileIOType)
assert.Nil(t, err)
assert.NotNil(t, dataFile)

Expand Down
4 changes: 2 additions & 2 deletions engine/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ func (db *DB) setActiveDataFile() error {
}

// Open a new data file
dataFile, err := data.OpenDataFile(db.options.DirPath, initialFileID)
dataFile, err := data.OpenDataFile(db.options.DirPath, initialFileID, db.options.DataFileSize, db.options.FIOType)
if err != nil {
return err
}
Expand Down Expand Up @@ -368,7 +368,7 @@ func (db *DB) loadDataFiles() error {

// Walk through each file id and open the corresponding data file
for i, fid := range fileIds {
dataFile, err := data.OpenDataFile(db.options.DirPath, uint32(fid))
dataFile, err := data.OpenDataFile(db.options.DirPath, uint32(fid), db.options.DataFileSize, db.options.FIOType)
if err != nil {
return err
}
Expand Down
8 changes: 4 additions & 4 deletions engine/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func (db *DB) Merge() error {
}

// 打开 hint 文件存储索引
hintFile, err := data.OpenHintFile(mergePath)
hintFile, err := data.OpenHintFile(mergePath, db.options.DataFileSize, db.options.FIOType)
if err != nil {
return err
}
Expand Down Expand Up @@ -131,7 +131,7 @@ func (db *DB) Merge() error {
}

// 写标识 merge 完成的文件
mergeFinaFile, err := data.OpenMergeFinaFile(mergePath)
mergeFinaFile, err := data.OpenMergeFinaFile(mergePath, db.options.DataFileSize, db.options.FIOType)
if err != nil {
return err
}
Expand Down Expand Up @@ -223,7 +223,7 @@ func (db *DB) loadMergeFiles() error {

// 获取最近没有参与 merge 的文件 id
func (db *DB) getRecentlyNonMergeFileId(dirPath string) (uint32, error) {
mergeFinaFile, err := data.OpenMergeFinaFile(dirPath)
mergeFinaFile, err := data.OpenMergeFinaFile(dirPath, db.options.DataFileSize, db.options.FIOType)
if err != nil {
return 0, err
}
Expand All @@ -247,7 +247,7 @@ func (db *DB) loadIndexFromHintFile() error {
}

// 打开 hint 文件
hintFile, err := data.OpenHintFile(db.options.DirPath)
hintFile, err := data.OpenHintFile(db.options.DirPath, db.options.DataFileSize, db.options.FIOType)
if err != nil {
return err
}
Expand Down
4 changes: 0 additions & 4 deletions fio/file_io.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,3 @@ func (fio *FileIO) Size() (int64, error) {
}
return stat.Size(), nil
}

func NewIOManager(filename string) (IOManager, error) {
return NewFileIOManager(filename)
}
21 changes: 21 additions & 0 deletions fio/io_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,14 @@ package fio

const DataFilePerm = 0644 //0644 表示创建了一个文件,文件所有者可以读写,其他人只能读

const DefaultFileSize = 256 * 1024 * 1024

const (
FileIOType = iota + 1 // Standard File IO
BufIOType // File IO with buffer
MmapIOType // Memory Mapping IO
)

// IOManager 抽象 IO 管理接口, 可以接入不同的 IO 类型, 目前支持标准文件 IO
type IOManager interface {
// Read 从文件的给定位置读取对应的数据
Expand All @@ -19,3 +27,16 @@ type IOManager interface {
// Size get file size
Size() (int64, error)
}

// NewIOManager get IOManager based on type
func NewIOManager(filename string, fileSize int64, fioType int8) (IOManager, error) {
switch fioType {
case FileIOType:
return NewFileIOManager(filename)
case BufIOType:
return NewBufIOManager(filename)
case MmapIOType:
return NewMMapIOManager(filename, fileSize)
}
return NewFileIOManager(filename)
}
36 changes: 20 additions & 16 deletions fio/mmap_io.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,18 @@ import (
"unsafe"
)

// DefaultMemMapSize 最大映射内存大小
const DefaultMemMapSize = 256 * 1024 * 1024

// MMapIO 标准系统文件IO
type MMapIO struct {
fd *os.File // 系统文件描述符
data []byte // 与文件对应的映射区
dirty bool // 是否更改过
offset int64 // 写入位置
fd *os.File // system file descriptor
data []byte // the mapping area corresponding to the file
dirty bool // has changed
offset int64 // next write location
fileSize int64 // max file size
}

// NewMMapIOManager 初始化标准文件 IO
func NewMMapIOManager(fileName string) (*MMapIO, error) {
mmapIO := &MMapIO{}
// NewMMapIOManager Initialize Mmap IO
func NewMMapIOManager(fileName string, fileSize int64) (*MMapIO, error) {
mmapIO := &MMapIO{fileSize: fileSize}

fd, err := os.OpenFile(
fileName,
os.O_CREATE|os.O_RDWR|os.O_APPEND,
Expand All @@ -31,13 +29,13 @@ func NewMMapIOManager(fileName string) (*MMapIO, error) {
}
info, _ := fd.Stat()

// 将文件扩容到映射区大小, 保存时会裁剪
if err := fd.Truncate(DefaultMemMapSize); err != nil {
// Expand files to maximum file size, crop when saving
if err := fd.Truncate(fileSize); err != nil {
return nil, err
}

// 构建映射
b, err := syscall.Mmap(int(fd.Fd()), 0, DefaultMemMapSize, syscall.PROT_WRITE|syscall.PROT_READ, syscall.MAP_SHARED)
// Building mappings between memory and disk files
b, err := syscall.Mmap(int(fd.Fd()), 0, int(fileSize), syscall.PROT_WRITE|syscall.PROT_READ, syscall.MAP_SHARED)
if err != nil {
return nil, err
}
Expand All @@ -48,14 +46,16 @@ func NewMMapIOManager(fileName string) (*MMapIO, error) {
return mmapIO, nil
}

// Read Copy data from the mapping area to byte slice
func (mio *MMapIO) Read(b []byte, offset int64) (int, error) {
return copy(b, mio.data[offset:]), nil
}

// Write Copy data from byte slice to the mapping area
func (mio *MMapIO) Write(b []byte) (int, error) {
oldOffset := mio.offset
newOffset := mio.offset + int64(len(b))
if newOffset > DefaultMemMapSize {
if newOffset > mio.fileSize {
return 0, errors.New("exceed file max content length")
}

Expand All @@ -64,6 +64,7 @@ func (mio *MMapIO) Write(b []byte) (int, error) {
return copy(mio.data[oldOffset:], b), nil
}

// Sync Synchronize data from memory to disk
func (mio *MMapIO) Sync() error {
if !mio.dirty {
return nil
Expand All @@ -78,6 +79,7 @@ func (mio *MMapIO) Sync() error {
return nil
}

// Close file
func (mio *MMapIO) Close() (err error) {
if err = mio.fd.Truncate(mio.offset); err != nil {
return err
Expand All @@ -91,10 +93,12 @@ func (mio *MMapIO) Close() (err error) {
return mio.fd.Close()
}

// Size return the size of current file
func (mio *MMapIO) Size() (int64, error) {
return mio.offset, nil
}

// UnMap Unmapping between memory and files
func (mio *MMapIO) UnMap() error {
if mio.data == nil {
return nil
Expand Down
14 changes: 7 additions & 7 deletions fio/mmap_io_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (

func TestNewMMapIOManager(t *testing.T) {
path := filepath.Join("/tmp", "a.data")
mio, err := NewMMapIOManager(path)
mio, err := NewMMapIOManager(path, DefaultFileSize)
defer destoryFile(path)

assert.Nil(t, err)
Expand All @@ -17,7 +17,7 @@ func TestNewMMapIOManager(t *testing.T) {

func TestMMapIO_Write(t *testing.T) {
path := filepath.Join("/tmp", "a.data")
mio, err := NewMMapIOManager(path)
mio, err := NewMMapIOManager(path, DefaultFileSize)
defer destoryFile(path)
assert.Nil(t, err)
assert.NotNil(t, mio)
Expand All @@ -36,7 +36,7 @@ func TestMMapIO_Write(t *testing.T) {

func TestMMapIO_Read(t *testing.T) {
path := filepath.Join("/tmp", "a.data")
mio, err := NewMMapIOManager(path)
mio, err := NewMMapIOManager(path, DefaultFileSize)
defer destoryFile(path)

assert.Nil(t, err)
Expand All @@ -60,7 +60,7 @@ func TestMMapIO_Read(t *testing.T) {

func TestMMapIO_Sync(t *testing.T) {
path := filepath.Join("/tmp", "a.data")
mio, err := NewMMapIOManager(path)
mio, err := NewMMapIOManager(path, DefaultFileSize)
defer destoryFile(path)

assert.Nil(t, err)
Expand All @@ -73,7 +73,7 @@ func TestMMapIO_Sync(t *testing.T) {

func TestMMapIO_Close(t *testing.T) {
path := filepath.Join("/tmp", "a.data")
mio, err := NewMMapIOManager(path)
mio, err := NewMMapIOManager(path, DefaultFileSize)
defer destoryFile(path)

assert.Nil(t, err)
Expand All @@ -88,7 +88,7 @@ func TestMMapIO_Close(t *testing.T) {

func TestMMapIO_Write_Speed(t *testing.T) {
path := filepath.Join("/tmp", "a.data")
mio, err := NewMMapIOManager(path)
mio, err := NewMMapIOManager(path, DefaultFileSize)
assert.Nil(t, err)
assert.NotNil(t, mio)

Expand All @@ -104,7 +104,7 @@ func TestMMapIO_Write_Speed(t *testing.T) {

func TestMMapIO_Read_Speed(t *testing.T) {
path := filepath.Join("/tmp", "a.data")
mio, err := NewMMapIOManager(path)
mio, err := NewMMapIOManager(path, DefaultFileSize)
defer destoryFile(path)
assert.Nil(t, err)
assert.NotNil(t, mio)
Expand Down
Loading

0 comments on commit 095a932

Please sign in to comment.