Golang实现自己的Redis(持久化AOF)

Golang实现自己的Redis(持久化AOF)

用11篇文章实现一个可用的Redis服务,姑且叫EasyRedis吧,希望通过文章将Redis掰开撕碎了呈现给大家,而不是仅仅停留在八股文的层面,并且有非常爽的感觉,欢迎持续关注学习。

项目代码地址: https://Github.com/gofish2020/easyredis 欢迎Fork & Star

  • [x] easyredis之TCP服务
  • [x] easyredis之网络请求序列化协议(RESP)
  • [x] easyredis之内存数据库
  • [x] easyredis之过期时间 (时间轮实现)
  • [x] easyredis之持久化 (AOF实现)
  • [ ] easyredis之发布订阅功能
  • [ ] easyredis之有序集合(跳表实现)
  • [ ] easyredis之 pipeline 客户端实现
  • [ ] easyredis之事务(原子性/回滚)
  • [ ] easyredis之连接池
  • [ ] easyredis之分布式集群存储

【第五篇】EasyRedis之持久化AOF

AOF全称Append Only File,就是将写相关的命令,追加保存到文件中,当服务器重启以后,将文件中的命令在服务端重放(重新执行恢复数据),实现的一种持久化方式。

本篇通过3个部分讲解AOF的实现:

  • AOF的写入过程
  • AOF的加载过程
  • AOF的重写过程

AOF的写入过程

在核心的数据结构 Engine中新增一个 aof *AOF对象

// 存储引擎,负责数据的CRUD
type Engine struct {
// *DB
dbSet []*atomic.Value
// 时间轮(延迟任务)
delay *timewheel.Delay
// Append Only File
aof *aof.AOF
}

在初始化函数func NewEngine() *Engine中,会基于是否启用AOF日志,决定 aof *aof.AOF的初始化

func NewEngine() *Engine {

//.....省略....

// 启用AOF日志
if conf.GlobalConfig.AppendOnly {
// 创建*AOF对象
aof, err := aof.NewAOF(conf.GlobalConfig.AppendFilename, engine, true, conf.GlobalConfig.AppendFsync)
if err != nil {
panic(err)
}
engine.aof = aof
// 设定每个db,使用aof写入日志
engine.aofBindEveryDB()
}
return engine
}

因为实际执行redis命令的对象是 *DB,所以会对每个*DB对象设定db.writeAof函数指针


func (e *Engine) aofBindEveryDB() {
for _, dbSet := range e.dbSet {
db := dbSet.Load().(*DB)
db.writeAof = func(redisCommand [][]byte) {
if conf.GlobalConfig.AppendOnly {
// 调用e.aof对象方法,保存命令
e.aof.SaveRedisCommand(db.index, aof.Command(redisCommand))
}
}
}
}



例如,当我们执行 set key value命令的时候,实际会执行 func cmdSet(db *DB, args [][]byte) protocol.Reply


func cmdSet(db *DB, args [][]byte) protocol.Reply {

//.....省略....

if result > 0 { // 1 表示存储成功
//TODO: 过期时间处理
if ttl != nolimitedTTL { // 设定key过期
expireTime := time.Now().Add(time.Duration(ttl) * time.Millisecond)
db.ExpireAt(key, expireTime)
//写入日志
db.writeAof(aof.SetCmd([][]byte{args[0], args[1]}...))
db.writeAof(aof.PExpireAtCmd(string(args[0]), expireTime))
} else { // 设定key不过期
db.Persist(key)
//写入日志
db.writeAof(aof.SetCmd(args...))
}
return protocol.NewOkReply()
}

return protocol.NewNullBulkReply()
}

可以看到,会调用上面刚才设定的db.writeAof函数,将当前的命令保存到AOF中。所以我们实际看下 SaveRedisCommand函数中具体在做什么事情。代码路径位于aof/aof.go

func (aof *AOF) SaveRedisCommand(dbIndex int, command Command) {

// 关闭
if aof.atomicClose.Load() {
return
}
// 写入文件 & 刷盘
if aof.aofFsync == FsyncAlways {
record := aofRecord{
dbIndex: dbIndex,
command: command,
}
aof.writeAofRecord(record)
return
}
// 写入缓冲
aof.aofChan <- aofRecord{
dbIndex: dbIndex,
command: command,
}
}

因为AOF的刷盘(Sync)有三种模式:

  • 写入 & 立即刷盘
  • 写入 & 每秒刷盘
  • 写入 & 不主动刷盘(取决于操作系统自动刷盘)

如果配置的是always模式,会立即执行aof.writeAofRecord(record);否则就将数据先保存在缓冲aof.aofChan中(这里其实又是生产者消费者模型)最后在消费协程中,执行写入

func (aof *AOF) watchChan() {

for record := range aof.aofChan {
aof.writeAofRecord(record)
}
aof.aofFinished <- struct{}{}
}

所以我们只需要看下 writeAofRecord函数即可,其实就是把命令按照Redis 序列化协议的格式,写入到文件中。给大家看下更直观的演示图:

Golang实现自己的Redis(持久化AOF)再看下在 append.aof文件中具体的数据格式:Golang实现自己的Redis(持久化AOF)

这里有个很重要点:因为AOF文件是所有的*DB对象复用的文件,写入的redis命令归属于不同的数据库的

