On the way day day up...

redis源码学习——事件(一)

redis服务器是基于事件驱动机制实现的。

所谓事件驱动是指,根据发生的事件(比如点击鼠标)进行相应的处理。一般事件驱动程序由事件收集器事件分发器事件处理器三部分组成。事件收集器负责接收事件(包括来自用户的或软件硬件的事件),事件分发器负责将事件发送到相应的事件处理器程序,事件处理器则负责处理具体事件。

redis中的事件有文件事件时间事件两类。redis自己实现了一套事件驱动机制(类似libevent),用来处理这两类事件。

  • 文件事件 redis服务器通过套接字与客户端或其他redis服务器连接。 文件事件是服务器对套接字操作的抽象。 服务器与客户端或其他服务器的通信会产生相应的文件事件,服务器通过监听并处理这些文件事件,来完成一系列网络通信操作。 redis基于reactor模式开发了自己的网络事件处理器,称作文件事件处理器
  • 时间事件 redis服务器中需要有一些固定时间点执行的操作,时间事件就是对这类定时操作的抽象。

事件处理流程

Alt text

redis.c / main() 函数

//redis.c文件,main函数
int main(int argc, char **argv) {
	...
	//1.创建并初始化服务器数据结构
	initServer();
	...
	//2. 运行事件处理器,一直到服务器关闭为止
	aeSetBeforeSleepProc(server.el,beforeSleep);
    aeMain(server.el);
    //3. 服务器关闭,停止事件循环
    aeDeleteEventLoop(server.el);
    return 0;
}

1. 初始化事件处理器

redis.c 的main()函数中调用initServer(),该函数调用aeCreateEventLoop初始化事件处理器。


//redis.c文件,创建并初始化服务器数据结构
struct redisServer server; //全局变量,redis服务器全局状态
void initServer(){
	...
	//1. 初始化事件处理器状态
	server.el = aeCreateEventLoop(server.maxclients+REDIS_EVENTLOOP_FDSET_INCR);
	...
}
//ae.c文件,初始化事件处理器
aeEventLoop *aeCreateEventLoop(int setsize) {
    aeEventLoop *eventLoop;
    int i;
    // 创建事件状态结构
    if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err;
    // 初始化文件事件结构和已就绪文件事件结构数组
    eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);
    eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);
    if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err;
    // 设置数组大小
    eventLoop->setsize = setsize;
    // 初始化执行最近一次执行时间
    eventLoop->lastTime = time(NULL);
    // 初始化时间事件结构
    eventLoop->timeEventHead = NULL;
    eventLoop->timeEventNextId = 0;
    eventLoop->stop = 0;
    eventLoop->maxfd = -1; 
    eventLoop->beforesleep = NULL;
    //以epoll为例,创建epool实例,并赋值给eventLoop的apidata字段
    if (aeApiCreate(eventLoop) == -1) goto err;
    /* Events with mask == AE_NONE are not set. So let's initialize the
     * vector with it. */
    // 初始化监听事件
    for (i = 0; i < setsize; i++)
        eventLoop->events[i].mask = AE_NONE;
    // 返回事件循环
    return eventLoop;
err:
    if (eventLoop) {
        zfree(eventLoop->events);
        zfree(eventLoop->fired);
        zfree(eventLoop);
    }
 return NULL;
}

2. 创建时间事件

在initServer()中调用aeCreateTimeEvent创建时间事件。


//redis.c文件,创建并初始化服务器数据结构
void initServer(){
	...
	//2. 为 serverCron() 创建时间事件
    if(aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
        redisPanic("Can't create the serverCron time event.");
        exit(1);
    }
   ...
}

