探索redis 文件事件

原初

单线程的Redis为什么如此快?

  • 纯内存操作
  • 单线程操作,避免了频繁的上下文切换,
  • 采用了非阻塞I/O多路复用机制

看到Redis高性能的原因里,有IO多路复用一项,感到非常好奇具体是怎么实现的呢。

网上浏览了一圈,代码探索了一遍,发现复用发生在系统内核。
软件层面通过系统函数库epolliocp,把IO组织起来,我大致是这样理解的。

在之前写 探索redis 键散列过程源码 文章时,看到redis有用到事件机制。
这回,在查Redis多路复用时,也看到了这个机制,决定将事件粗略探索一下,范围就定在“文件事件”上。

事件

20200713185944788325.png

  • 事件处理器用multiplexing监听多个套接字,根据行为的不同关联不同事件处理器
  • 被监听的套接字准备好执行 accept, read, write, close等操作时,会产生相对应的事件,触发事先关联的事件处理器(ps. 回调函数)处理

事件的定义,在头文件ae.h里,我猜文件名是 a event的缩写。

在源码里,经常出现fd,比如 ipfd, aof_fd, fd,经查是文件描述符 file description 的缩写。
想来在linux系统的世界观里,一切皆文件,IO相关的事物和文件有密切联系再正常不过。

https://github.com/antirez/redis/blob/6.0.0/src/ae.h

1
2
3
4
5
6
7
/* File event structure */
typedef struct aeFileEvent {
int mask; /* one of AE_(READABLE|WRITABLE|BARRIER) */
aeFileProc *rfileProc;
aeFileProc *wfileProc;
void *clientData;
} aeFileEvent;


当事件就绪时,需要知道文件事件的 文件描述符 还有 事件类型 才能对于锁定该事件,因此定义了 aeFiredEvent 结构统一管理

1
2
3
4
5
/* A fired event */
typedef struct aeFiredEvent {
int fd;
int mask;
} aeFiredEvent;

aeEventLoop 结构保存了一个 void * 类型的万能指针 apidata ,是用来保存轮询事件的状态的,也就是保存底层调用的多路复用库的事件状态
*beforesleep 会在启动时指向预设的回调方法,之后在ae.caeMain里不断循环

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/* State of an event based program */
typedef struct aeEventLoop {
int maxfd; /* highest file descriptor currently registered */
int setsize; /* max number of file descriptors tracked */
long long timeEventNextId;
time_t lastTime; /* Used to detect system clock skew */
aeFileEvent *events; /* Registered events */
aeFiredEvent *fired; /* Fired events */
aeTimeEvent *timeEventHead;
int stop;
void *apidata; /* This is used for polling API specific data */
aeBeforeSleepProc *beforesleep;
aeBeforeSleepProc *aftersleep;
int flags;
} aeEventLoop;

启动

https://github.com/antirez/redis/blob/6.0.0/src/server.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
line:4917
int main(int argc, char **argv) {
....
line:5075
initServer();
...

line:5123
aeSetBeforeSleepProc(server.el,beforeSleep);
aeSetAfterSleepProc(server.el,afterSleep);
aeMain(server.el);
aeDeleteEventLoop(server.el);
return 0;
}

  • initServer 初始化事件轮询、端口监听、连接应答注册等
  • aeSetBeforeSleepProcbeforeSleep函数关联到event loop上,函数beforeSleep中有回复处理的代码。
  • aeMain 正常运行时,将一直循环处理事件

https://github.com/antirez/redis/blob/6.0.0/src/server.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
line:2702
void initServer(void) {
...
line:2743
server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);
...

line:2752
/* Open the TCP listening socket for the user commands. */
if (server.port != 0 &&
listenToPort(server.port,server.ipfd,&server.ipfd_count) == C_ERR)
exit(1);
....

line:2845
/* Create an event handler for accepting new connections in TCP and Unix
* domain sockets. */
for (j = 0; j < server.ipfd_count; j++) {
if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
acceptTcpHandler,NULL) == AE_ERR)
{
serverPanic(
"Unrecoverable error creating server.ipfd file event.");
}
}
...
}

  • aeCreateEventLoop 创建事件循环、IO多路复用
  • listenToPort 监听端口
  • aeCreateFileEvent 创建TCP、TLS监听事件,其中会给server.ipfd[j]rfileProc赋予回调函数 acceptTcpHandler

https://github.com/antirez/redis/blob/6.0.0/src/ae.c

1
2
3
4
5
6
7
8
9
line:522
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) {
if (eventLoop->beforesleep != NULL)
eventLoop->beforesleep(eventLoop);
aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
}
}

  • beforeSleep 的循环调用,会执行接收、回复处理等操作
  • aeProcessEvents 的循环调用,会执行等待epoll中发生事件,并交由事先注册的处理器处理。

多路复用

多路复用函数库定义在:ae_xxx.c中,库底层可相互替换,这里以ae_epoll.c做探索。

https://github.com/antirez/redis/blob/6.0.0/src/ae_epoll.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
line:39
static int aeApiCreate(aeEventLoop *eventLoop) {
...
line:48
state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */
...
}

line:73
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
...
line:86
if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;
...
}

line:108
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
...
line:112
retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
...
}

  • epoll_create 建立一个epoll对象。参数size是内核保证能够正确处理的最大句柄数,多于这个最大数时内核可不保证效果。
  • epoll_ctl 可以操作上面建立的epoll,例如,将刚建立的socket加入到epoll中让其监控,或者把 epoll正在监控的某个socket句柄移出epoll,不再监控它等等。
  • epoll_wait 在调用时,在给定的timeout时间内,当在监控的所有句柄中有事件发生时,就返回用户态的进程。

