epoll 切入口 __poller_wait
Linux平台下,epoll只有三个api: epoll_create, epoll_ctl, epoll_wait。
找到epoll_wait,就能找到事件处理的核心(主事件循环)。
而workflow中,epoll_wait只在一个位置调用:
1 2 3 4 5 6 7 8 9 10 11
| /src/kernel/poller.c
static inline int __poller_wait(__poller_event_t *events, int maxevents, const poller_t *poller) {
return epoll_wait(poller->pfd, events, maxevents, -1); }
|
先不看poller_t是个什么。只看__poller_wait这一个函数看不出来整个事件处理的逻辑。找找__poller_wait被谁调用了:
也只有一个地方:poller_thread_routine.
poller_thread_routine
完整的poller_thread_routine是这样的,很长,不好分析具体逻辑。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
| static void *poller_thread_routine(void *arg) { poller_t *poller = (poller_t *)arg; __poller_event_t events[POLLER_EVENTS_MAX]; struct __poller_node time_node; struct __poller_node *node; int has_pipe_event = 0; int nevents = 0; int i = 0;
while (1) { __poller_set_timer(poller); nevents = __poller_wait(events, POLLER_EVENTS_MAX, poller); clock_gettime(CLOCK_MONOTONIC, &time_node.timeout); has_pipe_event = 0; for (i = 0; i < nevents; i++) { node = (struct __poller_node *)__poller_event_data(&events[i]); switch (node->data.operation) { case PD_OP_READ: __poller_handle_read(node, poller); break; case PD_OP_WRITE: __poller_handle_write(node, poller); break; case PD_OP_LISTEN: __poller_handle_listen(node, poller); break; case PD_OP_CONNECT: __poller_handle_connect(node, poller); break; case PD_OP_RECVFROM: __poller_handle_recvfrom(node, poller); break; case PD_OP_SSL_ACCEPT: __poller_handle_ssl_accept(node, poller); break; case PD_OP_SSL_CONNECT: __poller_handle_ssl_connect(node, poller); break; case PD_OP_SSL_SHUTDOWN: __poller_handle_ssl_shutdown(node, poller); break; case PD_OP_EVENT: __poller_handle_event(node, poller); break; case PD_OP_NOTIFY: __poller_handle_notify(node, poller); break; case -1: has_pipe_event = 1; break; default: ; } }
if (has_pipe_event) { if (__poller_handle_pipe(poller)) { break; } } __poller_handle_timeout(&time_node, poller); }
return NULL; }
|
简化一下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| static void *poller_thread_routine(void *arg) { while (1) { nevents = __poller_wait(events, POLLER_EVENTS_MAX, poller); for (i = 0; i < nevents; i++) { node = (struct __poller_node *)__poller_event_data(&events[i]); if (node > (struct __poller_node *)1) { switch (node->data.operation) { case PD_OP_READ: __poller_handle_read(node, poller); break; case PD_OP_WRITE: __poller_handle_write(node, poller); break; } } } }
|
可以看到,这里是将epoll触发的事件数组events,挨个根据他们的operation分发给不同的行为函数(read/write…)
poller_start
再看看poller_thread_routine在哪里被调用了:
poller_thread_routine的唯一一个引用就是在poller_start中,但并不是直接调用。而是传给了pthread_create函数,这是pthread库用来创建线程的函数。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| int poller_start(poller_t *poller) { pthread_t tid; int ret = 0;
pthread_mutex_lock(&poller->mutex); if (__poller_open_pipe(poller) >= 0) { ret = pthread_create(&tid, NULL, poller_thread_routine, poller); if (ret == 0) { poller->tid = tid; poller->stopped = 0; } else { errno = ret; close(poller->pipe_wr); close(poller->pipe_rd); } }
pthread_mutex_unlock(&poller->mutex); return -poller->stopped; }
|
也就是说,poller_thread_routine是某个线程的执行函数。而执行该函数的线程,就是事件处理线程。
那调用poller_start的又是谁呢?
毕竟只有先调用poller_start才能通过其内部的pthread_create创建事件线程。
mpoller_start
poller_start在mpoller_start中被调用。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| int mpoller_start(const mpoller_t *mpoller) { unsigned int i = 0; for (i = 0; i < mpoller->nthreads; ++i) { if (poller_start(mpoller->poller[i]) < 0) { break; } } if (i == mpoller->nthreads) { return 0; } while (i > 0) { poller_stop(mpoller->poller[--i]); } return -1; }
|
注意,上述代码中是循环调用poller_start,也就是说:创建了不止一个事件处理线程。
由此可见:mpoller的职责,是start我们设置的epoll线程数的epoll线程
create_poller
而mpoller_start 在 Communicator::create_poller 的时候启动:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| int Communicator::create_poller(const size_t poller_threads) { const poller_params params = { .max_open_file = static_cast<size_t>(sysconf(_SC_OPEN_MAX)), .call_back = Communicator::callback, .context = this };
if (static_cast<ssize_t>(params.max_open_file) < 0) { return -1; }
this->msgqueue = msgqueue_create(16 * 1024, sizeof(poller_result)); if (this->msgqueue) { this->mpoller = mpoller_create(¶ms, poller_threads); if (this->mpoller) { if (mpoller_start(this->mpoller) >= 0) { return 0; } mpoller_destroy(this->mpoller); } msgqueue_destroy(this->msgqueue); }
return -1; }
|
Communicator::init
上面的Communicator::create_poller又在Communicator::init中被调用:
1 2 3 4 5 6 7
| int Communicator::init(size_t poller_threads, size_t handler_threads) { .... create_poller(poller_threads); create_handler_threads(handler_threads); .... }
|
CommScheduler
继续向上追溯,发现Communicator::init在CommScheduler::init中被调用。
1 2 3 4 5 6 7 8 9 10 11 12
| class CommScheduler { public: int init(size_t poller_threads, size_t handler_threads) { return this->comm.init(poller_threads, handler_threads); }
... private: Communicator comm; };
|
而CommScheduler仅有一个成员变量Communicator, 对于Communicator来说就是对外封装了一层, 加入了一些逻辑操作,本质上都是this->comm的操作。
如果要说设计模式的话,这应该属于外观模式。
__CommManager
CommScheduler::init的唯一一次调用在__CommManager的构造函数里:
1 2 3 4 5 6 7 8 9 10 11 12 13
| private: __CommManager() : fio_service_(nullptr), fio_flag_(false) { const auto *settings = WFGlobal::get_global_settings(); if (scheduler_.init( settings->poller_threads, settings->handler_threads) < 0) { abort(); } signal(SIGPIPE, SIG_IGN); }
|
注意到,这个构造函数是私有的,那么很容易就想到单例模式了。
1 2 3 4 5
| static __CommManager *get_instance() { static __CommManager kInstance; __CommManager::created_ = true; return &kInstance; }
|
可以看到,此处使用的是:
C++ 11 中的静态局部变量实现的懒汉式单例模式。
优点是:
只有在第一次调用get_instance()方法时才创建实例,实现延迟加载。
static __CommManager kInstance;表示kInstance是静态局部变量,只在第一次调用get_instance()时初始化
C++11标准保证了静态局部变量的初始化是线程安全的
由此,只要__CommManager::get_instance被首次调用,那么就一定有一个_CommManager对象被实例化,进而__CommManager的构造函数被调用,然后scheduler_.init被执行,进而创建poller线程。
到这里其实已经到了最外层了,但__CommManager实际上属于框架内部类,使用者一般不能直接使用这个类。因此还有一层封装。
WFGlobal
所有__CommManager的使用,都被封装在WFGlobal中:
1 2 3 4 5 6 7 8 9 10 11
| bool WFGlobal::is_scheduler_created() { return __CommManager::is_created(); }
CommScheduler *WFGlobal::get_scheduler() { return __CommManager::get_instance()->get_scheduler(); }
IOService *WFGlobal::get_io_service() { return __CommManager::get_instance()->get_io_service(); }
|
以此,我们实现了__CommManager的全局单例,也间接实现了CommScheduler的全局单例。(因为CommScheduler的创建只在__CommManager中有)。这样就可以避免执行多次CommScheduler::init了