Golang实现KV存储引擎(三)

727 Visits / 0 Comments / Favorite

我们先从加载数据开始,因为服务重启的时候,需要从磁盘加载数据到内存中,加载的代码如下:

func OpenDB(options DBOptions) (*DB, error) {
	// 判断目录是否存在
	if _, err := os.Stat(options.DirPath); err != nil {
		if err := os.MkdirAll(options.DirPath, os.ModePerm); err != nil {
			return nil, err
		}
	}
	// 目录锁
	fileLock := flock.New(filepath.Join(options.DirPath, fileLockName))
	hold, err := fileLock.TryLock()
	if err != nil {
		return nil, err
	}
	if !hold {
		return nil, ErrDatabaseIsUsing
	}

	// 打开所有内存文件,构造内存数据
	memTables, err := OpenAllMemTables(options)
	if err != nil {
		return nil, err
	}
	db := &DB{
		activeMem:    memTables[len(memTables)-1],
		immutableMem: memTables[:len(memTables)-1],
		batchPool:    sync.Pool{New: MakeBatch},
	}
	return db, nil
}

这里对以上代码进行讲解:

  1. 加一个目录锁的目的在于排除掉其他重启的进程,就像你快速点了二下打开微信软件一样,总不能弹出2个窗口吧,更多的是只想打开一个。
  2. 最核心的就是加载磁盘的数据到内存中。就是
memTables, err := OpenAllMemTables(options)

该方法是核心方法,代码如下:

func OpenAllMemTables(options utils.DBOptions) ([]*MemTable, error) {
	entries, err := os.ReadDir(options.DirPath)
	if err != nil {
		return nil, err
	}
	// 获取内存编号 id
	var tableIDs []int
	for _, entry := range entries {
		if entry.IsDir() {
			continue
		}
		var id int
		var prefix int
		_, err := fmt.Sscanf(entry.Name(), utils.WalFileExt+"%d", &id, &prefix)
		if err != nil {
			continue
		}
		tableIDs = append(tableIDs, id)
	}
	// 默认初始化一个内存id
	if len(tableIDs) == 0 {
		tableIDs = append(tableIDs, 1)
	}

	tableIDs = removeDuplicates(tableIDs)
	// 排序
	sort.Ints(tableIDs)

	// 多个内存
	tables := make([]*MemTable, len(tableIDs))

	for i, table := range tableIDs {
		mem := MemTableOptions{
			WalDir:          options.DirPath,
			Id:              table,
			SklMemSize:      options.MemTableSize,
			WalIsSync:       options.IsSync,
			WalCacheSize:    options.BlockCacheSize,
			WalBytesPerSync: options.BytesPerSync,
		}
		table, err := OpenMemTable(mem, options)
		if err != nil {
			return nil, err
		}
		tables[i] = table
	}
	return tables, nil
}

这里面主要是打开存放数据的目录,我们先看看生成的数据文件长啥样子?

data.png

其中详细操作有以下步骤:

  1. 递归读取目录,获取数据文件。
  2. 获取文件的id前缀,就是前面的1-23序号,如果不存在,那么给设置一个默认的序号ID为1。
  3. 去除掉重复的ID序号
  4. 对ID序号进行排列
  5. 开始读取ID对应的文件,并开始生成MemTable内存页数据。并且返回内存页数组。

让我们看看内存页如何生成:

// OpenMemTable 通过wal构建skl
func OpenMemTable(memOptions MemTableOptions, dbOptions utils.DBOptions) (*MemTable, error) {
	// 初始化跳表
	skl := badgerskl.NewSkiplist(int64(float64(memOptions.SklMemSize) * 1.5))
	// 初始化内存对象
	memTable := &MemTable{Options: memOptions, Skl: skl}
	// 构建wal对象
	op := tinywal.WalOptionOptions{
		DirPath:        memOptions.WalDir,
		SegmentFileExt: fmt.Sprintf(dbOptions.SegmentFileExt, memOptions.Id), // 一个内存表对应一个文件
		SegmentSize:    dbOptions.SegmentSize,
		LocalCacheSize: memOptions.WalCacheSize,
		IsSync:         memOptions.WalIsSync,
		BytesPerSync:   memOptions.WalBytesPerSync,
	}
	//打开WAL文件
	wal, _ := OpenWal(op)
	memTable.Wal = wal
	//加载WAL文件对应的数据
	err := LoadWal(memTable, wal)
	if err != nil {
		return nil, err
	}
	return memTable, nil
}

