View | Details | Raw Unified | Return to bug 57399
Collapse All | Expand All

(-)server/mpm/event/event.c (-125 / +218 lines)
Lines 107-112 Link Here
107
#include "serf.h"
107
#include "serf.h"
108
#endif
108
#endif
109
109
110
#define VOLATILE_READ(T, x) (*(volatile T *)&(x));
111
110
/* Limit on the total --- clients will be locked out if more servers than
112
/* Limit on the total --- clients will be locked out if more servers than
111
 * this are needed.  It is intended solely to keep the server from crashing
113
 * this are needed.  It is intended solely to keep the server from crashing
112
 * when things get out of hand.
114
 * when things get out of hand.
Lines 182-187 static int dying = 0; Link Here
182
static int workers_may_exit = 0;
184
static int workers_may_exit = 0;
183
static int start_thread_may_exit = 0;
185
static int start_thread_may_exit = 0;
184
static int listener_may_exit = 0;
186
static int listener_may_exit = 0;
187
static int listener_is_wakeable = 0;        /* Pollset supports APR_POLLSET_WAKEABLE */
185
static int num_listensocks = 0;
188
static int num_listensocks = 0;
186
static apr_int32_t conns_this_child;        /* MaxConnectionsPerChild, only access
189
static apr_int32_t conns_this_child;        /* MaxConnectionsPerChild, only access
187
                                               in listener thread */
190
                                               in listener thread */
Lines 204-209 module AP_MODULE_DECLARE_DATA mpm_event_module; Link Here
204
struct event_srv_cfg_s;
207
struct event_srv_cfg_s;
205
typedef struct event_srv_cfg_s event_srv_cfg;
208
typedef struct event_srv_cfg_s event_srv_cfg;
206
209
210
/*
211
 * The pollset for sockets that are in any of the timeout queues. Currently
212
 * we use the timeout_mutex to make sure that connections are added/removed
213
 * atomically to/from both event_pollset and a timeout queue. Otherwise
214
 * some confusion can happen under high load if timeout queues and pollset
215
 * get out of sync.
216
 * XXX: It should be possible to make the lock unnecessary in many or even all
217
 * XXX: cases.
218
 */
