本文主要是基于 redis 6.2 源码进行分析定时事件的数据结构和常见操作。
数据结构
在 redis 中通过 aeTimeEvent 结构来创建定时任务事件,代码如下:
- /* Time event structure */
- typedef struct aeTimeEvent {
- ? ? // 标识符
- ? ? long long id; /* time event identifier. */
- ? ? // 定时纳秒数
- ? ? monotime when;
- ? ? // 定时回调函数
- ? ? aeTimeProc *timeProc;
- ? ? // 注销定时器时候的回调函数
- ? ? aeEventFinalizerProc *finalizerProc;
- ? ? void *clientData;
- ? ? struct aeTimeEvent *prev;
- ? ? struct aeTimeEvent *next;
- ? ? int refcount; /* refcount to prevent timer events from being
- ? ?? ??? ? ? * freed in recursive time event calls. */
- } aeTimeEvent;
常见操作
1. 创建定时事件
redis 中最重要的定时函数且是周期执行的函数,使用的是 serverCron 函数。在 redis 中由于定时任务比较少,因此并没有严格的按照过期时间来排序的,而是按照 id自增 + 头插法 来保证基本有序。
- if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
- ? serverPanic("Can't create event loop timers.");
- ? exit(1);
- }
-
- //创建定时器对象
- long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,
- ? ? ? ? aeTimeProc *proc, void *clientData,
- ? ? ? ? aeEventFinalizerProc *finalizerProc)
- {
- ? ? long long id = eventLoop->timeEventNextId++;
- ? ? aeTimeEvent *te;
-
- ? ? te = zmalloc(sizeof(*te));
- ? ? if (te == NULL) return AE_ERR;
- ? ? te->id = id;
- ? ? te->when = getMonotonicUs() + milliseconds * 1000;
- ? ? te->timeProc = proc;
- ? ? te->finalizerProc = finalizerProc;
- ? ? te->clientData = clientData;
- ? ? te->prev = NULL;
- ? ? // 头插法?
- ? ? te->next = eventLoop->timeEventHead;
- ? ? te->refcount = 0;
- ? ? if (te->next)
- ? ? ? ? te->next->prev = te;
- ? ? eventLoop->timeEventHead = te;
- ? ? return id;
- }

2. 触发定时事件
redis 中是采用 IO 复用来进行定时任务的。
查找距离现在最近的定时事件,见 usUntilEarliestTimer
- ?
- /* How many microseconds until the first timer should fire.
- ?* If there are no timers, -1 is returned.
- ?*
- ?* Note that's O(N) since time events are unsorted.
- ?* Possible optimizations (not needed by Redis so far, but...):
- ?* 1) Insert the event in order, so that the nearest is just the head.
- ?* ? ?Much better but still insertion or deletion of timers is O(N).
- ?* 2) Use a skiplist to have this operation as O(1) and insertion as O(log(N)).
- ?*/
- static int64_t usUntilEarliestTimer(aeEventLoop *eventLoop) {
- ? ? aeTimeEvent *te = eventLoop->timeEventHead;
- ? ? if (te == NULL) return -1;
-
- ? ? aeTimeEvent *earliest = NULL;
- ? ? while (te) {
- ? ? ? ? if (!earliest || te->when < earliest->when)
- ? ? ? ? ? ? earliest = te;
- ? ? ? ? te = te->next;
- ? ? }
-
- ? ? monotime now = getMonotonicUs();
- ? ? return (now >= earliest->when) ? 0 : earliest->when - now;
- }
-
- ?
这里时间复杂度可能比较高,实际中需要结合具体场景使用。
更新剩余过期时间,想想为啥呢?因为我们前面提到过,IO 复用有可能因为 IO 事件返回,所以需要更新。
- if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
- ? usUntilTimer = usUntilEarliestTimer(eventLoop);
-
- if (usUntilTimer >= 0) {
- ? tv.tv_sec = usUntilTimer / 1000000;
- ? tv.tv_usec = usUntilTimer % 1000000;
- ? tvp = &tv;
- } else {
- ? if (flags & AE_DONT_WAIT) {
- ? ? // 不等待
- ? ? tv.tv_sec = tv.tv_usec = 0;
- ? ? tvp = &tv;
- ? } else {
- ? ? /* Otherwise we can block */
- ? ? tvp = NULL; /* wait forever */
- ? }
- }
3. 执行定时事件
一次性的执行完直接删除,周期性的执行完在重新添加到链表。
- /* Process time events */
- static int processTimeEvents(aeEventLoop *eventLoop) {
- ? int processed = 0;
- ? aeTimeEvent *te;
- ? long long maxId;
-
- ? te = eventLoop->timeEventHead;
- ? maxId = eventLoop->timeEventNextId-1;
- ? monotime now = getMonotonicUs();
- ??
- ? // 删除定时器
- ? while(te) {
- ? ? long long id;
- ?? ??? ?
- ? ? // 下一轮中对事件进行删除
- ? ? /* Remove events scheduled for deletion. */
- ? ? if (te->id == AE_DELETED_EVENT_ID) {
- ? ? ? aeTimeEvent *next = te->next;
- ? ? ? /* If a reference exists for this timer event,
- ? ? ? ? ? ? ?* don't free it. This is currently incremented
- ? ? ? ? ? ? ?* for recursive timerProc calls */
- ? ? ? if (te->refcount) {
- ? ? ? ? te = next;
- ? ? ? ? continue;
- ? ? ? }
- ? ? ? if (te->prev)
- ? ? ? ? te->prev->next = te->next;
- ? ? ? else
- ? ? ? ? eventLoop->timeEventHead = te->next;
- ? ? ? if (te->next)
- ? ? ? ? te->next->prev = te->prev;
- ? ? ? if (te->finalizerProc) {
- ? ? ? ? te->finalizerProc(eventLoop, te->clientData);
- ? ? ? ? now = getMonotonicUs();
- ? ? ? }
- ? ? ? zfree(te);
- ? ? ? te = next;
- ? ? ? continue;
- ? ? }
- ? ??
- ? ? if (te->id > maxId) {
- ? ? ? te = te->next;
- ? ? ? continue;
- ? ? }
-
- ? ? if (te->when <= now) {
- ? ? ? int retval;
-
- ? ? ? id = te->id;
- ? ? ? te->refcount++;
- ? ? ? // timeProc 函数返回值 retVal 为时间事件执行的间隔
- ? ? ? retval = te->timeProc(eventLoop, id, te->clientData);
- ? ? ? te->refcount--;
- ? ? ? processed++;
- ? ? ? now = getMonotonicUs();
- ? ? ? if (retval != AE_NOMORE) {
- ? ? ? ? te->when = now + retval * 1000;
- ? ? ? } else {
- ? ? ? ? // 如果超时了,那么标记为删除
- ? ? ? ? te->id = AE_DELETED_EVENT_ID;
- ? ? ? }
- ? ? }
- ? ? // 执行下一个
- ? ? te = te->next;
- ? }
- ? return processed;
- }
总结
优点:实现简单
缺点:如果定时任务很多,效率比较低。
到此这篇关于Redis定时任务原理的实现的文章就介绍到这了,更多相关Redis定时任务内容请搜索w3xue以前的文章或继续浏览下面的相关文章希望大家以后多多支持w3xue!