Lines 12-17
Link Here
|
12 |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 |
* See the License for the specific language governing permissions and |
13 |
* See the License for the specific language governing permissions and |
14 |
* limitations under the License. |
14 |
* limitations under the License. |
15 |
*/ |
15 |
*/ |
16 |
|
16 |
|
17 |
#include "apr.h" |
17 |
#include "apr.h" |
Lines 48-56
Link Here
|
48 |
unsigned int bounds;/**< max size of queue */ |
52 |
unsigned int bounds;/**< max size of queue */ |
49 |
unsigned int full_waiters; |
53 |
unsigned int full_waiters; |
50 |
unsigned int empty_waiters; |
54 |
unsigned int empty_waiters; |
|
|
55 |
unsigned int is_empty_waiters; |
56 |
unsigned int blocked_push_waiters; |
57 |
unsigned int blocked_pop_waiters; |
58 |
unsigned int push_is_blocked; |
59 |
unsigned int pop_is_blocked; |
51 |
apr_thread_mutex_t *one_big_mutex; |
60 |
apr_thread_mutex_t *one_big_mutex; |
52 |
apr_thread_cond_t *not_empty; |
61 |
apr_thread_cond_t *not_empty; |
|
|
62 |
apr_thread_cond_t *is_empty; |
53 |
apr_thread_cond_t *not_full; |
63 |
apr_thread_cond_t *not_full; |
|
|
64 |
apr_thread_cond_t *not_blocked_push; |
65 |
apr_thread_cond_t *not_blocked_pop; |
54 |
int terminated; |
66 |
int terminated; |
55 |
}; |
67 |
}; |
56 |
|
68 |
|
Lines 90-95
Link Here
|
90 |
|
102 |
|
91 |
apr_thread_cond_destroy(queue->not_empty); |
103 |
apr_thread_cond_destroy(queue->not_empty); |
92 |
apr_thread_cond_destroy(queue->not_full); |
104 |
apr_thread_cond_destroy(queue->not_full); |
|
|
105 |
apr_thread_cond_destroy(queue->is_empty); |
106 |
apr_thread_cond_destroy(queue->not_blocked_push); |
107 |
apr_thread_cond_destroy(queue->not_blocked_pop); |
93 |
apr_thread_mutex_destroy(queue->one_big_mutex); |
108 |
apr_thread_mutex_destroy(queue->one_big_mutex); |
94 |
|
109 |
|
95 |
return APR_SUCCESS; |
110 |
return APR_SUCCESS; |
Lines 120-130
Link Here
|
120 |
return rv; |
135 |
return rv; |
121 |
} |
136 |
} |
122 |
|
137 |
|
|
|
138 |
rv = apr_thread_cond_create(&queue->is_empty, a); |
139 |
if (rv != APR_SUCCESS) { |
140 |
return rv; |
141 |
} |
142 |
|
123 |
rv = apr_thread_cond_create(&queue->not_full, a); |
143 |
rv = apr_thread_cond_create(&queue->not_full, a); |
124 |
if (rv != APR_SUCCESS) { |
144 |
if (rv != APR_SUCCESS) { |
125 |
return rv; |
145 |
return rv; |
126 |
} |
146 |
} |
127 |
|
147 |
|
|
|
148 |
/* I must say that I am not very pleased with the multiple returns in this file */ |
149 |
rv = apr_thread_cond_create(&queue->not_blocked_push, a); |
150 |
if (rv != APR_SUCCESS) { |
151 |
return rv; |
152 |
} |
153 |
|
154 |
rv = apr_thread_cond_create(&queue->not_blocked_pop, a); |
155 |
if (rv != APR_SUCCESS) { |
156 |
return rv; |
157 |
} |
158 |
|
128 |
/* Set all the data in the queue to NULL */ |
159 |
/* Set all the data in the queue to NULL */ |
129 |
queue->data = apr_pcalloc(a, queue_capacity * sizeof(void*)); |
160 |
queue->data = apr_pcalloc(a, queue_capacity * sizeof(void*)); |
130 |
queue->bounds = queue_capacity; |
161 |
queue->bounds = queue_capacity; |
Lines 134-139
Link Here
|
134 |
queue->terminated = 0; |
165 |
queue->terminated = 0; |
135 |
queue->full_waiters = 0; |
166 |
queue->full_waiters = 0; |
136 |
queue->empty_waiters = 0; |
167 |
queue->empty_waiters = 0; |
|
|
168 |
queue->is_empty_waiters = 0; |
169 |
queue->blocked_push_waiters = 0; |
170 |
queue->blocked_pop_waiters = 0; |
171 |
queue->push_is_blocked = 0; |
172 |
queue->pop_is_blocked = 0; |
137 |
|
173 |
|
138 |
apr_pool_cleanup_register(a, queue, queue_destroy, apr_pool_cleanup_null); |
174 |
apr_pool_cleanup_register(a, queue, queue_destroy, apr_pool_cleanup_null); |
139 |
|
175 |
|
Lines 184-189
Link Here
|
184 |
} |
220 |
} |
185 |
} |
221 |
} |
186 |
|
222 |
|
|
|
223 |
/* This added so that we can manually block the push if we like */ |
224 |
if(queue->push_is_blocked) { |
225 |
queue->blocked_push_waiters++; |
226 |
rv = apr_thread_cond_wait(queue->not_blocked_push, queue->one_big_mutex); |
227 |
queue->blocked_push_waiters--; |
228 |
if (rv != APR_SUCCESS) { |
229 |
apr_thread_mutex_unlock(queue->one_big_mutex); |
230 |
return rv; |
231 |
} |
232 |
} |
233 |
|
234 |
/* In case it was terminated while waiting on queue->not_blocked_push */ |
235 |
if (queue->terminated) { |
236 |
return APR_EOF; /* no more elements ever again */ |
237 |
} |
238 |
|
187 |
queue->data[queue->in] = data; |
239 |
queue->data[queue->in] = data; |
188 |
queue->in = (queue->in + 1) % queue->bounds; |
240 |
queue->in = (queue->in + 1) % queue->bounds; |
189 |
queue->nelts++; |
241 |
queue->nelts++; |
Lines 219-225
Link Here
|
219 |
return rv; |
271 |
return rv; |
220 |
} |
272 |
} |
221 |
|
273 |
|
222 |
if (apr_queue_full(queue)) { |
274 |
if (apr_queue_full(queue) || queue->push_is_blocked) { |
223 |
rv = apr_thread_mutex_unlock(queue->one_big_mutex); |
275 |
rv = apr_thread_mutex_unlock(queue->one_big_mutex); |
224 |
return APR_EAGAIN; |
276 |
return APR_EAGAIN; |
225 |
} |
277 |
} |
Lines 242-247
Link Here
|
242 |
} |
294 |
} |
243 |
|
295 |
|
244 |
/** |
296 |
/** |
|
|
297 |
* Manually block pushing onto the queue. |
298 |
*/ |
299 |
APU_DECLARE(apr_status_t) apr_queue_blockpush(apr_queue_t *queue) |
300 |
{ |
301 |
/* Is this mutex acquiring necessary for this? */ |
302 |
apr_status_t rv; |
303 |
if(!queue->terminated) { |
304 |
rv = apr_thread_mutex_lock(queue->one_big_mutex); |
305 |
if(rv == APR_SUCCESS) { |
306 |
queue->push_is_blocked = 1; |
307 |
} |
308 |
rv = apr_thread_mutex_unlock(queue->one_big_mutex); |
309 |
} |
310 |
return rv; |
311 |
} |
312 |
|
313 |
/** |
314 |
* Manually unblock pushing onto the queue. |
315 |
*/ |
316 |
APU_DECLARE(apr_status_t) apr_queue_unblockpush(apr_queue_t *queue) |
317 |
{ |
318 |
/* Is this mutex acquiring necessary for this? */ |
319 |
apr_status_t rv; |
320 |
if(!queue->terminated) { |
321 |
rv = apr_thread_mutex_lock(queue->one_big_mutex); |
322 |
if(rv == APR_SUCCESS) { |
323 |
queue->push_is_blocked = 0; |
324 |
if(queue->blocked_push_waiters) { |
325 |
rv = apr_thread_cond_signal(queue->not_blocked_push); |
326 |
} |
327 |
} |
328 |
rv = apr_thread_mutex_unlock(queue->one_big_mutex); |
329 |
} |
330 |
|
331 |
return rv; |
332 |
} |
333 |
|
334 |
/** |
245 |
* not thread safe |
335 |
* not thread safe |
246 |
*/ |
336 |
*/ |
247 |
APU_DECLARE(unsigned int) apr_queue_size(apr_queue_t *queue) { |
337 |
APU_DECLARE(unsigned int) apr_queue_size(apr_queue_t *queue) { |
Lines 294-299
Link Here
|
294 |
} |
384 |
} |
295 |
} |
385 |
} |
296 |
|
386 |
|
|
|
387 |
/* This added so that we can manually block the pop if we like */ |
388 |
if(queue->pop_is_blocked) { |
389 |
queue->blocked_pop_waiters++; |
390 |
rv = apr_thread_cond_wait(queue->not_blocked_pop, queue->one_big_mutex); |
391 |
queue->blocked_pop_waiters--; |
392 |
if (rv != APR_SUCCESS) { |
393 |
apr_thread_mutex_unlock(queue->one_big_mutex); |
394 |
return rv; |
395 |
} |
396 |
} |
397 |
|
398 |
/* In case it was terminated while waiting on queue->not_blocked_push */ |
399 |
if (queue->terminated) { |
400 |
return APR_EOF; /* no more elements ever again */ |
401 |
} |
402 |
|
297 |
*data = queue->data[queue->out]; |
403 |
*data = queue->data[queue->out]; |
298 |
queue->nelts--; |
404 |
queue->nelts--; |
299 |
|
405 |
|
Lines 307-312
Link Here
|
307 |
} |
413 |
} |
308 |
} |
414 |
} |
309 |
|
415 |
|
|
|
416 |
if (queue->nelts < 1 && queue->is_empty_waiters > 0) { |
417 |
Q_DBG("signal empty", queue); |
418 |
rv = apr_thread_cond_signal(queue->is_empty); |
419 |
if (rv != APR_SUCCESS) { |
420 |
apr_thread_mutex_unlock(queue->one_big_mutex); |
421 |
return rv; |
422 |
} |
423 |
} |
424 |
|
310 |
rv = apr_thread_mutex_unlock(queue->one_big_mutex); |
425 |
rv = apr_thread_mutex_unlock(queue->one_big_mutex); |
311 |
return rv; |
426 |
return rv; |
312 |
} |
427 |
} |
Lines 329-335
Link Here
|
329 |
return rv; |
444 |
return rv; |
330 |
} |
445 |
} |
331 |
|
446 |
|
332 |
if (apr_queue_empty(queue)) { |
447 |
if (apr_queue_empty(queue) || queue->pop_is_blocked) { |
333 |
rv = apr_thread_mutex_unlock(queue->one_big_mutex); |
448 |
rv = apr_thread_mutex_unlock(queue->one_big_mutex); |
334 |
return APR_EAGAIN; |
449 |
return APR_EAGAIN; |
335 |
} |
450 |
} |
Lines 351-356
Link Here
|
351 |
return rv; |
466 |
return rv; |
352 |
} |
467 |
} |
353 |
|
468 |
|
|
|
469 |
/** |
470 |
* Manually block popping from the queue. |
471 |
*/ |
472 |
APU_DECLARE(apr_status_t) apr_queue_blockpop(apr_queue_t *queue) |
473 |
{ |
474 |
/* Is this mutex acquiring necessary for this? */ |
475 |
apr_status_t rv; |
476 |
if(!queue->terminated) { |
477 |
rv = apr_thread_mutex_lock(queue->one_big_mutex); |
478 |
if(rv == APR_SUCCESS) { |
479 |
queue->pop_is_blocked = 1; |
480 |
} |
481 |
apr_thread_mutex_unlock(queue->one_big_mutex); |
482 |
} |
483 |
return rv; |
484 |
} |
485 |
|
486 |
/** |
487 |
* Manually unblock popping from the queue. |
488 |
*/ |
489 |
APU_DECLARE(apr_status_t) apr_queue_unblockpop(apr_queue_t *queue) |
490 |
{ |
491 |
/* Is this mutex acquiring necessary for this? */ |
492 |
apr_status_t rv; |
493 |
if(!queue->terminated) { |
494 |
rv = apr_thread_mutex_lock(queue->one_big_mutex); |
495 |
if(rv == APR_SUCCESS) { |
496 |
queue->pop_is_blocked = 0; |
497 |
if(queue->blocked_pop_waiters) { |
498 |
rv = apr_thread_cond_signal(queue->not_blocked_pop); |
499 |
} |
500 |
} |
501 |
apr_thread_mutex_unlock(queue->one_big_mutex); |
502 |
} |
503 |
return rv; |
504 |
} |
505 |
|
506 |
/** |
507 |
* Think of this how a toilet works - something "tells" the water |
508 |
* when the tank is empty. |
509 |
*/ |
510 |
APU_DECLARE(apr_status_t) apr_queue_isempty(apr_queue_t *queue) |
511 |
{ |
512 |
apr_status_t rv = APR_SUCCESS; |
513 |
if(!queue->terminated) { |
514 |
rv = apr_thread_mutex_lock(queue->one_big_mutex); |
515 |
if(rv == APR_SUCCESS) { |
516 |
queue->is_empty_waiters++; |
517 |
if(queue->nelts) { |
518 |
rv = apr_thread_cond_wait(queue->is_empty, queue->one_big_mutex); |
519 |
queue->is_empty_waiters--; |
520 |
} |
521 |
} |
522 |
rv = apr_thread_mutex_unlock(queue->one_big_mutex); |
523 |
} |
524 |
return rv; |
525 |
} |
526 |
|
354 |
APU_DECLARE(apr_status_t) apr_queue_interrupt_all(apr_queue_t *queue) |
527 |
APU_DECLARE(apr_status_t) apr_queue_interrupt_all(apr_queue_t *queue) |
355 |
{ |
528 |
{ |
356 |
apr_status_t rv; |
529 |
apr_status_t rv; |
Lines 359-365
Link Here
|
359 |
return rv; |
532 |
return rv; |
360 |
} |
533 |
} |
361 |
apr_thread_cond_broadcast(queue->not_empty); |
534 |
apr_thread_cond_broadcast(queue->not_empty); |
|
|
535 |
apr_thread_cond_broadcast(queue->is_empty); |
362 |
apr_thread_cond_broadcast(queue->not_full); |
536 |
apr_thread_cond_broadcast(queue->not_full); |
|
|
537 |
apr_thread_cond_broadcast(queue->not_blocked_push); |
538 |
apr_thread_cond_broadcast(queue->not_blocked_pop); |
363 |
|
539 |
|
364 |
if ((rv = apr_thread_mutex_unlock(queue->one_big_mutex)) != APR_SUCCESS) { |
540 |
if ((rv = apr_thread_mutex_unlock(queue->one_big_mutex)) != APR_SUCCESS) { |
365 |
return rv; |
541 |
return rv; |