View | Details | Raw Unified | Return to bug 57399
Collapse All | Expand All

(-)server/mpm/event/event.c (-137 / +244 lines)
Lines 107-112 Link Here
107
#include "serf.h"
107
#include "serf.h"
108
#endif
108
#endif
109
109
110
#define VOLATILE_READ(T, x) (*(volatile T *)&(x))
111
110
/* Limit on the total --- clients will be locked out if more servers than
112
/* Limit on the total --- clients will be locked out if more servers than
111
 * this are needed.  It is intended solely to keep the server from crashing
113
 * this are needed.  It is intended solely to keep the server from crashing
112
 * when things get out of hand.
114
 * when things get out of hand.
Lines 182-187 static int dying = 0; Link Here
182
static int workers_may_exit = 0;
184
static int workers_may_exit = 0;
183
static int start_thread_may_exit = 0;
185
static int start_thread_may_exit = 0;
184
static int listener_may_exit = 0;
186
static int listener_may_exit = 0;
187
static int listener_is_wakeable = 0;        /* Pollset supports APR_POLLSET_WAKEABLE */
185
static int num_listensocks = 0;
188
static int num_listensocks = 0;
186
static apr_int32_t conns_this_child;        /* MaxConnectionsPerChild, only access
189
static apr_int32_t conns_this_child;        /* MaxConnectionsPerChild, only access
187
                                               in listener thread */
190
                                               in listener thread */
Lines 204-209 module AP_MODULE_DECLARE_DATA mpm_event_module; Link Here
204
struct event_srv_cfg_s;
207
struct event_srv_cfg_s;
205
typedef struct event_srv_cfg_s event_srv_cfg;
208
typedef struct event_srv_cfg_s event_srv_cfg;
206
209
210
/*
211
 * The pollset for sockets that are in any of the timeout queues. Currently
212
 * we use the timeout_mutex to make sure that connections are added/removed
213
 * atomically to/from both event_pollset and a timeout queue. Otherwise
214
 * some confusion can happen under high load if timeout queues and pollset
215
 * get out of sync.
216
 * XXX: It should be possible to make the lock unnecessary in many or even all
217
 * XXX: cases.
218
 */
219
static apr_pollset_t *event_pollset;
220
207
struct event_conn_state_t {
221
struct event_conn_state_t {
208
    /** APR_RING of expiration timeouts */
222
    /** APR_RING of expiration timeouts */
209
    APR_RING_ENTRY(event_conn_state_t) timeout_list;
223
    APR_RING_ENTRY(event_conn_state_t) timeout_list;
Lines 249-272 static struct timeout_queue *write_completion_q, Link Here
249
                            *keepalive_q,
263
                            *keepalive_q,
250
                            *linger_q,
264
                            *linger_q,
251
                            *short_linger_q;
265
                            *short_linger_q;
266
static apr_time_t queues_next_expiry;
252
267
253
static apr_pollfd_t *listener_pollfd;
268
static apr_pollfd_t *listener_pollfd;
254
269
270
/* Prevent extra poll/wakeup calls for timeouts close in the future (queues
271
 * have the granularity of a second anyway).
272
 * XXX: Wouldn't 0.5s (instead of 0.1s) be "enough"?
273
 */
274
#define TIMEOUT_FUDGE_FACTOR APR_TIME_C(100000) /* 100 ms */
275
276
/* Same goal as for TIMEOUT_FUDGE_FACTOR (avoid extra poll calls), but applied
277
 * to timers. Since their timeouts are custom (user defined), we can't be too
278
 * approximative here (hence using 0.01s).
279
 */
280
#define EVENT_FUDGE_FACTOR APR_TIME_C(10000) /* 10 ms */
281
255
/*
282
/*
256
 * Macros for accessing struct timeout_queue.
283
 * Macros for accessing struct timeout_queue.
257
 * For TO_QUEUE_APPEND and TO_QUEUE_REMOVE, timeout_mutex must be held.
284
 * For TO_QUEUE_APPEND and TO_QUEUE_REMOVE, timeout_mutex must be held.
258
 */
285
 */
259
#define TO_QUEUE_APPEND(q, el)                                                \
286
static void TO_QUEUE_APPEND(struct timeout_queue *q, event_conn_state_t *el)
260
    do {                                                                      \
287
{
261
        APR_RING_INSERT_TAIL(&(q)->head, el, event_conn_state_t,              \
288
    apr_time_t q_expiry;
262
                             timeout_list);                                   \
263
        ++*(q)->total;                                                        \
264
        ++(q)->count;                                                         \
265
    } while (0)
266
289
290
    APR_RING_INSERT_TAIL(&q->head, el, event_conn_state_t, timeout_list);
291
    ++*q->total;
292
    ++q->count;
293
294
    /* Cheaply update the overall queues' next expiry according to the
295
     * first entry of this queue (oldest), if necessary.
296
     */
297
    el = APR_RING_FIRST(&q->head);
298
    q_expiry = el->queue_timestamp + q->timeout;
299
    if (!queues_next_expiry
300
            || queues_next_expiry > q_expiry + TIMEOUT_FUDGE_FACTOR) {
301
        VOLATILE_READ(apr_time_t, queues_next_expiry) = q_expiry;
302
        /* Unblock the poll()ing listener for it to update its timeout. */
303
        if (listener_is_wakeable) {
304
            apr_pollset_wakeup(event_pollset);
305
        }
306
    }
307
}
308
267
#define TO_QUEUE_REMOVE(q, el)                                                \
309
#define TO_QUEUE_REMOVE(q, el)                                                \
268
    do {                                                                      \
310
    do {                                                                      \
269
        APR_RING_REMOVE(el, timeout_list);                                    \
311
        APR_RING_REMOVE((el), timeout_list);                                  \
270
        --*(q)->total;                                                        \
312
        --*(q)->total;                                                        \
271
        --(q)->count;                                                         \
313
        --(q)->count;                                                         \
272
    } while (0)
314
    } while (0)
Lines 282-300 static apr_pollfd_t *listener_pollfd; Link Here
282
        (q)->next = NULL;                                                     \
324
        (q)->next = NULL;                                                     \
283
    } while (0)
325
    } while (0)
