On the way day day up...

算法基础题——数字

数字

1. 斐波那契数列

2. 求二进制中1的数目

3. N的阶乘中0的个数

4. 给定一个十进制正整数N,从1开始到N的所有整数中,出现1的个数。

比如N=2,则1,2中,出现了1个“1”

5. 快速寻找满足条件的两个数

快速找出一个数组中的两个数字,让这两个数组之和等于一个给定的值,假设这个数组中肯定存在至少一组符合要求的解

6. 大数相加

算法基础题——字符串

字符串

1. 字符串反转

比如: how are you 转换成 you are how

#include <stdio.h>

void reverse(char * start, char *end){

	if(NULL == start || NULL == end){
		return;
	}

	char tmp;
	while(start < end){
		tmp = *start;
		*start = *end;
		*end = tmp;
		start++;
		end--;
	}
}

//字反转
char * reverse_word(char * src){

	if(NULL == src){
		return NULL;
	}
	
	char *start = src;
	char *end = src;

	while(*end != '\0'){
		end++;	
	}
	end--;
	reverse(start, end);

	end = start;

	while(*start != '\0'){
		
		if(*start == ' '){ //指向空字符
			start++;
			end++;
			continue;
		}else if(*end == ' ' || *end == '\0'){ //end指向空格或字符串结尾
			reverse(start, --end);//注意这里先 -- 再传end
			start = ++end;//注意这里先++,再赋值给start
		}else{ //end指向字符
			end++;
		}
		
	}

	return src;
}

int main(){
	char src[] = "how are you";
	char *p = reverse_word(src);
	printf("%s\n", p);

}

2. 判断是否为ip

#include <stdio.h>

int isValidValue(char *start, char *end){

	int sum = 0;
	while(start <= end){
		if(*start < '0' || *start > '9'){
			return -1;
		}else{
			sum = sum*10 + (*start - '0');
			start++;
		}
	}
	if(sum <0 || sum > 255){
		return -1;
	}	
	
	return 1;
}

int isValidFormat(char *src){

	int num = 0;
	int res = 0;
	char *start = src;
	char *end = src;
	
	while(*start != '\0'){
		printf("%s\n", start);
		if(*start == '.'){
			num++;
			start++;
			end++;
		}else if(*end == '.' || *end == '\0'){
			res = isValidValue(start, --end);
			if(-1 == res){
				return -1;
			}else{
				start = ++end;
			}
		}else{
			end++;
		}
	}

	if(3 != num){
		return -1;
	}
	return 1;
}

int main(){
	char *src = "24.1.2.4.1";
	printf("%d\n", isValidFormat(src));

}

3. 字符串包含

一个长字符串a,一个短字符串b,如何判断b中的字符都在a中?

4. 字符串拼接

#include <stdio.h>
#define N 10

char * strconnect(char src[], char des[]){

	if(NULL == src || NULL == des){
		return NULL;
	}
	char *p = des;
	while(*des != '\0'){
		des ++;
	}

	while(*src != '\0'){
		*des = *src;
		des ++;
		src ++;
	}
	*des = '\0';
	return p;
}

int main(){

	char src[N] = "abcd";
	char des[N] = "1234";
	strconnect(src, des);
	printf("%s\n", des);
}

5. 字符串拷贝

#include <stdio.h>

char * strcopy(const char * src, char * des){

	if(NULL == src || NULL == des){
		return NULL;
	}
	char *p = des;

	while(*src != '\0'){
		*des = *src;
		des++;
		src++;
	}
	*des = '\0';
	return p;
}

int main(){

	const char *src = "abc";//src是指向 字符常量const char类型 的指针
	char *des;
	des = strcopy(src, des);//入参src与形参类型要一致,都是指向const char类型的指针
	printf("%s\n", des);
}

6. 字符串移位包含

字符串s1,s2,判断字符串s2能否被s1循环移位得到的字符串包含。例如,给定s1=‘AABCD’,s2=’CDAA’,则s1向右移动3位变长CDAAB,包含s2

7. 查找文件中字符串总数和位置

8. 字符串转整形,整形转字符串

字符串转整形

#include <stdio.h>

int str2int(char *str){
	if(NULL == str){
		return -1;
	}
	int sign = 1;
	int des = 0;

	if('-' == *str){
		sign = -1;
		str++;
	}else if('+' == *str){
		str++;
	}	

	while(*str != '\0'){
		if(*str >= '0' && *str <= '9'){
			des = des*10 + (*str - '0');
			str++;
		}else{
			return -1;
		}	
	}
	return sign * des;
}

int main(){
	char *str = "-1234";
	int des = str2int(str);
	printf("%d\n", des);
}

整形转字符串,方法一

#include <stdio.h>
#define N 100

