原初
单线程的Redis为什么如此快?
- 纯内存操作
- 单线程操作,避免了频繁的上下文切换,
- 采用了非阻塞I/O多路复用机制
看到Redis高性能的原因里,有IO多路复用
一项,感到非常好奇具体是怎么实现的呢。
网上浏览了一圈,代码探索了一遍,发现复用发生在系统内核。
软件层面通过系统函数库epoll
、 iocp
,把IO组织起来,我大致是这样理解的。
在之前写 探索redis 键散列过程源码 文章时,看到redis有用到事件机制。
这回,在查Redis多路复用时,也看到了这个机制,决定将事件粗略探索一下,范围就定在“文件事件”上。
事件
- 事件处理器用
multiplexing
监听多个套接字,根据行为的不同关联不同事件处理器 - 被监听的套接字准备好执行
accept
,read
,write
,close
等操作时,会产生相对应的事件,触发事先关联的事件处理器(ps. 回调函数)处理
事件的定义,在头文件ae.h
里,我猜文件名是 a event
的缩写。
在源码里,经常出现fd
,比如 ipfd
, aof_fd
, fd
,经查是文件描述符 file description
的缩写。
想来在linux系统的世界观里,一切皆文件,IO相关的事物和文件有密切联系再正常不过。
https://github.com/antirez/redis/blob/6.0.0/src/ae.h1
2
3
4
5
6
7/* File event structure */
typedef struct aeFileEvent {
int mask; /* one of AE_(READABLE|WRITABLE|BARRIER) */
aeFileProc *rfileProc;
aeFileProc *wfileProc;
void *clientData;
} aeFileEvent;
当事件就绪时,需要知道文件事件的 文件描述符
还有 事件类型
才能对于锁定该事件,因此定义了 aeFiredEvent
结构统一管理
1 | /* A fired event */ |
aeEventLoop
结构保存了一个 void *
类型的万能指针 apidata
,是用来保存轮询事件的状态的,也就是保存底层调用的多路复用库的事件状态*beforesleep
会在启动时指向预设的回调方法,之后在ae.c
的 aeMain
里不断循环
1 | /* State of an event based program */ |
启动
https://github.com/antirez/redis/blob/6.0.0/src/server.c1
2
3
4
5
6
7
8
9
10
11
12
13
14line:4917
int main(int argc, char **argv) {
....
line:5075
initServer();
...
line:5123
aeSetBeforeSleepProc(server.el,beforeSleep);
aeSetAfterSleepProc(server.el,afterSleep);
aeMain(server.el);
aeDeleteEventLoop(server.el);
return 0;
}
initServer
初始化事件轮询、端口监听、连接应答注册等aeSetBeforeSleepProc
将beforeSleep
函数关联到event loop
上,函数beforeSleep
中有回复处理
的代码。aeMain
正常运行时,将一直循环处理事件
https://github.com/antirez/redis/blob/6.0.0/src/server.c1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27line:2702
void initServer(void) {
...
line:2743
server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);
...
line:2752
/* Open the TCP listening socket for the user commands. */
if (server.port != 0 &&
listenToPort(server.port,server.ipfd,&server.ipfd_count) == C_ERR)
exit(1);
....
line:2845
/* Create an event handler for accepting new connections in TCP and Unix
* domain sockets. */
for (j = 0; j < server.ipfd_count; j++) {
if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
acceptTcpHandler,NULL) == AE_ERR)
{
serverPanic(
"Unrecoverable error creating server.ipfd file event.");
}
}
...
}
aeCreateEventLoop
创建事件循环、IO多路复用listenToPort
监听端口aeCreateFileEvent
创建TCP、TLS监听事件,其中会给server.ipfd[j]
的rfileProc
赋予回调函数acceptTcpHandler
https://github.com/antirez/redis/blob/6.0.0/src/ae.c1
2
3
4
5
6
7
8
9line:522
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) {
if (eventLoop->beforesleep != NULL)
eventLoop->beforesleep(eventLoop);
aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
}
}
beforeSleep
的循环调用,会执行接收、回复处理等操作aeProcessEvents
的循环调用,会执行等待epoll中发生事件,并交由事先注册的处理器处理。
多路复用
多路复用函数库定义在:ae_xxx.c
中,库底层可相互替换,这里以ae_epoll.c
做探索。
https://github.com/antirez/redis/blob/6.0.0/src/ae_epoll.c1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24line:39
static int aeApiCreate(aeEventLoop *eventLoop) {
...
line:48
state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */
...
}
line:73
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
...
line:86
if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;
...
}
line:108
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
...
line:112
retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
...
}
epoll_create
建立一个epoll对象。参数size是内核保证能够正确处理的最大句柄数,多于这个最大数时内核可不保证效果。epoll_ctl
可以操作上面建立的epoll,例如,将刚建立的socket加入到epoll中让其监控,或者把 epoll正在监控的某个socket句柄移出epoll,不再监控它等等。epoll_wait
在调用时,在给定的timeout时间内,当在监控的所有句柄中有事件发生时,就返回用户态的进程。
server.c:main:initServer -> server.c:initServer:aeCreateEventLoop -> ae.c:*aeCreateEventLoop:aeApiCreate -> ae_epoll.c:aeApiCreate:epoll_create
server.c:main:initServer -> server.c:initServer:aeCreateFileEvent -> ae.c:aeCreateFileEvent:aeCreateFileEvent -> ae_epoll.c:aeApiAddEvent:epoll_ctl
server.c:main:aeMain -> ae.c:aeMain:aeProcessEvents -> ae.c:aeProcessEvents:aeApiPoll -> ae_epoll.c:aeApiPoll:epoll_wait
可见在程序启动的过程里,已经通过事件库ae.c
完整的调用了多路复用函数,并完成了事件处理器的注册。
事件处理器
连接应答处理器
在上文的,redis启动阶段,已为事件关联acceptTcpHandler回调。
https://github.com/antirez/redis/blob/6.0.0/src/networking.c
1 | line:944 |
anetTcpAccept
用于对连接服务器监听套接字的客户端进行应答acceptCommonHandler
注册命令请求处理器
其主要调用anet.c
中anetTcpAccept
函数实现,具体实现为sys/socket.h/accept
函数的包装
https://github.com/antirez/redis/blob/6.0.0/src/anet.c1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16line:562
int anetTcpAccept(char *err, int s, char *ip, size_t ip_len, int *port) {
int fd;
...
if ((fd = anetGenericAccept(err,s,(struct sockaddr*)&sa,&salen)) == -1)
return ANET_ERR;
...
return fd;
}
line:545
static int anetGenericAccept(char *err, int s, struct sockaddr *sa, socklen_t *len) {
...
fd = accept(s,sa,len);
...
}
当Redis服务器进行初始化的时候,程序会将这个连接应答处理器和服务器监听套接字的AE_READABLE事件关联起来。
当有客户端用sys/socket.h/connect函数连接服务器监听套接字的时候,套接字就会产生AE_READABLE事件,引发连接应答处理器执行,并执行相应的套接字应答操作。
命令请求处理器
networking.c:acceptTcpHandler:acceptCommonHandler -> networking.c:acceptCommonHandler:createClient -> networking.c:*createClient:readQueryFromClient
在连接处理器中,会关联命令处理函数
它负责从套接字中读入客户端发送的命令请求内容,具体实现为unistd.h/read函数的包装
https://github.com/antirez/redis/blob/6.0.0/src/networking.c1
2
3
4
5
6
7
8line:1858
void readQueryFromClient(connection *conn) {
...
line:1887
nread = connRead(c->conn, c->querybuf+qblen, readlen);
...
processInputBuffer(c);
}
connRead
该函数定义在connection.h
中,最终映射至connection.c
的connSocketRead
函数
https://github.com/antirez/redis/blob/6.0.0/src/connection.c1
2
3
4
5
6
7
8
...
line:173
static int connSocketRead(connection *conn, void *buf, size_t buf_len) {
int ret = read(conn->fd, buf, buf_len);
...
return ret;
}
当一个客户端通过连接应答处理器成功连接到服务器之后,服务器会将客户端套接字的AE_READABLE事件和命令请求处理器关联起来。
当客户端向服务器发送命令请求的时候,套接字就会产生AE_READABLE事件,引发命令请求处理器执行,并执行相应的套接字读入操作;
在客户端连接服务器的整个过程中,服务器都会一直为客户端套接字的AE_READABLE事件关联命令请求处理器。
命令回复处理器
在启动阶段,aeSetBeforeSleepProc
给事件循环关联了beforeSleep
回调beforeSleep
中 handleClientsWithPendingWritesUsingThreads
将设置回复处理器 sendReplyToClient
https://github.com/antirez/redis/blob/6.0.0/src/networking.c1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29int handleClientsWithPendingWritesUsingThreads(void) {
...
line:3052
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
/* Install the write handler if there are pending writes in some
* of the clients. */
if (clientHasPendingReplies(c) &&
connSetWriteHandler(c->conn, sendReplyToClient) == AE_ERR)
...
}
...
}
line:1357
/* Write event handler. Just send data to the client. */
void sendReplyToClient(connection *conn) {
client *c = connGetPrivateData(conn);
writeToClient(c,1);
}
line:1261
int writeToClient(client *c, int handler_installed) {
...
line:1346
if (handler_installed) connSetWriteHandler(c->conn, NULL);
...
}
connSetWriteHandler
该函数定义在connection.h
中,
最终映射至connection.c
的connSocketSetWriteHandler
函数。
当服务器有命令回复需要传送给客户端的时候,服务器会将客户端套接字的AE_WRITABLE事件和命令回复处理器关联起来。
当客户端准备好接收服务器传回的命令回复时,就会产生AE_WRITABLE事件,引发命令回复处理器执行,并执行相应的套接字写入操作。
后记
在ae.c
的注释里有这样一段话,感叹作者积累的深厚
A simple event-driven programming library. Originally I wrote this code
for the Jim’s event-loop (Jim is a Tcl interpreter) but later translated
it in form of a library for easy reuse.
粗略的记了记redis里事件机制初始化、事件处理器
关于事件状态变化、时间事件、线程等还有待探索。
epoll
也是一个有趣的点,对它内部的实现感到好奇。
《redis设计与实现》-12事件event
Redis源码剖析和注释(十九)—- Redis 事件处理实现
[Redis源码阅读]当你输入get/set命令的时候,Redis做了什么
epoll模型详解
Linux Epoll介绍和程序实例