本文主要梳理文件事件处理流程

整体流程

Alt text

流程详解

step1: 初始化事件处理器结构体

// redis.c/initServer()
//全局变量
struct redisServer server; 
//初始化事件处理器结构体
server.el = aeCreateEventLoop(server.maxclients+REDIS_EVENTLOOP_FDSET_INCR);

事件处理器结构体:

typedef struct aeEventLoop {
    ...
    // 已注册的文件事件, events是aeFileEvent类型
    aeFileEvent *events; 
    ...
} aeEventLoop;

其中,aeFileEvent结构体定义如下:

typedef struct aeFileEvent {
    // 监听事件类型掩码,
    // 值可以是 AE_READABLE 或 AE_WRITABLE ,
    // 或者 AE_READABLE | AE_WRITABLE
    int mask; /* one of AE_(READABLE|WRITABLE) */
    // 读事件处理器,aeFileProc为函数指针
    aeFileProc *rfileProc;
    // 写事件处理器
    aeFileProc *wfileProc;
    // 多路复用库的私有数据
    void *clientData;
} aeFileEvent;

aeFileProc函数指针定义如下:

typedef void aeFileProc(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask);

在c语言中,回调是通过函数指针实现的。 通过将回调函数地址 传递给 被调函数,从而实现回调。在这里,通过定义函数指针aeFileProc,由调用方实现具体的函数内容,在实际调用函数里,把aeFileProc实现函数的地址传进来。其实相当于定义一种接口,由调用方来实现该接口。

step2: 注册事件处理函数

    //redis.c/initServer()
    // 为 TCP 连接关联连接应答(accept)处理器
    // 用于接受并应答客户端的 connect() 调用
   
    for (j = 0; j < server.ipfd_count; j++) {
        if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
            acceptTcpHandler,NULL) == AE_ERR)
            {
                redisPanic(
                    "Unrecoverable error creating server.ipfd file event.");
            }
    }
    // 为本地套接字关联应答处理器
    if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE,
        acceptUnixHandler,NULL) == AE_ERR) redisPanic("Unrecoverable error creating server.sofd file event.");

以TCP连接为例,说明注册流程。通过调用aeCreateFileEvent函数,把aeFilePrco函数指针的实现函数——acceptTcpHandler作为参数传进去。

aeCreateFileEvent函数如下:

//ae.c/aeCreateFileEvent()
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
        aeFileProc *proc, void *clientData)
{
    if (fd >= eventLoop->setsize) {
        errno = ERANGE;
        return AE_ERR;
    }
    if (fd >= eventLoop->setsize) return AE_ERR;
    // 取出文件事件结构
    aeFileEvent *fe = &eventLoop->events[fd];//注意这里是引用!!! by gs
    // 监听指定 fd 的指定事件
    if (aeApiAddEvent(eventLoop, fd, mask) == -1)
        return AE_ERR;
    // 设置文件事件类型,以及事件的处理器
    fe->mask |= mask;
    //将事件处理器函数(回调函数)赋值给aeFileEvent结构体中对应的函数指针,其实就是赋值给aeEventLoop结构体中的events  by gs
    if (mask & AE_READABLE) fe->rfileProc = proc;
    if (mask & AE_WRITABLE) fe->wfileProc = proc;
    // 私有数据
    fe->clientData = clientData;
    // 如果有需要,更新事件处理器的最大 fd
    if (fd > eventLoop->maxfd)
        eventLoop->maxfd = fd;
    return AE_OK;
}

函数说明如下:

1) aeApiAddEvent

以epoll为例,ae_epoll.c文件中,实现aeApiAddEvent接口,调用epoll系统函数,注册事件。

2)将aeFileProc *proc赋值给aeEventLoop *eventLoop中的events,完成回调函数注册。

step3: 事件触发,执行回调函数

//ae.c/aeMain()
void aeMain(aeEventLoop *eventLoop) {
    eventLoop->stop = 0;
    while (!eventLoop->stop) {
        // 如果有需要在事件处理前执行的函数,那么运行它
        if (eventLoop->beforesleep != NULL)
            eventLoop->beforesleep(eventLoop);
        // 开始处理事件
        aeProcessEvents(eventLoop, AE_ALL_EVENTS);
    }
}

aeProcessEvents函数如下:

int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
	...
	// 处理文件事件,阻塞时间由 tvp 决定
        numevents = aeApiPoll(eventLoop, tvp);//aeApiPoll获取已就绪事件by gs
        for (j = 0; j < numevents; j++) {
            // 从已就绪数组中获取事件
            aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
            int mask = eventLoop->fired[j].mask;
            int fd = eventLoop->fired[j].fd;
            int rfired = 0;
            // 读事件
            if (fe->mask & mask & AE_READABLE) {
                // rfired 确保读/写事件只能执行其中一个
                rfired = 1;
                fe->rfileProc(eventLoop,fd,fe->clientData,mask);//执行回调函数
            }
            // 写事件
            if (fe->mask & mask & AE_WRITABLE) {
                if (!rfired || fe->wfileProc != fe->rfileProc)
                    fe->wfileProc(eventLoop,fd,fe->clientData,mask);//执行回调函数
            }
            processed++;
        }
    ...
}