long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,
        aeTimeProc *proc, void *clientData,
        aeEventFinalizerProc *finalizerProc)
{
    // 更新时间计数器
    long long id = eventLoop->timeEventNextId++;
    // 创建时间事件结构
    aeTimeEvent *te;
    te = zmalloc(sizeof(*te));
    if (te == NULL) return AE_ERR;
    // 设置 ID
    te->id = id;
    // 设定处理事件的时间
    aeAddMillisecondsToNow(milliseconds,&te->when_sec,&te->when_ms);
    // 设置事件处理器
    te->timeProc = proc;
    te->finalizerProc = finalizerProc;
    // 设置私有数据
    te->clientData = clientData;
    // 将新事件放入表头
    te->next = eventLoop->timeEventHead;
    eventLoop->timeEventHead = te;
    return id;
}

3. 注册文件I/O事件

在initServer()中调用aeCreateFileEvent注册文件I/O事件,利用系统提供的I/O多路复用技术(select、epoll等)监听感兴趣的I/O事件。


int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
        aeFileProc *proc, void *clientData)
{
    if (fd >= eventLoop->setsize) {
        errno = ERANGE;
        return AE_ERR;
    }    
    if (fd >= eventLoop->setsize) return AE_ERR;    
    // 取出文件事件结构
    aeFileEvent *fe = &eventLoop->events[fd];
    // 监听指定 fd 的指定事件
    if (aeApiAddEvent(eventLoop, fd, mask) == -1)
        return AE_ERR;
    // 设置文件事件类型,以及事件的处理器
    fe->mask |= mask;
    if (mask & AE_READABLE) fe->rfileProc = proc;
    if (mask & AE_WRITABLE) fe->wfileProc = proc;
    // 私有数据
    fe->clientData = clientData;
    // 如果有需要,更新事件处理器的最大 fd
    if (fd > eventLoop->maxfd)
        eventLoop->maxfd = fd;
    return AE_OK;
}

4. 监听事件

initServer()中提供了TCP和UNIX域套接字两种方式进行监听。


void initServer(){
	...
    // 3. 为 TCP 连接关联连接应答(accept)处理器
    // 用于接受并应答客户端的 connect() 调用
    for (j = 0; j < server.ipfd_count; j++) {
        if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
            acceptTcpHandler,NULL) == AE_ERR)
            {
                redisPanic(
                    "Unrecoverable error creating server.ipfd file event.");
            }
    }
     // 为本地套接字关联应答处理器
    if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE,
        acceptUnixHandler,NULL) == AE_ERR) redisPanic("Unrecoverable error creating server.sofd file event.");
   ...
}

5. redis事件循环

initServer()完成上述准备工作后,回到main()函数,main()中会调用aeMain(),进入事件处理器主循环。

//ae.c文件,事件处理器主循环,运行事件处理器,一直到服务器关闭为止
void aeMain(aeEventLoop *eventLoop) {
    eventLoop->stop = 0;
    while (!eventLoop->stop) {
        // 如果有需要在事件处理前执行的函数,那么运行它
        if (eventLoop->beforesleep != NULL)
            eventLoop->beforesleep(eventLoop);
        // 开始处理事件
        aeProcessEvents(eventLoop, AE_ALL_EVENTS);
    }
}