char * int2str(int src, char des[]){

	if(0 == src){
		return NULL;
	}

	int i=0, j=0;
	char tmp[N];

	while(src > 0){
		tmp[i] = src % 10 + '0';
		src = src / 10;
		i++;
	}

	i--;
	while(i >= 0){
		des[j] = tmp[i];
		i--;
		j++;
	}
	des[j] = '\0';
	return des;	
}

int main(){
	int src = 1234;
	char des[N];
	int2str(src, des);
	printf("%s\n", des);
}

整形转字符串,方法二

#include <stdio.h>

char * int2str(int src, char *des){
	if(0 == src){
		return NULL;
	}

	char * tmp = des;
	while(src > 0){
		*tmp = src % 10 + '0';
		tmp++;
		src = src / 10;
	}
	*tmp = '\0';
	
	char strTmp;
	char *head = des;
	char *end = --tmp;
	
	while(head < end){
		strTmp = *head;
		*head = *end;
		*end = strTmp;
		head++;
		end--;
	}
	return des;
}

int main(){
	int src = 1234;
	char * des;
	des = int2str(src, des);//这里注意啊,需要把des传进去,不然int2str只能返回局部变量的指针,这种是错误的。
	printf("%s\n", des);
}

算法基础题list

数组

1. 排序算法——快速排序

2. 排序算法——冒泡排序

3. 排序算法——插入排序

4. 折半查找/二分查找

5. 求前K大的数

6. 寻找数组中的最大值和最小值

7. AB两个数组,元素去重合并

8. 求数组的子数组之和的最大值

9. 求数组中最长递增子序列

10. 数组循环右移

把一个含有N个元素的数组,循环右移K位,要求时间复杂度为O(N),且只允许使用两个附加变量。

11. 数组分割

有一个无序,元素个数为2n的正整数数组,要求:如何能把这个数组分割成元素个数为n的两个数组,并使两个子数组的和最接近。

字符串

1. 字符串反转

比如: how are you 转换成 you are how

2. 判断是否为ip

3. 字符串包含

一个长字符串a,一个短字符串b,如何判断b中的字符都在a中?

4. 字符串拼接

5. 字符串拷贝

6. 字符串移位包含

字符串s1,s2,判断字符串s2能否被s1循环移位得到的字符串包含。例如,给定s1=‘AABCD’,s2=’CDAA’,则s1向右移动3位变长CDAAB,包含s2

7. 查找文件中字符串总数和位置

8. 字符串转整形,整形转字符串

链表

1. 单链表倒置

2. 单链表相交

3. 链表是否有环

4. 从无头链表中删除节点

数字

1. 斐波那契数列

2. 求二进制中1的数目

3. N的阶乘中0的个数

4. 给定一个十进制正整数N,从1开始到N的所有整数中,出现1的个数。

比如N=2,则1,2中,出现了1个“1”

5. 快速寻找满足条件的两个数

快速找出一个数组中的两个数字,让这两个数组之和等于一个给定的值,假设这个数组中肯定存在至少一组符合要求的解

6. 大数相加

1. 重建二叉树

2. 分层遍历二叉树

3. 求二叉树深度

Redis源码学习——事件2


layout: post author: GoSaturn title: redis源码学习——事件(二) category: 源码学习 tag: [redis] —

本文主要梳理文件事件处理流程

##整体流程

Alt text

##流程详解 ###step1: 初始化事件处理器结构体

// redis.c/initServer()
//全局变量
struct redisServer server; 
//初始化事件处理器结构体
server.el = aeCreateEventLoop(server.maxclients+REDIS_EVENTLOOP_FDSET_INCR);

**事件处理器结构体:**


<figure class="highlight"><pre><code class="language-c" data-lang="c"><span class="k">typedef</span> <span class="k">struct</span> <span class="n">aeEventLoop</span> <span class="p">{</span>
    <span class="p">...</span>
    <span class="c1">// 已注册的文件事件, events是aeFileEvent类型
</span>    <span class="n">aeFileEvent</span> <span class="o">*</span><span class="n">events</span><span class="p">;</span> 
    <span class="p">...</span>
<span class="p">}</span> <span class="n">aeEventLoop</span><span class="p">;</span></code></pre></figure>


其中,**aeFileEvent结构体定义**如下:


<figure class="highlight"><pre><code class="language-c" data-lang="c"><span class="k">typedef</span> <span class="k">struct</span> <span class="n">aeFileEvent</span> <span class="p">{</span>
    <span class="c1">// 监听事件类型掩码,
</span>    <span class="c1">// 值可以是 AE_READABLE 或 AE_WRITABLE ,
</span>    <span class="c1">// 或者 AE_READABLE | AE_WRITABLE
</span>    <span class="kt">int</span> <span class="n">mask</span><span class="p">;</span> <span class="cm">/* one of AE_(READABLE|WRITABLE) */</span>
    <span class="c1">// 读事件处理器,aeFileProc为函数指针
</span>    <span class="n">aeFileProc</span> <span class="o">*</span><span class="n">rfileProc</span><span class="p">;</span>
    <span class="c1">// 写事件处理器
</span>    <span class="n">aeFileProc</span> <span class="o">*</span><span class="n">wfileProc</span><span class="p">;</span>
    <span class="c1">// 多路复用库的私有数据
</span>    <span class="kt">void</span> <span class="o">*</span><span class="n">clientData</span><span class="p">;</span>
<span class="p">}</span> <span class="n">aeFileEvent</span><span class="p">;</span></code></pre></figure>