219
static apr_pollset_t *event_pollset;
220
207
struct event_conn_state_t {
221
struct event_conn_state_t {
208
    /** APR_RING of expiration timeouts */
222
    /** APR_RING of expiration timeouts */
209
    APR_RING_ENTRY(event_conn_state_t) timeout_list;
223
    APR_RING_ENTRY(event_conn_state_t) timeout_list;
Lines 249-254 static struct timeout_queue *write_completion_q, Link Here
249
                            *keepalive_q,
263
                            *keepalive_q,
250
                            *linger_q,
264
                            *linger_q,
251
                            *short_linger_q;
265
                            *short_linger_q;
266
static apr_time_t queues_next_expiry;
252
267
253
static apr_pollfd_t *listener_pollfd;
268
static apr_pollfd_t *listener_pollfd;
254
269
Lines 256-269 static apr_pollfd_t *listener_pollfd; Link Here
256
 * Macros for accessing struct timeout_queue.
271
 * Macros for accessing struct timeout_queue.
257
 * For TO_QUEUE_APPEND and TO_QUEUE_REMOVE, timeout_mutex must be held.
272
 * For TO_QUEUE_APPEND and TO_QUEUE_REMOVE, timeout_mutex must be held.
258
 */
273
 */
259
#define TO_QUEUE_APPEND(q, el)                                                \
274
static void TO_QUEUE_APPEND(struct timeout_queue *q, event_conn_state_t *el)
260
    do {                                                                      \
275
{
261
        APR_RING_INSERT_TAIL(&(q)->head, el, event_conn_state_t,              \
276
    APR_RING_INSERT_TAIL(&q->head, el, event_conn_state_t, timeout_list);
262
                             timeout_list);                                   \
277
    ++*q->total;
263
        ++*(q)->total;                                                        \
278
    ++q->count;
264
        ++(q)->count;                                                         \
265
    } while (0)
266
279
280
    /* Cheaply update the overall queues' next expiry according to the
281
     * first entry of this queue (oldest), if necessary.
282
     */
283
    el = APR_RING_FIRST(&q->head);
284
    if (!queues_next_expiry
285
            || queues_next_expiry > el->queue_timestamp + q->timeout) {
286
        queues_next_expiry = el->queue_timestamp + q->timeout;
287
        /* Unblock the listener if it's waiting on a longer timeout. */
288
        if (listener_is_wakeable) {
289
            apr_pollset_wakeup(event_pollset);
290
        }
291
    }
292
}
293
267
#define TO_QUEUE_REMOVE(q, el)                                                \
294
#define TO_QUEUE_REMOVE(q, el)                                                \
268
    do {                                                                      \
295
    do {                                                                      \
269
        APR_RING_REMOVE(el, timeout_list);                                    \
296
        APR_RING_REMOVE(el, timeout_list);                                    \
Lines 284-300 static apr_pollfd_t *listener_pollfd; Link Here
284
311
285
#define TO_QUEUE_ELEM_INIT(el) APR_RING_ELEM_INIT(el, timeout_list)
312
#define TO_QUEUE_ELEM_INIT(el) APR_RING_ELEM_INIT(el, timeout_list)
286
313
287
/*
288
 * The pollset for sockets that are in any of the timeout queues. Currently
289
 * we use the timeout_mutex to make sure that connections are added/removed
290
 * atomically to/from both event_pollset and a timeout queue. Otherwise
291
 * some confusion can happen under high load if timeout queues and pollset
292
 * get out of sync.
293
 * XXX: It should be possible to make the lock unnecessary in many or even all
294
 * XXX: cases.
295
 */
296
static apr_pollset_t *event_pollset;
297
298
#if HAVE_SERF
314
#if HAVE_SERF
299
typedef struct {
315
typedef struct {
300
    apr_pollset_t *pollset;
316
    apr_pollset_t *pollset;
Lines 493-498 static void wakeup_listener(void) Link Here
493
        return;
509
        return;
494
    }
510
    }
495
511
512
    /* unblock the listener if it's poll()ing */
513
    if (listener_is_wakeable) {
514
        apr_pollset_wakeup(event_pollset);
515
    }
516
496
    /* unblock the listener if it's waiting for a worker */
517
    /* unblock the listener if it's waiting for a worker */
497
    ap_queue_info_term(worker_queue_info);
518
    ap_queue_info_term(worker_queue_info);
498
519
Lines 675-681 static apr_status_t decrement_connection_count(voi Link Here
675
        default:
696
        default:
676
            break;
697
            break;
677
    }
698
    }
678
    apr_atomic_dec32(&connection_count);
699
    /* Unblock the listener if it's waiting for connection_count = 0 */
700
    if (!apr_atomic_dec32(&connection_count)
701
             && listener_is_wakeable && listener_may_exit) {
702
        apr_pollset_wakeup(event_pollset);
703
    }
679
    return APR_SUCCESS;
704
    return APR_SUCCESS;
680
}
705
}
681
706
Lines 838-843 static void notify_resume(event_conn_state_t *cs, Link Here
838
863
839
static int start_lingering_close_common(event_conn_state_t *cs, int in_worker)
864
static int start_lingering_close_common(event_conn_state_t *cs, int in_worker)
840
{
865
{
866
    int done = 0;
841
    apr_status_t rv;
867
    apr_status_t rv;
842
    struct timeout_queue *q;
868
    struct timeout_queue *q;
843
    apr_socket_t *csd = cs->pfd.desc.s;
869
    apr_socket_t *csd = cs->pfd.desc.s;
Lines 849-855 static int start_lingering_close_common(event_conn Link Here
849
#else
875
#else
850
    apr_socket_timeout_set(csd, 0);
876
    apr_socket_timeout_set(csd, 0);
851
#endif
877
#endif
852
    cs->queue_timestamp = apr_time_now();
853
    /*
878
    /*
854
     * If some module requested a shortened waiting period, only wait for
879
     * If some module requested a shortened waiting period, only wait for
855
     * 2s (SECONDS_TO_LINGER). This is useful for mitigating certain
880
     * 2s (SECONDS_TO_LINGER). This is useful for mitigating certain
Lines 870-894 static int start_lingering_close_common(event_conn Link Here
870
    else {
895
    else {
871
        cs->c->sbh = NULL;
896
        cs->c->sbh = NULL;
872
    }
897
    }
873
    apr_thread_mutex_lock(timeout_mutex);
874
    TO_QUEUE_APPEND(q, cs);
875
    cs->pfd.reqevents = (
898
    cs->pfd.reqevents = (
876
            cs->pub.sense == CONN_SENSE_WANT_WRITE ? APR_POLLOUT :
899
            cs->pub.sense == CONN_SENSE_WANT_WRITE ? APR_POLLOUT :
877
                    APR_POLLIN) | APR_POLLHUP | APR_POLLERR;
900
                    APR_POLLIN) | APR_POLLHUP | APR_POLLERR;
878
    cs->pub.sense = CONN_SENSE_DEFAULT;
901
    cs->pub.sense = CONN_SENSE_DEFAULT;
902
    cs->queue_timestamp = apr_time_now();
903
    apr_thread_mutex_lock(timeout_mutex);
879
    rv = apr_pollset_add(event_pollset, &cs->pfd);
904
    rv = apr_pollset_add(event_pollset, &cs->pfd);
905
    if (rv == APR_SUCCESS || APR_STATUS_IS_EEXIST(rv)) {
906
        TO_QUEUE_APPEND(q, cs);
907
        done = 1;
908
    }
880
    apr_thread_mutex_unlock(timeout_mutex);
909
    apr_thread_mutex_unlock(timeout_mutex);
881
    if (rv != APR_SUCCESS && !APR_STATUS_IS_EEXIST(rv)) {
910
    if (!done) {
882
        ap_log_error(APLOG_MARK, APLOG_ERR, rv, ap_server_conf, APLOGNO(03092)
911
        ap_log_error(APLOG_MARK, APLOG_ERR, rv, ap_server_conf, APLOGNO(03092)
883
                     "start_lingering_close: apr_pollset_add failure");
912
                     "start_lingering_close: apr_pollset_add failure");
884
        apr_thread_mutex_lock(timeout_mutex);
885
        TO_QUEUE_REMOVE(q, cs);
886
        apr_thread_mutex_unlock(timeout_mutex);
887
        apr_socket_close(cs->pfd.desc.s);
913
        apr_socket_close(cs->pfd.desc.s);
888
        ap_push_pool(worker_queue_info, cs->p);
914
        ap_push_pool(worker_queue_info, cs->p);
889
        return 0;
890
    }
915
    }
891
    return 1;
916
    return done;
892
}
917
}
893
918
894
/*
919
/*
Lines 1149-1163 read_request: Link Here
1149
             * Set a write timeout for this connection, and let the
1174
             * Set a write timeout for this connection, and let the
1150
             * event thread poll for writeability.
1175
             * event thread poll for writeability.
1151
             */
1176
             */
1152
            cs->queue_timestamp = apr_time_now();
1153
            notify_suspend(cs);
1177
            notify_suspend(cs);
1154
            apr_thread_mutex_lock(timeout_mutex);
1155
            TO_QUEUE_APPEND(cs->sc->wc_q, cs);
1156
            cs->pfd.reqevents = (
1178
            cs->pfd.reqevents = (
1157
                    cs->pub.sense == CONN_SENSE_WANT_READ ? APR_POLLIN :
1179
                    cs->pub.sense == CONN_SENSE_WANT_READ ? APR_POLLIN :
1158
                            APR_POLLOUT) | APR_POLLHUP | APR_POLLERR;
1180
                            APR_POLLOUT) | APR_POLLHUP | APR_POLLERR;
1159
            cs->pub.sense = CONN_SENSE_DEFAULT;
1181
            cs->pub.sense = CONN_SENSE_DEFAULT;
1160
            rc = apr_pollset_add(event_pollset, &cs->pfd);
1182
            cs->queue_timestamp = apr_time_now();
1183
            apr_thread_mutex_lock(timeout_mutex);
1184
            apr_pollset_add(event_pollset, &cs->pfd);
1185
            TO_QUEUE_APPEND(cs->sc->wc_q, cs);
1161
            apr_thread_mutex_unlock(timeout_mutex);
1186
            apr_thread_mutex_unlock(timeout_mutex);
1162
            return;
1187
            return;
1163
        }
1188
        }
Lines 1188-1201 read_request: Link Here
1188
         * timeout today.  With a normal client, the socket will be readable in
1213
         * timeout today.  With a normal client, the socket will be readable in
1189
         * a few milliseconds anyway.
1214
         * a few milliseconds anyway.
1190
         */
1215
         */
1191
        cs->queue_timestamp = apr_time_now();
1192
        notify_suspend(cs);
1216
        notify_suspend(cs);
1193
        apr_thread_mutex_lock(timeout_mutex);
1194
        TO_QUEUE_APPEND(cs->sc->ka_q, cs);
1195
1196
        /* Add work to pollset. */
1217
        /* Add work to pollset. */
1197
        cs->pfd.reqevents = APR_POLLIN;
1218
        cs->pfd.reqevents = APR_POLLIN;
1219
        cs->queue_timestamp = apr_time_now();
1220
        apr_thread_mutex_lock(timeout_mutex);
1198
        rc = apr_pollset_add(event_pollset, &cs->pfd);
1221
        rc = apr_pollset_add(event_pollset, &cs->pfd);
1222
        TO_QUEUE_APPEND(cs->sc->ka_q, cs);
1199
        apr_thread_mutex_unlock(timeout_mutex);
1223
        apr_thread_mutex_unlock(timeout_mutex);
1200
1224
1201
        if (rc != APR_SUCCESS) {
1225
        if (rc != APR_SUCCESS) {
Lines 1226-1238 static apr_status_t event_resume_suspended (conn_r Link Here
1226
    apr_atomic_dec32(&suspended_count);
1250
    apr_atomic_dec32(&suspended_count);
1227
    c->suspended_baton = NULL;
1251
    c->suspended_baton = NULL;
1228
1252
1229
    apr_thread_mutex_lock(timeout_mutex);
1230
    TO_QUEUE_APPEND(cs->sc->wc_q, cs);
1231
    cs->pfd.reqevents = (
1253
    cs->pfd.reqevents = (
1232
            cs->pub.sense == CONN_SENSE_WANT_READ ? APR_POLLIN :
1254
            cs->pub.sense == CONN_SENSE_WANT_READ ? APR_POLLIN :
1233
                    APR_POLLOUT) | APR_POLLHUP | APR_POLLERR;
1255
                    APR_POLLOUT) | APR_POLLHUP | APR_POLLERR;
1234
    cs->pub.sense = CONN_SENSE_DEFAULT;
1256
    cs->pub.sense = CONN_SENSE_DEFAULT;
1257
    cs->queue_timestamp = apr_time_now();
1258
    apr_thread_mutex_lock(timeout_mutex);
1235
    apr_pollset_add(event_pollset, &cs->pfd);
1259
    apr_pollset_add(event_pollset, &cs->pfd);
1260
    TO_QUEUE_APPEND(cs->sc->wc_q, cs);
1236
    apr_thread_mutex_unlock(timeout_mutex);
1261
    apr_thread_mutex_unlock(timeout_mutex);
1237
1262
1238
    return OK;
1263
    return OK;
Lines 1443-1448 static void get_worker(int *have_idle_worker_p, in Link Here
1443
static APR_RING_HEAD(timer_free_ring_t, timer_event_t) timer_free_ring;
1468
static APR_RING_HEAD(timer_free_ring_t, timer_event_t) timer_free_ring;
1444
1469
1445
static apr_skiplist *timer_skiplist;
1470
static apr_skiplist *timer_skiplist;
1471
static apr_time_t timers_next_expiry;
1446
1472
1447
/* The following compare function is used by apr_skiplist_insert() to keep the
1473
/* The following compare function is used by apr_skiplist_insert() to keep the
1448
 * elements (timers) sorted and provide O(log n) complexity (this is also true
1474
 * elements (timers) sorted and provide O(log n) complexity (this is also true
Lines 1474-1481 static timer_event_t * event_get_timer_event(apr_t Link Here
1474
                                             apr_array_header_t *remove)
1500
                                             apr_array_header_t *remove)
1475
{
1501
{
1476
    timer_event_t *te;
1502
    timer_event_t *te;
1503
    apr_time_t now = (t < 0) ? 0 : apr_time_now();
1504
1477
    /* oh yeah, and make locking smarter/fine grained. */
1505
    /* oh yeah, and make locking smarter/fine grained. */
1478
1479
    apr_thread_mutex_lock(g_timer_skiplist_mtx);
1506
    apr_thread_mutex_lock(g_timer_skiplist_mtx);
1480
1507
1481
    if (!APR_RING_EMPTY(&timer_free_ring, timer_event_t, link)) {
1508
    if (!APR_RING_EMPTY(&timer_free_ring, timer_event_t, link)) {
Lines 1490-1501 static timer_event_t * event_get_timer_event(apr_t Link Here
1490
    te->cbfunc = cbfn;
1517
    te->cbfunc = cbfn;
1491
    te->baton = baton;
1518
    te->baton = baton;
1492
    te->canceled = 0;
1519
    te->canceled = 0;
1493
    te->when = t;
1520
    te->when = now + t;
1494
    te->remove = remove;
1521
    te->remove = remove;
1495
1522
1496
    if (insert) { 
1523
    if (insert) { 
1497
        /* Okay, add sorted by when.. */
1524
        /* Okay, add sorted by when.. */
1498
        apr_skiplist_insert(timer_skiplist, te);
1525
        apr_skiplist_insert(timer_skiplist, te);
1526
1527
        /* Cheaply update the overall timers' next expiry according to
1528
         * this event, if necessary.
1529
         */
1530
        if (!timers_next_expiry
1531
                || timers_next_expiry > te->when) {
1532
            timers_next_expiry = te->when;
1533
            /* Unblock the listener if it's waiting on a longer timer. */
1534
            if (listener_is_wakeable) {
1535
                apr_pollset_wakeup(event_pollset);
1536
            }
1537
        }
1499
    }
1538
    }
1500
    apr_thread_mutex_unlock(g_timer_skiplist_mtx);
1539
    apr_thread_mutex_unlock(g_timer_skiplist_mtx);
1501
1540
Lines 1507-1513 static apr_status_t event_register_timed_callback_ Link Here
1507
                                                  void *baton, 
1546
                                                  void *baton, 
1508
                                                  apr_array_header_t *remove)
1547
                                                  apr_array_header_t *remove)
1509
{
1548
{
1510
    event_get_timer_event(t + apr_time_now(), cbfn, baton, 1, remove);
1549
    event_get_timer_event(t, cbfn, baton, 1, remove);
1511
    return APR_SUCCESS;
1550
    return APR_SUCCESS;
1512
}
1551
}
1513
1552
Lines 1567-1573 static apr_status_t event_register_poll_callback_e Link Here
1567
1606
1568
    if (timeout > 0) { 
1607
    if (timeout > 0) { 
1569
        /* XXX:  This cancel timer event count fire before the pollset is updated */
1608
        /* XXX:  This cancel timer event count fire before the pollset is updated */
1570
        scb->cancel_event = event_get_timer_event(timeout + apr_time_now(), tofn, baton, 1, pfds);
1609
        scb->cancel_event = event_get_timer_event(timeout, tofn, baton, 1, pfds);
1571
    }
1610
    }
1572
    for (i = 0; i < pfds->nelts; i++) {
1611
    for (i = 0; i < pfds->nelts; i++) {
1573
        apr_pollfd_t *pfd = (apr_pollfd_t *)pfds->elts + i;
1612
        apr_pollfd_t *pfd = (apr_pollfd_t *)pfds->elts + i;
Lines 1658-1677 static void process_timeout_queue(struct timeout_q Link Here
1658
        count = 0;
1697
        count = 0;
1659
        cs = first = last = APR_RING_FIRST(&qp->head);
1698
        cs = first = last = APR_RING_FIRST(&qp->head);
1660
        while (cs != APR_RING_SENTINEL(&qp->head, event_conn_state_t,
1699
        while (cs != APR_RING_SENTINEL(&qp->head, event_conn_state_t,
1661
                                       timeout_list)
1700
                                       timeout_list)) {
1662
               /* Trash the entry if:
1701
            /* Trash the entry if:
1663
                * - no timeout_time was given (asked for all), or
1702
             * - no timeout_time was given (asked for all), or
1664
                * - it expired (according to the queue timeout), or
1703
             * - it expired (according to the queue timeout), or
1665
                * - the system clock skewed in the past: no entry should be
1704
             * - the system clock skewed in the past: no entry should be
1666
                *   registered above the given timeout_time (~now) + the queue
1705
             *   registered above the given timeout_time (~now) + the queue
1667
                *   timeout, we won't keep any here (eg. for centuries).
1706
             *   timeout, we won't keep any here (eg. for centuries).
1668
                * Stop otherwise, no following entry will match thanks to the
1707
             *
1669
                * single timeout per queue (entries are added to the end!).
1708
             * Otherwise stop, no following entry will match thanks to the
1670
                * This allows maintenance in O(1).
1709
             * single timeout per queue (entries are added to the end!).
1671
                */
1710
             * This allows maintenance in O(1).
1672
               && (!timeout_time
1711
             */
1673
                   || cs->queue_timestamp + qp->timeout < timeout_time
1712
            if (timeout_time
1674
                   || cs->queue_timestamp > timeout_time + qp->timeout)) {
1713
                    && cs->queue_timestamp + qp->timeout > timeout_time
1714
                    && cs->queue_timestamp < timeout_time + qp->timeout) {
1715
                /* Since this is the next expiring of this queue, update the
1716
                 * overall queues' next expiry if it's later than this one.
1717
                 */
1718
                apr_time_t cs_expiry = cs->queue_timestamp + qp->timeout;
1719
                if (!queues_next_expiry
1720
                        || queues_next_expiry > cs_expiry) {
1721
                    queues_next_expiry = cs_expiry;
1722
                }
1723
                break;
1724
            }
1725
1675
            last = cs;
1726
            last = cs;
1676
            rv = apr_pollset_remove(event_pollset, &cs->pfd);
1727
            rv = apr_pollset_remove(event_pollset, &cs->pfd);
1677
            if (rv != APR_SUCCESS && !APR_STATUS_IS_NOTFOUND(rv)) {
1728
            if (rv != APR_SUCCESS && !APR_STATUS_IS_NOTFOUND(rv)) {
Lines 1711-1720 static void * APR_THREAD_FUNC listener_thread(apr_ Link Here
1711
    apr_status_t rc;
1762
    apr_status_t rc;
1712
    proc_info *ti = dummy;
1763
    proc_info *ti = dummy;
1713
    int process_slot = ti->pslot;
1764
    int process_slot = ti->pslot;
1765
    struct process_score *ps = ap_get_scoreboard_process(process_slot);
1714
    apr_pool_t *tpool = apr_thread_pool_get(thd);
1766
    apr_pool_t *tpool = apr_thread_pool_get(thd);
1715
    apr_time_t timeout_time = 0, last_log;
1716
    int closed = 0, listeners_disabled = 0;
1767
    int closed = 0, listeners_disabled = 0;
1717
    int have_idle_worker = 0;
1768
    int have_idle_worker = 0;
1769
    apr_time_t last_log;
1718
1770
1719
    last_log = apr_time_now();
1771
    last_log = apr_time_now();
1720
    free(ti);
1772
    free(ti);
Lines 1749-1756 static void * APR_THREAD_FUNC listener_thread(apr_ Link Here
1749
        apr_int32_t num = 0;
1801
        apr_int32_t num = 0;
1750
        apr_uint32_t c_count, l_count, i_count;
1802
        apr_uint32_t c_count, l_count, i_count;
1751
        apr_interval_time_t timeout_interval;
1803
        apr_interval_time_t timeout_interval;
1752
        apr_time_t now;
1804
        apr_time_t now, timeout_time;
1753
        int workers_were_busy = 0;
1805
        int workers_were_busy = 0;
1806
1754
        if (listener_may_exit) {
1807
        if (listener_may_exit) {
1755
            close_listeners(process_slot, &closed);
1808
            close_listeners(process_slot, &closed);
1756
            if (terminate_mode == ST_UNGRACEFUL
1809
            if (terminate_mode == ST_UNGRACEFUL
Lines 1793-1841 static void * APR_THREAD_FUNC listener_thread(apr_ Link Here
1793
        }
1846
        }
1794
#endif
1847
#endif
1795
1848
1849
        /* Update poll() timeout below according to the next expiring
1850
         * timer or queue entry, if any.
1851
         */
1852
        timeout_interval = -1;
1796
        now = apr_time_now();
1853
        now = apr_time_now();
1797
        apr_thread_mutex_lock(g_timer_skiplist_mtx);
1854
1798
        te = apr_skiplist_peek(timer_skiplist);
1855
        /* Avoid locking if there's no expiring timer in the list,
1799
        if (te) {
1856
         * poll() will be woken up anyway if a new timer comes in.
1800
            if (te->when > now) {
1857
         */
1801
                timeout_interval = te->when - now;
1858
        timeout_time = VOLATILE_READ(apr_time_t, timers_next_expiry);
1802
            }
1859
        if (timeout_time && timeout_time < now + EVENT_FUDGE_FACTOR) {
1803
            else {
1860
            /* Push expired timers to a worker, the first one remaining
1804
                timeout_interval = 1;
1861
             * determines the maximum time to poll() below.
1805
            }
1862
             */
1806
        }
1863
            apr_thread_mutex_lock(g_timer_skiplist_mtx);
1807
        else {
1864
            while ((te = apr_skiplist_peek(timer_skiplist))) {
1808
            timeout_interval = apr_time_from_msec(100);
1865
                if (te->when < now + EVENT_FUDGE_FACTOR) {
1809
        }
1866
                    apr_skiplist_pop(timer_skiplist, NULL);
1810
        while (te) {
1867
                    if (!te->canceled) { 
1811
            if (te->when < now + EVENT_FUDGE_FACTOR) {
1868
                        if (te->remove) {
1812
                apr_skiplist_pop(timer_skiplist, NULL);
1869
                            int i;
1813
                if (!te->canceled) { 
1870
                            for (i = 0; i < te->remove->nelts; i++) {
1814
                    if (te->remove) {
1871
                                apr_pollfd_t *pfd;
1815
                        int i;
1872
                                pfd = (apr_pollfd_t *)te->remove->elts + i;
1816
                        for (i = 0; i < te->remove->nelts; i++) {
1873
                                apr_pollset_remove(event_pollset, pfd);
1817
                            apr_pollfd_t *pfd = (apr_pollfd_t *)te->remove->elts + i;
1874
                            }
1818
                            apr_pollset_remove(event_pollset, pfd);
1819
                        }
1875
                        }
1876
                        push_timer2worker(te);
1820
                    }
1877
                    }
1821
                    push_timer2worker(te);
1878
                    else {
1879
                        APR_RING_INSERT_TAIL(&timer_free_ring, te,
1880
                                             timer_event_t, link);
1881
                    }
1822
                }
1882
                }
1823
                else {
1883
                else {
1824
                    APR_RING_INSERT_TAIL(&timer_free_ring, te, timer_event_t,
1884
                    timeout_interval = te->when - now;
1825
                                         link);
1885
                    timers_next_expiry = te->when;
1886
                    break;
1826
                }
1887
                }
1827
            }
1888
            }
1828
            else {
1889
            /* If there are no timers in the list, either the listener is
1829
                break;
1890
             * wakeable and it can poll() indefinitely until a wake up occurs,
1891
             * or periodic checks must be performed.
1892
             */
1893
            if (!te) {
1894
                if (!listener_is_wakeable) {
1895
                    timeout_interval = apr_time_from_msec(100);
1896
                }
1897
                timers_next_expiry = 0;
1830
            }
1898
            }
1831
            te = apr_skiplist_peek(timer_skiplist);
1899
            apr_thread_mutex_unlock(g_timer_skiplist_mtx);
1832
        }
1900
        }
1833
        apr_thread_mutex_unlock(g_timer_skiplist_mtx);
1834
1901
1902
        /* Same for queues, if the listener is wakeable use the current expiry
1903
         * time and expect to be woken up for an earlier one, otherwise use the
1904
         * maintenance timeout (max).
1905
         */
1906
        timeout_time = VOLATILE_READ(apr_time_t, queues_next_expiry);
1907
        if (timeout_time
1908
                && (timeout_interval < 0
1909
                    || timeout_time <= now
1910
                    || timeout_interval > timeout_time - now)) {
1911
            timeout_interval = timeout_time > now ? timeout_time - now : 1;
1912
        }
1913
        if (!listener_is_wakeable
1914
                && timeout_interval > apr_time_from_msec(100)) {
1915
            timeout_interval = apr_time_from_msec(100);
1916
        }
1917
1835
        rc = apr_pollset_poll(event_pollset, timeout_interval, &num, &out_pfd);
1918
        rc = apr_pollset_poll(event_pollset, timeout_interval, &num, &out_pfd);
1836
        if (rc != APR_SUCCESS) {
1919
        if (rc != APR_SUCCESS) {
1837
            if (APR_STATUS_IS_EINTR(rc)) {
1920
            if (APR_STATUS_IS_EINTR(rc)) {
1838
                continue;
1921
                /* Woken up, either update timeouts or shutdown,
1922
                 * both logics are above.
1923
                 */
1924
                 continue;
1839
            }
1925
            }
1840
            if (!APR_STATUS_IS_TIMEUP(rc)) {
1926
            if (!APR_STATUS_IS_TIMEUP(rc)) {
1841
                ap_log_error(APLOG_MARK, APLOG_CRIT, rc, ap_server_conf,
1927
                ap_log_error(APLOG_MARK, APLOG_CRIT, rc, ap_server_conf,
Lines 1844-1849 static void * APR_THREAD_FUNC listener_thread(apr_ Link Here
1844
                             "shutdown process gracefully");
1930
                             "shutdown process gracefully");
1845
                signal_threads(ST_GRACEFUL);
1931
                signal_threads(ST_GRACEFUL);
1846
            }
1932
            }
1933
            num = 0;
1847
        }
1934
        }
1848
1935
1849
        if (listener_may_exit) {
1936
        if (listener_may_exit) {
Lines 2056-2090 static void * APR_THREAD_FUNC listener_thread(apr_ Link Here
2056
        /* XXX possible optimization: stash the current time for use as
2143
        /* XXX possible optimization: stash the current time for use as
2057
         * r->request_time for new requests
2144
         * r->request_time for new requests
2058
         */
2145
         */
2059
        now = apr_time_now();
2146
        /* We process the timeout queues here only when their overall next
2060
        /* We only do this once per 0.1s (TIMEOUT_FUDGE_FACTOR), or on a clock
2147
         * expiry (read once above) is over. This happens accurately since
2061
         * skew (if the system time is set back in the meantime, timeout_time
2148
         * adding to the queues (in workers) can only decrease this expiry,
2062
         * will exceed now + TIMEOUT_FUDGE_FACTOR, can't happen otherwise).
2149
         * while latest ones are only taken into account here (in listener)
2150
         * during queues' processing, with the lock held. This works both
2151
         * with and without wake-ability.
2063
         */
2152
         */
2064
        if (now > timeout_time || now + TIMEOUT_FUDGE_FACTOR < timeout_time ) {
2153
        if (timeout_time && timeout_time < (now = apr_time_now())) {
2065
            struct process_score *ps;
2066
            timeout_time = now + TIMEOUT_FUDGE_FACTOR;
2154
            timeout_time = now + TIMEOUT_FUDGE_FACTOR;
2067
2155
2068
            /* handle timed out sockets */
2156
            /* handle timed out sockets */
2069
            apr_thread_mutex_lock(timeout_mutex);
2157
            apr_thread_mutex_lock(timeout_mutex);
2070
2158
2159
            /* Processing all the queues below will recompute this. */
2160
            queues_next_expiry = 0;
2161
2071
            /* Step 1: keepalive timeouts */
2162
            /* Step 1: keepalive timeouts */
2072
            /* If all workers are busy, we kill older keep-alive connections so that they
2163
            process_timeout_queue(keepalive_q, timeout_time,
2073
             * may connect to another process.
2164
                                  start_lingering_close_nonblocking);
2074
             */
2075
            if ((workers_were_busy || dying) && *keepalive_q->total) {
2076
                if (!dying)
2077
                    ap_log_error(APLOG_MARK, APLOG_TRACE1, 0, ap_server_conf,
2078
                                 "All workers are busy, will close %d keep-alive "
2079
                                 "connections",
2080
                                 *keepalive_q->total);
2081
                process_timeout_queue(keepalive_q, 0,
2082
                                      start_lingering_close_nonblocking);
2083
            }
2084
            else {
2085
                process_timeout_queue(keepalive_q, timeout_time,
2086
                                      start_lingering_close_nonblocking);
2087
            }
2088
            /* Step 2: write completion timeouts */
2165
            /* Step 2: write completion timeouts */
2089
            process_timeout_queue(write_completion_q, timeout_time,
2166
            process_timeout_queue(write_completion_q, timeout_time,
2090
                                  start_lingering_close_nonblocking);
2167
                                  start_lingering_close_nonblocking);
Lines 2093-2099 static void * APR_THREAD_FUNC listener_thread(apr_ Link Here
2093
            /* Step 4: (short) lingering close completion timeouts */
2170
            /* Step 4: (short) lingering close completion timeouts */
2094
            process_timeout_queue(short_linger_q, timeout_time, stop_lingering_close);
2171
            process_timeout_queue(short_linger_q, timeout_time, stop_lingering_close);
2095
2172
2096
            ps = ap_get_scoreboard_process(process_slot);
2097
            ps->write_completion = *write_completion_q->total;
2173
            ps->write_completion = *write_completion_q->total;
2098
            ps->keep_alive = *keepalive_q->total;
2174
            ps->keep_alive = *keepalive_q->total;
2099
            apr_thread_mutex_unlock(timeout_mutex);
2175
            apr_thread_mutex_unlock(timeout_mutex);
Lines 2102-2107 static void * APR_THREAD_FUNC listener_thread(apr_ Link Here
2102
            ps->suspended = apr_atomic_read32(&suspended_count);
2178
            ps->suspended = apr_atomic_read32(&suspended_count);
2103
            ps->lingering_close = apr_atomic_read32(&lingering_count);
2179
            ps->lingering_close = apr_atomic_read32(&lingering_count);
2104
        }
2180
        }
2181
        else if ((workers_were_busy || dying) && *keepalive_q->total) {
2182
            if (!dying) {
2183
                ap_log_error(APLOG_MARK, APLOG_TRACE1, 0, ap_server_conf,
2184
                             "All workers are busy, will close %d keep-alive "
2185
                             "connections", *keepalive_q->total);
2186
            }
2187
            apr_thread_mutex_lock(timeout_mutex);
2188
            process_timeout_queue(keepalive_q, 0,
2189
                                  start_lingering_close_nonblocking);
2190
            ps->keep_alive = 0;
2191
            apr_thread_mutex_unlock(timeout_mutex);
2192
        }
2193
2105
        if (listeners_disabled && !workers_were_busy
2194
        if (listeners_disabled && !workers_were_busy
2106
            && ((c_count = apr_atomic_read32(&connection_count))
2195
            && ((c_count = apr_atomic_read32(&connection_count))
2107
                    >= (l_count = apr_atomic_read32(&lingering_count))
2196
                    >= (l_count = apr_atomic_read32(&lingering_count))
Lines 2314-2319 static void *APR_THREAD_FUNC start_threads(apr_thr Link Here
2314
    int prev_threads_created;
2403
    int prev_threads_created;
2315
    int max_recycled_pools = -1;
2404
    int max_recycled_pools = -1;
2316
    int good_methods[] = {APR_POLLSET_KQUEUE, APR_POLLSET_PORT, APR_POLLSET_EPOLL};
2405
    int good_methods[] = {APR_POLLSET_KQUEUE, APR_POLLSET_PORT, APR_POLLSET_EPOLL};
2406
    /* XXX don't we need more to handle K-A or lingering close? */
2407
    const apr_uint32_t pollset_size = threads_per_child * 2;
2317
2408
2318
    /* We must create the fd queues before we start up the listener
2409
    /* We must create the fd queues before we start up the listener
2319
     * and worker threads. */
2410
     * and worker threads. */
Lines 2353-2376 static void *APR_THREAD_FUNC start_threads(apr_thr Link Here
2353
2444
2354
    /* Create the main pollset */
2445
    /* Create the main pollset */
2355
    for (i = 0; i < sizeof(good_methods) / sizeof(good_methods[0]); i++) {
2446
    for (i = 0; i < sizeof(good_methods) / sizeof(good_methods[0]); i++) {
2356
        rv = apr_pollset_create_ex(&event_pollset,
2447
        apr_uint32_t flags = APR_POLLSET_THREADSAFE | APR_POLLSET_NOCOPY |
2357
                            threads_per_child*2, /* XXX don't we need more, to handle
2448
                             APR_POLLSET_NODEFAULT | APR_POLLSET_WAKEABLE;
2358
                                                * connections in K-A or lingering
2449
        rv = apr_pollset_create_ex(&event_pollset, pollset_size, pchild, flags,
2359
                                                * close?
2450
                                   good_methods[i]);
2360
                                                */
2361
                            pchild, APR_POLLSET_THREADSAFE | APR_POLLSET_NOCOPY | APR_POLLSET_NODEFAULT,
2362
                            good_methods[i]);
2363
        if (rv == APR_SUCCESS) {
2451
        if (rv == APR_SUCCESS) {
2452
            listener_is_wakeable = 1;
2364
            break;
2453
            break;
2365
        }
2454
        }
2455
        flags &= ~APR_POLLSET_WAKEABLE;
2456
        rv = apr_pollset_create_ex(&event_pollset, pollset_size, pchild, flags,
2457
                                   good_methods[i]);
2458
        if (rv == APR_SUCCESS) {
2459
            break;
2460
        }
2366
    }
2461
    }
2367
    if (rv != APR_SUCCESS) {
2462
    if (rv != APR_SUCCESS) {
2368
        rv = apr_pollset_create(&event_pollset,
2463
        rv = apr_pollset_create(&event_pollset, pollset_size, pchild,
2369
                               threads_per_child*2, /* XXX don't we need more, to handle
2464
                                APR_POLLSET_THREADSAFE | APR_POLLSET_NOCOPY);
2370
                                                     * connections in K-A or lingering
2371
                                                     * close?
2372
                                                     */
2373
                               pchild, APR_POLLSET_THREADSAFE | APR_POLLSET_NOCOPY);
2374
    }
2465
    }
2375
    if (rv != APR_SUCCESS) {
2466
    if (rv != APR_SUCCESS) {
2376
        ap_log_error(APLOG_MARK, APLOG_ERR, rv, ap_server_conf, APLOGNO(03103)
2467
        ap_log_error(APLOG_MARK, APLOG_ERR, rv, ap_server_conf, APLOGNO(03103)
Lines 2379-2385 static void *APR_THREAD_FUNC start_threads(apr_thr Link Here
2379
    }
2470
    }
2380
2471
2381
    ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, ap_server_conf, APLOGNO(02471)
2472
    ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, ap_server_conf, APLOGNO(02471)
2382
                 "start_threads: Using %s", apr_pollset_method_name(event_pollset));
2473
                 "start_threads: Using %s (%swakeable)",
2474
                 apr_pollset_method_name(event_pollset),
2475
                 listener_is_wakeable ? "" : "not ");
2383
    worker_sockets = apr_pcalloc(pchild, threads_per_child
2476
    worker_sockets = apr_pcalloc(pchild, threads_per_child
2384
                                 * sizeof(apr_socket_t *));
2477
                                 * sizeof(apr_socket_t *));
2385
2478

Return to bug 57399