//ae.c文件,处理时间事件&文件时间
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
    int processed = 0, numevents;
    /* Nothing to do? return ASAP */
    if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;
   /* Note that we want call select() even if there are no
     * file events to process as long as we want to process time
     * events, in order to sleep until the next time event is ready
     * to fire. */
    if (eventLoop->maxfd != -1 ||
        ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
        int j;
        aeTimeEvent *shortest = NULL;
        struct timeval tv, *tvp;
        // 获取最近的时间事件
        if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
            shortest = aeSearchNearestTimer(eventLoop);
        if (shortest) {
            // 如果时间事件存在的话
            // 那么根据最近可执行时间事件和现在时间的时间差来决定文件事件的阻塞时间
            long now_sec, now_ms;
            /* Calculate the time missing for the nearest
             * timer to fire. */
            // 计算距今最近的时间事件还要多久才能达到
            // 并将该时间距保存在 tv 结构中
            aeGetTime(&now_sec, &now_ms);
            tvp = &tv;
            tvp->tv_sec = shortest->when_sec - now_sec;
            if (shortest->when_ms < now_ms) {
                tvp->tv_usec = ((shortest->when_ms+1000) - now_ms)*1000;
                tvp->tv_sec --;
            } else {
		    // 执行到这一步,说明没有时间事件
            // 那么根据 AE_DONT_WAIT 是否设置来决定是否阻塞,以及阻塞的时间长度
            /* If we have to check for events but need to return
             * ASAP because of AE_DONT_WAIT we need to set the timeout
             * to zero */
            if (flags & AE_DONT_WAIT) {
                // 设置文件事件不阻塞
                tv.tv_sec = tv.tv_usec = 0;
                tvp = &tv;
            } else {
                /* Otherwise we can block */
                // 文件事件可以阻塞直到有事件到达为止
                tvp = NULL; /* wait forever */
            }
        }
        // 处理文件事件,阻塞时间由 tvp 决定
        numevents = aeApiPoll(eventLoop, tvp);//aeApiPoll获取已就绪事件by gs
        for (j = 0; j < numevents; j++) {
            // 从已就绪数组中获取事件
            aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
            int mask = eventLoop->fired[j].mask;
            int fd = eventLoop->fired[j].fd;
            int rfired = 0;
           /* note the fe->mask & mask & ... code: maybe an already processed
             * event removed an element that fired and we still didn't
             * processed, so we check if the event is still valid. */
            // 读事件
            if (fe->mask & mask & AE_READABLE) {
                // rfired 确保读/写事件只能执行其中一个
                rfired = 1;
                fe->rfileProc(eventLoop,fd,fe->clientData,mask);
            }
            // 写事件
            if (fe->mask & mask & AE_WRITABLE) {
              if (!rfired || fe->wfileProc != fe->rfileProc)
                    fe->wfileProc(eventLoop,fd,fe->clientData,mask);
            }

            processed++;
        }
    }
    // 执行时间事件
    if (flags & AE_TIME_EVENTS)
        processed += processTimeEvents(eventLoop);
	//返回处理的时间事件+文件事件总数
    return processed; 
 }

6. 事件触发

int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
	...
	 // 处理文件事件,阻塞时间由 tvp 决定
        numevents = aeApiPoll(eventLoop, tvp);//aeApiPoll获取已就绪事件by gs
        for (j = 0; j < numevents; j++) {
            // 从已就绪数组中获取事件
            aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
            int mask = eventLoop->fired[j].mask;
            int fd = eventLoop->fired[j].fd;
            int rfired = 0;
           /* note the fe->mask & mask & ... code: maybe an already processed
             * event removed an element that fired and we still didn't
             * processed, so we check if the event is still valid. */
            // 读事件
            if (fe->mask & mask & AE_READABLE) {
                // rfired 确保读/写事件只能执行其中一个
                rfired = 1;
                fe->rfileProc(eventLoop,fd,fe->clientData,mask);
            }
            // 写事件
            if (fe->mask & mask & AE_WRITABLE) {
              if (!rfired || fe->wfileProc != fe->rfileProc)
                    fe->wfileProc(eventLoop,fd,fe->clientData,mask);
            }

            processed++;
        }
    ...
}

其中,aeApiPoll用来收集就绪的文件时间,如果底层I/O多路复用机制采用epoll模式的话,对应ae_epoll.c文件。