**aeFileProc函数指针定义**如下:


<figure class="highlight"><pre><code class="language-c" data-lang="c"><span class="k">typedef</span> <span class="kt">void</span> <span class="n">aeFileProc</span><span class="p">(</span><span class="k">struct</span> <span class="n">aeEventLoop</span> <span class="o">*</span><span class="n">eventLoop</span><span class="p">,</span> <span class="kt">int</span> <span class="n">fd</span><span class="p">,</span> <span class="kt">void</span> <span class="o">*</span><span class="n">clientData</span><span class="p">,</span> <span class="kt">int</span> <span class="n">mask</span><span class="p">);</span></code></pre></figure>


在c语言中,`回调是通过函数指针实现的`。通过将回调函数地址 传递给 被调函数,从而实现回调。在这里,通过定义函数指针aeFileProc,由调用方实现具体的函数内容,在实际调用函数里,把aeFileProc实现函数的地址传进来。其实相当于定义一种接口,由调用方来实现该接口。

###step2: 注册事件处理函数

//redis.c/initServer()
// 为 TCP 连接关联连接应答(accept)处理器
// 用于接受并应答客户端的 connect() 调用
for (j = 0; j < server.ipfd_count; j++) {
    if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
        acceptTcpHandler,NULL) == AE_ERR)
        {
            redisPanic(
                "Unrecoverable error creating server.ipfd file event.");
        }
}

