Lines 52-57
Link Here
|
52 |
|
52 |
|
53 |
#include "mod_so.h" /* for ap_find_loaded_module_symbol */ |
53 |
#include "mod_so.h" /* for ap_find_loaded_module_symbol */ |
54 |
|
54 |
|
|
|
55 |
#ifdef AP_DEBUG_IO |
56 |
#define CORE_LOG_LEVEL_IO APLOG_NOTICE |
57 |
#else |
58 |
#define CORE_LOG_LEVEL_IO APLOG_TRACE6 |
59 |
#endif |
60 |
|
55 |
#define AP_MIN_SENDFILE_BYTES (256) |
61 |
#define AP_MIN_SENDFILE_BYTES (256) |
56 |
|
62 |
|
57 |
/** |
63 |
/** |
Lines 78-87
do { \
Link Here
|
78 |
#define APLOG_MODULE_INDEX AP_CORE_MODULE_INDEX |
84 |
#define APLOG_MODULE_INDEX AP_CORE_MODULE_INDEX |
79 |
|
85 |
|
80 |
struct core_output_filter_ctx { |
86 |
struct core_output_filter_ctx { |
|
|
87 |
core_server_config *conf; |
81 |
apr_bucket_brigade *buffered_bb; |
88 |
apr_bucket_brigade *buffered_bb; |
82 |
apr_bucket_brigade *tmp_flush_bb; |
89 |
apr_bucket_brigade *tmp_flush_bb; |
83 |
apr_pool_t *deferred_write_pool; |
90 |
apr_pool_t *deferred_write_pool; |
84 |
apr_size_t bytes_written; |
91 |
apr_size_t bytes_written; |
|
|
92 |
struct iovec *vec; |
93 |
apr_size_t nvec; |
85 |
}; |
94 |
}; |
86 |
|
95 |
|
87 |
struct core_filter_ctx { |
96 |
struct core_filter_ctx { |
Lines 335-341
static void setaside_remaining_output(ap_filter_t
Link Here
|
335 |
|
344 |
|
336 |
static apr_status_t send_brigade_nonblocking(apr_socket_t *s, |
345 |
static apr_status_t send_brigade_nonblocking(apr_socket_t *s, |
337 |
apr_bucket_brigade *bb, |
346 |
apr_bucket_brigade *bb, |
338 |
apr_size_t *bytes_written, |
347 |
core_output_filter_ctx_t *ctx, |
339 |
conn_rec *c); |
348 |
conn_rec *c); |
340 |
|
349 |
|
341 |
static void remove_empty_buckets(apr_bucket_brigade *bb); |
350 |
static void remove_empty_buckets(apr_bucket_brigade *bb); |
Lines 342-368
static void remove_empty_buckets(apr_bucket_brigad
Link Here
|
342 |
|
351 |
|
343 |
static apr_status_t send_brigade_blocking(apr_socket_t *s, |
352 |
static apr_status_t send_brigade_blocking(apr_socket_t *s, |
344 |
apr_bucket_brigade *bb, |
353 |
apr_bucket_brigade *bb, |
345 |
apr_size_t *bytes_written, |
354 |
core_output_filter_ctx_t *ctx, |
346 |
conn_rec *c); |
355 |
conn_rec *c); |
347 |
|
356 |
|
348 |
static apr_status_t writev_nonblocking(apr_socket_t *s, |
357 |
static apr_status_t writev_nonblocking(apr_socket_t *s, |
349 |
struct iovec *vec, apr_size_t nvec, |
|
|
350 |
apr_bucket_brigade *bb, |
358 |
apr_bucket_brigade *bb, |
351 |
apr_size_t *cumulative_bytes_written, |
359 |
core_output_filter_ctx_t *ctx, |
|
|
360 |
apr_size_t bytes_to_write, |
361 |
apr_size_t nvec, |
352 |
conn_rec *c); |
362 |
conn_rec *c); |
353 |
|
363 |
|
354 |
#if APR_HAS_SENDFILE |
364 |
#if APR_HAS_SENDFILE |
355 |
static apr_status_t sendfile_nonblocking(apr_socket_t *s, |
365 |
static apr_status_t sendfile_nonblocking(apr_socket_t *s, |
356 |
apr_bucket *bucket, |
366 |
apr_bucket *bucket, |
357 |
apr_size_t *cumulative_bytes_written, |
367 |
core_output_filter_ctx_t *ctx, |
358 |
conn_rec *c); |
368 |
conn_rec *c); |
359 |
#endif |
369 |
#endif |
360 |
|
370 |
|
361 |
/* XXX: Should these be configurable parameters? */ |
371 |
static apr_status_t should_flush(apr_bucket_brigade *bb, |
362 |
#define THRESHOLD_MIN_WRITE 4096 |
372 |
apr_bucket **flush_upto, |
363 |
#define THRESHOLD_MAX_BUFFER 65536 |
373 |
core_output_filter_ctx_t *ctx, |
364 |
#define MAX_REQUESTS_IN_PIPELINE 5 |
374 |
conn_rec *c, int first) |
|
|
375 |
{ |
376 |
core_server_config *conf = ctx->conf; |
377 |
apr_size_t total_bytes = 0, |
378 |
non_file_bytes = 0; |
379 |
apr_uint32_t eor_buckets = 0; |
380 |
apr_bucket *bucket, *next; |
365 |
|
381 |
|
|
|
382 |
*flush_upto = NULL; |
383 |
|
384 |
/* Scan through the brigade and decide whether to attempt a write, how |
385 |
* to write (blocking vs nonblocking), and how much to do blocking write |
386 |
* if needed, based on the following rules: |
387 |
* |
388 |
* 1) Determine if and up to which bucket we need to do a blocking |
389 |
* write (i.e. set *flush_to): |
390 |
* |
391 |
* a) The brigade contains a flush bucket: Do a blocking write |
392 |
* of everything up that point. |
393 |
* |
394 |
* b) The request is in CONN_STATE_HANDLER state, and the brigade |
395 |
* contains at least flush_max_threshold bytes in non-file |
396 |
* buckets: Do blocking writes until the amount of data in the |
397 |
* buffer is less than flush_max_threshold. (The point of this |
398 |
* rule is to provide flow control, in case a handler is |
399 |
* streaming out lots of data faster than the data can be |
400 |
* sent to the client.) |
401 |
* |
402 |
* c) The request is in CONN_STATE_HANDLER state, and the brigade |
403 |
* contains at least flush_max_pipelined EOR buckets: |
404 |
* Do blocking writes until less than flush_max_pipelined EOR |
405 |
* buckets are left. (The point of this rule is to prevent too many |
406 |
* FDs being kept open by pipelined requests, possibly allowing a |
407 |
* DoS). |
408 |
* |
409 |
* d) The brigade contains a morphing bucket: If there was no other |
410 |
* reason to do a blocking write yet, try reading the bucket. If its |
411 |
* contents fit into memory before flush_max_threshold is reached, |
412 |
* everything is fine. Otherwise we need to do a blocking write the |
413 |
* up to and including the morphing bucket, because ap_save_brigade() |
414 |
* would read the whole bucket into memory later on. |
415 |
* |
416 |
* 2) Actually do the blocking write up to the last bucket determined |
417 |
* by rules 2a-d. The point of doing only one flush is to make as |
418 |
* few calls to writev() as possible. |
419 |
* |
420 |
* 3) If the brigade contains at least THRESHOLD_MIN_WRITE |
421 |
* bytes, or a file bucket (to avoid setting it aside): return |
422 |
* APR_INCOMPLETE to do a nonblocking write of as much data as |
423 |
* possible, then save the rest. |
424 |
*/ |
425 |
for (bucket = APR_BRIGADE_FIRST(bb); |
426 |
bucket != APR_BRIGADE_SENTINEL(bb); |
427 |
bucket = next) { |
428 |
next = APR_BUCKET_NEXT(bucket); |
429 |
|
430 |
if (!APR_BUCKET_IS_METADATA(bucket)) { |
431 |
if (bucket->length == (apr_size_t)-1) { |
432 |
/* |
433 |
* A setaside of morphing buckets would read everything into |
434 |
* memory. Instead, we will flush everything up to and |
435 |
* including this bucket. |
436 |
*/ |
437 |
ap_log_cerror(APLOG_MARK, CORE_LOG_LEVEL_IO, 0, c, |
438 |
"core_output_filter: flushing because of " |
439 |
"morphing bucket"); |
440 |
*flush_upto = next; |
441 |
} |
442 |
else { |
443 |
total_bytes += bucket->length; |
444 |
if (!APR_BUCKET_IS_FILE(bucket)) { |
445 |
non_file_bytes += bucket->length; |
446 |
if (non_file_bytes >= conf->flush_max_threshold) { |
447 |
ap_log_cerror(APLOG_MARK, CORE_LOG_LEVEL_IO, 0, c, |
448 |
"core_output_filter: flushing because of " |
449 |
"max threshold"); |
450 |
*flush_upto = next; |
451 |
} |
452 |
} |
453 |
} |
454 |
} |
455 |
else if (APR_BUCKET_IS_FLUSH(bucket)) { |
456 |
ap_log_cerror(APLOG_MARK, CORE_LOG_LEVEL_IO, 0, c, |
457 |
"core_output_filter: flushing because of " |
458 |
"FLUSH bucket"); |
459 |
*flush_upto = next; |
460 |
} |
461 |
else if (AP_BUCKET_IS_EOR(bucket)) { |
462 |
if (eor_buckets > conf->flush_max_pipelined) { |
463 |
ap_log_cerror(APLOG_MARK, CORE_LOG_LEVEL_IO, 0, c, |
464 |
"core_output_filter: flushing because of " |
465 |
"max pipelined"); |
466 |
*flush_upto = next; |
467 |
} |
468 |
} |
469 |
|
470 |
if (*flush_upto) { |
471 |
if (first) { |
472 |
return APR_SUCCESS; |
473 |
} |
474 |
total_bytes = 0; |
475 |
non_file_bytes = 0; |
476 |
eor_buckets = 0; |
477 |
} |
478 |
} |
479 |
if (total_bytes >= conf->flush_min_threshold |
480 |
|| total_bytes > non_file_bytes) { |
481 |
return APR_INCOMPLETE; |
482 |
} |
483 |
else { |
484 |
return APR_SUCCESS; |
485 |
} |
486 |
} |
487 |
|
366 |
/* Optional function coming from mod_logio, used for logging of output |
488 |
/* Optional function coming from mod_logio, used for logging of output |
367 |
* traffic |
489 |
* traffic |
368 |
*/ |
490 |
*/ |
Lines 373-382
apr_status_t ap_core_output_filter(ap_filter_t *f,
Link Here
|
373 |
conn_rec *c = f->c; |
495 |
conn_rec *c = f->c; |
374 |
core_net_rec *net = f->ctx; |
496 |
core_net_rec *net = f->ctx; |
375 |
core_output_filter_ctx_t *ctx = net->out_ctx; |
497 |
core_output_filter_ctx_t *ctx = net->out_ctx; |
|
|
498 |
apr_interval_time_t old_timeout = 0; |
376 |
apr_bucket_brigade *bb = NULL; |
499 |
apr_bucket_brigade *bb = NULL; |
377 |
apr_bucket *bucket, *next, *flush_upto = NULL; |
500 |
apr_bucket *flush_upto; |
378 |
apr_size_t bytes_in_brigade, non_file_bytes_in_brigade; |
501 |
apr_socket_t *s; |
379 |
int eor_buckets_in_brigade, morphing_bucket_in_brigade; |
|
|
380 |
apr_status_t rv; |
502 |
apr_status_t rv; |
381 |
|
503 |
|
382 |
/* Fail quickly if the connection has already been aborted. */ |
504 |
/* Fail quickly if the connection has already been aborted. */ |
Lines 390-395
apr_status_t ap_core_output_filter(ap_filter_t *f,
Link Here
|
390 |
if (ctx == NULL) { |
512 |
if (ctx == NULL) { |
391 |
ctx = apr_pcalloc(c->pool, sizeof(*ctx)); |
513 |
ctx = apr_pcalloc(c->pool, sizeof(*ctx)); |
392 |
net->out_ctx = (core_output_filter_ctx_t *)ctx; |
514 |
net->out_ctx = (core_output_filter_ctx_t *)ctx; |
|
|
515 |
ctx->conf = ap_get_core_module_config(c->base_server->module_config); |
393 |
/* |
516 |
/* |
394 |
* Need to create tmp brigade with correct lifetime. Passing |
517 |
* Need to create tmp brigade with correct lifetime. Passing |
395 |
* NULL to apr_brigade_split_ex would result in a brigade |
518 |
* NULL to apr_brigade_split_ex would result in a brigade |
Lines 417-570
apr_status_t ap_core_output_filter(ap_filter_t *f,
Link Here
|
417 |
return APR_SUCCESS; |
540 |
return APR_SUCCESS; |
418 |
} |
541 |
} |
419 |
|
542 |
|
420 |
/* Scan through the brigade and decide whether to attempt a write, |
543 |
s = net->client_socket; |
421 |
* and how much to write, based on the following rules: |
|
|
422 |
* |
423 |
* 1) The new_bb is null: Do a nonblocking write of as much as |
424 |
* possible: do a nonblocking write of as much data as possible, |
425 |
* then save the rest in ctx->buffered_bb. (If new_bb == NULL, |
426 |
* it probably means that the MPM is doing asynchronous write |
427 |
* completion and has just determined that this connection |
428 |
* is writable.) |
429 |
* |
430 |
* 2) Determine if and up to which bucket we need to do a blocking |
431 |
* write: |
432 |
* |
433 |
* a) The brigade contains a flush bucket: Do a blocking write |
434 |
* of everything up that point. |
435 |
* |
436 |
* b) The request is in CONN_STATE_HANDLER state, and the brigade |
437 |
* contains at least THRESHOLD_MAX_BUFFER bytes in non-file |
438 |
* buckets: Do blocking writes until the amount of data in the |
439 |
* buffer is less than THRESHOLD_MAX_BUFFER. (The point of this |
440 |
* rule is to provide flow control, in case a handler is |
441 |
* streaming out lots of data faster than the data can be |
442 |
* sent to the client.) |
443 |
* |
444 |
* c) The request is in CONN_STATE_HANDLER state, and the brigade |
445 |
* contains at least MAX_REQUESTS_IN_PIPELINE EOR buckets: |
446 |
* Do blocking writes until less than MAX_REQUESTS_IN_PIPELINE EOR |
447 |
* buckets are left. (The point of this rule is to prevent too many |
448 |
* FDs being kept open by pipelined requests, possibly allowing a |
449 |
* DoS). |
450 |
* |
451 |
* d) The brigade contains a morphing bucket: If there was no other |
452 |
* reason to do a blocking write yet, try reading the bucket. If its |
453 |
* contents fit into memory before THRESHOLD_MAX_BUFFER is reached, |
454 |
* everything is fine. Otherwise we need to do a blocking write the |
455 |
* up to and including the morphing bucket, because ap_save_brigade() |
456 |
* would read the whole bucket into memory later on. |
457 |
* |
458 |
* 3) Actually do the blocking write up to the last bucket determined |
459 |
* by rules 2a-d. The point of doing only one flush is to make as |
460 |
* few calls to writev() as possible. |
461 |
* |
462 |
* 4) If the brigade contains at least THRESHOLD_MIN_WRITE |
463 |
* bytes: Do a nonblocking write of as much data as possible, |
464 |
* then save the rest in ctx->buffered_bb. |
465 |
*/ |
466 |
|
544 |
|
467 |
if (new_bb == NULL) { |
545 |
/* Socket will not block in any case, yet restored before leaving. */ |
468 |
rv = send_brigade_nonblocking(net->client_socket, bb, |
546 |
apr_socket_timeout_get(s, &old_timeout); |
469 |
&(ctx->bytes_written), c); |
547 |
apr_socket_timeout_set(s, 0); |
470 |
if (rv != APR_SUCCESS && !APR_STATUS_IS_EAGAIN(rv)) { |
|
|
471 |
/* The client has aborted the connection */ |
472 |
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, rv, c, |
473 |
"core_output_filter: writing data to the network"); |
474 |
apr_brigade_cleanup(bb); |
475 |
c->aborted = 1; |
476 |
return rv; |
477 |
} |
478 |
setaside_remaining_output(f, ctx, bb, c); |
479 |
return APR_SUCCESS; |
480 |
} |
481 |
|
548 |
|
482 |
bytes_in_brigade = 0; |
549 |
/* If new_bb is NULL, the caller is doing asynchronous write completion |
483 |
non_file_bytes_in_brigade = 0; |
550 |
* and has just determined that this connection was writable: do an |
484 |
eor_buckets_in_brigade = 0; |
551 |
* immediate nonblocking write of as much data as possible; Otherwise |
485 |
morphing_bucket_in_brigade = 0; |
552 |
* apply should_flush() rules against bb = ctx->buffered_bb + new_bb. |
486 |
|
553 |
*/ |
487 |
for (bucket = APR_BRIGADE_FIRST(bb); bucket != APR_BRIGADE_SENTINEL(bb); |
554 |
rv = APR_INCOMPLETE; |
488 |
bucket = next) { |
555 |
if (new_bb) { |
489 |
next = APR_BUCKET_NEXT(bucket); |
556 |
rv = should_flush(bb, &flush_upto, ctx, c, 0); |
490 |
|
557 |
if (flush_upto) { |
491 |
if (!APR_BUCKET_IS_METADATA(bucket)) { |
558 |
ctx->tmp_flush_bb = apr_brigade_split_ex(bb, flush_upto, |
492 |
if (bucket->length == (apr_size_t)-1) { |
559 |
ctx->tmp_flush_bb); |
493 |
/* |
560 |
rv = send_brigade_blocking(s, bb, ctx, c); |
494 |
* A setaside of morphing buckets would read everything into |
561 |
APR_BRIGADE_CONCAT(bb, ctx->tmp_flush_bb); |
495 |
* memory. Instead, we will flush everything up to and |
|
|
496 |
* including this bucket. |
497 |
*/ |
498 |
morphing_bucket_in_brigade = 1; |
499 |
} |
500 |
else { |
501 |
bytes_in_brigade += bucket->length; |
502 |
if (!APR_BUCKET_IS_FILE(bucket)) |
503 |
non_file_bytes_in_brigade += bucket->length; |
504 |
} |
505 |
} |
562 |
} |
506 |
else if (AP_BUCKET_IS_EOR(bucket)) { |
|
|
507 |
eor_buckets_in_brigade++; |
508 |
} |
509 |
|
510 |
if (APR_BUCKET_IS_FLUSH(bucket) |
511 |
|| non_file_bytes_in_brigade >= THRESHOLD_MAX_BUFFER |
512 |
|| morphing_bucket_in_brigade |
513 |
|| eor_buckets_in_brigade > MAX_REQUESTS_IN_PIPELINE) { |
514 |
/* this segment of the brigade MUST be sent before returning. */ |
515 |
|
516 |
if (APLOGctrace6(c)) { |
517 |
char *reason = APR_BUCKET_IS_FLUSH(bucket) ? |
518 |
"FLUSH bucket" : |
519 |
(non_file_bytes_in_brigade >= THRESHOLD_MAX_BUFFER) ? |
520 |
"THRESHOLD_MAX_BUFFER" : |
521 |
morphing_bucket_in_brigade ? "morphing bucket" : |
522 |
"MAX_REQUESTS_IN_PIPELINE"; |
523 |
ap_log_cerror(APLOG_MARK, APLOG_TRACE6, 0, c, |
524 |
"core_output_filter: flushing because of %s", |
525 |
reason); |
526 |
} |
527 |
/* |
528 |
* Defer the actual blocking write to avoid doing many writes. |
529 |
*/ |
530 |
flush_upto = next; |
531 |
|
532 |
bytes_in_brigade = 0; |
533 |
non_file_bytes_in_brigade = 0; |
534 |
eor_buckets_in_brigade = 0; |
535 |
morphing_bucket_in_brigade = 0; |
536 |
} |
537 |
} |
563 |
} |
538 |
|
564 |
if (rv == APR_INCOMPLETE) { |
539 |
if (flush_upto != NULL) { |
565 |
rv = send_brigade_nonblocking(s, bb, ctx, c); |
540 |
ctx->tmp_flush_bb = apr_brigade_split_ex(bb, flush_upto, |
|
|
541 |
ctx->tmp_flush_bb); |
542 |
rv = send_brigade_blocking(net->client_socket, bb, |
543 |
&(ctx->bytes_written), c); |
544 |
if (rv != APR_SUCCESS) { |
545 |
/* The client has aborted the connection */ |
546 |
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, rv, c, |
547 |
"core_output_filter: writing data to the network"); |
548 |
apr_brigade_cleanup(bb); |
549 |
c->aborted = 1; |
550 |
return rv; |
551 |
} |
552 |
APR_BRIGADE_CONCAT(bb, ctx->tmp_flush_bb); |
553 |
} |
566 |
} |
554 |
|
567 |
apr_socket_timeout_set(s, old_timeout); |
555 |
if (bytes_in_brigade >= THRESHOLD_MIN_WRITE) { |
568 |
if (rv != APR_SUCCESS && !APR_STATUS_IS_EAGAIN(rv)) { |
556 |
rv = send_brigade_nonblocking(net->client_socket, bb, |
569 |
/* The client has aborted the connection */ |
557 |
&(ctx->bytes_written), c); |
570 |
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, rv, c, |
558 |
if ((rv != APR_SUCCESS) && (!APR_STATUS_IS_EAGAIN(rv))) { |
571 |
"core_output_filter: writing data to the network"); |
559 |
/* The client has aborted the connection */ |
572 |
apr_brigade_cleanup(bb); |
560 |
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, rv, c, |
573 |
c->aborted = 1; |
561 |
"core_output_filter: writing data to the network"); |
574 |
return rv; |
562 |
apr_brigade_cleanup(bb); |
|
|
563 |
c->aborted = 1; |
564 |
return rv; |
565 |
} |
566 |
} |
575 |
} |
567 |
|
576 |
|
|
|
577 |
/* Save the rest in ctx->buffered_bb. */ |
568 |
setaside_remaining_output(f, ctx, bb, c); |
578 |
setaside_remaining_output(f, ctx, bb, c); |
569 |
return APR_SUCCESS; |
579 |
return APR_SUCCESS; |
570 |
} |
580 |
} |
Lines 578-586
static void setaside_remaining_output(ap_filter_t
Link Here
|
578 |
apr_bucket_brigade *bb, |
588 |
apr_bucket_brigade *bb, |
579 |
conn_rec *c) |
589 |
conn_rec *c) |
580 |
{ |
590 |
{ |
581 |
if (bb == NULL) { |
|
|
582 |
return; |
583 |
} |
584 |
remove_empty_buckets(bb); |
591 |
remove_empty_buckets(bb); |
585 |
if (!APR_BRIGADE_EMPTY(bb)) { |
592 |
if (!APR_BRIGADE_EMPTY(bb)) { |
586 |
c->data_in_output_filters = 1; |
593 |
c->data_in_output_filters = 1; |
Lines 603-626
static void setaside_remaining_output(ap_filter_t
Link Here
|
603 |
} |
610 |
} |
604 |
|
611 |
|
605 |
#ifndef APR_MAX_IOVEC_SIZE |
612 |
#ifndef APR_MAX_IOVEC_SIZE |
606 |
#define MAX_IOVEC_TO_WRITE 16 |
613 |
#define NVEC_MIN 16 |
|
|
614 |
#define NVEC_MAX NVEC_MIN |
607 |
#else |
615 |
#else |
608 |
#if APR_MAX_IOVEC_SIZE > 16 |
616 |
#if APR_MAX_IOVEC_SIZE > 16 |
609 |
#define MAX_IOVEC_TO_WRITE 16 |
617 |
#define NVEC_MIN 16 |
610 |
#else |
618 |
#else |
611 |
#define MAX_IOVEC_TO_WRITE APR_MAX_IOVEC_SIZE |
619 |
#define NVEC_MIN APR_MAX_IOVEC_SIZE |
612 |
#endif |
620 |
#endif |
|
|
621 |
#define NVEC_MAX APR_MAX_IOVEC_SIZE |
613 |
#endif |
622 |
#endif |
614 |
|
623 |
|
615 |
static apr_status_t send_brigade_nonblocking(apr_socket_t *s, |
624 |
static apr_status_t send_brigade_nonblocking(apr_socket_t *s, |
616 |
apr_bucket_brigade *bb, |
625 |
apr_bucket_brigade *bb, |
617 |
apr_size_t *bytes_written, |
626 |
core_output_filter_ctx_t *ctx, |
618 |
conn_rec *c) |
627 |
conn_rec *c) |
619 |
{ |
628 |
{ |
620 |
apr_bucket *bucket, *next; |
629 |
apr_bucket *bucket, *next; |
621 |
apr_status_t rv; |
630 |
apr_size_t nvec = 0, nbytes = 0; |
622 |
struct iovec vec[MAX_IOVEC_TO_WRITE]; |
631 |
apr_status_t rv = APR_SUCCESS; |
623 |
apr_size_t nvec = 0; |
|
|
624 |
|
632 |
|
625 |
remove_empty_buckets(bb); |
633 |
remove_empty_buckets(bb); |
626 |
|
634 |
|
Lines 639-662
static apr_status_t send_brigade_nonblocking(apr_s
Link Here
|
639 |
*/ |
647 |
*/ |
640 |
|
648 |
|
641 |
if ((apr_file_flags_get(fd) & APR_SENDFILE_ENABLED) && |
649 |
if ((apr_file_flags_get(fd) & APR_SENDFILE_ENABLED) && |
642 |
(bucket->length >= AP_MIN_SENDFILE_BYTES)) { |
650 |
(bucket->length >= AP_MIN_SENDFILE_BYTES)) { |
643 |
if (nvec > 0) { |
651 |
if (nvec > 0) { |
644 |
(void)apr_socket_opt_set(s, APR_TCP_NOPUSH, 1); |
652 |
(void)apr_socket_opt_set(s, APR_TCP_NOPUSH, 1); |
645 |
rv = writev_nonblocking(s, vec, nvec, bb, bytes_written, c); |
653 |
rv = writev_nonblocking(s, bb, ctx, nbytes, nvec, c); |
646 |
if (rv != APR_SUCCESS) { |
654 |
if (rv != APR_SUCCESS) { |
647 |
(void)apr_socket_opt_set(s, APR_TCP_NOPUSH, 0); |
655 |
goto cleanup; |
648 |
return rv; |
|
|
649 |
} |
656 |
} |
|
|
657 |
nvec = nbytes = 0; |
650 |
} |
658 |
} |
651 |
rv = sendfile_nonblocking(s, bucket, bytes_written, c); |
659 |
rv = sendfile_nonblocking(s, bucket, ctx, c); |
652 |
if (nvec > 0) { |
|
|
653 |
(void)apr_socket_opt_set(s, APR_TCP_NOPUSH, 0); |
654 |
nvec = 0; |
655 |
} |
656 |
if (rv != APR_SUCCESS) { |
660 |
if (rv != APR_SUCCESS) { |
657 |
return rv; |
661 |
goto cleanup; |
658 |
} |
662 |
} |
659 |
break; |
663 |
continue; |
660 |
} |
664 |
} |
661 |
} |
665 |
} |
662 |
#endif /* APR_HAS_SENDFILE */ |
666 |
#endif /* APR_HAS_SENDFILE */ |
Lines 671-715
static apr_status_t send_brigade_nonblocking(apr_s
Link Here
|
671 |
if (APR_STATUS_IS_EAGAIN(rv)) { |
675 |
if (APR_STATUS_IS_EAGAIN(rv)) { |
672 |
/* Read would block; flush any pending data and retry. */ |
676 |
/* Read would block; flush any pending data and retry. */ |
673 |
if (nvec) { |
677 |
if (nvec) { |
674 |
rv = writev_nonblocking(s, vec, nvec, bb, bytes_written, c); |
678 |
rv = writev_nonblocking(s, bb, ctx, nbytes, nvec, c); |
675 |
if (rv) { |
679 |
if (rv != APR_SUCCESS) { |
676 |
return rv; |
680 |
goto cleanup; |
677 |
} |
681 |
} |
678 |
nvec = 0; |
682 |
nvec = nbytes = 0; |
679 |
} |
683 |
} |
680 |
|
684 |
(void)apr_socket_opt_set(s, APR_TCP_NOPUSH, 0); |
|
|
685 |
|
681 |
rv = apr_bucket_read(bucket, &data, &length, APR_BLOCK_READ); |
686 |
rv = apr_bucket_read(bucket, &data, &length, APR_BLOCK_READ); |
682 |
} |
687 |
} |
683 |
if (rv != APR_SUCCESS) { |
688 |
if (rv != APR_SUCCESS) { |
684 |
return rv; |
689 |
goto cleanup; |
685 |
} |
690 |
} |
686 |
|
691 |
|
687 |
/* reading may have split the bucket, so recompute next: */ |
692 |
/* reading may have split the bucket, so recompute next: */ |
688 |
next = APR_BUCKET_NEXT(bucket); |
693 |
next = APR_BUCKET_NEXT(bucket); |
689 |
vec[nvec].iov_base = (char *)data; |
694 |
|
690 |
vec[nvec].iov_len = length; |
695 |
if (nvec >= ctx->nvec) { |
|
|
696 |
if (nvec > 0) { |
697 |
(void)apr_socket_opt_set(s, APR_TCP_NOPUSH, 1); |
698 |
rv = writev_nonblocking(s, bb, ctx, nbytes, nvec, c); |
699 |
if (rv != APR_SUCCESS) { |
700 |
goto cleanup; |
701 |
} |
702 |
} |
703 |
if (nvec < NVEC_MAX) { |
704 |
nvec *= 2; |
705 |
if (nvec < NVEC_MIN) { |
706 |
nvec = NVEC_MIN; |
707 |
} |
708 |
else if (nvec > NVEC_MAX) { |
709 |
nvec = NVEC_MAX; |
710 |
} |
711 |
ctx->vec = apr_palloc(c->pool, nvec * sizeof(*ctx->vec)); |
712 |
ctx->nvec = nvec; |
713 |
} |
714 |
nvec = nbytes = 0; |
715 |
} |
716 |
ctx->vec[nvec].iov_base = (char *)data; |
717 |
ctx->vec[nvec].iov_len = length; |
691 |
nvec++; |
718 |
nvec++; |
692 |
if (nvec == MAX_IOVEC_TO_WRITE) { |
719 |
|
693 |
rv = writev_nonblocking(s, vec, nvec, bb, bytes_written, c); |
720 |
/* Flush above max threshold, unless the brigade still contains |
694 |
nvec = 0; |
721 |
* non-morphing buckets (i.e. in memory) which we want to gather |
|
|
722 |
* in the same pass (if we are at the end of the brigade, the |
723 |
* write will happen outside the loop anyway). |
724 |
*/ |
725 |
nbytes += length; |
726 |
if (nbytes >= ctx->conf->flush_max_threshold |
727 |
&& next != APR_BRIGADE_SENTINEL(bb) |
728 |
&& (next->length == (apr_size_t)-1 |
729 |
|| APR_BUCKET_IS_FILE(next))) { |
730 |
(void)apr_socket_opt_set(s, APR_TCP_NOPUSH, 1); |
731 |
rv = writev_nonblocking(s, bb, ctx, nbytes, nvec, c); |
695 |
if (rv != APR_SUCCESS) { |
732 |
if (rv != APR_SUCCESS) { |
696 |
return rv; |
733 |
goto cleanup; |
697 |
} |
734 |
} |
698 |
break; |
735 |
nvec = nbytes = 0; |
699 |
} |
736 |
} |
700 |
} |
737 |
} |
701 |
} |
738 |
} |
702 |
|
739 |
|
703 |
if (nvec > 0) { |
740 |
if (nvec > 0) { |
704 |
rv = writev_nonblocking(s, vec, nvec, bb, bytes_written, c); |
741 |
rv = writev_nonblocking(s, bb, ctx, nbytes, nvec, c); |
705 |
if (rv != APR_SUCCESS) { |
742 |
if (rv != APR_SUCCESS) { |
706 |
return rv; |
743 |
goto cleanup; |
707 |
} |
744 |
} |
708 |
} |
745 |
} |
709 |
|
746 |
|
710 |
remove_empty_buckets(bb); |
747 |
remove_empty_buckets(bb); |
711 |
|
748 |
|
712 |
return APR_SUCCESS; |
749 |
cleanup: |
|
|
750 |
(void)apr_socket_opt_set(s, APR_TCP_NOPUSH, 0); |
751 |
return rv; |
713 |
} |
752 |
} |
714 |
|
753 |
|
715 |
static void remove_empty_buckets(apr_bucket_brigade *bb) |
754 |
static void remove_empty_buckets(apr_bucket_brigade *bb) |
Lines 723-729
static void remove_empty_buckets(apr_bucket_brigad
Link Here
|
723 |
|
762 |
|
724 |
static apr_status_t send_brigade_blocking(apr_socket_t *s, |
763 |
static apr_status_t send_brigade_blocking(apr_socket_t *s, |
725 |
apr_bucket_brigade *bb, |
764 |
apr_bucket_brigade *bb, |
726 |
apr_size_t *bytes_written, |
765 |
core_output_filter_ctx_t *ctx, |
727 |
conn_rec *c) |
766 |
conn_rec *c) |
728 |
{ |
767 |
{ |
729 |
apr_status_t rv; |
768 |
apr_status_t rv; |
Lines 730-736
static apr_status_t send_brigade_blocking(apr_sock
Link Here
|
730 |
|
769 |
|
731 |
rv = APR_SUCCESS; |
770 |
rv = APR_SUCCESS; |
732 |
while (!APR_BRIGADE_EMPTY(bb)) { |
771 |
while (!APR_BRIGADE_EMPTY(bb)) { |
733 |
rv = send_brigade_nonblocking(s, bb, bytes_written, c); |
772 |
rv = send_brigade_nonblocking(s, bb, ctx, c); |
734 |
if (rv != APR_SUCCESS) { |
773 |
if (rv != APR_SUCCESS) { |
735 |
if (APR_STATUS_IS_EAGAIN(rv)) { |
774 |
if (APR_STATUS_IS_EAGAIN(rv)) { |
736 |
/* Wait until we can send more data */ |
775 |
/* Wait until we can send more data */ |
Lines 737-743
static apr_status_t send_brigade_blocking(apr_sock
Link Here
|
737 |
apr_int32_t nsds; |
776 |
apr_int32_t nsds; |
738 |
apr_interval_time_t timeout; |
777 |
apr_interval_time_t timeout; |
739 |
apr_pollfd_t pollset; |
778 |
apr_pollfd_t pollset; |
|
|
779 |
apr_bucket *flush_upto; |
740 |
|
780 |
|
|
|
781 |
/* The above nonblocking write might have unblocked the |
782 |
* situation, so quickly check if we still need to flush |
783 |
* so to avoid blocking unnecessarily. |
784 |
*/ |
785 |
rv = should_flush(bb, &flush_upto, ctx, c, 1); |
786 |
if (!flush_upto) { |
787 |
break; |
788 |
} |
789 |
|
741 |
pollset.p = c->pool; |
790 |
pollset.p = c->pool; |
742 |
pollset.desc_type = APR_POLL_SOCKET; |
791 |
pollset.desc_type = APR_POLL_SOCKET; |
743 |
pollset.reqevents = APR_POLLOUT; |
792 |
pollset.reqevents = APR_POLLOUT; |
Lines 746-758
static apr_status_t send_brigade_blocking(apr_sock
Link Here
|
746 |
do { |
795 |
do { |
747 |
rv = apr_poll(&pollset, 1, &nsds, timeout); |
796 |
rv = apr_poll(&pollset, 1, &nsds, timeout); |
748 |
} while (APR_STATUS_IS_EINTR(rv)); |
797 |
} while (APR_STATUS_IS_EINTR(rv)); |
749 |
if (rv != APR_SUCCESS) { |
798 |
if (rv == APR_SUCCESS) { |
750 |
break; |
799 |
continue; |
751 |
} |
800 |
} |
752 |
} |
801 |
} |
753 |
else { |
802 |
break; |
754 |
break; |
|
|
755 |
} |
756 |
} |
803 |
} |
757 |
} |
804 |
} |
758 |
return rv; |
805 |
return rv; |
Lines 759-787
static apr_status_t send_brigade_blocking(apr_sock
Link Here
|
759 |
} |
806 |
} |
760 |
|
807 |
|
761 |
static apr_status_t writev_nonblocking(apr_socket_t *s, |
808 |
static apr_status_t writev_nonblocking(apr_socket_t *s, |
762 |
struct iovec *vec, apr_size_t nvec, |
|
|
763 |
apr_bucket_brigade *bb, |
809 |
apr_bucket_brigade *bb, |
764 |
apr_size_t *cumulative_bytes_written, |
810 |
core_output_filter_ctx_t *ctx, |
|
|
811 |
apr_size_t bytes_to_write, |
812 |
apr_size_t nvec, |
765 |
conn_rec *c) |
813 |
conn_rec *c) |
766 |
{ |
814 |
{ |
767 |
apr_status_t rv = APR_SUCCESS, arv; |
815 |
apr_status_t rv = APR_SUCCESS; |
768 |
apr_size_t bytes_written = 0, bytes_to_write = 0; |
816 |
apr_size_t bytes_written; |
769 |
apr_size_t i, offset; |
817 |
apr_size_t i, offset; |
770 |
apr_interval_time_t old_timeout; |
818 |
struct iovec *vec = ctx->vec; |
771 |
|
819 |
|
772 |
arv = apr_socket_timeout_get(s, &old_timeout); |
|
|
773 |
if (arv != APR_SUCCESS) { |
774 |
return arv; |
775 |
} |
776 |
arv = apr_socket_timeout_set(s, 0); |
777 |
if (arv != APR_SUCCESS) { |
778 |
return arv; |
779 |
} |
780 |
|
781 |
for (i = 0; i < nvec; i++) { |
782 |
bytes_to_write += vec[i].iov_len; |
783 |
} |
784 |
offset = 0; |
820 |
offset = 0; |
|
|
821 |
bytes_written = 0; |
785 |
while (bytes_written < bytes_to_write) { |
822 |
while (bytes_written < bytes_to_write) { |
786 |
apr_size_t n = 0; |
823 |
apr_size_t n = 0; |
787 |
rv = apr_socket_sendv(s, vec + offset, nvec - offset, &n); |
824 |
rv = apr_socket_sendv(s, vec + offset, nvec - offset, &n); |
Lines 813-827
static apr_status_t writev_nonblocking(apr_socket_
Link Here
|
813 |
if ((ap__logio_add_bytes_out != NULL) && (bytes_written > 0)) { |
850 |
if ((ap__logio_add_bytes_out != NULL) && (bytes_written > 0)) { |
814 |
ap__logio_add_bytes_out(c, bytes_written); |
851 |
ap__logio_add_bytes_out(c, bytes_written); |
815 |
} |
852 |
} |
816 |
*cumulative_bytes_written += bytes_written; |
853 |
ctx->bytes_written += bytes_written; |
817 |
|
854 |
|
818 |
arv = apr_socket_timeout_set(s, old_timeout); |
855 |
ap_log_cerror(APLOG_MARK, CORE_LOG_LEVEL_IO, rv, c, |
819 |
if ((arv != APR_SUCCESS) && (rv == APR_SUCCESS)) { |
856 |
"writev_nonblocking: %"APR_SIZE_T_FMT"/%"APR_SIZE_T_FMT" bytes", |
820 |
return arv; |
857 |
bytes_written, bytes_to_write); |
821 |
} |
858 |
return rv; |
822 |
else { |
|
|
823 |
return rv; |
824 |
} |
825 |
} |
859 |
} |
826 |
|
860 |
|
827 |
#if APR_HAS_SENDFILE |
861 |
#if APR_HAS_SENDFILE |
Lines 828-834
static apr_status_t writev_nonblocking(apr_socket_
Link Here
|
828 |
|
862 |
|
829 |
static apr_status_t sendfile_nonblocking(apr_socket_t *s, |
863 |
static apr_status_t sendfile_nonblocking(apr_socket_t *s, |
830 |
apr_bucket *bucket, |
864 |
apr_bucket *bucket, |
831 |
apr_size_t *cumulative_bytes_written, |
865 |
core_output_filter_ctx_t *ctx, |
832 |
conn_rec *c) |
866 |
conn_rec *c) |
833 |
{ |
867 |
{ |
834 |
apr_status_t rv = APR_SUCCESS; |
868 |
apr_status_t rv = APR_SUCCESS; |
Lines 851-887
static apr_status_t sendfile_nonblocking(apr_socke
Link Here
|
851 |
|
885 |
|
852 |
if (bytes_written < file_length) { |
886 |
if (bytes_written < file_length) { |
853 |
apr_size_t n = file_length - bytes_written; |
887 |
apr_size_t n = file_length - bytes_written; |
854 |
apr_status_t arv; |
|
|
855 |
apr_interval_time_t old_timeout; |
856 |
|
857 |
arv = apr_socket_timeout_get(s, &old_timeout); |
858 |
if (arv != APR_SUCCESS) { |
859 |
return arv; |
860 |
} |
861 |
arv = apr_socket_timeout_set(s, 0); |
862 |
if (arv != APR_SUCCESS) { |
863 |
return arv; |
864 |
} |
865 |
rv = apr_socket_sendfile(s, fd, NULL, &file_offset, &n, 0); |
888 |
rv = apr_socket_sendfile(s, fd, NULL, &file_offset, &n, 0); |
866 |
if (rv == APR_SUCCESS) { |
889 |
if (rv == APR_SUCCESS) { |
867 |
bytes_written += n; |
890 |
bytes_written += n; |
868 |
file_offset += n; |
891 |
file_offset += n; |
869 |
} |
892 |
} |
870 |
arv = apr_socket_timeout_set(s, old_timeout); |
|
|
871 |
if ((arv != APR_SUCCESS) && (rv == APR_SUCCESS)) { |
872 |
rv = arv; |
873 |
} |
874 |
} |
893 |
} |
875 |
if ((ap__logio_add_bytes_out != NULL) && (bytes_written > 0)) { |
894 |
if ((ap__logio_add_bytes_out != NULL) && (bytes_written > 0)) { |
876 |
ap__logio_add_bytes_out(c, bytes_written); |
895 |
ap__logio_add_bytes_out(c, bytes_written); |
877 |
} |
896 |
} |
878 |
*cumulative_bytes_written += bytes_written; |
897 |
ctx->bytes_written += bytes_written; |
879 |
if ((bytes_written < file_length) && (bytes_written > 0)) { |
898 |
|
880 |
apr_bucket_split(bucket, bytes_written); |
899 |
ap_log_cerror(APLOG_MARK, CORE_LOG_LEVEL_IO, rv, c, |
|
|
900 |
"sendfile_nonblocking: %"APR_SIZE_T_FMT"/%"APR_OFF_T_FMT" bytes", |
901 |
bytes_written, file_length); |
902 |
|
903 |
if (bytes_written >= file_length) { |
881 |
apr_bucket_delete(bucket); |
904 |
apr_bucket_delete(bucket); |
882 |
} |
905 |
} |
883 |
else if (bytes_written == file_length) { |
906 |
else if (bytes_written > 0) { |
|
|
907 |
apr_bucket_split(bucket, bytes_written); |
884 |
apr_bucket_delete(bucket); |
908 |
apr_bucket_delete(bucket); |
|
|
909 |
if (rv == APR_SUCCESS) { |
910 |
rv = APR_EAGAIN; |
911 |
} |
885 |
} |
912 |
} |
886 |
return rv; |
913 |
return rv; |
887 |
} |
914 |
} |