redis源码分析(2)――事件循环
来源:程序员人生 发布时间:2015-01-23 08:24:36 阅读次数:4105次
redis作为
服务器程序,网络IO处理是关键。redis不像memcached使用libevent,它实现了自己的IO事件框架,并且很简单、小巧。可以选择select、epoll、kqueue等实现。
作为 IO事件框架,需要抽象多种IO模型的共性,将全部进程主要抽象为:
1)初始化
2)添加、删除事件
3)等待事件产生
下面也依照这个步骤分析代码。
(1)初始化
回想1下redis的初始化进程中,initServer函数会调用aeCreateEventLoop创建event loop对象,对事件循环进行初始化。下面看1下aeEventLoop结构,存储事件循环相干的属性。
typedef struct aeEventLoop {
int maxfd; /* highest file descriptor currently registered */
int setsize; /* max number of file descriptors tracked */
long long timeEventNextId;
// <MM>
// 寄存的是上次触发定时器事件的时间
// </MM>
time_t lastTime; /* Used to detect system clock skew */
aeFileEvent *events; /* Registered events */
aeFiredEvent *fired; /* Fired events */
// <MM>
// 所有定时器事件组织成链表
// </MM>
aeTimeEvent *timeEventHead;
// <MM>
// 是不是停止eventLoop
// </MM>
int stop;
void *apidata; /* This is used for polling API specific data */
// <MM>
// 事件循环每次迭代都会调用beforesleep
// </MM>
aeBeforeSleepProc *beforesleep;
} aeEventLoop;
setsize:指定事件循环要监听的文件描写符集合的大小。这个值与配置文件中得maxclients有关。
events:寄存所有注册的读写事件,是大小为setsize的数组。内核会保证新建连接的fd是当前可用描写符的最小值,所以最多监听setsize个描写符,那末最大的fd就是setsize - 1。这类组织方式的好处是,可以以fd为下标,索引到对应的事件,在事件触发后根据fd快速查找到对应的事件。
fired:寄存触发的读写事件。一样是setsize大小的数组。
timeEventHead:redis将定时器事件组织成链表,这个属性指向表头。
apidata:寄存epoll、select等实现相干的数据。
beforesleep:事件循环在每次迭代前会调用beforesleep履行1些异步处理。
io模型初始化的抽象函数为aeApiCreate。aeCreateEventLoop函数创建并初始化全局事件循环结构,并调用aeApiCreate初始化具体实现依赖的数据结构。
aeEventLoop *aeCreateEventLoop(int setsize) {
aeEventLoop *eventLoop;
int i;
// <MM>
// setsize指定事件循环监听的fd的数目
// 由于内核保证新创建的fd是最小的正整数,所以直接创建setsize大小
// 的数组,寄存对应的event
// </MM>
if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err;
eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);
eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);
if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err;
eventLoop->setsize = setsize;
eventLoop->lastTime = time(NULL);
eventLoop->timeEventHead = NULL;
eventLoop->timeEventNextId = 0;
eventLoop->stop = 0;
eventLoop->maxfd = ⑴;
eventLoop->beforesleep = NULL;
if (aeApiCreate(eventLoop) == ⑴) goto err;
/* Events with mask == AE_NONE are not set. So let's initialize the
* vector with it. */
for (i = 0; i < setsize; i++)
eventLoop->events[i].mask = AE_NONE;
return eventLoop;
err:
if (eventLoop) {
zfree(eventLoop->events);
zfree(eventLoop->fired);
zfree(eventLoop);
}
return NULL;
}
以epoll为例,aeApiCreate主要是创建epoll的fd,和要监听的epoll_event,这些数据定义在:
typedef struct aeApiState {
int epfd;
struct epoll_event *events;
} aeApiState;
这里,监听到的事件组织方式与event_loop中监听事件1样,一样是setsize大小的数据,以fd为下标。 aeApiCreate会初始化这些属性,并将aeApiState结构寄存到eventLoop->apidata。
static int aeApiCreate(aeEventLoop *eventLoop) {
aeApiState *state = zmalloc(sizeof(aeApiState));
if (!state) return ⑴;
state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);
if (!state->events) {
zfree(state);
return ⑴;
}
state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */
if (state->epfd == ⑴) {
zfree(state->events);
zfree(state);
return ⑴;
}
eventLoop->apidata = state;
return 0;
}
(2)添加、删除事件
redis支持两类事件,网络io事件和定时器事件。定时器事件的添加、删除相对简单些,主要是保护定时器事件列表。首先看1下表示定时器事件的结构:
/* Time event structure */
typedef struct aeTimeEvent {
long long id; /* time event identifier. */
long when_sec; /* seconds */
long when_ms; /* milliseconds */
aeTimeProc *timeProc;
aeEventFinalizerProc *finalizerProc;
void *clientData;
struct aeTimeEvent *next;
} aeTimeEvent;
when_sec和when_ms:表示定时器触发的事件戳,在事件循环迭代返回后,如果当前时间戳大于这个值就会回调事件处理函数。
timeProc:事件处理函数。
finalizerProc:清算函数,在删除定时器时调用。
clientData:需要传入事件处理函数的参数。
next:定时器事件组织成链表,next指向下1个事件。
aeCreateTimeEvent函数用于添加定时器事件,逻辑很简单,根据当前时间计算下1次触发的事件,对事件属性赋值,并插入到定时器链表表头之前。删除通过aeDeleteTimeEvent函数,根据id找到事件并从链表删除该节点,回调清算函数。具体定时器事件的处理见后文,下面看1下io事件。
io事件的添加通过aeCreateFileEvent,逻辑很简单,根据要注册的fd,获得其event,设置属性,会调用aeApiAddEvent函数添加到底层的io模型。
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
aeFileProc *proc, void *clientData)
{
if (fd >= eventLoop->setsize) {
errno = ERANGE;
return AE_ERR;
}
aeFileEvent *fe = &eventLoop->events[fd];
if (aeApiAddEvent(eventLoop, fd, mask) == ⑴)
return AE_ERR;
fe->mask |= mask;
if (mask & AE_READABLE) fe->rfileProc = proc;
if (mask & AE_WRITABLE) fe->wfileProc = proc;
fe->clientData = clientData;
if (fd > eventLoop->maxfd)
eventLoop->maxfd = fd;
return AE_OK;
}
mask:指定注册的事件类型,可以是读或写。
proc:事件处理函数。
下面是io事件的结构,包括注册的事件类型mask,读写事件处理函数,和对应的参数。
/* File event structure */
typedef struct aeFileEvent {
int mask; /* one of AE_(READABLE|WRITABLE) */
aeFileProc *rfileProc;
aeFileProc *wfileProc;
void *clientData;
} aeFileEvent;
下面看1下epoll添加事件的实现,主要是调用epoll_ctl。
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
aeApiState *state = eventLoop->apidata;
struct epoll_event ee;
/* If the fd was already monitored for some event, we need a MOD
* operation. Otherwise we need an ADD operation. */
int op = eventLoop->events[fd].mask == AE_NONE ?
EPOLL_CTL_ADD : EPOLL_CTL_MOD;
ee.events = 0;
mask |= eventLoop->events[fd].mask; /* Merge old events */
if (mask & AE_READABLE) ee.events |= EPOLLIN;
if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
ee.data.u64 = 0; /* avoid valgrind warning */
ee.data.fd = fd;
if (epoll_ctl(state->epfd,op,fd,&ee) == ⑴) return ⑴;
return 0;
}
struct epll_event用于指定要监听的事件,和该文件描写符绑定的data,在事件触发时可以返回。这里将data直接存为fd,通过这个数据,即可以找到对应的事件,然后调用其处理函数。
epoll的删除与添加类似,不再赘述。
(3)等待事件触发
通过调用aeMain函数进入事件循环:void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) {
if (eventLoop->beforesleep != NULL)
eventLoop->beforesleep(eventLoop);
aeProcessEvents(eventLoop, AE_ALL_EVENTS);
}
}
函数内部就是1个while循环,不断的调用aeProcessEvents函数,等待事件产生。在每次迭代前会调用会调用beforesleep函数,处理异步任务,后续会和serverCron1起介绍。
aeProcessEvents函数首先会处理定时器事件,然后是io事件,下面介绍这个函数的实现。
首先,声明变量记录处理的事件个数,和触发的事件。flags表示此轮需要处理的事件类型,如果不需要处理定时器事件和io事件直接返回。
int processed = 0, numevents;
/* Nothing to do? return ASAP */
if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;
redis中的定时器事件是通过epoll实现的。大体思路是,在每次事件迭代调用epoll_wait时需要指定此轮sleep的时间。如果没有io事件产生,则在sleep时间到了以后会返回。通过算出下1次最早产生的事件,到当前时间的间隔,用这个值设为sleep,这样就能够保证在事件到达后回调其处理函数。但是,由于每次返回后,还有处理io事件,所以定时器的触发事件是不精确的,1定是比预定的触发时间晚的。下面看下具体实现。
首先是,查找下1次最早产生的定时器事件,以肯定sleep的事件。如果没有定时器事件,则根据传入的flags,选择是1直阻塞指点io事件产生,或是不阻塞,检查完立即返回。通过调用aeSearchNearestTimer函数查找最早产生的事件,采取的是线性查找的方式,复杂度是O(n),可以将定时器事件组织成堆,加快查找。不过,redis中只有1个serverCron定时器事件,所以暂时不需优化。
/* Note that we want call select() even if there are no
* file events to process as long as we want to process time
* events, in order to sleep until the next time event is ready
* to fire. */
// <MM>
// 在两种情况下进入poll,阻塞等待事件产生:
// 1)在有需要监听的描写符时(maxfd != ⑴)
// 2)需要处理定时器事件,并且DONT_WAIT开关关闭的情况下
// </MM>
if (eventLoop->maxfd != ⑴ ||
((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
int j;
aeTimeEvent *shortest = NULL;
struct timeval tv, *tvp;
// <MM>
// 根据最快产生的定时器事件的产生时间,肯定此次poll阻塞的时间
// </MM>
if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
// <MM>
// 线性查找最快产生的定时器事件
// </MM>
shortest = aeSearchNearestTimer(eventLoop);
if (shortest) {
// <MM>
// 如果有定时器事件,则根据它触发的时间,计算sleep的时间(ms单位)
// </MM>
long now_sec, now_ms;
/* Calculate the time missing for the nearest
* timer to fire. */
aeGetTime(&now_sec, &now_ms);
tvp = &tv;
tvp->tv_sec = shortest->when_sec - now_sec;
if (shortest->when_ms < now_ms) {
tvp->tv_usec = ((shortest->when_ms+1000) - now_ms)*1000;
tvp->tv_sec --;
} else {
tvp->tv_usec = (shortest->when_ms - now_ms)*1000;
}
if (tvp->tv_sec < 0) tvp->tv_sec = 0;
if (tvp->tv_usec < 0) tvp->tv_usec = 0;
} else {
// <MM>
// 如果没有定时器事件,则根据情况是立即返回,或永久阻塞
// </MM>
/* If we have to check for events but need to return
* ASAP because of AE_DONT_WAIT we need to set the timeout
* to zero */
if (flags & AE_DONT_WAIT) {
tv.tv_sec = tv.tv_usec = 0;
tvp = &tv;
} else {
/* Otherwise we can block */
tvp = NULL; /* wait forever */
}
}
接着,调用aeApiPoll函数,传入前面计算的sleep时间,等待io事件放生。在函数返回后,触发的事件已填充到eventLoop的fired数组中。epoll的实现以下,就是调用epoll_wait,函数返回后,会将触发的事件寄存到state->events数组中的前numevents个元素。接下来,填充fired数组,设置每一个触发事件的fd,和事件类型。
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
aeApiState *state = eventLoop->apidata;
int retval, numevents = 0;
// <MM>
// 调用epoll_wait,state->events寄存返回的产生事件的fd
// </MM>
retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : ⑴);
if (retval > 0) {
int j;
numevents = retval;
// <MM>
// 有事件产生,将产生的事件寄存于fired数组
// </MM>
for (j = 0; j < numevents; j++) {
int mask = 0;
struct epoll_event *e = state->events+j;
if (e->events & EPOLLIN) mask |= AE_READABLE;
if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
if (e->events & EPOLLERR) mask |= AE_WRITABLE;
if (e->events & EPOLLHUP) mask |= AE_WRITABLE;
eventLoop->fired[j].fd = e->data.fd;
eventLoop->fired[j].mask = mask;
}
}
return numevents;
}
在事件返回后,需要处理事件。遍历fired数组,获得fd对应的事件,并根据触发的事件类型,回调其处理函数。 for (j = 0; j < numevents; j++) {
// <MM>
// poll返回后,会将所有触发的时间寄存于fired数组
// </MM>
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
int mask = eventLoop->fired[j].mask;
int fd = eventLoop->fired[j].fd;
int rfired = 0;
/* note the fe->mask & mask & ... code: maybe an already processed
* event removed an element that fired and we still didn't
* processed, so we check if the event is still valid. */
// <MM>
// 回调产生事件的fd,注册的事件处理函数
// </MM>
if (fe->mask & mask & AE_READABLE) {
rfired = 1;
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
}
if (fe->mask & mask & AE_WRITABLE) {
if (!rfired || fe->wfileProc != fe->rfileProc)
fe->wfileProc(eventLoop,fd,fe->clientData,mask);
}
processed++;
}
以上便是,io事件的处理,下面看1下定时器事件的处理。会调用processTimeEvents函数处理定时器事件。
首先会校验是不是产生系统时钟偏差(system clock skew,修改系统事件会产生?把事件调到过去),如果产生就将所有事件的产生时间置为0,立即触发。
/* If the system clock is moved to the future, and then set back to the
* right value, time events may be delayed in a random way. Often this
* means that scheduled operations will not be performed soon enough.
*
* Here we try to detect system clock skews, and force all the time
* events to be processed ASAP when this happens: the idea is that
* processing events earlier is less dangerous than delaying them
* indefinitely, and practice suggests it is. */
if (now < eventLoop->lastTime) {
te = eventLoop->timeEventHead;
while(te) {
te->when_sec = 0;
te = te->next;
}
}
eventLoop->lastTime = now;
接下来遍历所有定时器事件,查找触发的事件,然后回调解理函数。定时器事件处理函数的返回值,决定这个事件是1次性的,还是周期性的。如果返回AE_NOMORE,则是1次性事件,在调用完后会删除该事件。否则的话,返回值指定的是下1次触发的时间。
te = eventLoop->timeEventHead;
maxId = eventLoop->timeEventNextId⑴;
while(te) {
long now_sec, now_ms;
long long id;
if (te->id > maxId) {
te = te->next;
continue;
}
aeGetTime(&now_sec, &now_ms);
if (now_sec > te->when_sec ||
(now_sec == te->when_sec && now_ms >= te->when_ms))
{
// <MM>
// 定时器事件的触发时间已过,则回调注册的事件处理函数
// </MM>
int retval;
id = te->id;
retval = te->timeProc(eventLoop, id, te->clientData);
processed++;
/* After an event is processed our time event list may
* no longer be the same, so we restart from head.
* Still we make sure to don't process events registered
* by event handlers itself in order to don't loop forever.
* To do so we saved the max ID we want to handle.
*
* FUTURE OPTIMIZATIONS:
* Note that this is NOT great algorithmically. Redis uses
* a single time event so it's not a problem but the right
* way to do this is to add the new elements on head, and
* to flag deleted elements in a special way for later
* deletion (putting references to the nodes to delete into
* another linked list). */
// <MM>
// 根据定时器事件处理函数的返回值,决定是不是将该定时器删除。
// 如果retval不等于⑴(AE_NOMORE),则更改定时器的触发时间为
// now + retval(ms)
// </MM>
if (retval != AE_NOMORE) {
aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms);
} else {
// <MM>
// 如果返回AE_NOMORE,则删除该定时器
// </MM>
aeDeleteTimeEvent(eventLoop, id);
}
te = eventLoop->timeEventHead;
} else {
te = te->next;
}
}
在回调解理函数时,有可能会添加新的定时器事件,如果不断加入,存在无穷循环的风险,所以需要避免这类情况,每次循环不处理新添加的事件,这是通过下面的代码实现的。
if (te->id > maxId) {
te = te->next;
continue;
}
事件循环部份分析到此结束,感觉比较直观、清晰,完全可以抽出来,作为1个独立的库使用。下面1节,会介绍要求的处理。
生活不易,码农辛苦
如果您觉得本网站对您的学习有所帮助,可以手机扫描二维码进行捐赠