Redis网络事件端到端流程流程


文章速读

下面为Redis在运行时处理Socket事件的伪代码,Select的本质上是模式匹配与高阶函数(在C中用函数指针进行模拟),全程单线程运行(BGSAVE需要fork除外)。

// Redis 中 Socket 事件的循环处理伪代码
while(true){
  // 通过Epoll,Select等方法获取当前的可用的SocketDescriptor数组
  aeApiPoll(eventLoop, ...);
  // 对可用Socket依次进行调用函数
  eventLoop.forEach(event -> {
    switch(event.mask){
      case AE_READABLE:
        // 调用相应的函数指针,即业务代码...
        rfileProc(event, ...);
        break;
      case AE_WRITABLE:
        wfileProc(event, ...);
        break;
    }
  })
}

IO multiplexing 与 Select系统调用

Redis在内部采用非阻塞同步IO作为底层库实现,看起来很高大上,实际上就是一个不断循环搜索可用I/O descriptor的系统调用,它在OS上有select, epoll, kqueue等实现。在本篇中,将以Select为例介绍I/O多路复用。

Select是一个网络相关的POSIX调用,可以用 man 2 select 查看具体的介绍。

select() examines the I/O descriptor sets whose addresses are passed in readfds, writefds, and errorfds to see if some of their descriptors are ready for reading, are ready for writing, or have an exceptional condi-tion pending, respectively.

它内部通过轮询一个默认长度为1024的数组查询可读 或者 可写Socket descriptor(在C中用 fd_set 表示, a fixed size buffer),并返回给用户,比如下图就是一个查询可读Socket的Select流程。

Select

通过Select可以一次性单线程通过内核遍历大量的Socket连接,再依次进行事件处理。

  1. 把大量IO管理的脏活交给内核了,可以轻松设计基于Event loop的模型,单线程就搞定IO,之后产生的事件再交给其他模块(比如执行线程池),业务代码更容易维护
  2. 相比于以前每次新来一个连接就开一个线程的做法,Select节约了线程创建、切换等资源损失

至于Select具体实现的源码,已经属于SystemCall层,可以翻下在线或者教学用的迷你Linux学习下源码,本文就不深挖了。此外,这里有一个对比阻塞与非阻塞的问答

Select在Redis中的封装

Redis实现了对底层的系统调用 Select, epoll, export, kqueue 的适配,以支持多个平台。接下来以Select为例,看一下Redis是如何封装Select的

// ae_select.c
// Redis 对 Select 的封装入口
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
    aeApiState *state = eventLoop->apidata;
    int retval, j, numevents = 0;
	
    memcpy(&state->_rfds,&state->rfds,sizeof(fd_set));
    memcpy(&state->_wfds,&state->wfds,sizeof(fd_set));
    // 遍历是否有可读可写的 Socket 描述符
    // 1. eventLoop->maxfd+1: 文件描述符数量+1
    // 2. &state->_rfds:可读的 Socket descriptor
    // 3. &state->_wfds: 可写的 Socket descriptor
    // 4. NULL: 错误的 Socket descriptor,NUll表示在Redis中没有监听
    // 5. tvp: 超时配置 timeval {秒 + 毫秒}, 为0时不等待, 为null时阻塞等待
    retval = select(eventLoop->maxfd+1,
                &state->_rfds,&state->_wfds,NULL,tvp);
  	// 如果刚刚Select遍历搜索到的话
    if (retval > 0) {
      	// 就再遍历一遍,并添加 Mask
        for (j = 0; j <= eventLoop->maxfd; j++) {
            int mask = 0;
            aeFileEvent *fe = &eventLoop->events[j];
			
            if (fe->mask == AE_NONE) continue;
            if (fe->mask & AE_READABLE && FD_ISSET(j,&state->_rfds))
                mask |= AE_READABLE;
            if (fe->mask & AE_WRITABLE && FD_ISSET(j,&state->_wfds))
                mask |= AE_WRITABLE;
          	//fire这里翻译为`触发事件`比较合理,为将要触发的事件放入fd与mask
            eventLoop->fired[numevents].fd = j;
            eventLoop->fired[numevents].mask = mask;
            numevents++;
        }
    }
    return numevents;
}