server.c:main:initServer -> server.c:initServer:aeCreateEventLoop -> ae.c:*aeCreateEventLoop:aeApiCreate -> ae_epoll.c:aeApiCreate:epoll_create


server.c:main:initServer -> server.c:initServer:aeCreateFileEvent -> ae.c:aeCreateFileEvent:aeCreateFileEvent -> ae_epoll.c:aeApiAddEvent:epoll_ctl


server.c:main:aeMain -> ae.c:aeMain:aeProcessEvents -> ae.c:aeProcessEvents:aeApiPoll -> ae_epoll.c:aeApiPoll:epoll_wait

可见在程序启动的过程里,已经通过事件库ae.c完整的调用了多路复用函数,并完成了事件处理器的注册。

事件处理器

连接应答处理器

在上文的,redis启动阶段,已为事件关联acceptTcpHandler回调。

https://github.com/antirez/redis/blob/6.0.0/src/networking.c

1
2
3
4
5
6
7
8
9
10
11
line:944
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
...

while(max--) {
cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
...
}
...
acceptCommonHandler(connCreateAcceptedSocket(cfd),0,cip);
  • anetTcpAccept 用于对连接服务器监听套接字的客户端进行应答
  • acceptCommonHandler 注册命令请求处理器

其主要调用anet.canetTcpAccept函数实现,具体实现为sys/socket.h/accept函数的包装
https://github.com/antirez/redis/blob/6.0.0/src/anet.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
line:562
int anetTcpAccept(char *err, int s, char *ip, size_t ip_len, int *port) {
int fd;
...
if ((fd = anetGenericAccept(err,s,(struct sockaddr*)&sa,&salen)) == -1)
return ANET_ERR;
...
return fd;
}

line:545
static int anetGenericAccept(char *err, int s, struct sockaddr *sa, socklen_t *len) {
...
fd = accept(s,sa,len);
...
}

当Redis服务器进行初始化的时候,程序会将这个连接应答处理器和服务器监听套接字的AE_READABLE事件关联起来。
当有客户端用sys/socket.h/connect函数连接服务器监听套接字的时候,套接字就会产生AE_READABLE事件,引发连接应答处理器执行,并执行相应的套接字应答操作。

命令请求处理器

networking.c:acceptTcpHandler:acceptCommonHandler -> networking.c:acceptCommonHandler:createClient -> networking.c:*createClient:readQueryFromClient

在连接处理器中,会关联命令处理函数
它负责从套接字中读入客户端发送的命令请求内容,具体实现为unistd.h/read函数的包装

https://github.com/antirez/redis/blob/6.0.0/src/networking.c

1
2
3
4
5
6
7
8
line:1858
void readQueryFromClient(connection *conn) {
...
line:1887
nread = connRead(c->conn, c->querybuf+qblen, readlen);
...
processInputBuffer(c);
}


connRead 该函数定义在connection.h中,最终映射至connection.cconnSocketRead函数
https://github.com/antirez/redis/blob/6.0.0/src/connection.c

1
2
3
4
5
6
7
8
#include "server.h"
...
line:173
static int connSocketRead(connection *conn, void *buf, size_t buf_len) {
int ret = read(conn->fd, buf, buf_len);
...
return ret;
}

当一个客户端通过连接应答处理器成功连接到服务器之后,服务器会将客户端套接字的AE_READABLE事件和命令请求处理器关联起来。
当客户端向服务器发送命令请求的时候,套接字就会产生AE_READABLE事件,引发命令请求处理器执行,并执行相应的套接字读入操作;

在客户端连接服务器的整个过程中,服务器都会一直为客户端套接字的AE_READABLE事件关联命令请求处理器。

命令回复处理器

在启动阶段,aeSetBeforeSleepProc给事件循环关联了beforeSleep回调
beforeSleephandleClientsWithPendingWritesUsingThreads 将设置回复处理器 sendReplyToClient

https://github.com/antirez/redis/blob/6.0.0/src/networking.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
int handleClientsWithPendingWritesUsingThreads(void) {
...
line:3052
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);

/* Install the write handler if there are pending writes in some
* of the clients. */
if (clientHasPendingReplies(c) &&
connSetWriteHandler(c->conn, sendReplyToClient) == AE_ERR)
...
}
...
}

line:1357
/* Write event handler. Just send data to the client. */
void sendReplyToClient(connection *conn) {
client *c = connGetPrivateData(conn);
writeToClient(c,1);
}

line:1261
int writeToClient(client *c, int handler_installed) {
...
line:1346
if (handler_installed) connSetWriteHandler(c->conn, NULL);
...
}


connSetWriteHandler 该函数定义在connection.h中,
最终映射至connection.cconnSocketSetWriteHandler函数。

当服务器有命令回复需要传送给客户端的时候,服务器会将客户端套接字的AE_WRITABLE事件和命令回复处理器关联起来。
当客户端准备好接收服务器传回的命令回复时,就会产生AE_WRITABLE事件,引发命令回复处理器执行,并执行相应的套接字写入操作。

后记

ae.c的注释里有这样一段话,感叹作者积累的深厚

A simple event-driven programming library. Originally I wrote this code
for the Jim’s event-loop (Jim is a Tcl interpreter) but later translated
it in form of a library for easy reuse.

粗略的记了记redis里事件机制初始化、事件处理器
关于事件状态变化、时间事件、线程等还有待探索。

epoll 也是一个有趣的点,对它内部的实现感到好奇。

《redis设计与实现》-12事件event
Redis源码剖析和注释(十九)—- Redis 事件处理实现
[Redis源码阅读]当你输入get/set命令的时候,Redis做了什么
epoll模型详解
Linux Epoll介绍和程序实例