static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
    aeApiState *state = eventLoop->apidata;
    int retval, numevents = 0;
    // 等待时间
    retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
            tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
    // 有至少一个事件就绪?
    if (retval > 0) {
        int j;
        // 为已就绪事件设置相应的模式
        // 并加入到 eventLoop 的 fired 数组中
        numevents = retval;
        for (j = 0; j < numevents; j++) {
            int mask = 0;
            struct epoll_event *e = state->events+j;
            if (e->events & EPOLLIN) mask |= AE_READABLE;
            if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
            if (e->events & EPOLLERR) mask |= AE_WRITABLE;
            if (e->events & EPOLLHUP) mask |= AE_WRITABLE;
            eventLoop->fired[j].fd = e->data.fd;
            eventLoop->fired[j].mask = mask;
        }   
    }   
    // 返回已就绪事件个数
    return numevents;
}

从浏览器输入网址到页面打开,都发生了什么?

通过这个问题,梳理其中涉及的知识点,再查缺补漏。

整体上分为三部分,浏览器端(客户端),网络传输,服务端,

大体流程为: 浏览器发送url请求——>通过网络传输请求数据——>服务端处理请求并返回请求结果——>通过网络再传送回浏览器——>浏览器拿到数据后处理展示

  1. 浏览器通过DNS查询IP,然后通过socket API发送请求;

  2. 数据在网络中传输,网络分层从上到下依次为:应用层(HTTP协议)——传输层(TCP协议)——网络层(IP协议)——网络接口层/链路层(MAC协议,ARP协议等);

  3. 请求到达服务端,首先经过负载均衡(LVS,VIP,HTTP反向代理),将请求发送到合适的webserver;

  4. webserver将请求传给fastcgi,fastcgi处理完请求后将结果返回给webserver,然后webserver再通过socket把请求结果发送出去。

浏览器发送url请求

1. url组成

例子:http://tieba.baidu.com/f?ie=utf-8&kw=test&fr=search

url由3部分组成:协议+主机名+路径及文件名

协议 包括http/https、ftp、file等协议,最常用的就是http协议

主机名 由 域名/IP地址 + 端口号组成,一般如果省略端口号,则使用相应协议的默认端口,比如http协议默认端口为80。域名需要通过DNS解析成对应的IP地址。

路径及文件名

  • 路径path是由零个或多个’/’隔开的字符串,一般表示主机上的某个目录路径或文件地址,如上面例子中的 ‘/f’ ;
  • 对于动态网页,在path后面会有’?’,问号后面的部分是参数,由’&’符号连接起来,如上面例子中的 ‘?ie=utf-8&kw=test&fr=search’

根据url的组成,相应的拓展到以下知识点:http/https协议,DNS,CDN

另外,有个将长url映射为短url的算法,可以了解下

2. http/https协议

http协议是一种网络传输协议

https协议可以理解为安全版的http协议

关于这两种协议,后面会有单独文章来总结

3. DNS

DNS(Domain Name System,域名系统),其实就是域名到实际IP地址的映射。

后续再补充

4. CDN

CDN(Content Delivery Network,内容分发网络), 主要是通过让用户就近取得数据,提高页面响应速度。

关于CDN涉及挺多内容,之前简单看过一些,找时间总结下

5. socket

浏览器通过DNS查到IP后,再通过socket API发送数据

socket相关内容

网络传输

关于这部分,准备看下《TCP/IP详解卷1》再来总结

服务端处理请求&返回结果

浏览器发送的数据通过网络传输到服务端,当然首先是到达服务器网卡,网卡接收到数据包后,会立即发送中断信号,通知内核数据来了,内核会调用网卡之前注册好的中断处理程序来响应中断信号,该中断处理程序负责将网卡上的数据包拷贝到内存。这样我们的webserver就能拿到网络传过来的数据了。

1. webserver

webserve能够解析http协议,接收http请求,然后返回http响应。如果是动态页面,会把请求交给cgi/fastcgi进行处理,然后拿到返回结果。

webserver有挺多,我们主要学习nginx。关于nginx会涉及很多内容,后续会逐步总结。

涉及知识点:cgi/fastcgi,进程间通信,tcp连接通信

2. 逻辑处理

