redis 通过网络服务接收远程命令,进行处理,将执行结果返回。

redis的rdb和aof 都是经典的数据持久化/备份手段,rdb关注数据库的数据, aof关注数据库的操作。

网络处理

redis 的网络处理类似常见的IO多路复用网络库,通过事件通知实现单线程处理大量来自不同客户端的事件。

事件和eventloop 事件循环

三种事件aeFileEvent(注册的网络事件),aeTimeEvent(定时事件),firedEvent用来存放epoll返回的就绪事件(即需要处理的事件)

网络事件通过epoll 监听fd可读可写(即网络包可读可写)触发;定时事件通过获得下一个最近事件的执行时间、定时触发;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
aeFileEvent
```cpp
/* File event structure */
typedef struct aeFileEvent {
int mask; /* one of AE_(READABLE|WRITABLE) */
aeFileProc *rfileProc;
aeFileProc *wfileProc;
void *clientData;
} aeFileEvent;

/* Time event structure */
typedef struct aeTimeEvent {
long long id; /* time event identifier. */
long when_sec; /* seconds */
long when_ms; /* milliseconds */
aeTimeProc *timeProc;
aeEventFinalizerProc *finalizerProc;
void *clientData;
struct aeTimeEvent *next;
} aeTimeEvent;

/* A fired event */
typedef struct aeFiredEvent {
int fd;
int mask;
} aeFiredEvent;

eventloop的核心是aeProcessEvents函数,执行逻辑如下

  1. 计算最近的定时事件需要等待的时间t。shortest = aeSearchNearestTimer(eventLoop)
  2. numevents = aeApiPoll(eventLoop, tvp); 执行epoll_wait,超时时间为上述需要等待的时间t
  3. 遍历eventLoop->fired[j].fd获得触发的事件,根据事件读写类型执行处理函数;fe->rfileProc(eventLoop,fd,fe->clientData,mask); fe->wfileProc(eventLoop,fd,fe->clientData,mask);
  4. 处理定时事件processTimeEvents

事件处理函数

  1. acceptTcpHandler
    用来接受和建立连接,acceptTcpHandler会创建client

创建Client时,同时创建对该client的fileevent 可读事件,处理函数是readQueryFromClient。linux的accept() 会从监听 socket 的已完成连接队列中取出一个客户端连接,并为其创建一个 新的 socket。该fd标志了client和filevent可读事件。

  1. readQueryFromClient

void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask)

  1. 生成c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);
  2. nread = read(fd, c->querybuf+qblen, readlen);
  3. 调用processInputBuffer(c);函数

processInputBuffer 内部是processCommand,主要执行两步

  1. 获取命令c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
  2. 处理命令call(c,CMD_CALL_FULL);

线程模型

redis 4.0版本监听网络请求,处理请求和命令都由一个主线程完成。

  1. 好处是redis 主IO路径无须加锁保护。
  2. 缺点是redis 不可处理耗时长的命令,这在使用自定义命令和复杂lua脚本里需要注意。此外redis 无法利用多核cpu,这对部署机器的cpu单核能力有要求。

redis 使用多线程的地方很少,只有adb, aof后台执行,和lazyfree等操作。

fork 子进程

fork() 创建子进程,对于父进程:fork() 返回新创建子进程的进程ID(PID)。这是一个正整数。
对于子进程:fork() 返回0。
如果 fork() 失败:则返回-1,并设置 errno 以指示错误原因。

fork使用到的地方

  1. 后台创建子进程执行rdb备份任务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
int rdbSaveBackground(char *filename, rdbSaveInfo *rsi) {
pid_t childpid;
long long start;

if (server.aof_child_pid != -1 || server.rdb_child_pid != -1) return C_ERR;

server.dirty_before_bgsave = server.dirty;
server.lastbgsave_try = time(NULL);
openChildInfoPipe();

start = ustime();
if ((childpid = fork()) == 0) {
int retval;

/* Child */
closeListeningSockets(0);
redisSetProcTitle("redis-rdb-bgsave");
retval = rdbSave(filename,rsi);
  1. aof 创建子进程执行rewriteAppendOnlyFile
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
int rewriteAppendOnlyFileBackground(void) {
pid_t childpid;
long long start;

if (server.aof_child_pid != -1 || server.rdb_child_pid != -1) return C_ERR;
if (aofCreatePipes() != C_OK) return C_ERR;
openChildInfoPipe();
start = ustime();
if ((childpid = fork()) == 0) {
char tmpfile[256];

/* Child */
closeListeningSockets(0);
redisSetProcTitle("redis-aof-rewrite");
snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());
if (rewriteAppendOnlyFile(tmpfile) == C_OK) {
size_t private_dirty = zmalloc_get_private_dirty(-1);
....
} else {
/* Parent */
server.stat_fork_time = ustime()-start;
server.stat_fork_rate = (double) zmalloc_used_memory() * 1000000 / server.stat_fork_time / (1024*1024*1024); /* GB per second. */
latencyAddSampleIfNeeded("fork",server.stat_fork_time/1000);
  1. server如果设置了daemonize,则当前进程退出,创建子进程执行后续任务
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    void daemonize(void) {
    int fd;

    if (fork() != 0) exit(0); /* parent exits */
    setsid(); /* create a new session */

    /* Every output goes to /dev/null. If Redis is daemonized but
    * the 'logfile' is set to 'stdout' in the configuration file
    * it will not log at all. */
    if ((fd = open("/dev/null", O_RDWR, 0)) != -1) {
    dup2(fd, STDIN_FILENO);
    dup2(fd, STDOUT_FILENO);
    dup2(fd, STDERR_FILENO);
    if (fd > STDERR_FILENO) close(fd);
    }
    }
  2. sentinelRunPendingScripts,子进程执行脚本,父进程返回
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    /* Run pending scripts if we are not already at max number of running
    * scripts. */
    void sentinelRunPendingScripts(void) {
    listNode *ln;
    listIter li;
    mstime_t now = mstime();

    /* Find jobs that are not running and run them, from the top to the
    * tail of the queue, so we run older jobs first. */
    listRewind(sentinel.scripts_queue,&li);
    while (sentinel.running_scripts < SENTINEL_SCRIPT_MAX_RUNNING &&
    (ln = listNext(&li)) != NULL)
    {
    sentinelScriptJob *sj = ln->value;
    pid_t pid;

    pid = fork();

    if (pid == -1) {
    /* Parent (fork error).
    * We report fork errors as signal 99, in order to unify the
    * reporting with other kind of errors. */
    sentinelEvent(LL_WARNING,"-script-error",NULL,
    "%s %d %d", sj->argv[0], 99, 0);
    sj->flags &= ~SENTINEL_SCRIPT_RUNNING;
    sj->pid = 0;
    } else if (pid == 0) {
    /* Child */
    execve(sj->argv[0],sj->argv,environ);
    /* If we are here an error occurred. */
    _exit(2); /* Don't retry execution. */
    } else {
    sentinel.running_scripts++;
    sj->pid = pid;
    sentinelEvent(LL_DEBUG,"+script-child",NULL,"%ld",(long)pid);
    }
    }

pthread_create 多线程

pthread_create创建线程被bio封装

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
/* Initialize the background system, spawning the thread. */
void bioInit(void) {
pthread_attr_t attr;
pthread_t thread;
size_t stacksize;
int j;

/* Initialization of state vars and objects */
for (j = 0; j < BIO_NUM_OPS; j++) {
pthread_mutex_init(&bio_mutex[j],NULL);
pthread_cond_init(&bio_newjob_cond[j],NULL);
pthread_cond_init(&bio_step_cond[j],NULL);
bio_jobs[j] = listCreate();
bio_pending[j] = 0;
}

/* Set the stack size as by default it may be small in some system */
pthread_attr_init(&attr);
pthread_attr_getstacksize(&attr,&stacksize);
if (!stacksize) stacksize = 1; /* The world is full of Solaris Fixes */
while (stacksize < REDIS_THREAD_STACK_SIZE) stacksize *= 2;
pthread_attr_setstacksize(&attr, stacksize);

/* Ready to spawn our threads. We use the single argument the thread
* function accepts in order to pass the job ID the thread is
* responsible of. */
for (j = 0; j < BIO_NUM_OPS; j++) {
void *arg = (void*)(unsigned long) j;
if (pthread_create(&thread,&attr,bioProcessBackgroundJobs,arg) != 0) {
serverLog(LL_WARNING,"Fatal: Can't initialize Background Jobs.");
exit(1);
}
bio_threads[j] = thread;
}
}

void bioCreateBackgroundJob(int type, void *arg1, void *arg2, void *arg3) {
struct bio_job *job = zmalloc(sizeof(*job));

job->time = time(NULL);
job->arg1 = arg1;
job->arg2 = arg2;
job->arg3 = arg3;
pthread_mutex_lock(&bio_mutex[type]);
listAddNodeTail(bio_jobs[type],job);
bio_pending[type]++;
pthread_cond_signal(&bio_newjob_cond[type]);
pthread_mutex_unlock(&bio_mutex[type]);
}

bioCreateBackgroundJob 使用的地方

  1. aof后台fsync
1
2
3
void aof_background_fsync(int fd) {
bioCreateBackgroundJob(BIO_AOF_FSYNC,(void*)(long)fd,NULL,NULL);
}
  1. lazyfree
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    /* Empty a Redis DB asynchronously. What the function does actually is to
    * create a new empty set of hash tables and scheduling the old ones for
    * lazy freeing. */
    void emptyDbAsync(redisDb *db) {
    dict *oldht1 = db->dict, *oldht2 = db->expires;
    db->dict = dictCreate(&dbDictType,NULL);
    db->expires = dictCreate(&keyptrDictType,NULL);
    atomicIncr(lazyfree_objects,dictSize(oldht1));
    bioCreateBackgroundJob(BIO_LAZY_FREE,NULL,oldht1,oldht2);
    }

rdb持久化

rdb, replicate db

暂只看rdbsave和rdbload

rdbsave

int rdbSave(char *filename, rdbSaveInfo *rsi)

  1. snprintf(tmpfile,256,”temp-%d.rdb”, (int) getpid()); fp = fopen(tmpfile,”w”);
  2. rioInitWithFile(&rdb,fp);
  3. rdbSaveRio(&rdb,&error,RDB_SAVE_NONE,rsi)
  4. fflush(fp) == EOF fsync(fileno(fp) fclose(fp) == EOF
  5. rename(tmpfile,filename)

fflush是flush文件流到page cache,fsync是刷page cache到文件落盘

其中的rdbSaveRio 函数,rdbSaveRio(&rdb,&error,RDB_SAVE_NONE,rsi)

  1. 写入”REDIS%04d”,RDB_VERSION
  2. rdbSaveInfoAuxFields(rdb,flags,rsi)
  3. 遍历server.dbnum
    1. 对每个db,redisDb。每个db内部其实是db->dict,
    2. 写入一些元数据,包括rdbSaveType(rdb,RDB_OPCODE_SELECTDB)
    3. rdbSaveLen(rdb,j)
    4. rdbSaveType(rdb,RDB_OPCODE_RESIZEDB)
    5. rdbSaveLen(rdb,db_size)
    6. rdbSaveLen(rdb,expires_size)
    7. 遍历db->dict,拿到sds keystr = dictGetKey(de);,robj key, *o = dictGetVal(de);,执行rdbSaveKeyValuePair(rdb,&key,o,expire,now)

rdbsave 的调用

rdbSaveBackground执行调用childpid = fork() 启动进程,在子进程里执行rdbSave(filename,rsi);

fork()的子进程初始利用写时复制共享父进程内存。写时复制情况下,父子进程修改共享内存页时,均会复制共享页到新的物理内存页。

写时复制保证子进程的内存始终和父进程调用fork()时的状态一致,就是说,子进程rdbSave 的dict 内容和父进程fork()时的一致,且不会因后续父进程对dict的写入而改变。。以上由操作系统保证。太6了

如果父进程大量修改数据(触发大量 COW),可能导致内存和 CPU 压力增大,但这属于资源问题,影响可控

rdbsave的定时和手动调度

后台执行时会设置执行的child_pid,从而判断是否后台正在执行备份
server.rdb_child_pid != -1 || server.aof_child_pid != -1

replicationCron -> startBgsaveForReplication
replicationCron 一秒钟执行一次

syncCommand-> startBgsaveForReplication

rdbLoadRio

int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi)

从文件里读数据

  1. 首先读到”REDIS\0”
  2. 然后是一字节版本,rdbver = atoi(buf+5);。算法
    从第一个非空白字符开始,取一个可选的初始加号或者减号,后跟尽可能多的十进制数字,并将它们转换为一个int类型的数值。
  3. 循环执行
    1. type = rdbLoadType(rdb)
    2. 根据type 类型执行以下操作,type的类型可以是RDB_OPCODE_EOF表示结束
    3. 解析出key = rdbLoadStringObject(rdb) 和val = rdbLoadObject(type,rdb)
    4. 将kv写入到hash table dbAdd(db,key,val);

显然rdb会将某时刻hashtable的所有kv写入到rdb文件中。

aof持久化

aof, append only file

rdb关心的是复制db的dict 的key, value,通过写kv数据恢复数据库状态。aof关心的是复制db的增删改操作,通过replay操作恢复数据库状态。

aof rewrite

aof 可以实现全量复制和增量复制。

aof 的rewrite 是将当前状态的dict的key value,全部保存为set key value操作,从而实现全量复制。

aof 增量复制通过父进程将写操作发给子进程实现。进程通信的方式是使用管道。创建管道会返回两个fd,分别给父子进程。进程利用fd和fileevent事件可以监听管道,子进程监听可写事件,父进程监听可读事件。

当执行aof时,父进程接收到写请求,在处理请求之外,会把写操作写入到管道。当执行rdb或aof时,redis会把执行的子进程id记录,通过判断子进程id存在可以得到是否正在执行rdb/aof。

Redis 4.0 引入了 混合持久化(通过配置 aof-use-rdb-preamble yes 启用):重写后的 AOF 文件会以 RDB 格式开头,后续追加增量 AOF 命令。
此时,AOF 重写过程会先生成 RDB 数据,再追加新写入的 AOF 命令。

子进程执行逻辑

  1. 如果设置了server.aof_use_rdb_preamble,处理rdb;否则,执行rewriteAppendOnlyFileRio,遍历dict,将当前dict存放的key, value转换成SET key value 写入到aof文件
  2. 循环执行aofReadDiffFromParent(); 读rewrite期间父进程的写入,到server.aof_child_diff
  3. 执行rioWrite(&aof,server.aof_child_diff,sdslen(server.aof_child_diff),写入到rio

aofReadDiffFromParent 从管道读数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/* This function is called by the child rewriting the AOF file to read
* the difference accumulated from the parent into a buffer, that is
* concatenated at the end of the rewrite. */
ssize_t aofReadDiffFromParent(void) {
char buf[65536]; /* Default pipe buffer size on most Linux systems. */
ssize_t nread, total = 0;

while ((nread =
read(server.aof_pipe_read_data_from_parent,buf,sizeof(buf))) > 0) {
server.aof_child_diff = sdscatlen(server.aof_child_diff,buf,nread);
total += nread;
}
return total;
}

rio对应一个文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
void rioInitWithFile(rio *r, FILE *fp) {
*r = rioFileIO;
r->io.file.fp = fp;
r->io.file.buffered = 0;
r->io.file.autosync = 0;
}

static const rio rioFileIO = {
rioFileRead,
rioFileWrite,
rioFileTell,
rioFileFlush,
NULL, /* update_checksum */
0, /* current checksum */
0, /* bytes read or written */
0, /* read/write chunk size */
{ { NULL, 0 } } /* union for io-specific vars */
};

主进程执行逻辑

  1. 格式化数据到Buf,server.expireCommand,argv[1], exarg
  2. 如果开启了aof,执行server.aof_buf = sdscatlen(server.aof_buf,buf,sdslen(buf));
  3. 如果server.aof_child_pid != -1,表示有rewrite进程,执行aofRewriteBufferAppend((unsigned char*)buf,sdslen(buf));

pub-sub

pub-sub 是常用的网络服务

订阅-发布和关注某个up主,收到他们更新的消息,是类似的

SubscribeChannel

Subscribe a client to a channel. Returns 1 if the operation succeeded

c->pubsub_channels 是一个dict。注册的结果,key是channel,value是一个client list

pubsubPublishMessage

  1. 利用dictFind(server.pubsub_channels,channel) 找到channel(key),返回client list(value)
  2. 遍历list,对每个client,执行addReply(c,shared.mbulkhdr[3]);addReply(c,shared.messagebulk); addReplyBulk(c,channel); addReplyBulk(c,message);

addReply

  1. 传入robj *obj
  2. addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)

subscribeCommand

1
2
3
4
5
6
7
void subscribeCommand(client *c) {
int j;

for (j = 1; j < c->argc; j++)
pubsubSubscribeChannel(c,c->argv[j]);
c->flags |= CLIENT_PUBSUB;
}

publish command

1
2
3
4
5
6
7
8
void publishCommand(client *c) {
int receivers = pubsubPublishMessage(c->argv[1],c->argv[2]);
if (server.cluster_enabled)
clusterPropagatePublish(c->argv[1],c->argv[2]);
else
forceCommandPropagation(c,PROPAGATE_REPL);
addReplyLongLong(c,receivers);
}