📌wolai制作,原文:https://www.wolai.com/ravenxrz/d4CYcWtC6M86RQyVAuKT2d
花了1~2 week的时间过完了经典的C网络库muduo实现,和学习LevelDB的时候不同,那个时候主要学习C 的编程知识,而muduo主要学习的是设计,当然还有一些编程技巧。本篇总结下通过muduo学习到了哪些内容。
1 IO多路复用·
通过redis和muduo的学习,以及公司内部的一些网络库实现。IO多路复用是必备的,且基本都用epoll实现。回顾下什么是io多路复用,有哪些io多路复用方式,优劣势。
IO多路复用指的是一个线程可以同时监听多个fd ,一旦这些fd中任一一个有响应,系统即可通知该线程来处理,这样每个线程可以同时处理多个链接,提高系统性能(iops和带宽)。
当前io多路复用的方式 主要有三种:
select:select维护一个fd_set(底层实现就是一个bitmap),通过FD_SET
绑定要监听的fd,通过select
系统调用等待事件ready,再通过FD_ISSET
判定哪个fd ready,最后处理。
缺点:
由于fd_set底层是一个bitmap,这个bitmap最多接受1024个fd,所以有长度限制
fd在用户态和内核态各维护了一份,每次select需要将fd拷贝一份到内核,内核就绪后,又会拷贝一份到用户态,这里存在拷贝开销
处理时,通过循环判定fd是否ready,这里有多余开销。
poll: poll和select基本一致,但是底层不使用bitmap,而是链表,从而打破1024的上限限制。其余缺点和select一样。
epoll:
优点: 1. epoll监控fd的数据结构是红黑树,相比select和poll,每次修改要监听的fd时的开销更小,因为不用拷贝fd_set, 内核自己维护这个红黑树即可。2. 事件就绪时,不用循环遍历fd_set来找到是哪个fd就绪,因为epoll在内核中维护了一个 就绪链表,只有有事件发生的fd会加入到这个链表,用户拿到的fd一定是有事件的fd。 另外,epoll通常搭配非阻塞io使用。
缺点:linux特有,无法跨平台。
muduo自身优先使用epoll实现io多路复用。
一些额外思考:
io多路复用本质上还是同步调用,可能涉及多次系统调用(比如epoll_wait, epoll_cntl),最近几年io_uring 比较火,各大公司的存储系统也基本切到io_uring, 不知道有系统把io_uring用到网络库没,io_uring介入后,可以做异步(觉得异步编程难的,借助协程就可实现同步),且能明显降低系统调用次数。
2 Reactor设计模式·
关于Reactor
的详细解释参考: 这里 。这里简单解释下,本质上reactor就是一个事件监听器(reactor),监听到事件后给予以分发处理(handler)。这里有两个变量:1. 监听器有多少个? 2.处理器有多个(多少个线程来处理)? 所以会有多种设计方式, 单reactor/单线程,单reactor/多线程,多reactor/多线程等。一个典型的单reactor/单线程设计模式图如下:
单reactor通过io多路复用的接口等待有fd事件发生,针对每个fd,如果该fd有事件,则调用对应的handler。redis是这种)。
在muduo中,reactor对应的实现类是EventLoop
,handler对应的实现类是Channel
,而muduo是有多个EventLoop
,每个loop
有多个channel
,对应关系如下图(详细分析见 muduo源码分析2-事件循环(下) )
明显muduo是典型的多reactor多线程模型,并遵从主从reactor设计,在主reactor中,只负责建立连接(Acceptor跑在这个reactor上),建立连接后,就从多个子reactor(也就是EventloopThreadPool)中选一个出来处理后续io,之后的处理都在这个线程中。换句话说:建立连接的线程和处理io的线程不在同一个线程。
3 可以借鉴的一些技巧·
3.1 保证生命周期的Channel
tie
函数·
channel
的tie
函数, 用于保证回调函数引用的对象,在真正执行回调时,一定没有析构,具体实现中,用到了weak_ptr
。 个人的开发生活中,用到weak_ptr
的机会是很少的,这里算是一个。详细如何使用,见此处 。
3.2 incomplete type 使用unique_ptr作为成员 ,其类析构函数要使用outline定义·
标题有点长,用例子来解释:
1 2 3 4 5 6 7 8 9 10 11 12 13 class TcpServer : noncopyable{ public : ~TcpServer (); private : std::unique_ptr<Acceptor> acceptor_; };
muduo源码中TcpServer有一个unique_ptr成员,其Acceptor类型在TcpServer.h文件中并没有实现,所以必须将TcpServer的析构函数用outline的方式实现(也就是定义在.cpp文件中)。这是因为std::unique_ptr在析构时会调用delete acceptor
, 此时必须要知道acceptor
的类型定义,换成shared_ptr
则不存在这个问题。详细见分析: 这里 。
3.3 检测running的线程是否符合预期·
muduo保证每个Eventloop
只能运行在一个线程中,为了强保证这点,EventLoop中有个函数assertInLoopThread
,底层实现如下
1 2 bool isInLoopThread () const { return threadId_ == CurrentThread::tid (); }
threadId_
是构造EventLoop
时初始化的。
笔者在公司内项目就有函数要求调用上下文一定要在同一个线程中,不然就可能出现并发bug,笔者本身也踩过多次坑,如果有这个防护,至少能提前发现问题。
另一个使用场景为要求收发socket fd一定要在同一个线程,但是muduo的上层应用很可能在不同线程调用send, receive接口,所以可以做一个检测,如果检测到当前处于非预期线程,就主动切线程,如下函数:
1 2 3 4 5 6 7 8 9 10 11 void EventLoop::runInLoop (Functor cb) { if (isInLoopThread ()) { cb (); } else { queueInLoop (std::move (cb)); } }
3.4 事件wakeup机制·
通常要做生产者-消费者模型,一端要通知另一端时,笔者通常会使用条件变量或者信号量的方式,但是在muduo中,使用了eventfd的方式来通知, 这部分分析见:wakeup。
通过eventfd方式,可以统一Eventloop
被唤醒的方式。
3.5 定时器实现·
muduo的定时器借助了timefd_xxx 系统api,在内核的定时器之上构建自己的定时器,虽然精确不够,但是借助了fd + epoll实现了编程统一。这样整个系统中,有新链接、有io读写、有超时事件、有唤醒事件都可以完全统一到EventLoop中。
4 附录·
4.1 select usage·
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 #include <iostream> #include <sys/select.h> #include <sys/socket.h> #include <netinet/in.h> #include <unistd.h> #include <string.h> #include <vector> #define PORT 8080 #define MAX_CLIENTS 100 #define BUFFER_SIZE 1024 int main () { int server_fd, new_socket; struct sockaddr_in address; int opt = 1 ; int addrlen = sizeof (address); char buffer[BUFFER_SIZE] = {0 }; if ((server_fd = socket (AF_INET, SOCK_STREAM, 0 )) == 0 ) { perror ("socket failed" ); exit (EXIT_FAILURE); } if (setsockopt (server_fd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof (opt))) { perror ("setsockopt" ); exit (EXIT_FAILURE); } address.sin_family = AF_INET; address.sin_addr.s_addr = INADDR_ANY; address.sin_port = htons (PORT); if (bind (server_fd, (struct sockaddr *)&address, sizeof (address)) < 0 ) { perror ("bind failed" ); exit (EXIT_FAILURE); } if (listen (server_fd, 3 ) < 0 ) { perror ("listen" ); exit (EXIT_FAILURE); } std::vector<int > client_sockets; fd_set readfds; while (true ) { FD_ZERO (&readfds); FD_SET (server_fd, &readfds); int max_sd = server_fd; for (int sd : client_sockets) { FD_SET (sd, &readfds); if (sd > max_sd) max_sd = sd; } int activity = select (max_sd + 1 , &readfds, NULL , NULL , NULL ); if ((activity < 0 ) && (errno != EINTR)) { perror ("select error" ); } if (FD_ISSET (server_fd, &readfds)) { if ((new_socket = accept (server_fd, (struct sockaddr *)&address, (socklen_t *)&addrlen)) < 0 ) { perror ("accept" ); exit (EXIT_FAILURE); } std::cout << "New connection , socket fd is " << new_socket << std::endl; client_sockets.push_back (new_socket); } for (int i = 0 ; i < client_sockets.size (); i++) { int sd = client_sockets[i]; if (FD_ISSET (sd, &readfds)) { if (read (sd, buffer, BUFFER_SIZE) == 0 ) { std::cout << "Client disconnected, socket fd is " << sd << std::endl; close (sd); client_sockets.erase (client_sockets.begin () + i); } else { std::cout << "Client " << sd << " sent: " << buffer << std::endl; send (sd, buffer, strlen (buffer), 0 ); memset (buffer, 0 , BUFFER_SIZE); } } } } return 0 ; }
4.2 epoll usage·
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 #include <iostream> #include <sys/epoll.h> #include <sys/socket.h> #include <netinet/in.h> #include <unistd.h> #include <string.h> #include <vector> #include <errno.h> #define PORT 8080 #define MAX_EVENTS 1024 #define BUFFER_SIZE 1024 int main () { int server_fd, new_socket; struct sockaddr_in address; int opt = 1 ; int addrlen = sizeof (address); char buffer[BUFFER_SIZE] = {0 }; if ((server_fd = socket (AF_INET, SOCK_STREAM, 0 )) == 0 ) { perror ("socket failed" ); exit (EXIT_FAILURE); } if (setsockopt (server_fd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof (opt))) { perror ("setsockopt" ); exit (EXIT_FAILURE); } address.sin_family = AF_INET; address.sin_addr.s_addr = INADDR_ANY; address.sin_port = htons (PORT); if (bind (server_fd, (struct sockaddr *)&address, sizeof (address)) < 0 ) { perror ("bind failed" ); exit (EXIT_FAILURE); } if (listen (server_fd, 3 ) < 0 ) { perror ("listen" ); exit (EXIT_FAILURE); } int epfd = epoll_create1 (0 ); if (epfd == -1 ) { perror ("epoll_create1" ); exit (EXIT_FAILURE); } struct epoll_event event; event.events = EPOLLIN | EPOLLET; event.data.fd = server_fd; if (epoll_ctl (epfd, EPOLL_CTL_ADD, server_fd, &event) == -1 ) { perror ("epoll_ctl" ); exit (EXIT_FAILURE); } struct epoll_event events[MAX_EVENTS]; while (true ) { int n = epoll_wait (epfd, events, MAX_EVENTS, -1 ); for (int i = 0 ; i < n; i++) { int fd = events[i].data.fd; if (fd == server_fd) { if ((new_socket = accept (server_fd, (struct sockaddr *)&address, (socklen_t *)&addrlen)) < 0 ) { perror ("accept" ); continue ; } std::cout << "New connection, socket fd is " << new_socket << std::endl; event.events = EPOLLIN | EPOLLET; event.data.fd = new_socket; if (epoll_ctl (epfd, EPOLL_CTL_ADD, new_socket, &event) == -1 ) { perror ("epoll_ctl" ); close (new_socket); continue ; } } else { int done = 0 ; while (!done) { int count = read (fd, buffer, BUFFER_SIZE); if (count == -1 ) { if (errno != EAGAIN) { perror ("read" ); done = 1 ; } break ; } else if (count == 0 ) { done = 1 ; break ; } std::cout << "Client " << fd << " sent: " << buffer << std::endl; send (fd, buffer, count, 0 ); memset (buffer, 0 , BUFFER_SIZE); } if (done) { std::cout << "Client disconnected, socket fd is " << fd << std::endl; epoll_ctl (epfd, EPOLL_CTL_DEL, fd, NULL ); close (fd); } } } } close (server_fd); close (epfd); return 0 ; }
4.3 epoll LT & ET的区别·
LT模式(水平触发):
LT模式下,只要文件描述符(fd)还有数据可读,每次调用epoll_wait都会返回该fd的事件,提醒用户程序去操作。
ET模式(边缘触发):
ET模式下,只有当数据第一次到达时才会触发事件,之后直到下次有新的数据流入之前,即使fd中还有剩余数据,也不会再触发事件。所以在ET模式下,read一个fd时,必须将它的buffer读光,即一直读到read的返回值小于请求值。
epoll工作在ET模式时,必须使用非阻塞套接字 ,以避免由于一个文件的阻塞读/阻塞写操作把处理多个文件描述符的任务饿死。