webserver通过cgi讲请求发送给响应的执行程序,首先会根据url中的路径及文件名信息进行路由,找到响应的执行文件,然后根据参数进行逻辑处理,得到返回结果,再返回给cgi程序。

这里就涉及编程语言了,包括php/python/go等。

平时项目大部分用php,之前简单看了下php内核相关的内容,准备找时间再仔细看一遍,然后再总结。

3. 存储

存储类型:mysql,redis等 存储方式:单机,分布式

后续单独总结

浏览器接收到数据后处理展示

浏览器拿到数据后,通过html进行展示,

这个不太了解,需要好好学习下

小结

本文主要是想通过梳理”从浏览器输入网址到页面打开,都发生了什么”这个问题,把沿途涉及的知识点简单列一下,然后针对自己的不足,补充学习。 so,it’s just a start…

linux内核学习——进程&进程调度

进程管理

1. 进程基本概念

进程是处于执行期间的程序以及相关资源的总称。

2. 进程描述符

linux通过`slab分配器`分配进程描述符结构体
task_struct{
	unsigned log state;//进程状态
	pid_t pid;//进程标志,唯一
	struct task_struct *parent//父进程,每个进程都必定有一个父进程
	...
}

进程上下文

一个用户空间进程,通过系统调用或者触发某个异常,就会陷入内核空间——这种情况就称作内核代表进程执行处于进程上下文中。 注意:用户空间进程只能通过系统调用异常处理程序这两类内核提供的接口,才能陷入内核执行。

3. 进程创建&终结

linux系统中的进程存在明显的继承关系,所有进程都是PID为1的init进程的后代。内核在系统启动的最后阶段启动init进程。

fork()拷贝当前进程创建子进程

fork()使用写时拷贝(COW)机制

exec()读取可执行文件并将其载入地址空间运行

exit()结束进程

进程释放资源后,处于EXIT_ZOMBIE状态(此时会占用内核栈,thread_info结构,task_struct结构),此时需要通过父进程通知内核释放剩余资源,真正结束进程。

如果父进程在子进程之前退出,必须有机制保证子进程能找的新的父进程(可以让init作为父进程),否则这些子进程在退出时就会成为僵尸进程,浪费内存空间。

4. 线程

linux系统中的线程与进程实现机制一样,被当做与其他进程共享某些资源的进程。

进程调度

进程调度是指,从就绪的进程中选择一个来执行

基本概念

寄存器、cpu缓存、内存

寄存器 是cpu的重要组成部分

cpu缓存 是为了弥补cpu与内存之间速度的差异而增加的缓存

三者的速度:寄存器 > cpu缓存 > 内存

进程类型

IO消耗型 进程大部分时间用来提交IO请求,或者等待IO请求结果

cpu消耗性进程大部分时间都在执行代码,IO请求比较少

进程优先级、时间片

进程优先级调度程序总数选择时间片未用完且优先级最高的进程来执行。

时间片是一个数值,表示进程被抢占前所能运行的时间。

实时进程、普通进程

进程调度方法

每个cpu都有两个进程队列,可运行进程队列等待/过期进程队列

linux用红黑树存储系统中所有可运行的进程,红黑树的键值是该进程可运行的虚拟时间。

当进程阻塞或终止时,会从红黑树中移除。进程阻塞时,会把当前进程移入等待队列(等待队列是由等待某些事件发生的进程组成的简单链表)。当该进程等待的事件发生后,会被唤醒,再加入红黑树中。

linux系统——I/O模型

基本概念

同步/异步

同步是指 在发送一个请求时,在该请求返回结果之前,就一直不返回。就是说,这件事做不完,不会做下一件事。 异步是指 发送完请求就返回,不会等待请求响应。当该请求实际完成后,通过回调、通知等方式通知调用者。

阻塞/非阻塞