执行回调函数acceptTcpHandle

//networking.c/acceptTcpHandler()
//创建TCP连接处理器
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
    int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
    char cip[REDIS_IP_STR_LEN];
    REDIS_NOTUSED(el);
    REDIS_NOTUSED(mask);
    REDIS_NOTUSED(privdata);
    while(max--) {
        // accept 客户端连接
        cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
        if (cfd == ANET_ERR) {
            if (errno != EWOULDBLOCK)
                redisLog(REDIS_WARNING,
                    "Accepting client connection: %s", server.neterr);
            return;
        }
        redisLog(REDIS_VERBOSE,"Accepted %s:%d", cip, cport);
        // 为客户端创建客户端状态(redisClient)
        acceptCommonHandler(cfd,0);
    }
}

主要步骤:

1) anetTcpAccept用于接收客户端连接,返回连接的fd;

2)acceptCommonHandle用于为每个客户端连接创建redisClient结构体。

anetTcpAccept函数:

//anet.c/anetTcpAccept()
//Tcp连接accept
int antTcpAccept(char *err, int s, char *ip, size_t ip_len, int *port) {
    int fd;
    struct sockaddr_storage sa;
    socklen_t salen = sizeof(sa);
    if ((fd = anetGenericAccept(err,s,(struct sockaddr*)&sa,&salen)) == -1)
        return ANET_ERR;
    if (sa.ss_family == AF_INET) {
        struct sockaddr_in *s = (struct sockaddr_in *)&sa;
        if (ip) inet_ntop(AF_INET,(void*)&(s->sin_addr),ip,ip_len);
        if (port) *port = ntohs(s->sin_port);
    } else {
        struct sockaddr_in6 *s = (struct sockaddr_in6 *)&sa;
        if (ip) inet_ntop(AF_INET6,(void*)&(s->sin6_addr),ip,ip_len);
        if (port) *port = ntohs(s->sin6_port);
    }
    return fd;
}

anetGenericAccept函数:

static int anetGenericAccept(char *err, int s, struct sockaddr *sa, socklen_t *len) {
    int fd;
    while(1) {
        fd = accept(s,sa,len);//系统调用accept函数,接收Tcp连接
        if (fd == -1) {
            if (errno == EINTR)
                continue;
            else {
                anetSetError(err, "accept: %s", strerror(errno));
                return ANET_ERR;
            }
        }
        break;
    }
    return fd;
}

accept函数从连接请求队列中获取第一个连接,创建新的套接字,并返回fd。如果队列中无请求连接&套接字为阻塞方式,则accept函数阻塞调用进程直到新的连接出现;如果为非阻塞&无请求连接,则accept函数返回一个错误信息。

acceptCommonHandler函数:

//networking.c/acceptCommonHandler()
//处理tcp连接
static void acceptCommonHandler(int fd, int flags) {
    // 创建客户端
    redisClient *c;
    if ((c = createClient(fd)) == NULL) {
        redisLog(REDIS_WARNING,
            "Error registering fd event for the new client: %s (fd=%d)",
            strerror(errno),fd);
        close(fd); /* May be already closed, just ignore errors */
        return;
    }
   ...
}

该函数主要调用createClient为连接创建客户端结构体。

createClient函数:

redisClient *createClient(int fd) {
	// 分配空间
    redisClient *c = zmalloc(sizeof(redisClient));
    // 当 fd 不为 -1 时,创建带网络连接的客户端
    // 如果 fd 为 -1 ,那么创建无网络连接的伪客户端
    // 因为 Redis 的命令必须在客户端的上下文中使用,所以在执行 Lua 环境中的命令时需要用到这种伪终端
    if (fd != -1) {
        // 非阻塞
        anetNonBlock(NULL,fd);
        // 禁用 Nagle 算法
        anetEnableTcpNoDelay(NULL,fd);
        // 设置 keep alive
        if (server.tcpkeepalive)
            anetKeepAlive(NULL,fd,server.tcpkeepalive);
        // 绑定读事件到事件 loop (开始接收命令请求)
        if (aeCreateFileEvent(server.el,fd,AE_READABLE,
            readQueryFromClient, c) == AE_ERR)
        {
            close(fd);
            zfree(c);
            return NULL;
        }
    }
   ...
}

这里又一次调用aeCreateFileEvent函数,传入回调函数readQueryFromClient,用来处理该连接上的数据。

readQueryFromClient函数:

void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
    redisClient *c = (redisClient*) privdata;
    ...
     // 设置服务器的当前客户端
    server.current_client = c;
    ...
    // 读入内容到查询缓存
    nread = read(fd, c->querybuf+qblen, readlen);//读数据
	...
	// 从查询缓存重读取内容,创建参数,并执行命令
    // 函数会执行到缓存中的所有内容都被处理完为止
    processInputBuffer(c);//解析成命令
    ...
}

processInputBuffer函数:

//networking.c/processInputBuffer()
//处理客户端出入的命令
void processInputBuffer(redisClient *c) {
	while(sdslen(c->querybuf)) {
		...
        // 将缓冲区中的内容转换成命令,以及命令参数
        if (c->reqtype == REDIS_REQ_INLINE) {
            if (processInlineBuffer(c) != REDIS_OK) break;
        } else if (c->reqtype == REDIS_REQ_MULTIBULK) {
            if (processMultibulkBuffer(c) != REDIS_OK) break;
        } else {
            redisPanic("Unknown request type");
        }
        /* Multibulk processing could see a <= 0 length. */
        if (c->argc == 0) {
            resetClient(c);
        } else {
            /* Only reset the client when the command was executed. */
            // 执行命令,并重置客户端
            if (processCommand(c) == REDIS_OK)
                resetClient(c);
        }
    }
}

首先处理数据内容,解析成命令,然后调用processCommand函数执行命令。

processCommand函数:

//redis.c/processCommand()
int processCommand(redisClient *c){
	...
	// 查找命令,并进行命令合法性检查,以及命令参数个数检查
    c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
    ...
    //执行命令
    if (c->flags & REDIS_MULTI &&
        c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
        c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)
    {
        // 在事务上下文中
        // 除 EXEC 、 DISCARD 、 MULTI 和 WATCH 命令之外,其他所有命令都会被入队到事务队列中
        queueMultiCommand(c);
        addReply(c,shared.queued);
    } else {
        // 执行命令
        call(c,REDIS_CALL_FULL);
        c->woff = server.master_repl_offset;
        // 处理那些解除了阻塞的键
        if (listLength(server.ready_keys))
            handleClientsBlockedOnLists();
    }
    return REDIS_OK;
}

调用call()执行命令。

call函数:

//redis.c/call()
//调用命令的实现函数,执行命令
void call(redisClient *c, int flags) {
	...
	 // 执行实现函数
    c->cmd->proc(c);
    ...
}

step4: 回复请求

以mget命令为例,说明服务器是怎么返回命令结果的。

//t_string.c/mgetCommand()
void mgetCommand(redisClient *c) {
    int j;

    addReplyMultiBulkLen(c,c->argc-1);
    // 查找并返回所有输入键的值
    for (j = 1; j < c->argc; j++) {
        // 查找键 c->argc[j] 的值
        robj *o = lookupKeyRead(c->db,c->argv[j]);
        if (o == NULL) {
            // 值不存在,向客户端发送空回复
            addReply(c,shared.nullbulk);
        } else {
            if (o->type != REDIS_STRING) {
                // 值存在,但不是字符串类型
                addReply(c,shared.nullbulk);
            } else {
                // 值存在,并且是字符串
                addReplyBulk(c,o);
            }
        }
    }
}

命令执行后,调用addReply函数,回复客户端。

addReply函数:

//networking.c/addReply()
void addReply(redisClient *c, robj *obj) {
    // 为客户端安装写处理器到事件循环
    if (prepareClientToWrite(c) != REDIS_OK) return;
   ...
}

在该函数中,调用prepareClientToWrite函数,注册写时间处理器。

prepareClientToWrite函数:

//networking.c/prepareClientToWrite()
int prepareClientToWrite(redisClient *c) {
    ...
    // 一般情况,为客户端套接字安装写处理器到事件循环
    if (c->bufpos == 0 && listLength(c->reply) == 0 &&
        (c->replstate == REDIS_REPL_NONE ||
         c->replstate == REDIS_REPL_ONLINE) &&
        aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
        sendReplyToClient, c) == AE_ERR) return REDIS_ERR;
    return REDIS_OK;
}

调用aeCreateFileEvent函数,注册sendReplyToClient事件处理函数。

sendReplyToClient函数:

//networking.c/sendReplyToClient()
void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
	...
	while(c->bufpos > 0 || listLength(c->reply)) {
		if (c->bufpos > 0) {
			...
			//调用系统函数write写数据到fd
			nwritten = write(fd,c->buf+c->sentlen,c->bufpos-c->sentlen);
			...
		}else{
			...
			//调用系统函数write写数据到fd
			nwritten = write(fd, ((char*)o->ptr)+c->sentlen,objlen-c->sentlen);
			...
		}
	}
	...
}

小结

如上,redis文件事件处理流程:

  • 首先初始化事件处理器
  • 注册回调函数
  • 事件触发时,调用回调函数,处理事件
  • 执行完命令后,返回执行结果