ASF Bugzilla – Attachment 35427 Details for
Bug 61616
mod_proxy_connect: stall and connection loss on bi-directional traffic
Home
|
New
|
Browse
|
Search
|
[?]
|
Reports
|
Help
|
New Account
|
Log In
Remember
[x]
|
Forgot Password
Login:
[x]
[patch]
File read buffer and flush threshold tuning
httpd-2.4.x-flush_threshold-v8.patch (text/plain), 35.86 KB, created by
Yann Ylavic
on 2017-10-15 12:46:47 UTC
(
hide
)
Description:
File read buffer and flush threshold tuning
Filename:
MIME Type:
Creator:
Yann Ylavic
Created:
2017-10-15 12:46:47 UTC
Size:
35.86 KB
patch
obsolete
>Index: include/http_core.h >=================================================================== >--- include/http_core.h (revision 1792652) >+++ include/http_core.h (working copy) >@@ -672,6 +672,8 @@ typedef struct { > > /** Table of rules for building CGI variables, NULL if none configured */ > apr_hash_t *cgi_var_rules; >+ >+ apr_size_t read_file_buf_size; > } core_dir_config; > > /* macro to implement off by default behaviour */ >@@ -741,6 +743,8 @@ typedef struct { > #define AP_HTTP_METHODS_REGISTERED 2 > char http_methods; > >+ apr_size_t flush_max_threshold; >+ apr_uint32_t flush_max_pipelined; > } core_server_config; > > /* for AddOutputFiltersByType in core.c */ >Index: modules/http/http_request.c >=================================================================== >--- modules/http/http_request.c (revision 1792652) >+++ modules/http/http_request.c (working copy) >@@ -379,9 +379,22 @@ AP_DECLARE(void) ap_process_request_after_handler( > c->data_in_input_filters = (rv == APR_SUCCESS); > apr_brigade_destroy(bb); > >- if (c->cs) >- c->cs->state = (c->aborted) ? CONN_STATE_LINGER >- : CONN_STATE_WRITE_COMPLETION; >+ if (c->cs) { >+ if (c->aborted) { >+ c->cs->state = CONN_STATE_LINGER; >+ } >+ else { >+ /* If we have still data in the output filters here it means that >+ * the last (recent) nonblocking write was EAGAIN, so tell the MPM >+ * to not try another useless/stressful one but to go straight to >+ * POLLOUT. >+ */ >+ if (c->data_in_output_filters) { >+ c->cs->sense = CONN_SENSE_WANT_WRITE; >+ } >+ c->cs->state = CONN_STATE_WRITE_COMPLETION; >+ } >+ } > AP_PROCESS_REQUEST_RETURN((uintptr_t)r, r->uri, r->status); > if (ap_extended_status) { > ap_time_process_request(c->sbh, STOP_PREQUEST); >Index: modules/ssl/ssl_engine_io.c >=================================================================== >--- modules/ssl/ssl_engine_io.c (revision 1792652) >+++ modules/ssl/ssl_engine_io.c (working copy) >@@ -36,6 +36,12 @@ APR_IMPLEMENT_OPTIONAL_HOOK_RUN_ALL(ssl, SSL, int, > (conn_rec *c,SSL *ssl), > (c,ssl),OK,DECLINED); > >+#ifdef AP_DEBUG_IO >+#define MODSSL_LOG_LEVEL_IO APLOG_NOTICE >+#else >+#define MODSSL_LOG_LEVEL_IO APLOG_TRACE6 >+#endif >+ > /* _________________________________________________________________ > ** > ** I/O Hooks >@@ -152,6 +158,9 @@ static int bio_filter_out_flush(BIO *bio) > bio_filter_out_ctx_t *outctx = (bio_filter_out_ctx_t *)BIO_get_data(bio); > apr_bucket *e; > >+ ap_log_cerror(APLOG_MARK, MODSSL_LOG_LEVEL_IO, 0, outctx->c, >+ "bio_filter_out_write: flush"); >+ > AP_DEBUG_ASSERT(APR_BRIGADE_EMPTY(outctx->bb)); > > e = apr_bucket_flush_create(outctx->bb->bucket_alloc); >@@ -206,6 +215,9 @@ static int bio_filter_out_write(BIO *bio, const ch > return -1; > } > >+ ap_log_cerror(APLOG_MARK, MODSSL_LOG_LEVEL_IO, 0, outctx->c, >+ "bio_filter_out_write: %i bytes", inl); >+ > /* when handshaking we'll have a small number of bytes. > * max size SSL will pass us here is about 16k. > * (16413 bytes to be exact) >@@ -843,6 +855,9 @@ static apr_status_t ssl_filter_write(ap_filter_t * > return APR_EGENERAL; > } > >+ ap_log_cerror(APLOG_MARK, MODSSL_LOG_LEVEL_IO, 0, f->c, >+ "ssl_filter_write: %"APR_SIZE_T_FMT" bytes", len); >+ > /* We rely on SSL_get_error() after the write, which requires an empty error > * queue before the write in order to work properly. > */ >@@ -1629,8 +1644,11 @@ static apr_status_t ssl_io_filter_coalesce(ap_filt > && (ctx == NULL > || bytes + ctx->bytes + e->length < COALESCE_BYTES); > e = APR_BUCKET_NEXT(e)) { >- if (e->length) count++; /* don't count zero-length buckets */ >- bytes += e->length; >+ /* don't count zero-length buckets */ >+ if (e->length) { >+ bytes += e->length; >+ count++; >+ } > } > upto = e; > >Index: server/core.c >=================================================================== >--- server/core.c (revision 1792652) >+++ server/core.c (working copy) >@@ -21,6 +21,10 @@ > #include "apr_hash.h" > #include "apr_thread_proc.h" /* for RLIMIT stuff */ > #include "apr_random.h" >+#include "apr_version.h" >+#if APR_MAJOR_VERSION < 2 >+#include "apu_version.h" >+#endif > > #define APR_WANT_IOVEC > #define APR_WANT_STRFUNC >@@ -80,6 +84,9 @@ > #define AP_CONTENT_MD5_ON 1 > #define AP_CONTENT_MD5_UNSET 2 > >+#define AP_FLUSH_MAX_THRESHOLD 65536 >+#define AP_FLUSH_MAX_PIPELINED 5 >+ > APR_HOOK_STRUCT( > APR_HOOK_LINK(get_mgmt_items) > APR_HOOK_LINK(insert_network_bucket) >@@ -390,6 +397,15 @@ static void *merge_core_dir_configs(apr_pool_t *a, > conf->enable_sendfile = new->enable_sendfile; > } > >+#if APR_MAJOR_VERSION > 1 || (APU_MAJOR_VERSION == 1 && APU_MINOR_VERSION >= 6) >+ if (new->read_file_buf_size) { >+ conf->read_file_buf_size = new->read_file_buf_size; >+ } >+ else { >+ conf->read_file_buf_size = base->read_file_buf_size; >+ } >+#endif >+ > conf->allow_encoded_slashes = new->allow_encoded_slashes; > conf->decode_encoded_slashes = new->decode_encoded_slashes; > >@@ -461,6 +477,9 @@ static void *create_core_server_config(apr_pool_t > apr_table_setn(conf->accf_map, "http", "data"); > apr_table_setn(conf->accf_map, "https", "data"); > #endif >+ >+ conf->flush_max_threshold = AP_FLUSH_MAX_THRESHOLD; >+ conf->flush_max_pipelined = AP_FLUSH_MAX_PIPELINED; > } > /* pcalloc'ed - we have NULL's/0's > else ** is_virtual ** { >@@ -554,6 +573,13 @@ static void *merge_core_server_configs(apr_pool_t > conf->protocols_honor_order = ((virt->protocols_honor_order < 0)? > base->protocols_honor_order : > virt->protocols_honor_order); >+ >+ conf->flush_max_threshold = (virt->flush_max_threshold) >+ ? virt->flush_max_threshold >+ : base->flush_max_threshold; >+ conf->flush_max_pipelined = (virt->flush_max_pipelined) >+ ? virt->flush_max_pipelined >+ : base->flush_max_pipelined; > > return conf; > } >@@ -2208,7 +2234,71 @@ static const char *set_enable_sendfile(cmd_parms * > return NULL; > } > >+static const char *set_read_file_buf_size(cmd_parms *cmd, void *d_, >+ const char *arg) >+{ >+#if APR_MAJOR_VERSION > 1 || (APU_MAJOR_VERSION == 1 && APU_MINOR_VERSION >= 6) >+ core_dir_config *d = d_; >+ apr_off_t size; >+ char *end; > >+ if (apr_strtoff(&size, arg, &end, 10) >+ || size < 0 || size > APR_SIZE_MAX || *end) >+ return apr_pstrcat(cmd->pool, >+ "parameter must be a number between 0 and " >+ APR_STRINGIFY(APR_SIZE_MAX) "): ", >+ arg, NULL); >+ >+ d->read_file_buf_size = (apr_size_t)size; >+ >+ return NULL; >+#else >+ return "parameter requires APR version at least 1.6.0"; >+#endif >+} >+ >+ >+static const char *set_flush_max_threshold(cmd_parms *cmd, void *d_, >+ const char *arg) >+{ >+ core_server_config *conf = >+ ap_get_core_module_config(cmd->server->module_config); >+ apr_off_t size; >+ char *end; >+ >+ if (apr_strtoff(&size, arg, &end, 10) >+ || size <= 0 || size > APR_SIZE_MAX || *end) >+ return apr_pstrcat(cmd->pool, >+ "parameter must be a number between 1 and " >+ APR_STRINGIFY(APR_SIZE_MAX) "): ", >+ arg, NULL); >+ >+ conf->flush_max_threshold = (apr_size_t)size; >+ >+ return NULL; >+} >+ >+static const char *set_flush_max_pipelined(cmd_parms *cmd, void *d_, >+ const char *arg) >+{ >+ core_server_config *conf = >+ ap_get_core_module_config(cmd->server->module_config); >+ apr_off_t num; >+ char *end; >+ >+ if (apr_strtoff(&num, arg, &end, 10) >+ || num < 0 || num > APR_UINT32_MAX || *end) >+ return apr_pstrcat(cmd->pool, >+ "parameter must be a number between 0 and " >+ APR_STRINGIFY(APR_UINT32_MAX) ": ", >+ arg, NULL); >+ >+ conf->flush_max_pipelined = (apr_uint32_t)num; >+ >+ return NULL; >+} >+ >+ > /* > * Report a missing-'>' syntax error. > */ >@@ -4300,6 +4390,12 @@ AP_INIT_TAKE1("EnableMMAP", set_enable_mmap, NULL, > "Controls whether memory-mapping may be used to read files"), > AP_INIT_TAKE1("EnableSendfile", set_enable_sendfile, NULL, OR_FILEINFO, > "Controls whether sendfile may be used to transmit files"), >+AP_INIT_TAKE1("ReadFileBufferSize", set_read_file_buf_size, NULL, OR_FILEINFO, >+ "Size (in bytes) of the memory buffers used to read files"), >+AP_INIT_TAKE1("FlushMaxThreshold", set_flush_max_threshold, NULL, RSRC_CONF, >+ "Maximum size (in bytes) above which pending data are flushed (blocking) to the network"), >+AP_INIT_TAKE1("FlushMaxPipelined", set_flush_max_pipelined, NULL, RSRC_CONF, >+ "Number of pipelined/pending responses above which they are flushed to the network"), > > /* Old server config file commands */ > >@@ -4738,6 +4834,11 @@ static int default_handler(request_rec *r) > (void)apr_bucket_file_enable_mmap(e, 0); > } > #endif >+#if APR_MAJOR_VERSION > 1 || (APU_MAJOR_VERSION == 1 && APU_MINOR_VERSION >= 6) >+ if (d->read_file_buf_size != 0) { >+ apr_bucket_file_set_buf_size(e, d->read_file_buf_size); >+ } >+#endif > } > > e = apr_bucket_eos_create(c->bucket_alloc); >Index: server/core_filters.c >=================================================================== >--- server/core_filters.c (revision 1792652) >+++ server/core_filters.c (working copy) >@@ -52,6 +52,12 @@ > > #include "mod_so.h" /* for ap_find_loaded_module_symbol */ > >+#ifdef AP_DEBUG_IO >+#define CORE_LOG_LEVEL_IO APLOG_NOTICE >+#else >+#define CORE_LOG_LEVEL_IO APLOG_TRACE6 >+#endif >+ > #define AP_MIN_SENDFILE_BYTES (256) > > /** >@@ -78,10 +84,13 @@ do { \ > #define APLOG_MODULE_INDEX AP_CORE_MODULE_INDEX > > struct core_output_filter_ctx { >+ core_server_config *conf; > apr_bucket_brigade *buffered_bb; > apr_bucket_brigade *tmp_flush_bb; > apr_pool_t *deferred_write_pool; > apr_size_t bytes_written; >+ struct iovec *vec; >+ apr_size_t nvec; > }; > > struct core_filter_ctx { >@@ -335,7 +344,7 @@ static void setaside_remaining_output(ap_filter_t > > static apr_status_t send_brigade_nonblocking(apr_socket_t *s, > apr_bucket_brigade *bb, >- apr_size_t *bytes_written, >+ core_output_filter_ctx_t *ctx, > conn_rec *c); > > static void remove_empty_buckets(apr_bucket_brigade *bb); >@@ -342,27 +351,92 @@ static void remove_empty_buckets(apr_bucket_brigad > > static apr_status_t send_brigade_blocking(apr_socket_t *s, > apr_bucket_brigade *bb, >- apr_size_t *bytes_written, >+ core_output_filter_ctx_t *ctx, > conn_rec *c); > > static apr_status_t writev_nonblocking(apr_socket_t *s, >- struct iovec *vec, apr_size_t nvec, > apr_bucket_brigade *bb, >- apr_size_t *cumulative_bytes_written, >+ core_output_filter_ctx_t *ctx, >+ apr_size_t bytes_to_write, >+ apr_size_t nvec, > conn_rec *c); > > #if APR_HAS_SENDFILE > static apr_status_t sendfile_nonblocking(apr_socket_t *s, > apr_bucket *bucket, >- apr_size_t *cumulative_bytes_written, >+ core_output_filter_ctx_t *ctx, > conn_rec *c); > #endif > >-/* XXX: Should these be configurable parameters? */ >-#define THRESHOLD_MIN_WRITE 4096 >-#define THRESHOLD_MAX_BUFFER 65536 >-#define MAX_REQUESTS_IN_PIPELINE 5 >+static apr_status_t should_flush(apr_bucket_brigade *bb, >+ apr_bucket **flush_upto, >+ core_output_filter_ctx_t *ctx, >+ conn_rec *c, int first) >+{ >+ apr_status_t rv = APR_SUCCESS; >+ core_server_config *conf = ctx->conf; >+ apr_size_t non_file_bytes_in_brigade = 0; >+ apr_uint32_t eor_buckets_in_brigade = 0; >+ apr_bucket *bucket, *next; > >+ *flush_upto = NULL; >+ >+ for (bucket = APR_BRIGADE_FIRST(bb); >+ bucket != APR_BRIGADE_SENTINEL(bb); >+ bucket = next) { >+ next = APR_BUCKET_NEXT(bucket); >+ >+ if (!APR_BUCKET_IS_METADATA(bucket)) { >+ if (bucket->length == (apr_size_t)-1) { >+ /* >+ * A setaside of morphing buckets would read everything into >+ * memory. Instead, we will flush everything up to and >+ * including this bucket. >+ */ >+ ap_log_cerror(APLOG_MARK, CORE_LOG_LEVEL_IO, 0, c, >+ "core_output_filter: flushing because of " >+ "morphing bucket"); >+ *flush_upto = next; >+ } >+ else if (!APR_BUCKET_IS_FILE(bucket)) { >+ non_file_bytes_in_brigade += bucket->length; >+ if (non_file_bytes_in_brigade >= conf->flush_max_threshold) { >+ non_file_bytes_in_brigade = 0; >+ ap_log_cerror(APLOG_MARK, CORE_LOG_LEVEL_IO, 0, c, >+ "core_output_filter: flushing because of " >+ "max threshold"); >+ *flush_upto = next; >+ } >+ } >+ else { >+ rv = APR_INCOMPLETE; >+ } >+ } >+ else if (APR_BUCKET_IS_FLUSH(bucket)) { >+ ap_log_cerror(APLOG_MARK, CORE_LOG_LEVEL_IO, 0, c, >+ "core_output_filter: flushing because of " >+ "FLUSH bucket"); >+ *flush_upto = next; >+ } >+ else if (AP_BUCKET_IS_EOR(bucket)) { >+ eor_buckets_in_brigade++; >+ if (eor_buckets_in_brigade > conf->flush_max_pipelined) { >+ eor_buckets_in_brigade = 0; >+ ap_log_cerror(APLOG_MARK, CORE_LOG_LEVEL_IO, 0, c, >+ "core_output_filter: flushing because of " >+ "max pipelined"); >+ *flush_upto = next; >+ } >+ } >+ >+ if (first && *flush_upto) { >+ break; >+ } >+ } >+ >+ return rv; >+} >+ > /* Optional function coming from mod_logio, used for logging of output > * traffic > */ >@@ -374,9 +448,7 @@ apr_status_t ap_core_output_filter(ap_filter_t *f, > core_net_rec *net = f->ctx; > core_output_filter_ctx_t *ctx = net->out_ctx; > apr_bucket_brigade *bb = NULL; >- apr_bucket *bucket, *next, *flush_upto = NULL; >- apr_size_t bytes_in_brigade, non_file_bytes_in_brigade; >- int eor_buckets_in_brigade, morphing_bucket_in_brigade; >+ apr_bucket *flush_upto; > apr_status_t rv; > > /* Fail quickly if the connection has already been aborted. */ >@@ -390,6 +462,7 @@ apr_status_t ap_core_output_filter(ap_filter_t *f, > if (ctx == NULL) { > ctx = apr_pcalloc(c->pool, sizeof(*ctx)); > net->out_ctx = (core_output_filter_ctx_t *)ctx; >+ ctx->conf = ap_get_core_module_config(c->base_server->module_config); > /* > * Need to create tmp brigade with correct lifetime. Passing > * NULL to apr_brigade_split_ex would result in a brigade >@@ -426,6 +499,23 @@ apr_status_t ap_core_output_filter(ap_filter_t *f, > * it probably means that the MPM is doing asynchronous write > * completion and has just determined that this connection > * is writable.) >+ */ >+ >+ if (new_bb == NULL) { >+ rv = send_brigade_nonblocking(net->client_socket, bb, ctx, c); >+ if (rv != APR_SUCCESS && !APR_STATUS_IS_EAGAIN(rv)) { >+ /* The client has aborted the connection */ >+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, rv, c, >+ "core_output_filter: writing data to the network"); >+ apr_brigade_cleanup(bb); >+ c->aborted = 1; >+ return rv; >+ } >+ setaside_remaining_output(f, ctx, bb, c); >+ return APR_SUCCESS; >+ } >+ >+ /* Below should_flush() will: > * > * 2) Determine if and up to which bucket we need to do a blocking > * write: >@@ -434,16 +524,16 @@ apr_status_t ap_core_output_filter(ap_filter_t *f, > * of everything up that point. > * > * b) The request is in CONN_STATE_HANDLER state, and the brigade >- * contains at least THRESHOLD_MAX_BUFFER bytes in non-file >+ * contains at least flush_max_threshold bytes in non-file > * buckets: Do blocking writes until the amount of data in the >- * buffer is less than THRESHOLD_MAX_BUFFER. (The point of this >+ * buffer is less than flush_max_threshold. (The point of this > * rule is to provide flow control, in case a handler is > * streaming out lots of data faster than the data can be > * sent to the client.) > * > * c) The request is in CONN_STATE_HANDLER state, and the brigade >- * contains at least MAX_REQUESTS_IN_PIPELINE EOR buckets: >- * Do blocking writes until less than MAX_REQUESTS_IN_PIPELINE EOR >+ * contains at least flush_max_pipelined EOR buckets: >+ * Do blocking writes until less than flush_max_pipelined EOR > * buckets are left. (The point of this rule is to prevent too many > * FDs being kept open by pipelined requests, possibly allowing a > * DoS). >@@ -450,7 +540,7 @@ apr_status_t ap_core_output_filter(ap_filter_t *f, > * > * d) The brigade contains a morphing bucket: If there was no other > * reason to do a blocking write yet, try reading the bucket. If its >- * contents fit into memory before THRESHOLD_MAX_BUFFER is reached, >+ * contents fit into memory before flush_max_threshold is reached, > * everything is fine. Otherwise we need to do a blocking write the > * up to and including the morphing bucket, because ap_save_brigade() > * would read the whole bucket into memory later on. >@@ -458,90 +548,13 @@ apr_status_t ap_core_output_filter(ap_filter_t *f, > * 3) Actually do the blocking write up to the last bucket determined > * by rules 2a-d. The point of doing only one flush is to make as > * few calls to writev() as possible. >- * >- * 4) If the brigade contains at least THRESHOLD_MIN_WRITE >- * bytes: Do a nonblocking write of as much data as possible, >- * then save the rest in ctx->buffered_bb. > */ >- >- if (new_bb == NULL) { >- rv = send_brigade_nonblocking(net->client_socket, bb, >- &(ctx->bytes_written), c); >- if (rv != APR_SUCCESS && !APR_STATUS_IS_EAGAIN(rv)) { >- /* The client has aborted the connection */ >- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, rv, c, >- "core_output_filter: writing data to the network"); >- apr_brigade_cleanup(bb); >- c->aborted = 1; >- return rv; >- } >- setaside_remaining_output(f, ctx, bb, c); >- return APR_SUCCESS; >- } >- >- bytes_in_brigade = 0; >- non_file_bytes_in_brigade = 0; >- eor_buckets_in_brigade = 0; >- morphing_bucket_in_brigade = 0; >- >- for (bucket = APR_BRIGADE_FIRST(bb); bucket != APR_BRIGADE_SENTINEL(bb); >- bucket = next) { >- next = APR_BUCKET_NEXT(bucket); >- >- if (!APR_BUCKET_IS_METADATA(bucket)) { >- if (bucket->length == (apr_size_t)-1) { >- /* >- * A setaside of morphing buckets would read everything into >- * memory. Instead, we will flush everything up to and >- * including this bucket. >- */ >- morphing_bucket_in_brigade = 1; >- } >- else { >- bytes_in_brigade += bucket->length; >- if (!APR_BUCKET_IS_FILE(bucket)) >- non_file_bytes_in_brigade += bucket->length; >- } >- } >- else if (AP_BUCKET_IS_EOR(bucket)) { >- eor_buckets_in_brigade++; >- } >- >- if (APR_BUCKET_IS_FLUSH(bucket) >- || non_file_bytes_in_brigade >= THRESHOLD_MAX_BUFFER >- || morphing_bucket_in_brigade >- || eor_buckets_in_brigade > MAX_REQUESTS_IN_PIPELINE) { >- /* this segment of the brigade MUST be sent before returning. */ >- >- if (APLOGctrace6(c)) { >- char *reason = APR_BUCKET_IS_FLUSH(bucket) ? >- "FLUSH bucket" : >- (non_file_bytes_in_brigade >= THRESHOLD_MAX_BUFFER) ? >- "THRESHOLD_MAX_BUFFER" : >- morphing_bucket_in_brigade ? "morphing bucket" : >- "MAX_REQUESTS_IN_PIPELINE"; >- ap_log_cerror(APLOG_MARK, APLOG_TRACE6, 0, c, >- "core_output_filter: flushing because of %s", >- reason); >- } >- /* >- * Defer the actual blocking write to avoid doing many writes. >- */ >- flush_upto = next; >- >- bytes_in_brigade = 0; >- non_file_bytes_in_brigade = 0; >- eor_buckets_in_brigade = 0; >- morphing_bucket_in_brigade = 0; >- } >- } >- >- if (flush_upto != NULL) { >+ rv = should_flush(bb, &flush_upto, ctx, c, 0); >+ if (flush_upto) { > ctx->tmp_flush_bb = apr_brigade_split_ex(bb, flush_upto, > ctx->tmp_flush_bb); >- rv = send_brigade_blocking(net->client_socket, bb, >- &(ctx->bytes_written), c); >- if (rv != APR_SUCCESS) { >+ rv = send_brigade_blocking(net->client_socket, bb, ctx, c); >+ if (rv != APR_SUCCESS && rv != APR_INCOMPLETE) { > /* The client has aborted the connection */ > ap_log_cerror(APLOG_MARK, APLOG_TRACE1, rv, c, > "core_output_filter: writing data to the network"); >@@ -552,10 +565,15 @@ apr_status_t ap_core_output_filter(ap_filter_t *f, > APR_BRIGADE_CONCAT(bb, ctx->tmp_flush_bb); > } > >- if (bytes_in_brigade >= THRESHOLD_MIN_WRITE) { >- rv = send_brigade_nonblocking(net->client_socket, bb, >- &(ctx->bytes_written), c); >- if ((rv != APR_SUCCESS) && (!APR_STATUS_IS_EAGAIN(rv))) { >+ /* >+ * 4) If the brigade contains at least a file bucket, do nonblocking >+ * write(s) of as much data as possible, then save the rest in >+ * ctx->buffered_bb (send_brigade_nonblocking() will take care of >+ * the flush threshold). >+ */ >+ if (rv == APR_INCOMPLETE) { >+ rv = send_brigade_nonblocking(net->client_socket, bb, ctx, c); >+ if (rv != APR_SUCCESS && !APR_STATUS_IS_EAGAIN(rv)) { > /* The client has aborted the connection */ > ap_log_cerror(APLOG_MARK, APLOG_TRACE1, rv, c, > "core_output_filter: writing data to the network"); >@@ -603,24 +621,25 @@ static void setaside_remaining_output(ap_filter_t > } > > #ifndef APR_MAX_IOVEC_SIZE >-#define MAX_IOVEC_TO_WRITE 16 >+#define NVEC_MIN 16 >+#define NVEC_MAX NVEC_MIN > #else > #if APR_MAX_IOVEC_SIZE > 16 >-#define MAX_IOVEC_TO_WRITE 16 >+#define NVEC_MIN 16 > #else >-#define MAX_IOVEC_TO_WRITE APR_MAX_IOVEC_SIZE >+#define NVEC_MIN APR_MAX_IOVEC_SIZE > #endif >+#define NVEC_MAX APR_MAX_IOVEC_SIZE > #endif > > static apr_status_t send_brigade_nonblocking(apr_socket_t *s, > apr_bucket_brigade *bb, >- apr_size_t *bytes_written, >+ core_output_filter_ctx_t *ctx, > conn_rec *c) > { > apr_bucket *bucket, *next; >- apr_status_t rv; >- struct iovec vec[MAX_IOVEC_TO_WRITE]; >- apr_size_t nvec = 0; >+ apr_size_t nvec = 0, nbytes = 0; >+ apr_status_t rv = APR_SUCCESS; > > remove_empty_buckets(bb); > >@@ -639,24 +658,20 @@ static apr_status_t send_brigade_nonblocking(apr_s > */ > > if ((apr_file_flags_get(fd) & APR_SENDFILE_ENABLED) && >- (bucket->length >= AP_MIN_SENDFILE_BYTES)) { >+ (bucket->length >= AP_MIN_SENDFILE_BYTES)) { > if (nvec > 0) { > (void)apr_socket_opt_set(s, APR_TCP_NOPUSH, 1); >- rv = writev_nonblocking(s, vec, nvec, bb, bytes_written, c); >+ rv = writev_nonblocking(s, bb, ctx, nbytes, nvec, c); > if (rv != APR_SUCCESS) { >- (void)apr_socket_opt_set(s, APR_TCP_NOPUSH, 0); >- return rv; >+ goto cleanup; > } >+ nvec = nbytes = 0; > } >- rv = sendfile_nonblocking(s, bucket, bytes_written, c); >- if (nvec > 0) { >- (void)apr_socket_opt_set(s, APR_TCP_NOPUSH, 0); >- nvec = 0; >- } >+ rv = sendfile_nonblocking(s, bucket, ctx, c); > if (rv != APR_SUCCESS) { >- return rv; >+ goto cleanup; > } >- break; >+ continue; > } > } > #endif /* APR_HAS_SENDFILE */ >@@ -671,45 +686,80 @@ static apr_status_t send_brigade_nonblocking(apr_s > if (APR_STATUS_IS_EAGAIN(rv)) { > /* Read would block; flush any pending data and retry. */ > if (nvec) { >- rv = writev_nonblocking(s, vec, nvec, bb, bytes_written, c); >- if (rv) { >- return rv; >+ rv = writev_nonblocking(s, bb, ctx, nbytes, nvec, c); >+ if (rv != APR_SUCCESS) { >+ goto cleanup; > } >- nvec = 0; >+ nvec = nbytes = 0; > } >- >+ (void)apr_socket_opt_set(s, APR_TCP_NOPUSH, 0); >+ > rv = apr_bucket_read(bucket, &data, &length, APR_BLOCK_READ); > } > if (rv != APR_SUCCESS) { >- return rv; >+ goto cleanup; > } > > /* reading may have split the bucket, so recompute next: */ > next = APR_BUCKET_NEXT(bucket); >- vec[nvec].iov_base = (char *)data; >- vec[nvec].iov_len = length; >+ >+ if (nvec >= ctx->nvec) { >+ if (nvec > 0) { >+ (void)apr_socket_opt_set(s, APR_TCP_NOPUSH, 1); >+ rv = writev_nonblocking(s, bb, ctx, nbytes, nvec, c); >+ if (rv != APR_SUCCESS) { >+ goto cleanup; >+ } >+ } >+ if (nvec < NVEC_MAX) { >+ nvec *= 2; >+ if (nvec < NVEC_MIN) { >+ nvec = NVEC_MIN; >+ } >+ else if (nvec > NVEC_MAX) { >+ nvec = NVEC_MAX; >+ } >+ ctx->vec = apr_palloc(c->pool, nvec * sizeof(*ctx->vec)); >+ ctx->nvec = nvec; >+ } >+ nvec = nbytes = 0; >+ } >+ ctx->vec[nvec].iov_base = (char *)data; >+ ctx->vec[nvec].iov_len = length; > nvec++; >- if (nvec == MAX_IOVEC_TO_WRITE) { >- rv = writev_nonblocking(s, vec, nvec, bb, bytes_written, c); >- nvec = 0; >+ >+ /* Flush above max threshold, unless the brigade still contains >+ * non-morphing buckets (i.e. in memory) which we want to gather >+ * in the same pass (if we are at the end of the brigade, the >+ * write will happen outside the loop anyway). >+ */ >+ nbytes += length; >+ if (nbytes >= ctx->conf->flush_max_threshold >+ && next != APR_BRIGADE_SENTINEL(bb) >+ && (next->length == (apr_size_t)-1 >+ || APR_BUCKET_IS_FILE(next))) { >+ (void)apr_socket_opt_set(s, APR_TCP_NOPUSH, 1); >+ rv = writev_nonblocking(s, bb, ctx, nbytes, nvec, c); > if (rv != APR_SUCCESS) { >- return rv; >+ goto cleanup; > } >- break; >+ nvec = nbytes = 0; > } > } > } > > if (nvec > 0) { >- rv = writev_nonblocking(s, vec, nvec, bb, bytes_written, c); >+ rv = writev_nonblocking(s, bb, ctx, nbytes, nvec, c); > if (rv != APR_SUCCESS) { >- return rv; >+ goto cleanup; > } > } > > remove_empty_buckets(bb); > >- return APR_SUCCESS; >+cleanup: >+ (void)apr_socket_opt_set(s, APR_TCP_NOPUSH, 0); >+ return rv; > } > > static void remove_empty_buckets(apr_bucket_brigade *bb) >@@ -723,7 +773,7 @@ static void remove_empty_buckets(apr_bucket_brigad > > static apr_status_t send_brigade_blocking(apr_socket_t *s, > apr_bucket_brigade *bb, >- apr_size_t *bytes_written, >+ core_output_filter_ctx_t *ctx, > conn_rec *c) > { > apr_status_t rv; >@@ -730,7 +780,7 @@ static apr_status_t send_brigade_blocking(apr_sock > > rv = APR_SUCCESS; > while (!APR_BRIGADE_EMPTY(bb)) { >- rv = send_brigade_nonblocking(s, bb, bytes_written, c); >+ rv = send_brigade_nonblocking(s, bb, ctx, c); > if (rv != APR_SUCCESS) { > if (APR_STATUS_IS_EAGAIN(rv)) { > /* Wait until we can send more data */ >@@ -737,7 +787,17 @@ static apr_status_t send_brigade_blocking(apr_sock > apr_int32_t nsds; > apr_interval_time_t timeout; > apr_pollfd_t pollset; >+ apr_bucket *flush_upto; > >+ /* The above nonblocking write might have unblocked the >+ * situation, so quickly check if we still need to flush >+ * so to avoid blocking unnecessarily. >+ */ >+ rv = should_flush(bb, &flush_upto, ctx, c, 1); >+ if (!flush_upto) { >+ break; >+ } >+ > pollset.p = c->pool; > pollset.desc_type = APR_POLL_SOCKET; > pollset.reqevents = APR_POLLOUT; >@@ -746,13 +806,11 @@ static apr_status_t send_brigade_blocking(apr_sock > do { > rv = apr_poll(&pollset, 1, &nsds, timeout); > } while (APR_STATUS_IS_EINTR(rv)); >- if (rv != APR_SUCCESS) { >- break; >+ if (rv == APR_SUCCESS) { >+ continue; > } > } >- else { >- break; >- } >+ break; > } > } > return rv; >@@ -759,15 +817,17 @@ static apr_status_t send_brigade_blocking(apr_sock > } > > static apr_status_t writev_nonblocking(apr_socket_t *s, >- struct iovec *vec, apr_size_t nvec, > apr_bucket_brigade *bb, >- apr_size_t *cumulative_bytes_written, >+ core_output_filter_ctx_t *ctx, >+ apr_size_t bytes_to_write, >+ apr_size_t nvec, > conn_rec *c) > { > apr_status_t rv = APR_SUCCESS, arv; >- apr_size_t bytes_written = 0, bytes_to_write = 0; >+ apr_size_t bytes_written; > apr_size_t i, offset; > apr_interval_time_t old_timeout; >+ struct iovec *vec = ctx->vec; > > arv = apr_socket_timeout_get(s, &old_timeout); > if (arv != APR_SUCCESS) { >@@ -778,10 +838,8 @@ static apr_status_t writev_nonblocking(apr_socket_ > return arv; > } > >- for (i = 0; i < nvec; i++) { >- bytes_to_write += vec[i].iov_len; >- } > offset = 0; >+ bytes_written = 0; > while (bytes_written < bytes_to_write) { > apr_size_t n = 0; > rv = apr_socket_sendv(s, vec + offset, nvec - offset, &n); >@@ -813,8 +871,12 @@ static apr_status_t writev_nonblocking(apr_socket_ > if ((ap__logio_add_bytes_out != NULL) && (bytes_written > 0)) { > ap__logio_add_bytes_out(c, bytes_written); > } >- *cumulative_bytes_written += bytes_written; >+ ctx->bytes_written += bytes_written; > >+ ap_log_cerror(APLOG_MARK, CORE_LOG_LEVEL_IO, rv, c, >+ "writev_nonblocking: %"APR_SIZE_T_FMT"/%"APR_SIZE_T_FMT" bytes", >+ bytes_written, bytes_to_write); >+ > arv = apr_socket_timeout_set(s, old_timeout); > if ((arv != APR_SUCCESS) && (rv == APR_SUCCESS)) { > return arv; >@@ -828,7 +890,7 @@ static apr_status_t writev_nonblocking(apr_socket_ > > static apr_status_t sendfile_nonblocking(apr_socket_t *s, > apr_bucket *bucket, >- apr_size_t *cumulative_bytes_written, >+ core_output_filter_ctx_t *ctx, > conn_rec *c) > { > apr_status_t rv = APR_SUCCESS; >@@ -875,13 +937,21 @@ static apr_status_t sendfile_nonblocking(apr_socke > if ((ap__logio_add_bytes_out != NULL) && (bytes_written > 0)) { > ap__logio_add_bytes_out(c, bytes_written); > } >- *cumulative_bytes_written += bytes_written; >- if ((bytes_written < file_length) && (bytes_written > 0)) { >- apr_bucket_split(bucket, bytes_written); >+ ctx->bytes_written += bytes_written; >+ >+ ap_log_cerror(APLOG_MARK, CORE_LOG_LEVEL_IO, rv, c, >+ "sendfile_nonblocking: %"APR_SIZE_T_FMT"/%"APR_OFF_T_FMT" bytes", >+ bytes_written, file_length); >+ >+ if (bytes_written >= file_length) { > apr_bucket_delete(bucket); > } >- else if (bytes_written == file_length) { >+ else if (bytes_written > 0) { >+ apr_bucket_split(bucket, bytes_written); > apr_bucket_delete(bucket); >+ if (rv == APR_SUCCESS) { >+ rv = APR_EAGAIN; >+ } > } > return rv; > } >Index: server/mpm/event/event.c >=================================================================== >--- server/mpm/event/event.c (revision 1792652) >+++ server/mpm/event/event.c (working copy) >@@ -1110,13 +1110,16 @@ read_request: > } > > if (cs->pub.state == CONN_STATE_WRITE_COMPLETION) { >- ap_filter_t *output_filter = c->output_filters; >- apr_status_t rv; >+ apr_status_t rv = APR_SUCCESS; > ap_update_child_status(sbh, SERVER_BUSY_WRITE, NULL); >- while (output_filter->next != NULL) { >- output_filter = output_filter->next; >+ if (cs->pub.sense == CONN_SENSE_DEFAULT) { >+ ap_filter_t *output_filter = c->output_filters; >+ while (output_filter->next != NULL) { >+ output_filter = output_filter->next; >+ } >+ rv = output_filter->frec->filter_func.out_func(output_filter, >+ NULL); > } >- rv = output_filter->frec->filter_func.out_func(output_filter, NULL); > if (rv != APR_SUCCESS) { > ap_log_cerror(APLOG_MARK, APLOG_DEBUG, rv, c, APLOGNO(00470) > "network write failure in core output filter");
You cannot view the attachment while viewing its details because your browser does not support IFRAMEs.
View the attachment on a separate page
.
View Attachment As Diff
View Attachment As Raw
Actions:
View
|
Diff
Attachments on
bug 61616
:
35421
|
35422
|
35423
|
35424
|
35425
|
35426
|
35427
|
35429
|
35430
|
35431
|
35432
|
35442
|
35443
|
35446
|
35447
|
35512
|
35513
|
35620
|
35621
|
35623
|
35639
|
35737
|
35738
|
35739
|
35772
|
35774
|
35776
|
35783
|
35784
|
35786
|
35787
|
35789