举个例子: 比如在0号数据库,我们执行set key value,在3号数据库,我们执行set key value,在日志文件中会记录两条命令,但是这两个命令其实是不同数据库的命令。在恢复命令到数据库的时候,应该在不同的数据库中执行该命令。所以在记录命令的时候,我们还要记录下他的数据库是什么?这样恢复的时候,才能知道命令的数据库的归属问题。


func (aof *AOF) writeAofRecord(record aofRecord) {

aof.mu.Lock()
defer aof.mu.Unlock()

// 因为aof对象是所有数据库对象【复用】写入文件方法,每个数据库的索引不同
// 所以,每个命令的执行,有个前提就是操作的不同的数据库
if record.dbIndex != aof.lastDBIndex {
// 构建select index 命令 & 写入文件
selectCommand := [][]byte{[]byte("select"), []byte(strconv.Itoa(record.dbIndex))}
data := protocol.NewMultiBulkReply(selectCommand).ToBytes()
_, err := aof.aofFile.Write(data)
if err != nil {
logger.Warn(err)
return
}
aof.lastDBIndex = record.dbIndex
}

// redis命令
data := protocol.NewMultiBulkReply(record.command).ToBytes()
_, err := aof.aofFile.Write(data)
if err != nil {
logger.Warn(err)
}
logger.Debugf("write aof command:%q", data)
// 每次写入刷盘
if aof.aofFsync == FsyncAlways {
aof.aofFile.Sync()
}
}

AOF的加载过程

在服务启动的时候,将*.aof文件中的命令,在服务端进行重放,效果演示如下:Golang实现自己的Redis(持久化AOF)

代码路径位于aof/aof.go

// 构建AOF对象
func NewAOF(aofFileName string, engine abstract.Engine, load bool, fsync string) (*AOF, error) {
//...省略...

// 启动加载aof文件
if load {
aof.LoadAof(0)
}


//...省略...
}

aof.LoadAof(0)函数的本质就是从文件中,按照行读取数据。如果看过之前的文章,这里其实复用了parser.ParseStream(reader)函数,负责从文件解析redis序列化协议格式的命令,最后利用数据库引擎,将命令数据保存到内存中(命令重放)

func (aof *AOF) LoadAof(maxBytes int) {

// 目的:当加载aof文件的时候,因为需要复用engine对象,内部重放命令的时候会自动写aof日志,加载aof 禁用 SaveRedisCommand的写入
aof.atomicClose.Store(true)
defer func() {
aof.atomicClose.Store(false)
}()

// 只读打开文件
file, err := os.Open(aof.aofFileName)
if err != nil {
logger.Error(err.Error())
return
}
defer file.Close()
file.Seek(0, io.SeekStart)

var reader io.Reader
if maxBytes > 0 { // 限定读取的字节大小
reader = io.LimitReader(file, int64(maxBytes))
} else { // 不限定,直接读取到文件结尾(为止)
reader = file
}

// 文件中保存的格式和网络传输的格式一致
ch := parser.ParseStream(reader)
virtualConn := connection.NewVirtualConn()

for payload := range ch {
if payload.Err != nil {
// 文件已经读取到“完成“
if payload.Err == io.EOF {
break
}
// 读取到非法的格式
logger.Errorf("LoadAof parser error %+v:", payload.Err)
continue
}

if payload.Reply == nil {
logger.Error("empty payload data")
continue
}
// 从文件中读取到命令
reply, ok := payload.Reply.(*protocol.MultiBulkReply)
if !ok {
logger.Error("require multi bulk protocol")
continue
}

// 利用数据库引擎,将命令数据保存到内存中(命令重放)
ret := aof.engine.Exec(virtualConn, reply.RedisCommand)
// 判断是否执行失败
if protocol.IsErrReply(ret) {
logger.Error("exec err ", string(ret.ToBytes()))
}
// 判断命令是否是"select"
if strings.ToLower(string(reply.RedisCommand[0])) == "select" {
dbIndex, err := strconv.Atoi(string(reply.RedisCommand[1]))
if err == nil {
aof.lastDBIndex = dbIndex // 记录下数据恢复过程中,选中的数据库索引
}
}
}
}

AOF的重写过程

代码路径aof/rewrite.go重写的过程就是下面的函数

func (aof *AOF) Rewrite(engine abstract.Engine) {
//1.对现有的aof文件做一次快照
snapShot, err := aof.startRewrite()
if err != nil {
logger.Errorf("StartRewrite err: %+v", err)
return
}

//2. 将现在的aof文件数据,加在到新(内存)对象中,并重写入新aof文件中
err = aof.doRewrite(snapShot, engine)
if err != nil {
logger.Errorf("doRewrite err: %+v", err)
return
}

//3. 将重写过程中的增量命令写入到新文件中
err = aof.finishRewrite(snapShot)
if err != nil {
logger.Errorf("finishRewrite err: %+v", err)
}
}

整个的处理思想很重要:如下图

Golang实现自己的Redis(持久化AOF)

总结

代码的思路应该还是比较清晰,但是细节上的处理非常容易让人大脑宕机。建议还是看下源码,边看边自己敲一下,感受是不一样的


原文始发于微信公众号(nullbody笔记):Golang实现自己的Redis(持久化AOF)

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/223671.html

(0)
小半的头像小半

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!