
早期服务端程序处理用户请求时,会为每一个新连接创建线程处理请求,随着并发量越来越高,“thread-per-connection” 这种模式的弊端开始显现。一来线程创建本身有开销,二来上下文切换也会带来额外开销。引入线程池可以一定程度上缓解,但更深层次的问题开始浮现,连接建立(accept)后通常不能马上读取到client的请求数据(recv),也就是说线程池内线程处于挂起状态。
一个直观的想法就是,连接建立后直到有数据可读时才开启线程进行处理,避免线程挂起。
单线程 Reactor

Synchronous Event Demultiplexer: 指代各种多路复用机制。Handle: 文件描述符或句柄。Event Handler: 处理 Handle 的上的各种事件,比如 READ, WRITE。Initiation Dispatcher: 核心类,维护一个基于 Synchronous Event Demultiplexer 的事件循环监听 Handle 各种事件并分发给 Event Handler 进行处理。

Logging Acceptor 是 Event Handler 一个实现类,专门处理 socket 上的 READ 事件,即客户端连接建立,大致流程:
- 将
Logging Acceptor 注册至 Initiation Dispatcher。 Initiation Dispatcher 启动事件循环。Initiation Dispatcher 通过 select 等待客户端发起 connect。- 客户端发起
connect。 Initiation Dispatcher 从 select 返回并将连接建立事件交由 Logging Acceptor 处理。Logging Acceptor 调用 accept 获取连接。Logging Acceptor 为了连接创建一个 Logging HandlerLogging Acceptor 将新创建的 Logging Handler 注册到 Initiation Dispatcher 的事件循环中,开始新一轮的事件等待直到客户端发送数据(send)。

当客户端开始发送数据,大致流程:
- 客户端发送数据。
Initiation Dispatcher 从 select 返回并将发生 READ 事件的连接交给对应 Logging Handler 处理。Logging Handler 调用 recv 读取客户端数据。Logging Handler 处理数据(注意这里的 write 在论文里是用作把客户端数据写入存储用的)。- 回到
Initiation Dispatcher 处理下一个事件或进入下一轮事件循环。
上面这两个过程基本就是 Reactor pattern 的核心流程了,回消息给客户端的流程类似,Logging Handler 处理完后再注册一个 WRITE 事件的监听,然后再写入数据。整个流程都在一个线程内工作,没有上下文切换。
Reactor pattern 除了使用多路复用外,还用到了非阻塞IO。设想 Logging Handler 一次事件响应中没能读到一个完整请求的数据,那么就需要立即返回等待下一次事件,而不是阻塞在 recv 上。
下面是示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114
| #include <sys/epoll.h> #include <sys/socket.h> #include <cstdio> #include <cstdlib> #include <unistd.h> #include <fcntl.h>
enum EventType { CONNECT, READ, WRITE, CLOSE };
class EventHandler { protected: const int fd;
public: EventHandler(const int fd) : fd(fd) {} virtual ~EventHandler() {} virtual void accept(EventType event) {} };
class InitiationDispatcher { private: static const int MAX_EVENTS = 16; EventHandler *get_handler(epoll_event *ev) const {} public: InitiationDispatcher() {} ~InitiationDispatcher() {}
void register_handler(const EventHandler* handler, EventType et) {} void remove_handler(const EventHandler* handler) {} void handle_events() { const int epfd = epoll_create(1); epoll_event events[MAX_EVENTS]; while (true) { epoll_ctl(epfd, EPOLL_CTL_ADD, , );
const int nevents = epoll_wait(epfd, events, MAX_EVENTS, -1); for (size_t i = 0; i < nevents; i++) { auto ev = events[i]; if () { auto logging_acceptor = get_handler(&ev); logging_acceptor->accept(EventType::CONNECT); } else if () { auto logging_handler = get_handler(&ev); logging_handler->accept(EventType::READ); } else if () { auto logging_handler = get_handler(&ev); logging_handler->accept(EventType::WRITE); } else { } } } }
};
class LoggingHandler : public EventHandler { private: InitiationDispatcher* dispatcher; public: LoggingHandler(const int fd, InitiationDispatcher* dispatcher) : EventHandler(fd), dispatcher(dispatcher) {} ~LoggingHandler() {} void accept(EventType event) override { if (event == EventType::READ) { int size = read(this->fd, , ) if (size == EAGAIN) { return; } int written = write(this->fd, , ); if (written == EAGAIN) { this->dispatcher->register_handler(this, EventType::WRITE); return } } else if (event == EventType::WRITE) { } } };
class LoggingAcceptor : public EventHandler { private: InitiationDispatcher* dispatcher; public: LoggingAcceptor(const int fd, InitiationDispatcher* dispatcher): EventHandler(fd), dispatcher(dispatcher) {} ~LoggingAcceptor() {} void accept(EventType event) override { int conn_fd = accept4(this->fd, , , 0); auto handler = new LoggingHandler(conn_fd, this->dispatcher); int flags = fcntl(fd, F_GETFL, 0); fcntl(fd, F_SETFL, flags | O_NONBLOCK); this->dispatcher->register_handler(handler, EventType::READ); } };
int main(void) { int socket_fd = socket(); listen(socket_fd, 0); auto dispatcher = new InitiationDispatcher(); auto acceptor = new LoggingAcceptor(socket_fd, dispatcher); dispatcher->register_handler(acceptor, EventType::CONNECT); dispatcher->handle_events(); return EXIT_SUCCESS; }
|
多线程 Reactor
单线程 Reactor 中所有的操作在单个线程中完成,如果某个请求处理非常耗时,将导致其它请求无法处理,无法建立新连接。所以就可以将请求处理的逻辑交由线程池进行处理。
多线程 Multi-Reactor
不管单线程还是多线程 Reactor,所有的文件描述符/句柄都在同一个事件循环里处理,也就无法同一时间处理大量新连接建立和读写事件。所以可以将两种不同的文件描述符分别在不同的事件循环里处理,交由不同线程处理,提高吞吐量。
1 2 3 4 5 6
| class InitiationDispatcher { private: std::vector<InitiationDispatcher*> sub_dispathcers; public: }
|
root InitiationDispatcher 只负责建立新连接建立,然后已建立的连接则交给 sub_dispatchers(RR)处理,每个 sub_dispacther 都会创建线程开启事件循环。
Key takeaways
The C10K problem
reactor
nio netty