阻塞是指在结果返回前,当前线程会被挂起(cpu不会给该线程分配时间片,次此时线程暂停,不可执行)。与同步相比,同步至少没有结果就不返回,大多数情况下,此时线程还是激活的,还可以处理别的事,只是没有返回结果而已。 非阻塞是指如果不能立即返回结果,该函数不会阻塞当前线程,而是立即返回。

同步/异步 vs. 阻塞/非阻塞

同步/异步是指访问数据的机制,二者的区别其实就是,数据读写(拷贝)的时候是否阻塞

  • 同步是指主动请求并等待I/O操作完,当数据ready后,在读写的时候必须是阻塞的(就是说,数据从内核拷贝到用户空间时,当前的用户空间线程需要阻塞,等待数据拷贝完)。
  • 异步是指主动请求数据后,便可以返回处理其他事,I/O操作完后会通知该该请求调用方(等“通知”),即使是在数据读写时也不会阻塞。

阻塞/非阻塞 是指当进程访问的数据没有ready时,进程是等待还是返回,这其实是函数内部实现机制的区别。二者的区别其实就是,应用程序的调用是否立即返回

I/O多路复用

在处理I/O操作时,如果需要同时处理多个I/O请求,可以使用多线程或者I/O多路复用技术进行处理。

I/O多路复用通过把多个I/O的阻塞复用到同一个select的阻塞上,从而可以使系统利用单线程处理多个I/O连接请求。这种方式,相比多线程来说,系统开销小

目前支持I/O多路复用的系统调用有select,pool,epool等。

select模型

  • select模型大部分linux/unix系统都支持。
  • 通过select函数发送I/O请求后,线程阻塞,一直到数据准备完毕,把数据从内核拷贝到用户空间,所以select是同步阻塞模式。
  • 特点:
    • 单个进程能监控的fd数目有限制

poll模型

与select类似,不过能监听的并发连接数没限制,因为是用链表存储

epoll模型

epoll是linux特有的事件处理机制。

  • 能监听的并发连接数没有限制
  • 效率提升:只关心活跃的fd(即达到ready状态的fd),与总连接数无关。
  • 内存拷贝:使用内存映射(mmap)技术完成数据从内核传递到用户空间的功能(内核与用户空间共享同一块内存),加速了用户空间与内核空间传递数据的效率,减少了复制的开销。
  • 由于epoll在内核中的实现是通过fd的callback函数进行消息传递的,在活跃的fd数量较少的情况下,性能比select和poll都要好,如果活跃的fd比较多的情况,性能也会下降。

水平触发/边缘触发 todo

epoll使用方法

  • step1: 创建epoll文件描述符
 
 int epoll_create(int size)
 

说明:创建好epoll句柄后,会占用一个fd,所以使用完epoll后需要调用close()关闭,释放调该fd(fd的总数是有限的)。

  • step2: 注册fd
 
 /*参数epfd是epoll_create的返回值*/

 /*
 * 参数op表示动作,用三个宏表示:
 * 1) EPOLL_CTL_ADD——注册新的fd到epfd中;
 * 2) EPOLL_CTL_MOD——修改已经注册的fd的监听事件;
 * 3) EPOLL_CTL_DEL:从epfd中删除一个fd;
 */
 
 /*参数fd代表要监听的fd*/
 
 /*参数event告诉内核要做什么事*/ 
 int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
 
  • step3: 收集epoll监控的事件中,发生的事件

/*参数events是分配好的epoll_event结构体数组,epoll将会把发生的事件赋值到events数组中*/
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);

I/O多路复用模式

I/O多路复用机制一般都依赖一个事件多路分离器,分离器负责将来自I/O事件源的读写事件分离出来,并分发到对于的read/write事件处理器。根据事先注册的I/O事件及相应的事件处理函数(回调函数),事件分离器事件类型调用相应的事件处理函数。 与事件分离器相关的模式有reactor和proactor,其中reactor模式采用同步I/O,proactor模式采用异步I/O(reactor和proactor是一种I/O设计模式)。

reactor模式

