void process_runnable_tasks(int *next)
{
...
eb = eb32_lookup_ge(&rqueue, rqueue_ticks - TIMER_LOOK_BACK);
while (max_processed--) {
...
t = eb32_entry(eb, struct task, rq);
eb = eb32_next(eb);
__task_unlink_rq(t);
t->state |= TASK_RUNNING;
/* This is an optimisation to help the processor's branch
* predictor take this most common call.
*/
t->calls++;
if (likely(t->process == process_session))
t = process_session(t);
else
t = t->process(t);
...
}
}
/* Runs the polling loop */
void run_poll_loop()
{
int next;
tv_update_date(0,1);
while (1) {
/* check if we caught some signals and process them */
signal_process_queue();
/* Check if we can expire some tasks */
wake_expired_tasks(&next);
/* Process a few tasks */
process_runnable_tasks(&next);
/* stop when there's nothing left to do */
if (jobs == 0)
break;
/* The poller will ensure it returns around */
cur_poller.poll(&cur_poller, next);
fd_process_spec_events();
}
}
/* Add the various callbacks. Right now the transport layer is present
* but not initialized. Also note we need to be careful as the stream
* int is not initialized yet.
*/
conn_prepare(s->si[0].conn, &sess_conn_cb, l->proto, l->xprt, s);
fdtab[cfd].owner = s->si[0].conn; /*fd 对应的 owner 为 connection 结构*/
fdtab[cfd].iocb = conn_fd_handler;
conn_data_want_recv(s->si[0].conn);
if (conn_xprt_init(s->si[0].conn) < 0)
goto out_free_task;
...
si_takeover_conn(&s->si[0], l->proto, l->xprt);
...
t->process = l->handler;
...
if (p->accept && (ret = p->accept(s)) <= 0) {
/* Either we had an unrecoverable error (<0) or work is
* finished (=0, eg: monitoring), in both situations,
* we can release everything and close.
*/
goto out_free_rep_buf;
}
...
task_wakeup(t, TASK_WOKEN_INIT);
int conn_fd_handler(int fd)
{
struct connection *conn = fdtab[fd].owner;
...
if ((fdtab[fd].ev & (FD_POLL_IN | FD_POLL_HUP | FD_POLL_ERR)) &&
conn->xprt &&
!(conn->flags & (CO_FL_WAIT_RD|CO_FL_WAIT_ROOM|CO_FL_ERROR|CO_FL_HANDSHAKE))) {
/* force detection of a flag change : it's impossible to have both
* CONNECTED and WAIT_CONN so we're certain to trigger a change.
*/
flags = CO_FL_WAIT_L4_CONN | CO_FL_CONNECTED;
conn->data->recv(conn);
}
...
}