通过将此函数放在一个while循环中,Redis就可以通过一个线程对IO事件进行高效处理,下文将详细讲。

Redis 套接字处理流程

我们首先从main函数开始

事件循环(Event loop)

可以看出这里是一个简单的事件循环,关键点是aeProcessEvents函数

// server.c 的 main 方法中调用 ae.c 的事件循环函数 aeMain
void aeMain(aeEventLoop *eventLoop) {
    eventLoop->stop = 0;
    while (!eventLoop->stop) {
        if (eventLoop->beforesleep != NULL)
            eventLoop->beforesleep(eventLoop);
        aeProcessEvents(eventLoop, AE_ALL_EVENTS);
    }
}

事件处理

接着分析 aeProcessEvents,它用于处理Redis的所有事件

// ae.c 的 aeProcessEvents 函数,处理Redis事件
// 本文只分析Socket事件,为了版面,将删除与Socket无关的代码
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
    int processed = 0, numevents;

    /* Nothing to do? return ASAP */
    if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;
    // 此处略去定时相关代码...
    if (eventLoop->maxfd != -1 ||
        ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
        int j;
        aeTimeEvent *shortest = NULL;
        struct timeval tv, *tvp;
        // 设置 select 的超时事件
     	if (flags & AE_DONT_WAIT) {  	
            tv.tv_sec = tv.tv_usec = 0;
            // tvp 为 0 时 不等待                     
            tvp = &tv;
        } else {
            // tvp 为 null 时 阻塞等待
            tvp = NULL;
        }
        // 调用对 Select 的包装,这个刚刚讲过了
        numevents = aeApiPoll(eventLoop, tvp);
        // 对可读与可写的事件进行处理
        for (j = 0; j < numevents; j++) {
            // 读取刚刚通过Select获得的Event
            aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
            int mask = eventLoop->fired[j].mask;
            int fd = eventLoop->fired[j].fd;
            int rfired = 0;
            // 可读Socket处理
            if (fe->mask & mask & AE_READABLE) {
                rfired = 1;
                // 调用函数指针           
                fe->rfileProc(eventLoop,fd,fe->clientData,mask);
            }
            // 可写Socket处理
            if (fe->mask & mask & AE_WRITABLE) {
                if (!rfired || fe->wfileProc != fe->rfileProc)
                    fe->wfileProc(eventLoop,fd,fe->clientData,mask);
            }
            processed++;
        }
    }
	// 此处略去定时相关代码...
    return processed; /* return the number of processed file/time events */
}

调用业务( 调用函数指针)

接下来就进行具体的调用函数指针了。这部分进行反序列化操作并执行读写事务,C中的函数指针在一定程度上类似于高级语言中的闭包(Closure),比如这里的aeFileProc就有多个实现,下面是它的原型与某个实现

// 对函数的定义
typedef void aeFileProc(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask);
// 对函数的实现
void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
    UNUSED(el);
    UNUSED(mask);
    // 业务代码...
    writeToClient(fd,privdata,1);
}

可以在Clion使用cmd + shift + f进行全局文本搜索aeCreateFileEvent,就可以找到所有的业务了。

附录

为什么NIO中Select调用操作有时会占用很多CPU?

说个NIO相关的吧,最近在用JVisualvm抽样CPU使用时,发现MQ的Select调用占用非常高的CPU时间。通过定位,发现:

  1. 有大量的连接通过MQ发出后无响应
  2. 自身由于积压导致可写的连接没被处理
  3. 进而引发FullGC,导致更加恶化

个人猜测,Select在某些恶劣环境下不擅长于长连接或者不稳定的连接而出现空转循环。我的建议:

  1. 配置Unix参数,减小栈内存的分配量,减少Socket的等待超时
  2. 切忌在此循环线程上执行耗时任务,建议将产生的事件交给线程池
  3. 将Select超时设计为阻塞等待,减少空转次数

参考文献

  1. http://origin.redisbook.com/
  2. Redis设计与实现 (数据库技术丛书)
  3. 深入浅出Node.js
  4. https://www.zhihu.com/question/28594409
  5. TCP/IP 应用程序的通信连接模式