redis(2)——网络处理、线程模型和rdb,aof持久化
redis 通过网络服务接收远程命令,进行处理,将执行结果返回。
redis的rdb和aof 都是经典的数据持久化/备份手段,rdb关注数据库的数据, aof关注数据库的操作。
网络处理
redis 的网络处理类似常见的IO多路复用网络库,通过事件通知实现单线程处理大量来自不同客户端的事件。
事件和eventloop 事件循环
三种事件aeFileEvent(注册的网络事件),aeTimeEvent(定时事件),firedEvent用来存放epoll返回的就绪事件(即需要处理的事件)
网络事件通过epoll 监听fd可读可写(即网络包可读可写)触发;定时事件通过获得下一个最近事件的执行时间、定时触发;
1 | aeFileEvent |
eventloop的核心是aeProcessEvents函数,执行逻辑如下
- 计算最近的定时事件需要等待的时间t。shortest = aeSearchNearestTimer(eventLoop)
- numevents = aeApiPoll(eventLoop, tvp); 执行epoll_wait,超时时间为上述需要等待的时间t
- 遍历eventLoop->fired[j].fd获得触发的事件,根据事件读写类型执行处理函数;fe->rfileProc(eventLoop,fd,fe->clientData,mask); fe->wfileProc(eventLoop,fd,fe->clientData,mask);
- 处理定时事件processTimeEvents
事件处理函数
- acceptTcpHandler
用来接受和建立连接,acceptTcpHandler会创建client
创建Client时,同时创建对该client的fileevent 可读事件,处理函数是readQueryFromClient。linux的accept() 会从监听 socket 的已完成连接队列中取出一个客户端连接,并为其创建一个 新的 socket。该fd标志了client和filevent可读事件。
- readQueryFromClient
void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask)
- 生成c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);
- nread = read(fd, c->querybuf+qblen, readlen);
- 调用processInputBuffer(c);函数
processInputBuffer 内部是processCommand,主要执行两步
- 获取命令
c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
- 处理命令call(c,CMD_CALL_FULL);
线程模型
redis 4.0版本监听网络请求,处理请求和命令都由一个主线程完成。
- 好处是redis 主IO路径无须加锁保护。
- 缺点是redis 不可处理耗时长的命令,这在使用自定义命令和复杂lua脚本里需要注意。此外redis 无法利用多核cpu,这对部署机器的cpu单核能力有要求。
redis 使用多线程的地方很少,只有adb, aof后台执行,和lazyfree等操作。
fork 子进程
fork() 创建子进程,对于父进程:fork() 返回新创建子进程的进程ID(PID)。这是一个正整数。
对于子进程:fork() 返回0。
如果 fork() 失败:则返回-1,并设置 errno 以指示错误原因。
fork使用到的地方
- 后台创建子进程执行rdb备份任务
1 | int rdbSaveBackground(char *filename, rdbSaveInfo *rsi) { |
- aof 创建子进程执行rewriteAppendOnlyFile
1 | int rewriteAppendOnlyFileBackground(void) { |
- server如果设置了daemonize,则当前进程退出,创建子进程执行后续任务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16void daemonize(void) {
int fd;
if (fork() != 0) exit(0); /* parent exits */
setsid(); /* create a new session */
/* Every output goes to /dev/null. If Redis is daemonized but
* the 'logfile' is set to 'stdout' in the configuration file
* it will not log at all. */
if ((fd = open("/dev/null", O_RDWR, 0)) != -1) {
dup2(fd, STDIN_FILENO);
dup2(fd, STDOUT_FILENO);
dup2(fd, STDERR_FILENO);
if (fd > STDERR_FILENO) close(fd);
}
} - sentinelRunPendingScripts,子进程执行脚本,父进程返回
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37/* Run pending scripts if we are not already at max number of running
* scripts. */
void sentinelRunPendingScripts(void) {
listNode *ln;
listIter li;
mstime_t now = mstime();
/* Find jobs that are not running and run them, from the top to the
* tail of the queue, so we run older jobs first. */
listRewind(sentinel.scripts_queue,&li);
while (sentinel.running_scripts < SENTINEL_SCRIPT_MAX_RUNNING &&
(ln = listNext(&li)) != NULL)
{
sentinelScriptJob *sj = ln->value;
pid_t pid;
pid = fork();
if (pid == -1) {
/* Parent (fork error).
* We report fork errors as signal 99, in order to unify the
* reporting with other kind of errors. */
sentinelEvent(LL_WARNING,"-script-error",NULL,
"%s %d %d", sj->argv[0], 99, 0);
sj->flags &= ~SENTINEL_SCRIPT_RUNNING;
sj->pid = 0;
} else if (pid == 0) {
/* Child */
execve(sj->argv[0],sj->argv,environ);
/* If we are here an error occurred. */
_exit(2); /* Don't retry execution. */
} else {
sentinel.running_scripts++;
sj->pid = pid;
sentinelEvent(LL_DEBUG,"+script-child",NULL,"%ld",(long)pid);
}
}
pthread_create 多线程
pthread_create创建线程被bio封装
1 | /* Initialize the background system, spawning the thread. */ |
bioCreateBackgroundJob 使用的地方
- aof后台fsync
1 | void aof_background_fsync(int fd) { |
- lazyfree
1
2
3
4
5
6
7
8
9
10/* Empty a Redis DB asynchronously. What the function does actually is to
* create a new empty set of hash tables and scheduling the old ones for
* lazy freeing. */
void emptyDbAsync(redisDb *db) {
dict *oldht1 = db->dict, *oldht2 = db->expires;
db->dict = dictCreate(&dbDictType,NULL);
db->expires = dictCreate(&keyptrDictType,NULL);
atomicIncr(lazyfree_objects,dictSize(oldht1));
bioCreateBackgroundJob(BIO_LAZY_FREE,NULL,oldht1,oldht2);
}
rdb持久化
rdb, replicate db
暂只看rdbsave和rdbload
rdbsave
int rdbSave(char *filename, rdbSaveInfo *rsi)
- snprintf(tmpfile,256,”temp-%d.rdb”, (int) getpid()); fp = fopen(tmpfile,”w”);
- rioInitWithFile(&rdb,fp);
- rdbSaveRio(&rdb,&error,RDB_SAVE_NONE,rsi)
- fflush(fp) == EOF fsync(fileno(fp) fclose(fp) == EOF
- rename(tmpfile,filename)
fflush是flush文件流到page cache,fsync是刷page cache到文件落盘
其中的rdbSaveRio 函数,rdbSaveRio(&rdb,&error,RDB_SAVE_NONE,rsi)
- 写入”REDIS%04d”,RDB_VERSION
- rdbSaveInfoAuxFields(rdb,flags,rsi)
- 遍历server.dbnum
- 对每个db,redisDb。每个db内部其实是db->dict,
- 写入一些元数据,包括rdbSaveType(rdb,RDB_OPCODE_SELECTDB)
- rdbSaveLen(rdb,j)
- rdbSaveType(rdb,RDB_OPCODE_RESIZEDB)
- rdbSaveLen(rdb,db_size)
- rdbSaveLen(rdb,expires_size)
- 遍历db->dict,拿到sds keystr = dictGetKey(de);,robj key,
*o = dictGetVal(de);
,执行rdbSaveKeyValuePair(rdb,&key,o,expire,now)
rdbsave 的调用
rdbSaveBackground执行调用childpid = fork() 启动进程,在子进程里执行rdbSave(filename,rsi);
fork()的子进程初始利用写时复制共享父进程内存。写时复制情况下,父子进程修改共享内存页时,均会复制共享页到新的物理内存页。
写时复制保证子进程的内存始终和父进程调用fork()时的状态一致,就是说,子进程rdbSave 的dict 内容和父进程fork()时的一致,且不会因后续父进程对dict的写入而改变。。以上由操作系统保证。太6了
如果父进程大量修改数据(触发大量 COW),可能导致内存和 CPU 压力增大,但这属于资源问题,影响可控
rdbsave的定时和手动调度
后台执行时会设置执行的child_pid,从而判断是否后台正在执行备份
server.rdb_child_pid != -1 || server.aof_child_pid != -1
replicationCron -> startBgsaveForReplication
replicationCron 一秒钟执行一次
syncCommand-> startBgsaveForReplication
rdbLoadRio
int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi)
从文件里读数据
- 首先读到”REDIS\0”
- 然后是一字节版本,rdbver = atoi(buf+5);。算法
从第一个非空白字符开始,取一个可选的初始加号或者减号,后跟尽可能多的十进制数字,并将它们转换为一个int
类型的数值。 - 循环执行
- type = rdbLoadType(rdb)
- 根据type 类型执行以下操作,type的类型可以是RDB_OPCODE_EOF表示结束
- 解析出key = rdbLoadStringObject(rdb) 和val = rdbLoadObject(type,rdb)
- 将kv写入到hash table dbAdd(db,key,val);
显然rdb会将某时刻hashtable的所有kv写入到rdb文件中。
aof持久化
aof, append only file
rdb关心的是复制db的dict 的key, value,通过写kv数据恢复数据库状态。aof关心的是复制db的增删改操作,通过replay操作恢复数据库状态。
aof rewrite
aof 可以实现全量复制和增量复制。
aof 的rewrite 是将当前状态的dict的key value,全部保存为set key value
操作,从而实现全量复制。
aof 增量复制通过父进程将写操作发给子进程实现。进程通信的方式是使用管道。创建管道会返回两个fd,分别给父子进程。进程利用fd和fileevent事件可以监听管道,子进程监听可写事件,父进程监听可读事件。
当执行aof时,父进程接收到写请求,在处理请求之外,会把写操作写入到管道。当执行rdb或aof时,redis会把执行的子进程id记录,通过判断子进程id存在可以得到是否正在执行rdb/aof。
Redis 4.0 引入了 混合持久化(通过配置 aof-use-rdb-preamble yes 启用):重写后的 AOF 文件会以 RDB 格式开头,后续追加增量 AOF 命令。
此时,AOF 重写过程会先生成 RDB 数据,再追加新写入的 AOF 命令。
子进程执行逻辑
- 如果设置了server.aof_use_rdb_preamble,处理rdb;否则,执行rewriteAppendOnlyFileRio,遍历dict,将当前dict存放的key, value转换成SET key value 写入到aof文件
- 循环执行aofReadDiffFromParent(); 读rewrite期间父进程的写入,到server.aof_child_diff
- 执行rioWrite(&aof,server.aof_child_diff,sdslen(server.aof_child_diff),写入到rio
aofReadDiffFromParent 从管道读数据
1 | /* This function is called by the child rewriting the AOF file to read |
rio对应一个文件
1 | void rioInitWithFile(rio *r, FILE *fp) { |
主进程执行逻辑
- 格式化数据到Buf,
server.expireCommand,argv[1], exarg
- 如果开启了aof,执行server.aof_buf = sdscatlen(server.aof_buf,buf,sdslen(buf));
- 如果server.aof_child_pid != -1,表示有rewrite进程,执行aofRewriteBufferAppend((unsigned char*)buf,sdslen(buf));
pub-sub
pub-sub 是常用的网络服务
订阅-发布和关注某个up主,收到他们更新的消息,是类似的
SubscribeChannel
Subscribe a client to a channel. Returns 1 if the operation succeeded
c->pubsub_channels 是一个dict。注册的结果,key是channel,value是一个client list
pubsubPublishMessage
- 利用dictFind(server.pubsub_channels,channel) 找到channel(key),返回client list(value)
- 遍历list,对每个client,执行
addReply(c,shared.mbulkhdr[3]);
,addReply(c,shared.messagebulk);
addReplyBulk(c,channel); addReplyBulk(c,message);
addReply
- 传入
robj *obj
- addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)
subscribeCommand
1 | void subscribeCommand(client *c) { |
publish command
1 | void publishCommand(client *c) { |
本文标题:redis(2)——网络处理、线程模型和rdb,aof持久化
文章作者:Infinity
发布时间:2025-02-12
最后更新:2025-02-15
版权声明:本博客所有文章除特别声明外,均采用 CC BY-NC-SA 3.0 CN 许可协议。转载请注明出处!