工作流程如下:

  • 注册读就绪事件和相应的事件处理器
  • 事件分离器等待事件发生(读就绪事件)
  • 事件到来,激活分离器,调用相应的读事件处理器
  • 事件处理器完成实际的读操作,并处理读到的数据,注册新的事件,并返还控制权

proactor模式

工作流程如下:

  • 事件处理器发起异步读操作(操作系统需要支持异步I/O),此时,事件处理器不关心I/O就绪事件,只关心I/O完成事件
  • 事件分离器等待读完成事件
  • 在分离器等待过程中,操作系统内核线程进行事件的读操作,并将结果存于用户自定义缓冲区,然后通知分离器读操作完成
  • 事件处理器读取用户缓冲区中的数据,然后启动一个新的异步I/O操作,并将控制权返还给事件分离器

redis源码学习——压缩列表ziplist

ziplist简介

  • 压缩列表是由一系列 特殊编码的 连续内存块 组成的 顺序型 存储结构。主要是为了节省内存空间而设计的数据结构。
  • 压缩列表可以用来存储字符串或整数。
  • 压缩列表存储的数据特点: 数据量比较小,并且每项数据要么是比较小的整数,要么是比较短的字符串。
  • redis中的 列表list,哈希表hashtable,有序集合sorted set 的底层实现中都用到了ziplist。
  • 对于用户来说,其实不用关心什么时候用ziplist,redis底层会根据数据量多少以及每项数据大小,判断是否采用ziplist结构存储数据(当数据量超过限定值时,会传化成其他的数据结构)。
//redis.conf文件
list-max-ziplist-entries 512 //ziplist中entry个数
list-max-ziplist-value 64 //每个entry所占的字节大小

ziplist结构

<zlbytes>|<zltail>|<zllen>|<entry>...<entry>|<zlend>
4B       |   4B   |    2B |                 |  1B
|<------header----------->|  

zlbytes: 存储整个压缩列表所占字节数,4B(unsigned int)

zltail: 存储压缩列表尾节点相对于首地址的偏移量, 4B(unsigned int)

zllen:存储压缩列表中节点数目,2B(unsigned int),最长2^16-2,当节点数目大于此值,需要遍历所有节点才能得到节点数

entry: 压缩列表节点,节点大小根据存储的内容确定

zlend: 压缩列表结尾符,值为255,1B(unsigned int)

entry结构

typedef struct zlentry{
	unsigned int   prevrawlensize, prevrawlen;
	//prevrawlen前一个节点长度, prevrawlensize 前一个节点编码所占字节数
	unsigned int   lensize, len;
	//len 当前节点长度,lensize 当前节点编码所占字节数
	unsigned int   headersize;
	//
	unsigned char  encoding;
	unsigned char  *p;
}
|前一个节点的编码&长度    |当前节点编码&长度     |当前节点内容|
<previous_entry_length>|<encoding & length>|<content> |

previous_entry_length有两种长度:

  • 如果前一个字节长度小于2^8-2=254 B,则previous_entry_length长度为1B
  • 如果前一个字节长度大于254B,则previous_entry_length长度为5B,第一个字节为254(1111 1110),后面4个字节保存实际长度值

encoding&length ziplist有两种类型的编码:字符串&整数。

通过encoding的前两个比特位判断content保存的是字符串还是整数,00,01,10表示字符串,11表示整数

字符串:

  • 00aaaaaa 1字节,长度<= 2^6-1=63B
  • 01aaaaaa bbbbbbbb 2字节,长度<=2^14-1=16383B
  • 10——– aaaaaaaa bbbbbbbb cccccccc dddddddd 5字节,长度>=16383B,用后面4个字节存储字符串长度