284
326
285
#define TO_QUEUE_ELEM_INIT(el) APR_RING_ELEM_INIT(el, timeout_list)
327
#define TO_QUEUE_ELEM_INIT(el) \
328
    APR_RING_ELEM_INIT((el), timeout_list)
286
329
287
/*
288
 * The pollset for sockets that are in any of the timeout queues. Currently
289
 * we use the timeout_mutex to make sure that connections are added/removed
290
 * atomically to/from both event_pollset and a timeout queue. Otherwise
291
 * some confusion can happen under high load if timeout queues and pollset
292
 * get out of sync.
293
 * XXX: It should be possible to make the lock unnecessary in many or even all
294
 * XXX: cases.
295
 */
296
static apr_pollset_t *event_pollset;
297
298
#if HAVE_SERF
330
#if HAVE_SERF
299
typedef struct {
331
typedef struct {
300
    apr_pollset_t *pollset;
332
    apr_pollset_t *pollset;
Lines 493-498 static void wakeup_listener(void) Link Here
493
        return;
525
        return;
494
    }
526
    }
495
527
528
    /* Unblock the listener if it's poll()ing */
529
    if (listener_is_wakeable) {
530
        apr_pollset_wakeup(event_pollset);
531
    }
532
496
    /* unblock the listener if it's waiting for a worker */
533
    /* unblock the listener if it's waiting for a worker */
497
    ap_queue_info_term(worker_queue_info);
534
    ap_queue_info_term(worker_queue_info);
498
535
Lines 675-681 static apr_status_t decrement_connection_count(voi Link Here
675
        default:
712
        default:
676
            break;
713
            break;
677
    }
714
    }
678
    apr_atomic_dec32(&connection_count);
715
    /* Unblock the listener if it's waiting for connection_count = 0 */
716
    if (!apr_atomic_dec32(&connection_count)
717
             && listener_is_wakeable && listener_may_exit) {
718
        apr_pollset_wakeup(event_pollset);
719
    }
679
    return APR_SUCCESS;
720
    return APR_SUCCESS;
