--- /usr/local/src/apr-util-1.2.7/misc/apr_queue.c 2005-02-04 14:45:35.000000000 -0600 +++ ./apr_queue-new.c 2006-12-03 12:35:08.000000000 -0600 @@ -12,6 +12,6 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "apr.h" @@ -48,9 +52,17 @@ unsigned int bounds;/**< max size of queue */ unsigned int full_waiters; unsigned int empty_waiters; + unsigned int is_empty_waiters; + unsigned int blocked_push_waiters; + unsigned int blocked_pop_waiters; + unsigned int push_is_blocked; + unsigned int pop_is_blocked; apr_thread_mutex_t *one_big_mutex; apr_thread_cond_t *not_empty; + apr_thread_cond_t *is_empty; apr_thread_cond_t *not_full; + apr_thread_cond_t *not_blocked_push; + apr_thread_cond_t *not_blocked_pop; int terminated; }; @@ -90,6 +102,9 @@ apr_thread_cond_destroy(queue->not_empty); apr_thread_cond_destroy(queue->not_full); + apr_thread_cond_destroy(queue->is_empty); + apr_thread_cond_destroy(queue->not_blocked_push); + apr_thread_cond_destroy(queue->not_blocked_pop); apr_thread_mutex_destroy(queue->one_big_mutex); return APR_SUCCESS; @@ -120,11 +135,27 @@ return rv; } + rv = apr_thread_cond_create(&queue->is_empty, a); + if (rv != APR_SUCCESS) { + return rv; + } + rv = apr_thread_cond_create(&queue->not_full, a); if (rv != APR_SUCCESS) { return rv; } + /* I must say that I am not very pleased with the multiple returns in this file */ + rv = apr_thread_cond_create(&queue->not_blocked_push, a); + if (rv != APR_SUCCESS) { + return rv; + } + + rv = apr_thread_cond_create(&queue->not_blocked_pop, a); + if (rv != APR_SUCCESS) { + return rv; + } + /* Set all the data in the queue to NULL */ queue->data = apr_pcalloc(a, queue_capacity * sizeof(void*)); queue->bounds = queue_capacity; @@ -134,6 +165,11 @@ queue->terminated = 0; queue->full_waiters = 0; queue->empty_waiters = 0; + queue->is_empty_waiters = 0; + queue->blocked_push_waiters = 0; + queue->blocked_pop_waiters = 0; + queue->push_is_blocked = 0; + queue->pop_is_blocked = 0; apr_pool_cleanup_register(a, queue, queue_destroy, apr_pool_cleanup_null); @@ -184,6 +220,22 @@ } } + /* This added so that we can manually block the push if we like */ + if(queue->push_is_blocked) { + queue->blocked_push_waiters++; + rv = apr_thread_cond_wait(queue->not_blocked_push, queue->one_big_mutex); + queue->blocked_push_waiters--; + if (rv != APR_SUCCESS) { + apr_thread_mutex_unlock(queue->one_big_mutex); + return rv; + } + } + + /* In case it was terminated while waiting on queue->not_blocked_push */ + if (queue->terminated) { + return APR_EOF; /* no more elements ever again */ + } + queue->data[queue->in] = data; queue->in = (queue->in + 1) % queue->bounds; queue->nelts++; @@ -219,7 +271,7 @@ return rv; } - if (apr_queue_full(queue)) { + if (apr_queue_full(queue) || queue->push_is_blocked) { rv = apr_thread_mutex_unlock(queue->one_big_mutex); return APR_EAGAIN; } @@ -242,6 +294,44 @@ } /** + * Manually block pushing onto the queue. + */ +APU_DECLARE(apr_status_t) apr_queue_blockpush(apr_queue_t *queue) +{ + /* Is this mutex acquiring necessary for this? */ + apr_status_t rv; + if(!queue->terminated) { + rv = apr_thread_mutex_lock(queue->one_big_mutex); + if(rv == APR_SUCCESS) { + queue->push_is_blocked = 1; + } + rv = apr_thread_mutex_unlock(queue->one_big_mutex); + } + return rv; +} + +/** + * Manually unblock pushing onto the queue. + */ +APU_DECLARE(apr_status_t) apr_queue_unblockpush(apr_queue_t *queue) +{ + /* Is this mutex acquiring necessary for this? */ + apr_status_t rv; + if(!queue->terminated) { + rv = apr_thread_mutex_lock(queue->one_big_mutex); + if(rv == APR_SUCCESS) { + queue->push_is_blocked = 0; + if(queue->blocked_push_waiters) { + rv = apr_thread_cond_signal(queue->not_blocked_push); + } + } + rv = apr_thread_mutex_unlock(queue->one_big_mutex); + } + + return rv; +} + +/** * not thread safe */ APU_DECLARE(unsigned int) apr_queue_size(apr_queue_t *queue) { @@ -294,6 +384,22 @@ } } + /* This added so that we can manually block the pop if we like */ + if(queue->pop_is_blocked) { + queue->blocked_pop_waiters++; + rv = apr_thread_cond_wait(queue->not_blocked_pop, queue->one_big_mutex); + queue->blocked_pop_waiters--; + if (rv != APR_SUCCESS) { + apr_thread_mutex_unlock(queue->one_big_mutex); + return rv; + } + } + + /* In case it was terminated while waiting on queue->not_blocked_push */ + if (queue->terminated) { + return APR_EOF; /* no more elements ever again */ + } + *data = queue->data[queue->out]; queue->nelts--; @@ -307,6 +413,15 @@ } } + if (queue->nelts < 1 && queue->is_empty_waiters > 0) { + Q_DBG("signal empty", queue); + rv = apr_thread_cond_signal(queue->is_empty); + if (rv != APR_SUCCESS) { + apr_thread_mutex_unlock(queue->one_big_mutex); + return rv; + } + } + rv = apr_thread_mutex_unlock(queue->one_big_mutex); return rv; } @@ -329,7 +444,7 @@ return rv; } - if (apr_queue_empty(queue)) { + if (apr_queue_empty(queue) || queue->pop_is_blocked) { rv = apr_thread_mutex_unlock(queue->one_big_mutex); return APR_EAGAIN; } @@ -351,6 +466,64 @@ return rv; } +/** + * Manually block popping from the queue. + */ +APU_DECLARE(apr_status_t) apr_queue_blockpop(apr_queue_t *queue) +{ + /* Is this mutex acquiring necessary for this? */ + apr_status_t rv; + if(!queue->terminated) { + rv = apr_thread_mutex_lock(queue->one_big_mutex); + if(rv == APR_SUCCESS) { + queue->pop_is_blocked = 1; + } + apr_thread_mutex_unlock(queue->one_big_mutex); + } + return rv; +} + +/** + * Manually unblock popping from the queue. + */ +APU_DECLARE(apr_status_t) apr_queue_unblockpop(apr_queue_t *queue) +{ + /* Is this mutex acquiring necessary for this? */ + apr_status_t rv; + if(!queue->terminated) { + rv = apr_thread_mutex_lock(queue->one_big_mutex); + if(rv == APR_SUCCESS) { + queue->pop_is_blocked = 0; + if(queue->blocked_pop_waiters) { + rv = apr_thread_cond_signal(queue->not_blocked_pop); + } + } + apr_thread_mutex_unlock(queue->one_big_mutex); + } + return rv; +} + +/** + * Think of this how a toilet works - something "tells" the water + * when the tank is empty. + */ +APU_DECLARE(apr_status_t) apr_queue_isempty(apr_queue_t *queue) +{ + apr_status_t rv = APR_SUCCESS; + if(!queue->terminated) { + rv = apr_thread_mutex_lock(queue->one_big_mutex); + if(rv == APR_SUCCESS) { + queue->is_empty_waiters++; + if(queue->nelts) { + rv = apr_thread_cond_wait(queue->is_empty, queue->one_big_mutex); + queue->is_empty_waiters--; + } + } + rv = apr_thread_mutex_unlock(queue->one_big_mutex); + } + return rv; +} + APU_DECLARE(apr_status_t) apr_queue_interrupt_all(apr_queue_t *queue) { apr_status_t rv; @@ -359,7 +532,10 @@ return rv; } apr_thread_cond_broadcast(queue->not_empty); + apr_thread_cond_broadcast(queue->is_empty); apr_thread_cond_broadcast(queue->not_full); + apr_thread_cond_broadcast(queue->not_blocked_push); + apr_thread_cond_broadcast(queue->not_blocked_pop); if ((rv = apr_thread_mutex_unlock(queue->one_big_mutex)) != APR_SUCCESS) { return rv;