整数:

  • 1100 0000 —— >1字节 ——> int16_t类型整数
  • 1101 0000 ——> 1字节 ——> int32_t类型整数
  • 1110 0000 —— >1字节 ——> int64_t类型整数
  • 1111 0000 ——> 1字节 ——> 24bit有符号整数
  • 1111 1110 —— >1字节 ——> 8bit 有符号整数
  • 1111 xxxx ——> 1字节 ——> 4bit无符号整数,0~12,不需要content字段

定义压缩列表节点的encoding值


/* Different encoding/length possibilities */定义不同的encoding
//str
#define ZIP_STR_06B (0 << 6)
#define ZIP_STR_14B (1 << 6)
#define ZIP_STR_32B (2 << 6)
//int
#define ZIP_INT_16B (0xc0 | 0<<4) // '|'按位或,1&0=1,0&0=0
#define ZIP_INT_32B (0xc0 | 1<<4)
#define ZIP_INT_64B (0xc0 | 2<<4)

计算数据类型


/* Macro's to determine type */计算encoding对应的数据类型,str  int
//将上面的宏带入enc,得到true或false。
#define ZIP_IS_STR(enc) (((enc) & 0xc0) < 0xc0) //'&'按位与,1&1=1,1&0=0
#define ZIP_IS_INT(enc) (!ZIP_IS_STR(enc) && ((enc) & 0x30) < 0x30)

ziplist应用

创建空的ziplist


//创建并返回一个新的ziplist,其中entry为空
unsigned char *ziplistNew(void) {
    // ZIPLIST_HEADER_SIZE 是 ziplist 表头的大小,固定长度4B+4B+2B
    // 1 字节是表末端 ZIP_END 的大小
    unsigned int bytes = ZIPLIST_HEADER_SIZE+1;
    // 为表头和表末端分配空间
    unsigned char *zl = zmalloc(bytes);
    // 初始化表属性
    ZIPLIST_BYTES(zl) = intrev32ifbe(bytes);//整个ziplist所占字节数,intrev32ifbe大小端转换
    ZIPLIST_TAIL_OFFSET(zl) = intrev32ifbe(ZIPLIST_HEADER_SIZE);//ziplist尾节点相对首节点的偏移量
    ZIPLIST_LENGTH(zl) = 0;//entry为空
    // 设置表末端
    zl[bytes-1] = ZIP_END;
    return zl;
}

给ziplist插入entry节点


//将长度为slen的字符串s,插入zl中,返回插入了entry元素的ziplist
unsigned char *ziplistPush(unsigned char *zl, unsigned char *s, unsigned int slen, int where) {
    // 根据 where 参数的值,决定将值推入到表头还是表尾
    unsigned char *p;
    p = (where == ZIPLIST_HEAD) ? ZIPLIST_ENTRY_HEAD(zl) : ZIPLIST_ENTRY_END(zl);
    // 返回添加新值后的 ziplist
    // T = O(N^2)
    return __ziplistInsert(zl,p,s,slen);
}

其中,插入节点可能会涉及到连锁更新

初始化entry


//将p指向的压缩列表节点所对应的属性信息保存到zlentry结构体中,并返回zlentry结构体。(相当于给zlentry结构体的各个字段赋值)
static zlentry zipEntry(unsigned char *p) {
    zlentry e;
    // e.prevrawlensize 保存着编码前一个节点的长度所需的字节数
    // e.prevrawlen 保存着前一个节点的长度
    // T = O(1)
    ZIP_DECODE_PREVLEN(p, e.prevrawlensize, e.prevrawlen);
    // p + e.prevrawlensize 将指针移动到列表节点本身
    // e.encoding 保存着节点值的编码类型
    // e.lensize 保存着编码节点值长度所需的字节数
    // e.len 保存着节点值的长度
    // T = O(1)
    ZIP_DECODE_LENGTH(p + e.prevrawlensize, e.encoding, e.lensize, e.len);
    // 计算头结点的字节数
    e.headersize = e.prevrawlensize + e.lensize;
    // 记录指针
    e.p = p; 
    return e;
}