680
}
721
}
681
722
Lines 838-843 static void notify_resume(event_conn_state_t *cs, Link Here
838
879
839
static int start_lingering_close_common(event_conn_state_t *cs, int in_worker)
880
static int start_lingering_close_common(event_conn_state_t *cs, int in_worker)
840
{
881
{
882
    int done = 0;
841
    apr_status_t rv;
883
    apr_status_t rv;
842
    struct timeout_queue *q;
884
    struct timeout_queue *q;
843
    apr_socket_t *csd = cs->pfd.desc.s;
885
    apr_socket_t *csd = cs->pfd.desc.s;
Lines 849-855 static int start_lingering_close_common(event_conn Link Here
849
#else
891
#else
850
    apr_socket_timeout_set(csd, 0);
892
    apr_socket_timeout_set(csd, 0);
851
#endif
893
#endif
852
    cs->queue_timestamp = apr_time_now();
853
    /*
894
    /*
854
     * If some module requested a shortened waiting period, only wait for
895
     * If some module requested a shortened waiting period, only wait for
855
     * 2s (SECONDS_TO_LINGER). This is useful for mitigating certain
896
     * 2s (SECONDS_TO_LINGER). This is useful for mitigating certain
Lines 870-894 static int start_lingering_close_common(event_conn Link Here
870
    else {
911
    else {
871
        cs->c->sbh = NULL;
912
        cs->c->sbh = NULL;
872
    }
913
    }
873
    apr_thread_mutex_lock(timeout_mutex);
874
    TO_QUEUE_APPEND(q, cs);
875
    cs->pfd.reqevents = (
914
    cs->pfd.reqevents = (
876
            cs->pub.sense == CONN_SENSE_WANT_WRITE ? APR_POLLOUT :
915
            cs->pub.sense == CONN_SENSE_WANT_WRITE ? APR_POLLOUT :
877
                    APR_POLLIN) | APR_POLLHUP | APR_POLLERR;
916
                    APR_POLLIN) | APR_POLLHUP | APR_POLLERR;
878
    cs->pub.sense = CONN_SENSE_DEFAULT;
917
    cs->pub.sense = CONN_SENSE_DEFAULT;
918
    cs->queue_timestamp = apr_time_now();
919
    apr_thread_mutex_lock(timeout_mutex);
879
    rv = apr_pollset_add(event_pollset, &cs->pfd);
920
    rv = apr_pollset_add(event_pollset, &cs->pfd);
921
    if (rv == APR_SUCCESS || APR_STATUS_IS_EEXIST(rv)) {
922
        TO_QUEUE_APPEND(q, cs);
923
        done = 1;
924
    }
880
    apr_thread_mutex_unlock(timeout_mutex);
925
    apr_thread_mutex_unlock(timeout_mutex);
881
    if (rv != APR_SUCCESS && !APR_STATUS_IS_EEXIST(rv)) {
926
    if (!done) {
882
        ap_log_error(APLOG_MARK, APLOG_ERR, rv, ap_server_conf, APLOGNO(03092)
927
        ap_log_error(APLOG_MARK, APLOG_ERR, rv, ap_server_conf, APLOGNO(03092)
883
                     "start_lingering_close: apr_pollset_add failure");
928
                     "start_lingering_close: apr_pollset_add failure");
884
        apr_thread_mutex_lock(timeout_mutex);
885
        TO_QUEUE_REMOVE(q, cs);
886
        apr_thread_mutex_unlock(timeout_mutex);
887
        apr_socket_close(cs->pfd.desc.s);
929
        apr_socket_close(cs->pfd.desc.s);
888
        ap_push_pool(worker_queue_info, cs->p);
930
        ap_push_pool(worker_queue_info, cs->p);
889
        return 0;
890
    }
931
    }
891
    return 1;
932
    return done;
892
}
933
}
893
934
894
/*
935
/*
Lines 1149-1163 read_request: Link Here
1149
             * Set a write timeout for this connection, and let the
1190
             * Set a write timeout for this connection, and let the
1150
             * event thread poll for writeability.
1191
             * event thread poll for writeability.
1151
             */
1192
             */
1152
            cs->queue_timestamp = apr_time_now();
1153
            notify_suspend(cs);
1193
            notify_suspend(cs);
1154
            apr_thread_mutex_lock(timeout_mutex);
1155
            TO_QUEUE_APPEND(cs->sc->wc_q, cs);
1156
            cs->pfd.reqevents = (
1194
            cs->pfd.reqevents = (
1157
                    cs->pub.sense == CONN_SENSE_WANT_READ ? APR_POLLIN :
1195
                    cs->pub.sense == CONN_SENSE_WANT_READ ? APR_POLLIN :
1158
                            APR_POLLOUT) | APR_POLLHUP | APR_POLLERR;
1196
                            APR_POLLOUT) | APR_POLLHUP | APR_POLLERR;
1159
            cs->pub.sense = CONN_SENSE_DEFAULT;
1197
            cs->pub.sense = CONN_SENSE_DEFAULT;
1160
            rc = apr_pollset_add(event_pollset, &cs->pfd);
1198
            cs->queue_timestamp = apr_time_now();
1199
            apr_thread_mutex_lock(timeout_mutex);
1200
            apr_pollset_add(event_pollset, &cs->pfd);
1201
            TO_QUEUE_APPEND(cs->sc->wc_q, cs);
1161
            apr_thread_mutex_unlock(timeout_mutex);
1202
            apr_thread_mutex_unlock(timeout_mutex);
1162
            return;
1203
            return;
1163
        }
1204
        }
Lines 1188-1201 read_request: Link Here
1188
         * timeout today.  With a normal client, the socket will be readable in
1229
         * timeout today.  With a normal client, the socket will be readable in
1189
         * a few milliseconds anyway.
1230
         * a few milliseconds anyway.
1190
         */
1231
         */
1191
        cs->queue_timestamp = apr_time_now();
1192
        notify_suspend(cs);
1232
        notify_suspend(cs);
1193
        apr_thread_mutex_lock(timeout_mutex);
1194
        TO_QUEUE_APPEND(cs->sc->ka_q, cs);
1195
1196
        /* Add work to pollset. */
1233
        /* Add work to pollset. */
1197
        cs->pfd.reqevents = APR_POLLIN;
1234
        cs->pfd.reqevents = APR_POLLIN;
1235
        cs->queue_timestamp = apr_time_now();
1236
        apr_thread_mutex_lock(timeout_mutex);
1198
        rc = apr_pollset_add(event_pollset, &cs->pfd);
1237
        rc = apr_pollset_add(event_pollset, &cs->pfd);
1238
        TO_QUEUE_APPEND(cs->sc->ka_q, cs);
1199
        apr_thread_mutex_unlock(timeout_mutex);
1239
        apr_thread_mutex_unlock(timeout_mutex);
1200
1240
1201
        if (rc != APR_SUCCESS) {
1241
        if (rc != APR_SUCCESS) {
Lines 1226-1238 static apr_status_t event_resume_suspended (conn_r Link Here
1226
    apr_atomic_dec32(&suspended_count);
1266
    apr_atomic_dec32(&suspended_count);
1227
    c->suspended_baton = NULL;
1267
    c->suspended_baton = NULL;
1228
1268
1229
    apr_thread_mutex_lock(timeout_mutex);
1230
    TO_QUEUE_APPEND(cs->sc->wc_q, cs);
1231
    cs->pfd.reqevents = (
1269
    cs->pfd.reqevents = (
1232
            cs->pub.sense == CONN_SENSE_WANT_READ ? APR_POLLIN :
1270
            cs->pub.sense == CONN_SENSE_WANT_READ ? APR_POLLIN :
1233
                    APR_POLLOUT) | APR_POLLHUP | APR_POLLERR;
1271
                    APR_POLLOUT) | APR_POLLHUP | APR_POLLERR;
1234
    cs->pub.sense = CONN_SENSE_DEFAULT;
1272
    cs->pub.sense = CONN_SENSE_DEFAULT;
1273
    cs->queue_timestamp = apr_time_now();
1274
    apr_thread_mutex_lock(timeout_mutex);
1235
    apr_pollset_add(event_pollset, &cs->pfd);
1275
    apr_pollset_add(event_pollset, &cs->pfd);
1276
    TO_QUEUE_APPEND(cs->sc->wc_q, cs);
1236
    apr_thread_mutex_unlock(timeout_mutex);
1277
    apr_thread_mutex_unlock(timeout_mutex);
1237
1278
1238
    return OK;
1279
    return OK;
Lines 1443-1448 static void get_worker(int *have_idle_worker_p, in Link Here
1443
static APR_RING_HEAD(timer_free_ring_t, timer_event_t) timer_free_ring;
1484
static APR_RING_HEAD(timer_free_ring_t, timer_event_t) timer_free_ring;
1444
1485
1445
static apr_skiplist *timer_skiplist;
1486
static apr_skiplist *timer_skiplist;
1487
static apr_time_t timers_next_expiry;
1446
1488
1447
/* The following compare function is used by apr_skiplist_insert() to keep the
1489
/* The following compare function is used by apr_skiplist_insert() to keep the
1448
 * elements (timers) sorted and provide O(log n) complexity (this is also true
1490
 * elements (timers) sorted and provide O(log n) complexity (this is also true
Lines 1474-1481 static timer_event_t * event_get_timer_event(apr_t Link Here
1474
                                             apr_array_header_t *remove)
1516
                                             apr_array_header_t *remove)
1475
{
1517
{
1476
    timer_event_t *te;
1518
    timer_event_t *te;
1519
    apr_time_t now = (t < 0) ? 0 : apr_time_now();
1520
1477
    /* oh yeah, and make locking smarter/fine grained. */
1521
    /* oh yeah, and make locking smarter/fine grained. */
1478
1479
    apr_thread_mutex_lock(g_timer_skiplist_mtx);
1522
    apr_thread_mutex_lock(g_timer_skiplist_mtx);
1480
1523
1481
    if (!APR_RING_EMPTY(&timer_free_ring, timer_event_t, link)) {
1524
    if (!APR_RING_EMPTY(&timer_free_ring, timer_event_t, link)) {
Lines 1490-1501 static timer_event_t * event_get_timer_event(apr_t Link Here
1490
    te->cbfunc = cbfn;
1533
    te->cbfunc = cbfn;
1491
    te->baton = baton;
1534
    te->baton = baton;
1492
    te->canceled = 0;
1535
    te->canceled = 0;
1493
    te->when = t;
1536
    te->when = now + t;
1494
    te->remove = remove;
1537
    te->remove = remove;
1495
1538
1496
    if (insert) { 
1539
    if (insert) { 
1497
        /* Okay, add sorted by when.. */
1540
        /* Okay, add sorted by when.. */
1498
        apr_skiplist_insert(timer_skiplist, te);
1541
        apr_skiplist_insert(timer_skiplist, te);
1542
1543
        /* Cheaply update the overall timers' next expiry according to
1544
         * this event, if necessary.
1545
         */
1546
        if (!timers_next_expiry
1547
                || timers_next_expiry > te->when + EVENT_FUDGE_FACTOR) {
1548
            VOLATILE_READ(apr_time_t, timers_next_expiry) = te->when;
1549
            /* Unblock the poll()ing listener for it to update its timeout. */
1550
            if (listener_is_wakeable) {
1551
                apr_pollset_wakeup(event_pollset);
1552
            }
1553
        }
1499
    }
1554
    }
1500
    apr_thread_mutex_unlock(g_timer_skiplist_mtx);
1555
    apr_thread_mutex_unlock(g_timer_skiplist_mtx);
1501
1556
Lines 1507-1513 static apr_status_t event_register_timed_callback_ Link Here
1507
                                                  void *baton, 
1562
                                                  void *baton, 
1508
                                                  apr_array_header_t *remove)
1563
                                                  apr_array_header_t *remove)
1509
{
1564
{
1510
    event_get_timer_event(t + apr_time_now(), cbfn, baton, 1, remove);
1565
    event_get_timer_event(t, cbfn, baton, 1, remove);
1511
    return APR_SUCCESS;
1566
    return APR_SUCCESS;
1512
}
1567
}
1513
1568
Lines 1567-1573 static apr_status_t event_register_poll_callback_e Link Here
1567
1622
1568
    if (timeout > 0) { 
1623
    if (timeout > 0) { 
1569
        /* XXX:  This cancel timer event count fire before the pollset is updated */
1624
        /* XXX:  This cancel timer event count fire before the pollset is updated */
1570
        scb->cancel_event = event_get_timer_event(timeout + apr_time_now(), tofn, baton, 1, pfds);
1625
        scb->cancel_event = event_get_timer_event(timeout, tofn, baton, 1, pfds);
1571
    }
1626
    }
1572
    for (i = 0; i < pfds->nelts; i++) {
1627
    for (i = 0; i < pfds->nelts; i++) {
1573
        apr_pollfd_t *pfd = (apr_pollfd_t *)pfds->elts + i;
1628
        apr_pollfd_t *pfd = (apr_pollfd_t *)pfds->elts + i;
Lines 1658-1677 static void process_timeout_queue(struct timeout_q Link Here
1658
        count = 0;
1713
        count = 0;
1659
        cs = first = last = APR_RING_FIRST(&qp->head);
1714
        cs = first = last = APR_RING_FIRST(&qp->head);
1660
        while (cs != APR_RING_SENTINEL(&qp->head, event_conn_state_t,
1715
        while (cs != APR_RING_SENTINEL(&qp->head, event_conn_state_t,
1661
                                       timeout_list)
1716
                                       timeout_list)) {
1662
               /* Trash the entry if:
1717
            /* Trash the entry if:
1663
                * - no timeout_time was given (asked for all), or
1718
             * - no timeout_time was given (asked for all), or
1664
                * - it expired (according to the queue timeout), or
1719
             * - it expired (according to the queue timeout), or
1665
                * - the system clock skewed in the past: no entry should be
1720
             * - the system clock skewed in the past: no entry should be
1666
                *   registered above the given timeout_time (~now) + the queue
1721
             *   registered above the given timeout_time (~now) + the queue
1667
                *   timeout, we won't keep any here (eg. for centuries).
1722
             *   timeout, we won't keep any here (eg. for centuries).
1668
                * Stop otherwise, no following entry will match thanks to the
1723
             *
1669
                * single timeout per queue (entries are added to the end!).
1724
             * Otherwise stop, no following entry will match thanks to the
1670
                * This allows maintenance in O(1).
1725
             * single timeout per queue (entries are added to the end!).
1671
                */
1726
             * This allows maintenance in O(1).
1672
               && (!timeout_time
1727
             */
1673
                   || cs->queue_timestamp + qp->timeout < timeout_time
1728
            if (timeout_time
1674
                   || cs->queue_timestamp > timeout_time + qp->timeout)) {
1729
                    && cs->queue_timestamp + qp->timeout > timeout_time
1730
                    && cs->queue_timestamp < timeout_time + qp->timeout) {
1731
                /* Since this is the next expiring of this queue, update the
1732
                 * overall queues' next expiry if it's later than this one.
1733
                 */
1734
                apr_time_t q_expiry = cs->queue_timestamp + qp->timeout;
1735
                if (!queues_next_expiry || queues_next_expiry > q_expiry) {
1736
                    VOLATILE_READ(apr_time_t, queues_next_expiry) = q_expiry;
1737
                }
1738
                break;
1739
            }
1740
1675
            last = cs;
1741
            last = cs;
1676
            rv = apr_pollset_remove(event_pollset, &cs->pfd);
1742
            rv = apr_pollset_remove(event_pollset, &cs->pfd);
1677
            if (rv != APR_SUCCESS && !APR_STATUS_IS_NOTFOUND(rv)) {
1743
            if (rv != APR_SUCCESS && !APR_STATUS_IS_NOTFOUND(rv)) {
Lines 1711-1732 static void * APR_THREAD_FUNC listener_thread(apr_ Link Here
1711
    apr_status_t rc;
1777
    apr_status_t rc;
1712
    proc_info *ti = dummy;
1778
    proc_info *ti = dummy;
1713
    int process_slot = ti->pslot;
1779
    int process_slot = ti->pslot;
1780
    struct process_score *ps = ap_get_scoreboard_process(process_slot);
1714
    apr_pool_t *tpool = apr_thread_pool_get(thd);
1781
    apr_pool_t *tpool = apr_thread_pool_get(thd);
1715
    apr_time_t timeout_time = 0, last_log;
1716
    int closed = 0, listeners_disabled = 0;
1782
    int closed = 0, listeners_disabled = 0;
1717
    int have_idle_worker = 0;
1783
    int have_idle_worker = 0;
1784
    apr_time_t last_log;
1718
1785
1719
    last_log = apr_time_now();
1786
    last_log = apr_time_now();
1720
    free(ti);
1787
    free(ti);
1721
1788
1722
    /* the following times out events that are really close in the future
1723
     *   to prevent extra poll calls
1724
     *
1725
     * current value is .1 second
1726
     */
1727
#define TIMEOUT_FUDGE_FACTOR 100000
1728
#define EVENT_FUDGE_FACTOR 10000
1729
1730
    rc = init_pollset(tpool);
1789
    rc = init_pollset(tpool);
1731
    if (rc != APR_SUCCESS) {
1790
    if (rc != APR_SUCCESS) {
1732
        ap_log_error(APLOG_MARK, APLOG_ERR, rc, ap_server_conf,
1791
        ap_log_error(APLOG_MARK, APLOG_ERR, rc, ap_server_conf,
Lines 1749-1756 static void * APR_THREAD_FUNC listener_thread(apr_ Link Here
1749
        apr_int32_t num = 0;
1808
        apr_int32_t num = 0;
1750
        apr_uint32_t c_count, l_count, i_count;
1809
        apr_uint32_t c_count, l_count, i_count;
1751
        apr_interval_time_t timeout_interval;
1810
        apr_interval_time_t timeout_interval;
1752
        apr_time_t now;
1811
        apr_time_t now, timeout_time;
1753
        int workers_were_busy = 0;
1812
        int workers_were_busy = 0;
1813
        int keepalives;
1814
1754
        if (listener_may_exit) {
1815
        if (listener_may_exit) {
1755
            close_listeners(process_slot, &closed);
1816
            close_listeners(process_slot, &closed);
1756
            if (terminate_mode == ST_UNGRACEFUL
1817
            if (terminate_mode == ST_UNGRACEFUL
Lines 1793-1841 static void * APR_THREAD_FUNC listener_thread(apr_ Link Here
1793
        }
1854
        }
1794
#endif
1855
#endif
1795
1856
1857
        /* Update poll() timeout below according to the next expiring
1858
         * timer or queue entry, if any.
1859
         */
1860
        timeout_interval = -1;
1796
        now = apr_time_now();
1861
        now = apr_time_now();
1797
        apr_thread_mutex_lock(g_timer_skiplist_mtx);
1862
1798
        te = apr_skiplist_peek(timer_skiplist);
1863
        /* Avoid locking if there's no expiring timer in the list,
1799
        if (te) {
1864
         * poll() will be woken up anyway if a new timer comes in.
1800
            if (te->when > now) {
1865
         */
1801
                timeout_interval = te->when - now;
1866
        timeout_time = VOLATILE_READ(apr_time_t, timers_next_expiry);
1802
            }
1867
        if (timeout_time && timeout_time < now + EVENT_FUDGE_FACTOR) {
1803
            else {
1868
            /* Push expired timers to a worker, the first one remaining
1804
                timeout_interval = 1;
1869
             * determines the maximum time to poll() below.
1805
            }
1870
             */
1806
        }
1871
            apr_thread_mutex_lock(g_timer_skiplist_mtx);
1807
        else {
1872
            while ((te = apr_skiplist_peek(timer_skiplist))) {
1808
            timeout_interval = apr_time_from_msec(100);
1873
                if (te->when < now + EVENT_FUDGE_FACTOR) {
1809
        }
1874
                    apr_skiplist_pop(timer_skiplist, NULL);
1810
        while (te) {
1875
                    if (!te->canceled) { 
1811
            if (te->when < now + EVENT_FUDGE_FACTOR) {
1876
                        if (te->remove) {
1812
                apr_skiplist_pop(timer_skiplist, NULL);
1877
                            int i;
1813
                if (!te->canceled) { 
1878
                            for (i = 0; i < te->remove->nelts; i++) {
1814
                    if (te->remove) {
1879
                                apr_pollfd_t *pfd;
1815
                        int i;
1880
                                pfd = (apr_pollfd_t *)te->remove->elts + i;
1816
                        for (i = 0; i < te->remove->nelts; i++) {
1881
                                apr_pollset_remove(event_pollset, pfd);
1817
                            apr_pollfd_t *pfd = (apr_pollfd_t *)te->remove->elts + i;
1882
                            }
1818
                            apr_pollset_remove(event_pollset, pfd);
1819
                        }
1883
                        }
1884
                        push_timer2worker(te);
1820
                    }
1885
                    }
1821
                    push_timer2worker(te);
1886
                    else {
1887
                        APR_RING_INSERT_TAIL(&timer_free_ring, te,
1888
                                             timer_event_t, link);
1889
                    }
1822
                }
1890
                }
1823
                else {
1891
                else {
1824
                    APR_RING_INSERT_TAIL(&timer_free_ring, te, timer_event_t,
1892
                    timeout_interval = te->when - now;
1825
                                         link);
1893
                    timers_next_expiry = te->when;
1894
                    break;
1826
                }
1895
                }
1827
            }
1896
            }
1828
            else {
1897
            /* If there are no timers in the list, either the listener is
1829
                break;
1898
             * wakeable and it can poll() indefinitely until a wake up occurs,
1899
             * or periodic checks must be performed.
1900
             */
1901
            if (!te) {
1902
                if (!listener_is_wakeable) {
1903
                    timeout_interval = apr_time_from_msec(100);
1904
                }
1905
                timers_next_expiry = 0;
1830
            }
1906
            }
1831
            te = apr_skiplist_peek(timer_skiplist);
1907
            apr_thread_mutex_unlock(g_timer_skiplist_mtx);
1832
        }
1908
        }
1833
        apr_thread_mutex_unlock(g_timer_skiplist_mtx);
1834
1909
1910
        /* Same for queues, if the listener is wakeable use the current expiry
1911
         * time and expect to be woken up for an earlier one, otherwise use the
1912
         * maintenance timeout (max).
1913
         */
1914
        timeout_time = VOLATILE_READ(apr_time_t, queues_next_expiry);
1915
        if (timeout_time
1916
                && (timeout_interval < 0
1917
                    || timeout_time <= now
1918
                    || timeout_interval > timeout_time - now)) {
1919
            timeout_interval = timeout_time > now ? timeout_time - now : 1;
1920
        }
1921
        if (!listener_is_wakeable
1922
                && (timeout_interval < 0
1923
                    || timeout_interval > apr_time_from_msec(100))) {
1924
            timeout_interval = apr_time_from_msec(100);
1925
        }
1926
1835
        rc = apr_pollset_poll(event_pollset, timeout_interval, &num, &out_pfd);
1927
        rc = apr_pollset_poll(event_pollset, timeout_interval, &num, &out_pfd);
1836
        if (rc != APR_SUCCESS) {
1928
        if (rc != APR_SUCCESS) {
1837
            if (APR_STATUS_IS_EINTR(rc)) {
1929
            if (APR_STATUS_IS_EINTR(rc)) {
1838
                continue;
1930
                /* Woken up, either update timeouts or shutdown,
1931
                 * both logics are above.
1932
                 */
1933
                 continue;
1839
            }
1934
            }
1840
            if (!APR_STATUS_IS_TIMEUP(rc)) {
1935
            if (!APR_STATUS_IS_TIMEUP(rc)) {
1841
                ap_log_error(APLOG_MARK, APLOG_CRIT, rc, ap_server_conf,
1936
                ap_log_error(APLOG_MARK, APLOG_CRIT, rc, ap_server_conf,
Lines 1844-1849 static void * APR_THREAD_FUNC listener_thread(apr_ Link Here
1844
                             "shutdown process gracefully");
1939
                             "shutdown process gracefully");
1845
                signal_threads(ST_GRACEFUL);
1940
                signal_threads(ST_GRACEFUL);
1846
            }
1941
            }
1942
            num = 0;
1847
        }
1943
        }
1848
1944
1849
        if (listener_may_exit) {
1945
        if (listener_may_exit) {
Lines 2056-2099 static void * APR_THREAD_FUNC listener_thread(apr_ Link Here
2056
        /* XXX possible optimization: stash the current time for use as
2152
        /* XXX possible optimization: stash the current time for use as
2057
         * r->request_time for new requests
2153
         * r->request_time for new requests
2058
         */
2154
         */
2059
        now = apr_time_now();
2155
        /* We process the timeout queues here only when their overall next
2060
        /* We only do this once per 0.1s (TIMEOUT_FUDGE_FACTOR), or on a clock
2156
         * expiry (read once above) is over. This happens accurately since
2061
         * skew (if the system time is set back in the meantime, timeout_time
2157
         * adding to the queues (in workers) can only decrease this expiry,
2062
         * will exceed now + TIMEOUT_FUDGE_FACTOR, can't happen otherwise).
2158
         * while latest ones are only taken into account here (in listener)
2159
         * during queues' processing, with the lock held. This works both
2160
         * with and without wake-ability.
2063
         */
2161
         */
2064
        if (now > timeout_time || now + TIMEOUT_FUDGE_FACTOR < timeout_time ) {
2162
        if (timeout_time && timeout_time < (now = apr_time_now())) {
2065
            struct process_score *ps;
2066
            timeout_time = now + TIMEOUT_FUDGE_FACTOR;
2163
            timeout_time = now + TIMEOUT_FUDGE_FACTOR;
2067
2164
2068
            /* handle timed out sockets */
2165
            /* handle timed out sockets */
2069
            apr_thread_mutex_lock(timeout_mutex);
2166
            apr_thread_mutex_lock(timeout_mutex);
2070
2167
2168
            /* Processing all the queues below will recompute this. */
2169
            queues_next_expiry = 0;
2170
2071
            /* Step 1: keepalive timeouts */
2171
            /* Step 1: keepalive timeouts */
2072
            /* If all workers are busy, we kill older keep-alive connections so that they
2172
            process_timeout_queue(keepalive_q, timeout_time,
2073
             * may connect to another process.
2173
                                  start_lingering_close_nonblocking);
2074
             */
2075
            if ((workers_were_busy || dying) && *keepalive_q->total) {
2076
                if (!dying)
2077
                    ap_log_error(APLOG_MARK, APLOG_TRACE1, 0, ap_server_conf,
2078
                                 "All workers are busy, will close %d keep-alive "
2079
                                 "connections",
2080
                                 *keepalive_q->total);
2081
                process_timeout_queue(keepalive_q, 0,
2082
                                      start_lingering_close_nonblocking);
2083
            }
2084
            else {
2085
                process_timeout_queue(keepalive_q, timeout_time,
2086
                                      start_lingering_close_nonblocking);
2087
            }
2088
            /* Step 2: write completion timeouts */
2174
            /* Step 2: write completion timeouts */
2089
            process_timeout_queue(write_completion_q, timeout_time,
2175
            process_timeout_queue(write_completion_q, timeout_time,
2090
                                  start_lingering_close_nonblocking);
2176
                                  start_lingering_close_nonblocking);
2091
            /* Step 3: (normal) lingering close completion timeouts */
2177
            /* Step 3: (normal) lingering close completion timeouts */
2092
            process_timeout_queue(linger_q, timeout_time, stop_lingering_close);
2178
            process_timeout_queue(linger_q, timeout_time,
2179
                                  stop_lingering_close);
2093
            /* Step 4: (short) lingering close completion timeouts */
2180
            /* Step 4: (short) lingering close completion timeouts */
2094
            process_timeout_queue(short_linger_q, timeout_time, stop_lingering_close);
2181
            process_timeout_queue(short_linger_q, timeout_time,
2182
                                  stop_lingering_close);
2095
2183
2096
            ps = ap_get_scoreboard_process(process_slot);
2097
            ps->write_completion = *write_completion_q->total;
2184
            ps->write_completion = *write_completion_q->total;
2098
            ps->keep_alive = *keepalive_q->total;
2185
            ps->keep_alive = *keepalive_q->total;
2099
            apr_thread_mutex_unlock(timeout_mutex);
2186
            apr_thread_mutex_unlock(timeout_mutex);
Lines 2102-2107 static void * APR_THREAD_FUNC listener_thread(apr_ Link Here
2102
            ps->suspended = apr_atomic_read32(&suspended_count);
2189
            ps->suspended = apr_atomic_read32(&suspended_count);
2103
            ps->lingering_close = apr_atomic_read32(&lingering_count);
2190
            ps->lingering_close = apr_atomic_read32(&lingering_count);
2104
        }
2191
        }
2192
        else if ((workers_were_busy || dying)
2193
                 && (keepalives = VOLATILE_READ(int, *keepalive_q->total))) {
2194
            /* If all workers are busy, we kill older keep-alive connections so
2195
             * that they may connect to another process.
2196
             */
2197
            ap_log_error(APLOG_MARK, APLOG_TRACE1, 0, ap_server_conf,
2198
                         "All workers are %s, will close %d keep-alive "
2199
                         "connections", dying ? "dying" : "busy",
2200
                         keepalives);
2201
            apr_thread_mutex_lock(timeout_mutex);
2202
            process_timeout_queue(keepalive_q, 0,
2203
                                  start_lingering_close_nonblocking);
2204
            ps->keep_alive = 0;
2205
            apr_thread_mutex_unlock(timeout_mutex);
2206
        }
2207
2105
        if (listeners_disabled && !workers_were_busy
2208
        if (listeners_disabled && !workers_were_busy
2106
            && ((c_count = apr_atomic_read32(&connection_count))
2209
            && ((c_count = apr_atomic_read32(&connection_count))
2107
                    >= (l_count = apr_atomic_read32(&lingering_count))
2210
                    >= (l_count = apr_atomic_read32(&lingering_count))
Lines 2314-2319 static void *APR_THREAD_FUNC start_threads(apr_thr Link Here
2314
    int prev_threads_created;
2417
    int prev_threads_created;
2315
    int max_recycled_pools = -1;
2418
    int max_recycled_pools = -1;
2316
    int good_methods[] = {APR_POLLSET_KQUEUE, APR_POLLSET_PORT, APR_POLLSET_EPOLL};
2419
    int good_methods[] = {APR_POLLSET_KQUEUE, APR_POLLSET_PORT, APR_POLLSET_EPOLL};
2420
    /* XXX don't we need more to handle K-A or lingering close? */
2421
    const apr_uint32_t pollset_size = threads_per_child * 2;
2317
2422
2318
    /* We must create the fd queues before we start up the listener
2423
    /* We must create the fd queues before we start up the listener
2319
     * and worker threads. */
2424
     * and worker threads. */
Lines 2353-2376 static void *APR_THREAD_FUNC start_threads(apr_thr Link Here
2353
2458
2354
    /* Create the main pollset */
2459
    /* Create the main pollset */
2355
    for (i = 0; i < sizeof(good_methods) / sizeof(good_methods[0]); i++) {
2460
    for (i = 0; i < sizeof(good_methods) / sizeof(good_methods[0]); i++) {
2356
        rv = apr_pollset_create_ex(&event_pollset,
2461
        apr_uint32_t flags = APR_POLLSET_THREADSAFE | APR_POLLSET_NOCOPY |
2357
                            threads_per_child*2, /* XXX don't we need more, to handle
2462
                             APR_POLLSET_NODEFAULT | APR_POLLSET_WAKEABLE;
2358
                                                * connections in K-A or lingering
2463
        rv = apr_pollset_create_ex(&event_pollset, pollset_size, pchild, flags,
2359
                                                * close?
2464
                                   good_methods[i]);
2360
                                                */
2361
                            pchild, APR_POLLSET_THREADSAFE | APR_POLLSET_NOCOPY | APR_POLLSET_NODEFAULT,
2362
                            good_methods[i]);
2363
        if (rv == APR_SUCCESS) {
2465
        if (rv == APR_SUCCESS) {
2466
            listener_is_wakeable = 1;
2364
            break;
2467
            break;
2365
        }
2468
        }
2469
        flags &= ~APR_POLLSET_WAKEABLE;
2470
        rv = apr_pollset_create_ex(&event_pollset, pollset_size, pchild, flags,
2471
                                   good_methods[i]);
2472
        if (rv == APR_SUCCESS) {
2473
            break;
2474
        }
2366
    }
2475
    }
2367
    if (rv != APR_SUCCESS) {
2476
    if (rv != APR_SUCCESS) {
2368
        rv = apr_pollset_create(&event_pollset,
2477
        rv = apr_pollset_create(&event_pollset, pollset_size, pchild,
2369
                               threads_per_child*2, /* XXX don't we need more, to handle
2478
                                APR_POLLSET_THREADSAFE | APR_POLLSET_NOCOPY);
2370
                                                     * connections in K-A or lingering
2371
                                                     * close?
2372
                                                     */
2373
                               pchild, APR_POLLSET_THREADSAFE | APR_POLLSET_NOCOPY);
2374
    }
2479
    }
2375
    if (rv != APR_SUCCESS) {
2480
    if (rv != APR_SUCCESS) {
2376
        ap_log_error(APLOG_MARK, APLOG_ERR, rv, ap_server_conf, APLOGNO(03103)
2481
        ap_log_error(APLOG_MARK, APLOG_ERR, rv, ap_server_conf, APLOGNO(03103)
Lines 2379-2385 static void *APR_THREAD_FUNC start_threads(apr_thr Link Here
2379
    }
2484
    }
2380
2485
2381
    ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, ap_server_conf, APLOGNO(02471)
2486
    ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, ap_server_conf, APLOGNO(02471)
2382
                 "start_threads: Using %s", apr_pollset_method_name(event_pollset));
2487
                 "start_threads: Using %s (%swakeable)",
2488
                 apr_pollset_method_name(event_pollset),
2489
                 listener_is_wakeable ? "" : "not ");
2383
    worker_sockets = apr_pcalloc(pchild, threads_per_child
2490
    worker_sockets = apr_pcalloc(pchild, threads_per_child
2384
                                 * sizeof(apr_socket_t *));
2491
                                 * sizeof(apr_socket_t *));
2385
2492

Return to bug 57399