// 为本地套接字关联应答处理器
if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE,
    acceptUnixHandler,NULL) == AE_ERR) redisPanic("Unrecoverable error creating server.sofd file event."); ``` 以TCP连接为例,说明注册流程。通过调用aeCreateFileEvent函数,把aeFilePrco函数指针的实现函数——acceptTcpHandler作为参数传进去。

aeCreateFileEvent函数如下:

//ae.c/aeCreateFileEvent()
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
        aeFileProc *proc, void *clientData)
{
    if (fd >= eventLoop->setsize) {
        errno = ERANGE;
        return AE_ERR;
    }
    if (fd >= eventLoop->setsize) return AE_ERR;
    // 取出文件事件结构
    aeFileEvent *fe = &eventLoop->events[fd];//注意这里是引用!!! by gs
    // 监听指定 fd 的指定事件
    if (aeApiAddEvent(eventLoop, fd, mask) == -1)
        return AE_ERR;
    // 设置文件事件类型,以及事件的处理器
    fe->mask |= mask;
    //将事件处理器函数(回调函数)赋值给aeFileEvent结构体中对应的函数指针,其实就是赋值给aeEventLoop结构体中的events  by gs
    if (mask & AE_READABLE) fe->rfileProc = proc;
    if (mask & AE_WRITABLE) fe->wfileProc = proc;
    // 私有数据
    fe->clientData = clientData;
    // 如果有需要,更新事件处理器的最大 fd
    if (fd > eventLoop->maxfd)
        eventLoop->maxfd = fd;
    return AE_OK;
}

函数说明如下: 1) aeApiAddEvent 以epoll为例,ae_epoll.c文件中,实现aeApiAddEvent接口,调用epoll系统函数,注册事件。 2)将aeFileProc *proc赋值给aeEventLoop *eventLoop中的events,完成回调函数注册。

###step3: 事件触发,执行回调函数

//ae.c/aeMain()
void aeMain(aeEventLoop *eventLoop) {
    eventLoop->stop = 0;
    while (!eventLoop->stop) {
        // 如果有需要在事件处理前执行的函数,那么运行它
        if (eventLoop->beforesleep != NULL)
            eventLoop->beforesleep(eventLoop);
        // 开始处理事件
        aeProcessEvents(eventLoop, AE_ALL_EVENTS);
    }
}

aeProcessEvents函数如下:

int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
	...
	// 处理文件事件,阻塞时间由 tvp 决定
        numevents = aeApiPoll(eventLoop, tvp);//aeApiPoll获取已就绪事件by gs
        for (j = 0; j < numevents; j++) {
            // 从已就绪数组中获取事件
            aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
            int mask = eventLoop->fired[j].mask;
            int fd = eventLoop->fired[j].fd;
            int rfired = 0;
            // 读事件
            if (fe->mask & mask & AE_READABLE) {
                // rfired 确保读/写事件只能执行其中一个
                rfired = 1;
                fe->rfileProc(eventLoop,fd,fe->clientData,mask);//执行回调函数
            }
            // 写事件
            if (fe->mask & mask & AE_WRITABLE) {
                if (!rfired || fe->wfileProc != fe->rfileProc)
                    fe->wfileProc(eventLoop,fd,fe->clientData,mask);//执行回调函数
            }
            processed++;
        }
    ...
}

执行回调函数acceptTcpHandle

//networking.c/acceptTcpHandler()
//创建TCP连接处理器
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
    int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
    char cip[REDIS_IP_STR_LEN];
    REDIS_NOTUSED(el);
    REDIS_NOTUSED(mask);
    REDIS_NOTUSED(privdata);
    while(max--) {
        // accept 客户端连接
        cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
        if (cfd == ANET_ERR) {
            if (errno != EWOULDBLOCK)
                redisLog(REDIS_WARNING,
                    "Accepting client connection: %s", server.neterr);
            return;
        }
        redisLog(REDIS_VERBOSE,"Accepted %s:%d", cip, cport);
        // 为客户端创建客户端状态(redisClient)
        acceptCommonHandler(cfd,0);
    }
}

主要步骤: 1) anetTcpAccept用于接收客户端连接,返回连接的fd; 2)acceptCommonHandle用于为每个客户端连接创建redisClient结构体。 anetTcpAccept函数:

//anet.c/anetTcpAccept()
//Tcp连接accept
int antTcpAccept(char *err, int s, char *ip, size_t ip_len, int *port) {
    int fd;
    struct sockaddr_storage sa;
    socklen_t salen = sizeof(sa);
    if ((fd = anetGenericAccept(err,s,(struct sockaddr*)&sa,&salen)) == -1)
        return ANET_ERR;
    if (sa.ss_family == AF_INET) {
        struct sockaddr_in *s = (struct sockaddr_in *)&sa;
        if (ip) inet_ntop(AF_INET,(void*)&(s->sin_addr),ip,ip_len);
        if (port) *port = ntohs(s->sin_port);
    } else {
        struct sockaddr_in6 *s = (struct sockaddr_in6 *)&sa;
        if (ip) inet_ntop(AF_INET6,(void*)&(s->sin6_addr),ip,ip_len);
        if (port) *port = ntohs(s->sin6_port);
    }
    return fd;
}

anetGenericAccept函数:

static int anetGenericAccept(char *err, int s, struct sockaddr *sa, socklen_t *len) {
    int fd;
    while(1) {
        fd = accept(s,sa,len);//系统调用accept函数,接收Tcp连接
        if (fd == -1) {
            if (errno == EINTR)
                continue;
            else {
                anetSetError(err, "accept: %s", strerror(errno));
                return ANET_ERR;
            }
        }
        break;
    }
    return fd;
}

accept函数从连接请求队列中获取第一个连接,创建新的套接字,并返回fd。如果队列中无请求连接&套接字为阻塞方式,则accept函数阻塞调用进程直到新的连接出现;如果为非阻塞&无请求连接,则accept函数返回一个错误信息。

acceptCommonHandler函数:

//networking.c/acceptCommonHandler()
//处理tcp连接
static void acceptCommonHandler(int fd, int flags) {
    // 创建客户端
    redisClient *c;
    if ((c = createClient(fd)) == NULL) {
        redisLog(REDIS_WARNING,
            "Error registering fd event for the new client: %s (fd=%d)",
            strerror(errno),fd);
        close(fd); /* May be already closed, just ignore errors */
        return;
    }
   ...
}

该函数主要调用createClient为连接创建客户端结构体。 createClient函数:

redisClient *createClient(int fd) {
	// 分配空间
    redisClient *c = zmalloc(sizeof(redisClient));
    // 当 fd 不为 -1 时,创建带网络连接的客户端
    // 如果 fd 为 -1 ,那么创建无网络连接的伪客户端
    // 因为 Redis 的命令必须在客户端的上下文中使用,所以在执行 Lua 环境中的命令时需要用到这种伪终端
    if (fd != -1) {
        // 非阻塞
        anetNonBlock(NULL,fd);
        // 禁用 Nagle 算法
        anetEnableTcpNoDelay(NULL,fd);
        // 设置 keep alive
        if (server.tcpkeepalive)
            anetKeepAlive(NULL,fd,server.tcpkeepalive);
        // 绑定读事件到事件 loop (开始接收命令请求)
        if (aeCreateFileEvent(server.el,fd,AE_READABLE,
            readQueryFromClient, c) == AE_ERR)
        {
            close(fd);
            zfree(c);
            return NULL;
        }
    }
   ...
}

这里又一次调用aeCreateFileEvent函数,传入回调函数readQueryFromClient,用来处理该连接上的数据。 readQueryFromClient函数:

void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
    redisClient *c = (redisClient*) privdata;
    ...
     // 设置服务器的当前客户端
    server.current_client = c;
    ...
    // 读入内容到查询缓存
    nread = read(fd, c->querybuf+qblen, readlen);//读数据
	...
	// 从查询缓存重读取内容,创建参数,并执行命令
    // 函数会执行到缓存中的所有内容都被处理完为止
    processInputBuffer(c);//解析成命令
    ...
}

processInputBuffer函数:

//networking.c/processInputBuffer()
//处理客户端出入的命令
void processInputBuffer(redisClient *c) {
	while(sdslen(c->querybuf)) {
		...
        // 将缓冲区中的内容转换成命令,以及命令参数
        if (c->reqtype == REDIS_REQ_INLINE) {
            if (processInlineBuffer(c) != REDIS_OK) break;
        } else if (c->reqtype == REDIS_REQ_MULTIBULK) {
            if (processMultibulkBuffer(c) != REDIS_OK) break;
        } else {
            redisPanic("Unknown request type");
        }
        /* Multibulk processing could see a <= 0 length. */
        if (c->argc == 0) {
            resetClient(c);
        } else {
            /* Only reset the client when the command was executed. */
            // 执行命令,并重置客户端
            if (processCommand(c) == REDIS_OK)
                resetClient(c);
        }
    }
}

首先处理数据内容,解析成命令,然后调用processCommand函数执行命令。 processCommand函数:

//redis.c/processCommand()
int processCommand(redisClient *c){
	...
	// 查找命令,并进行命令合法性检查,以及命令参数个数检查
    c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
    ...
    //执行命令
    if (c->flags & REDIS_MULTI &&
        c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
        c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)
    {
        // 在事务上下文中
        // 除 EXEC 、 DISCARD 、 MULTI 和 WATCH 命令之外,其他所有命令都会被入队到事务队列中
        queueMultiCommand(c);
        addReply(c,shared.queued);
    } else {
        // 执行命令
        call(c,REDIS_CALL_FULL);
        c->woff = server.master_repl_offset;
        // 处理那些解除了阻塞的键
        if (listLength(server.ready_keys))
            handleClientsBlockedOnLists();
    }
    return REDIS_OK;
}

调用call()执行命令。 call函数:

//redis.c/call()
//调用命令的实现函数,执行命令
void call(redisClient *c, int flags) {
	...
	 // 执行实现函数
    c->cmd->proc(c);
    ...
}

###step4: 回复请求 以mget命令为例,说明服务器是怎么返回命令结果的。

//t_string.c/mgetCommand()
void mgetCommand(redisClient *c) {
    int j;

    addReplyMultiBulkLen(c,c->argc-1);
    // 查找并返回所有输入键的值
    for (j = 1; j < c->argc; j++) {
        // 查找键 c->argc[j] 的值
        robj *o = lookupKeyRead(c->db,c->argv[j]);
        if (o == NULL) {
            // 值不存在,向客户端发送空回复
            addReply(c,shared.nullbulk);
        } else {
            if (o->type != REDIS_STRING) {
                // 值存在,但不是字符串类型
                addReply(c,shared.nullbulk);
            } else {
                // 值存在,并且是字符串
                addReplyBulk(c,o);
            }
        }
    }
}

命令执行后,调用addReply函数,回复客户端。 addReply函数:

//networking.c/addReply()
void addReply(redisClient *c, robj *obj) {
    // 为客户端安装写处理器到事件循环
    if (prepareClientToWrite(c) != REDIS_OK) return;
   ...
}

在该函数中,调用prepareClientToWrite函数,注册写时间处理器。 prepareClientToWrite函数:

//networking.c/prepareClientToWrite()
int prepareClientToWrite(redisClient *c) {
    ...
    // 一般情况,为客户端套接字安装写处理器到事件循环
    if (c->bufpos == 0 && listLength(c->reply) == 0 &&
        (c->replstate == REDIS_REPL_NONE ||
         c->replstate == REDIS_REPL_ONLINE) &&
        aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
        sendReplyToClient, c) == AE_ERR) return REDIS_ERR;
    return REDIS_OK;
}

调用aeCreateFileEvent函数,注册sendReplyToClient事件处理函数。 sendReplyToClient函数:

//networking.c/sendReplyToClient()
void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
	...
	while(c->bufpos > 0 || listLength(c->reply)) {
		if (c->bufpos > 0) {
			...
			//调用系统函数write写数据到fd
			nwritten = write(fd,c->buf+c->sentlen,c->bufpos-c->sentlen);
			...
		}else{
			...
			//调用系统函数write写数据到fd
			nwritten = write(fd, ((char*)o->ptr)+c->sentlen,objlen-c->sentlen);
			...
		}
	}
	...
}

##小结 如上,redis文件事件处理流程:

  • 首先初始化事件处理器
  • 注册回调函数
  • 事件触发时,调用回调函数,处理事件
  • 执行完命令后,返回执行结果

redis源码学习——事件(二)

本文主要梳理文件事件处理流程

整体流程

Alt text

流程详解

step1: 初始化事件处理器结构体

// redis.c/initServer()
//全局变量
struct redisServer server; 
//初始化事件处理器结构体
server.el = aeCreateEventLoop(server.maxclients+REDIS_EVENTLOOP_FDSET_INCR);

事件处理器结构体:

typedef struct aeEventLoop {
    ...
    // 已注册的文件事件, events是aeFileEvent类型
    aeFileEvent *events; 
    ...
} aeEventLoop;

其中,aeFileEvent结构体定义如下:

typedef struct aeFileEvent {
    // 监听事件类型掩码,
    // 值可以是 AE_READABLE 或 AE_WRITABLE ,
    // 或者 AE_READABLE | AE_WRITABLE
    int mask; /* one of AE_(READABLE|WRITABLE) */
    // 读事件处理器,aeFileProc为函数指针
    aeFileProc *rfileProc;
    // 写事件处理器
    aeFileProc *wfileProc;
    // 多路复用库的私有数据
    void *clientData;
} aeFileEvent;

aeFileProc函数指针定义如下:

typedef void aeFileProc(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask);

在c语言中,回调是通过函数指针实现的。 通过将回调函数地址 传递给 被调函数,从而实现回调。在这里,通过定义函数指针aeFileProc,由调用方实现具体的函数内容,在实际调用函数里,把aeFileProc实现函数的地址传进来。其实相当于定义一种接口,由调用方来实现该接口。

step2: 注册事件处理函数

    //redis.c/initServer()
    // 为 TCP 连接关联连接应答(accept)处理器
    // 用于接受并应答客户端的 connect() 调用
   
    for (j = 0; j < server.ipfd_count; j++) {
        if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
            acceptTcpHandler,NULL) == AE_ERR)
            {
                redisPanic(
                    "Unrecoverable error creating server.ipfd file event.");
            }
    }
    // 为本地套接字关联应答处理器
    if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE,
        acceptUnixHandler,NULL) == AE_ERR) redisPanic("Unrecoverable error creating server.sofd file event.");

以TCP连接为例,说明注册流程。通过调用aeCreateFileEvent函数,把aeFilePrco函数指针的实现函数——acceptTcpHandler作为参数传进去。

aeCreateFileEvent函数如下:

//ae.c/aeCreateFileEvent()
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
        aeFileProc *proc, void *clientData)
{
    if (fd >= eventLoop->setsize) {
        errno = ERANGE;
        return AE_ERR;
    }
    if (fd >= eventLoop->setsize) return AE_ERR;
    // 取出文件事件结构
    aeFileEvent *fe = &eventLoop->events[fd];//注意这里是引用!!! by gs
    // 监听指定 fd 的指定事件
    if (aeApiAddEvent(eventLoop, fd, mask) == -1)
        return AE_ERR;
    // 设置文件事件类型,以及事件的处理器
    fe->mask |= mask;
    //将事件处理器函数(回调函数)赋值给aeFileEvent结构体中对应的函数指针,其实就是赋值给aeEventLoop结构体中的events  by gs
    if (mask & AE_READABLE) fe->rfileProc = proc;
    if (mask & AE_WRITABLE) fe->wfileProc = proc;
    // 私有数据
    fe->clientData = clientData;
    // 如果有需要,更新事件处理器的最大 fd
    if (fd > eventLoop->maxfd)
        eventLoop->maxfd = fd;
    return AE_OK;
}

函数说明如下:

1) aeApiAddEvent

以epoll为例,ae_epoll.c文件中,实现aeApiAddEvent接口,调用epoll系统函数,注册事件。

2)将aeFileProc *proc赋值给aeEventLoop *eventLoop中的events,完成回调函数注册。

step3: 事件触发,执行回调函数

//ae.c/aeMain()
void aeMain(aeEventLoop *eventLoop) {
    eventLoop->stop = 0;
    while (!eventLoop->stop) {
        // 如果有需要在事件处理前执行的函数,那么运行它
        if (eventLoop->beforesleep != NULL)
            eventLoop->beforesleep(eventLoop);
        // 开始处理事件
        aeProcessEvents(eventLoop, AE_ALL_EVENTS);
    }
}

aeProcessEvents函数如下:

int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
	...
	// 处理文件事件,阻塞时间由 tvp 决定
        numevents = aeApiPoll(eventLoop, tvp);//aeApiPoll获取已就绪事件by gs
        for (j = 0; j < numevents; j++) {
            // 从已就绪数组中获取事件
            aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
            int mask = eventLoop->fired[j].mask;
            int fd = eventLoop->fired[j].fd;
            int rfired = 0;
            // 读事件
            if (fe->mask & mask & AE_READABLE) {
                // rfired 确保读/写事件只能执行其中一个
                rfired = 1;
                fe->rfileProc(eventLoop,fd,fe->clientData,mask);//执行回调函数
            }
            // 写事件
            if (fe->mask & mask & AE_WRITABLE) {
                if (!rfired || fe->wfileProc != fe->rfileProc)
                    fe->wfileProc(eventLoop,fd,fe->clientData,mask);//执行回调函数
            }
            processed++;
        }
    ...
}

执行回调函数acceptTcpHandle

//networking.c/acceptTcpHandler()
//创建TCP连接处理器
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
    int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
    char cip[REDIS_IP_STR_LEN];
    REDIS_NOTUSED(el);
    REDIS_NOTUSED(mask);
    REDIS_NOTUSED(privdata);
    while(max--) {
        // accept 客户端连接
        cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
        if (cfd == ANET_ERR) {
            if (errno != EWOULDBLOCK)
                redisLog(REDIS_WARNING,
                    "Accepting client connection: %s", server.neterr);
            return;
        }
        redisLog(REDIS_VERBOSE,"Accepted %s:%d", cip, cport);
        // 为客户端创建客户端状态(redisClient)
        acceptCommonHandler(cfd,0);
    }
}

主要步骤:

1) anetTcpAccept用于接收客户端连接,返回连接的fd;

2)acceptCommonHandle用于为每个客户端连接创建redisClient结构体。

anetTcpAccept函数:

//anet.c/anetTcpAccept()
//Tcp连接accept
int antTcpAccept(char *err, int s, char *ip, size_t ip_len, int *port) {
    int fd;
    struct sockaddr_storage sa;
    socklen_t salen = sizeof(sa);
    if ((fd = anetGenericAccept(err,s,(struct sockaddr*)&sa,&salen)) == -1)
        return ANET_ERR;
    if (sa.ss_family == AF_INET) {
        struct sockaddr_in *s = (struct sockaddr_in *)&sa;
        if (ip) inet_ntop(AF_INET,(void*)&(s->sin_addr),ip,ip_len);
        if (port) *port = ntohs(s->sin_port);
    } else {
        struct sockaddr_in6 *s = (struct sockaddr_in6 *)&sa;
        if (ip) inet_ntop(AF_INET6,(void*)&(s->sin6_addr),ip,ip_len);
        if (port) *port = ntohs(s->sin6_port);
    }
    return fd;
}

anetGenericAccept函数:

static int anetGenericAccept(char *err, int s, struct sockaddr *sa, socklen_t *len) {
    int fd;
    while(1) {
        fd = accept(s,sa,len);//系统调用accept函数,接收Tcp连接
        if (fd == -1) {
            if (errno == EINTR)
                continue;
            else {
                anetSetError(err, "accept: %s", strerror(errno));
                return ANET_ERR;
            }
        }
        break;
    }
    return fd;
}

accept函数从连接请求队列中获取第一个连接,创建新的套接字,并返回fd。如果队列中无请求连接&套接字为阻塞方式,则accept函数阻塞调用进程直到新的连接出现;如果为非阻塞&无请求连接,则accept函数返回一个错误信息。

acceptCommonHandler函数:

//networking.c/acceptCommonHandler()
//处理tcp连接
static void acceptCommonHandler(int fd, int flags) {
    // 创建客户端
    redisClient *c;
    if ((c = createClient(fd)) == NULL) {
        redisLog(REDIS_WARNING,
            "Error registering fd event for the new client: %s (fd=%d)",
            strerror(errno),fd);
        close(fd); /* May be already closed, just ignore errors */
        return;
    }
   ...
}

该函数主要调用createClient为连接创建客户端结构体。

createClient函数:

redisClient *createClient(int fd) {
	// 分配空间
    redisClient *c = zmalloc(sizeof(redisClient));
    // 当 fd 不为 -1 时,创建带网络连接的客户端
    // 如果 fd 为 -1 ,那么创建无网络连接的伪客户端
    // 因为 Redis 的命令必须在客户端的上下文中使用,所以在执行 Lua 环境中的命令时需要用到这种伪终端
    if (fd != -1) {
        // 非阻塞
        anetNonBlock(NULL,fd);
        // 禁用 Nagle 算法
        anetEnableTcpNoDelay(NULL,fd);
        // 设置 keep alive
        if (server.tcpkeepalive)
            anetKeepAlive(NULL,fd,server.tcpkeepalive);
        // 绑定读事件到事件 loop (开始接收命令请求)
        if (aeCreateFileEvent(server.el,fd,AE_READABLE,
            readQueryFromClient, c) == AE_ERR)
        {
            close(fd);
            zfree(c);
            return NULL;
        }
    }
   ...
}

这里又一次调用aeCreateFileEvent函数,传入回调函数readQueryFromClient,用来处理该连接上的数据。

readQueryFromClient函数:

void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
    redisClient *c = (redisClient*) privdata;
    ...
     // 设置服务器的当前客户端
    server.current_client = c;
    ...
    // 读入内容到查询缓存
    nread = read(fd, c->querybuf+qblen, readlen);//读数据
	...
	// 从查询缓存重读取内容,创建参数,并执行命令
    // 函数会执行到缓存中的所有内容都被处理完为止
    processInputBuffer(c);//解析成命令
    ...
}

processInputBuffer函数:

//networking.c/processInputBuffer()
//处理客户端出入的命令
void processInputBuffer(redisClient *c) {
	while(sdslen(c->querybuf)) {
		...
        // 将缓冲区中的内容转换成命令,以及命令参数
        if (c->reqtype == REDIS_REQ_INLINE) {
            if (processInlineBuffer(c) != REDIS_OK) break;
        } else if (c->reqtype == REDIS_REQ_MULTIBULK) {
            if (processMultibulkBuffer(c) != REDIS_OK) break;
        } else {
            redisPanic("Unknown request type");
        }
        /* Multibulk processing could see a <= 0 length. */
        if (c->argc == 0) {
            resetClient(c);
        } else {
            /* Only reset the client when the command was executed. */
            // 执行命令,并重置客户端
            if (processCommand(c) == REDIS_OK)
                resetClient(c);
        }
    }
}

首先处理数据内容,解析成命令,然后调用processCommand函数执行命令。

processCommand函数:

//redis.c/processCommand()
int processCommand(redisClient *c){
	...
	// 查找命令,并进行命令合法性检查,以及命令参数个数检查
    c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
    ...
    //执行命令
    if (c->flags & REDIS_MULTI &&
        c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
        c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)
    {
        // 在事务上下文中
        // 除 EXEC 、 DISCARD 、 MULTI 和 WATCH 命令之外,其他所有命令都会被入队到事务队列中
        queueMultiCommand(c);
        addReply(c,shared.queued);
    } else {
        // 执行命令
        call(c,REDIS_CALL_FULL);
        c->woff = server.master_repl_offset;
        // 处理那些解除了阻塞的键
        if (listLength(server.ready_keys))
            handleClientsBlockedOnLists();
    }
    return REDIS_OK;
}

调用call()执行命令。

call函数:

//redis.c/call()
//调用命令的实现函数,执行命令
void call(redisClient *c, int flags) {
	...
	 // 执行实现函数
    c->cmd->proc(c);
    ...
}

step4: 回复请求

以mget命令为例,说明服务器是怎么返回命令结果的。

//t_string.c/mgetCommand()
void mgetCommand(redisClient *c) {
    int j;

    addReplyMultiBulkLen(c,c->argc-1);
    // 查找并返回所有输入键的值
    for (j = 1; j < c->argc; j++) {
        // 查找键 c->argc[j] 的值
        robj *o = lookupKeyRead(c->db,c->argv[j]);
        if (o == NULL) {
            // 值不存在,向客户端发送空回复
            addReply(c,shared.nullbulk);
        } else {
            if (o->type != REDIS_STRING) {
                // 值存在,但不是字符串类型
                addReply(c,shared.nullbulk);
            } else {
                // 值存在,并且是字符串
                addReplyBulk(c,o);
            }
        }
    }
}

命令执行后,调用addReply函数,回复客户端。

addReply函数:

//networking.c/addReply()
void addReply(redisClient *c, robj *obj) {
    // 为客户端安装写处理器到事件循环
    if (prepareClientToWrite(c) != REDIS_OK) return;
   ...
}

在该函数中,调用prepareClientToWrite函数,注册写时间处理器。

prepareClientToWrite函数:

//networking.c/prepareClientToWrite()
int prepareClientToWrite(redisClient *c) {
    ...
    // 一般情况,为客户端套接字安装写处理器到事件循环
    if (c->bufpos == 0 && listLength(c->reply) == 0 &&
        (c->replstate == REDIS_REPL_NONE ||
         c->replstate == REDIS_REPL_ONLINE) &&
        aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
        sendReplyToClient, c) == AE_ERR) return REDIS_ERR;
    return REDIS_OK;
}

调用aeCreateFileEvent函数,注册sendReplyToClient事件处理函数。

sendReplyToClient函数:

//networking.c/sendReplyToClient()
void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
	...
	while(c->bufpos > 0 || listLength(c->reply)) {
		if (c->bufpos > 0) {
			...
			//调用系统函数write写数据到fd
			nwritten = write(fd,c->buf+c->sentlen,c->bufpos-c->sentlen);
			...
		}else{
			...
			//调用系统函数write写数据到fd
			nwritten = write(fd, ((char*)o->ptr)+c->sentlen,objlen-c->sentlen);
			...
		}
	}
	...
}

小结

如上,redis文件事件处理流程:

  • 首先初始化事件处理器
  • 注册回调函数
  • 事件触发时,调用回调函数,处理事件
  • 执行完命令后,返回执行结果