我们先从加载数据开始,因为服务重启的时候,需要从磁盘加载数据到内存中,加载的代码如下:
func OpenDB(options DBOptions) (*DB, error) {
// 判断目录是否存在
if _, err := os.Stat(options.DirPath); err != nil {
if err := os.MkdirAll(options.DirPath, os.ModePerm); err != nil {
return nil, err
}
}
// 目录锁
fileLock := flock.New(filepath.Join(options.DirPath, fileLockName))
hold, err := fileLock.TryLock()
if err != nil {
return nil, err
}
if !hold {
return nil, ErrDatabaseIsUsing
}
// 打开所有内存文件,构造内存数据
memTables, err := OpenAllMemTables(options)
if err != nil {
return nil, err
}
db := &DB{
activeMem: memTables[len(memTables)-1],
immutableMem: memTables[:len(memTables)-1],
batchPool: sync.Pool{New: MakeBatch},
}
return db, nil
}
这里对以上代码进行讲解:
memTables, err := OpenAllMemTables(options)
该方法是核心方法,代码如下:
func OpenAllMemTables(options utils.DBOptions) ([]*MemTable, error) {
entries, err := os.ReadDir(options.DirPath)
if err != nil {
return nil, err
}
// 获取内存编号 id
var tableIDs []int
for _, entry := range entries {
if entry.IsDir() {
continue
}
var id int
var prefix int
_, err := fmt.Sscanf(entry.Name(), utils.WalFileExt+"%d", &id, &prefix)
if err != nil {
continue
}
tableIDs = append(tableIDs, id)
}
// 默认初始化一个内存id
if len(tableIDs) == 0 {
tableIDs = append(tableIDs, 1)
}
tableIDs = removeDuplicates(tableIDs)
// 排序
sort.Ints(tableIDs)
// 多个内存
tables := make([]*MemTable, len(tableIDs))
for i, table := range tableIDs {
mem := MemTableOptions{
WalDir: options.DirPath,
Id: table,
SklMemSize: options.MemTableSize,
WalIsSync: options.IsSync,
WalCacheSize: options.BlockCacheSize,
WalBytesPerSync: options.BytesPerSync,
}
table, err := OpenMemTable(mem, options)
if err != nil {
return nil, err
}
tables[i] = table
}
return tables, nil
}
这里面主要是打开存放数据的目录,我们先看看生成的数据文件长啥样子?
其中详细操作有以下步骤:
让我们看看内存页如何生成:
// OpenMemTable 通过wal构建skl
func OpenMemTable(memOptions MemTableOptions, dbOptions utils.DBOptions) (*MemTable, error) {
// 初始化跳表
skl := badgerskl.NewSkiplist(int64(float64(memOptions.SklMemSize) * 1.5))
// 初始化内存对象
memTable := &MemTable{Options: memOptions, Skl: skl}
// 构建wal对象
op := tinywal.WalOptionOptions{
DirPath: memOptions.WalDir,
SegmentFileExt: fmt.Sprintf(dbOptions.SegmentFileExt, memOptions.Id), // 一个内存表对应一个文件
SegmentSize: dbOptions.SegmentSize,
LocalCacheSize: memOptions.WalCacheSize,
IsSync: memOptions.WalIsSync,
BytesPerSync: memOptions.WalBytesPerSync,
}
//打开WAL文件
wal, _ := OpenWal(op)
memTable.Wal = wal
//加载WAL文件对应的数据
err := LoadWal(memTable, wal)
if err != nil {
return nil, err
}
return memTable, nil
}
这一步的过程其实相对简单,主要是打开数据文件
wal, _ := OpenWal(op)
并把数据文件的内容加载到memTable中。
err := LoadWal(memTable, wal)
在这个过程中,memTable的WAL属性得到了设置。
我们可以看看MemTable的结构:
// MemTable 内存对象
type MemTable struct {
Wal *tinywal.TinyWal // 保证内存数据的持久化
Options MemTableOptions
sync.RWMutex
Skl *badgerskl.Skiplist // 内存数据(跳表)
}
其实最主要是设置内存页对应的Skl的数据和该内存页对应的Wal的数据文件。
我们来分析下如何打开WAL文件:
// OpenWal 基于目录中的segment file构建tinywal对象
func OpenWal(options WalOptionOptions) (*TinyWal, error) {
// 判断文件后缀格式是否正确
if !strings.HasSuffix(options.SegmentFileExt, ".") {
return nil, errors.New("文件后缀必须以.开头")
}
// 如果目录不存在创建目录;如果目录存在啥也不做
err := os.MkdirAll(options.DirPath, fs.ModePerm)
if err != nil {
return nil, err
}
tinywal := TinyWal{
Options: options,
ImmutableSegmentFile: make(map[SegmentFileId]*SegmentFile),
ActiveSegmentFile: nil,
}
// LocalCacheSize > 0 需要缓存
if options.LocalCacheSize > 0 {
// 计算缓存block个数
blockNum := options.LocalCacheSize / BlockSize
if options.LocalCacheSize%BlockSize != 0 {
blockNum++
}
tinywal.LocalCache, err = lru.New[uint64, []byte](int(blockNum))
if err != nil {
return nil, err
}
}
// 读取目录中所有文件
dirEntries, err := os.ReadDir(options.DirPath)
if err != nil {
return nil, err
}
var segmentFileIds []int
for _, entry := range dirEntries {
if entry.IsDir() {
continue
}
// 提取文件名中以SegmentFileExt为后缀的段文件
segmentFileId := 0
_, err = fmt.Sscanf(entry.Name(), options.SegmentFileExt, &segmentFileId)
if err != nil { // 说明这个文件名格式不符合
continue
}
segmentFileIds = append(segmentFileIds, segmentFileId)
}
if len(segmentFileIds) == 0 { // 说明第一次
segment, err := OpenSegmentFile(options.DirPath, options.SegmentFileExt, FirstSegmentFileId, tinywal.LocalCache)
if err != nil {
return nil, err
}
tinywal.ActiveSegmentFile = segment
} else {
sort.Ints(segmentFileIds)
for i, segmentFileId := range segmentFileIds { // 打开所有的文件
segment, err := OpenSegmentFile(options.DirPath, options.SegmentFileExt, uint32(segmentFileId), tinywal.LocalCache)
if err != nil {
return nil, err
}
if i == len(segmentFileIds)-1 { // 最后一个文件为活跃segment
tinywal.ActiveSegmentFile = segment
} else {
tinywal.ImmutableSegmentFile[uint32(segmentFileId)] = segment
}
}
}
return &tinywal, nil
}
这里的代码我们进行分析下:
先判断文件后缀是否符合
如果文件目录不存在,那么新建一个目录
判断是否需要缓存block,如果需要的话那么判断缓存的个数,主要是为了加快查找数据的速度。
读取指定的options.SegmentFileExt前缀,进行文件名匹配
_, err = fmt.Sscanf(entry.Name(), options.SegmentFileExt, &segmentFileId)
比如:options.SegmentFileExt=11.MEM. 那么我们可以读取到以下二个文件:
11.MEM.0000000001
11.MEM.0000000002
判断是否匹配到文件,如果没有的话,那么我们打开一个数据文件【这里叫segmentfile】,如果存在的话,我们对取值进行排序。之所以排序是因为,数字越大,说明是最新的文件。最大的数字对应的文件需要设置为ActiveSegmentFile,其他的文件需要设置为ImmutableSegmentFile。
最终这个memtable的形成如下:
All comments