这一步的过程其实相对简单,主要是打开数据文件

wal, _ := OpenWal(op)

并把数据文件的内容加载到memTable中。

err := LoadWal(memTable, wal)

在这个过程中,memTable的WAL属性得到了设置。

我们可以看看MemTable的结构:

// MemTable 内存对象
type MemTable struct {
	Wal     *tinywal.TinyWal // 保证内存数据的持久化
	Options MemTableOptions
	sync.RWMutex
	Skl *badgerskl.Skiplist // 内存数据(跳表)
}

其实最主要是设置内存页对应的Skl的数据和该内存页对应的Wal的数据文件。

我们来分析下如何打开WAL文件:

// OpenWal 基于目录中的segment file构建tinywal对象
func OpenWal(options WalOptionOptions) (*TinyWal, error) {
	// 判断文件后缀格式是否正确
	if !strings.HasSuffix(options.SegmentFileExt, ".") {
		return nil, errors.New("文件后缀必须以.开头")
	}
	// 如果目录不存在创建目录;如果目录存在啥也不做
	err := os.MkdirAll(options.DirPath, fs.ModePerm)
	if err != nil {
		return nil, err
	}
	tinywal := TinyWal{
		Options:              options,
		ImmutableSegmentFile: make(map[SegmentFileId]*SegmentFile),
		ActiveSegmentFile:    nil,
	}
	// LocalCacheSize > 0 需要缓存
	if options.LocalCacheSize > 0 {
		// 计算缓存block个数
		blockNum := options.LocalCacheSize / BlockSize
		if options.LocalCacheSize%BlockSize != 0 {
			blockNum++
		}
		tinywal.LocalCache, err = lru.New[uint64, []byte](int(blockNum))
		if err != nil {
			return nil, err
		}
	}
	// 读取目录中所有文件
	dirEntries, err := os.ReadDir(options.DirPath)
	if err != nil {
		return nil, err
	}

	var segmentFileIds []int
	for _, entry := range dirEntries {
		if entry.IsDir() {
			continue
		}
		// 提取文件名中以SegmentFileExt为后缀的段文件
		segmentFileId := 0
		_, err = fmt.Sscanf(entry.Name(), options.SegmentFileExt, &segmentFileId)
		if err != nil { // 说明这个文件名格式不符合
			continue
		}
		segmentFileIds = append(segmentFileIds, segmentFileId)
	}

	if len(segmentFileIds) == 0 { // 说明第一次
		segment, err := OpenSegmentFile(options.DirPath, options.SegmentFileExt, FirstSegmentFileId, tinywal.LocalCache)
		if err != nil {
			return nil, err
		}
		tinywal.ActiveSegmentFile = segment
	} else {
		sort.Ints(segmentFileIds)
		for i, segmentFileId := range segmentFileIds { // 打开所有的文件
			segment, err := OpenSegmentFile(options.DirPath, options.SegmentFileExt, uint32(segmentFileId), tinywal.LocalCache)
			if err != nil {
				return nil, err
			}

			if i == len(segmentFileIds)-1 { // 最后一个文件为活跃segment
				tinywal.ActiveSegmentFile = segment
			} else {
				tinywal.ImmutableSegmentFile[uint32(segmentFileId)] = segment
			}
		}
	}
	return &tinywal, nil
}

这里的代码我们进行分析下:

先判断文件后缀是否符合

如果文件目录不存在,那么新建一个目录

判断是否需要缓存block,如果需要的话那么判断缓存的个数,主要是为了加快查找数据的速度。

读取指定的options.SegmentFileExt前缀,进行文件名匹配

_, err = fmt.Sscanf(entry.Name(), options.SegmentFileExt, &segmentFileId)

比如:options.SegmentFileExt=11.MEM. 那么我们可以读取到以下二个文件:

11.MEM.0000000001

11.MEM.0000000002

判断是否匹配到文件,如果没有的话,那么我们打开一个数据文件【这里叫segmentfile】,如果存在的话,我们对取值进行排序。之所以排序是因为,数字越大,说明是最新的文件。最大的数字对应的文件需要设置为ActiveSegmentFile,其他的文件需要设置为ImmutableSegmentFile。

最终这个memtable的形成如下:

struct.png

All comments

Top