--- modules/proxy/mod_proxy.h (revision 1814718) +++ modules/proxy/mod_proxy.h (working copy) @@ -1172,6 +1172,14 @@ PROXY_DECLARE(apr_status_t) ap_proxy_buckets_lifet apr_bucket_brigade *from, apr_bucket_brigade *to); +/* The flags for ap_proxy_transfer_between_connections(). */ +#define AP_PROXY_TRANSFER_FLUSH_EACH (0) +#define AP_PROXY_TRANSFER_FLUSH_AFTER (1 << 0) +#define AP_PROXY_TRANSFER_FLUSH_NEVER (1 << 1) +#define AP_PROXY_TRANSFER_FLUSH_MASK (AP_PROXY_TRANSFER_FLUSH_AFTER | \ + AP_PROXY_TRANSFER_FLUSH_NEVER) +#define AP_PROXY_TRANSFER_UNBUFFERED (1 << 2) + /* * Sends all data that can be read non blocking from the input filter chain of * c_i and send it down the output filter chain of c_o. For reading it uses @@ -1189,7 +1197,7 @@ PROXY_DECLARE(apr_status_t) ap_proxy_buckets_lifet * @param name string for logging from where data was pulled * @param sent if not NULL will be set to 1 if data was sent through c_o * @param bsize maximum amount of data pulled in one iteration from c_i - * @param after if set flush data on c_o only once after the loop + * @param flags AP_PROXY_TRANSFER_* bitmask * @return apr_status_t of the operation. Could be any error returned from * either the input filter chain of c_i or the output filter chain * of c_o. APR_EPIPE if the outgoing connection was aborted. @@ -1203,7 +1211,7 @@ PROXY_DECLARE(apr_status_t) ap_proxy_transfer_betw const char *name, int *sent, apr_off_t bsize, - int after); + int flags); extern module PROXY_DECLARE_DATA proxy_module; --- modules/proxy/mod_proxy_connect.c (revision 1814718) +++ modules/proxy/mod_proxy_connect.c (working copy) @@ -143,6 +143,48 @@ static int proxy_connect_canon(request_rec *r, cha return OK; } +struct proxy_connect_conn +{ + conn_rec *c; + apr_pollfd_t pfd; + apr_bucket_brigade *bb; + ap_filter_t *core_filter; + ap_out_filter_func core_output; + apr_interval_time_t timeout; + const char *name; + int shutdown; +}; + +#define PROXY_SHUTDOWN_READ 0x1 +#define PROXY_SHUTDOWN_WRITE 0x2 +#define PROXY_SHUTDOWN_BOTH 0x3 + +static void pollset_add_conn(apr_pollset_t *pollset, + struct proxy_connect_conn *conn, + apr_int16_t events) +{ + if ((conn->pfd.reqevents & events) != events) { + apr_pollset_remove(pollset, &conn->pfd); + + conn->pfd.reqevents |= events; + apr_pollset_add(pollset, &conn->pfd); + } +} + +static void pollset_del_conn(apr_pollset_t *pollset, + struct proxy_connect_conn *conn, + apr_int16_t events) +{ + if (conn->pfd.reqevents & events) { + apr_pollset_remove(pollset, &conn->pfd); + + conn->pfd.reqevents &= ~events; + if (conn->pfd.reqevents) { + apr_pollset_add(pollset, &conn->pfd); + } + } +} + /* CONNECT handler */ static int proxy_connect_handler(request_rec *r, proxy_worker *worker, proxy_server_conf *conf, @@ -153,24 +195,25 @@ static int proxy_connect_handler(request_rec *r, p ap_get_module_config(r->server->module_config, &proxy_connect_module); apr_pool_t *p = r->pool; - apr_socket_t *sock; conn_rec *c = r->connection; - conn_rec *backconn; int done = 0; - - apr_bucket_brigade *bb_front; - apr_bucket_brigade *bb_back; apr_status_t rv; apr_size_t nbytes; char buffer[HUGE_STRING_LEN]; - apr_socket_t *client_socket = ap_get_conn_socket(c); int failed, rc; apr_pollset_t *pollset; - apr_pollfd_t pollfd; const apr_pollfd_t *signalled; apr_int32_t pollcnt, pi; - apr_int16_t pollevent; apr_sockaddr_t *nexthop; + apr_interval_time_t timeout; + struct proxy_connect_conn conns[2], + *client = &conns[0], + *backend = &conns[1]; +#if AP_MODULE_MAGIC_AT_LEAST(20120211, 69) + apr_size_t read_buf_size = ap_get_read_buf_size(r); +#else + apr_size_t read_buf_size = CONN_BLKSZ; +#endif apr_uri_t uri; const char *connectname; @@ -238,6 +281,23 @@ static int proxy_connect_handler(request_rec *r, p * We have determined who to connect to. Now make the connection. */ + client->c = c; + client->pfd.p = p; + client->pfd.client_data = NULL; + client->pfd.reqevents = APR_POLLIN; + client->pfd.desc_type = APR_POLL_SOCKET; + client->pfd.desc.s = ap_get_conn_socket(c); + client->name = "client"; + client->shutdown = 0; + + backend->c = NULL; + backend->pfd.p = p; + backend->pfd.client_data = NULL; + backend->pfd.reqevents = APR_POLLIN; + backend->pfd.desc_type = APR_POLL_SOCKET; + backend->name = "backend"; + backend->shutdown = 0; + /* * At this point we have a list of one or more IP addresses of * the machine to connect to. If configured, reorder this @@ -248,8 +308,8 @@ static int proxy_connect_handler(request_rec *r, p * For now we do nothing, ie we get DNS round robin. * XXX FIXME */ - failed = ap_proxy_connect_to_backend(&sock, "CONNECT", nexthop, - connectname, conf, r); + failed = ap_proxy_connect_to_backend(&backend->pfd.desc.s, "CONNECT", + nexthop, connectname, conf, r); /* handle a permanent error from the above loop */ if (failed) { @@ -261,28 +321,6 @@ static int proxy_connect_handler(request_rec *r, p } } - /* setup polling for connection */ - ap_log_rerror(APLOG_MARK, APLOG_TRACE2, 0, r, "setting up poll()"); - - if ((rv = apr_pollset_create(&pollset, 2, r->pool, 0)) != APR_SUCCESS) { - apr_socket_close(sock); - ap_log_rerror(APLOG_MARK, APLOG_ERR, rv, r, APLOGNO(01020) - "error apr_pollset_create()"); - return HTTP_INTERNAL_SERVER_ERROR; - } - - /* Add client side to the poll */ - pollfd.p = r->pool; - pollfd.desc_type = APR_POLL_SOCKET; - pollfd.reqevents = APR_POLLIN | APR_POLLHUP; - pollfd.desc.s = client_socket; - pollfd.client_data = NULL; - apr_pollset_add(pollset, &pollfd); - - /* Add the server side to the poll */ - pollfd.desc.s = sock; - apr_pollset_add(pollset, &pollfd); - /* * Step Three: Send the Request * @@ -289,20 +327,22 @@ static int proxy_connect_handler(request_rec *r, p * Send the HTTP/1.1 CONNECT request to the remote server */ - backconn = ap_run_create_connection(c->pool, r->server, sock, - c->id, c->sbh, c->bucket_alloc); - if (!backconn) { + backend->c = ap_run_create_connection(c->pool, r->server, + backend->pfd.desc.s, 0, NULL, + c->bucket_alloc); + if (!backend->c) { /* peer reset */ ap_log_rerror(APLOG_MARK, APLOG_INFO, 0, r, APLOGNO(01021) "an error occurred creating a new connection " "to %pI (%s)", nexthop, connectname); - apr_socket_close(sock); + apr_socket_close(backend->pfd.desc.s); return HTTP_INTERNAL_SERVER_ERROR; } - ap_proxy_ssl_disable(backconn); - rc = ap_run_pre_connection(backconn, sock); + ap_proxy_ssl_disable(backend->c); + + rc = ap_run_pre_connection(backend->c, backend->pfd.desc.s); if (rc != OK && rc != DONE) { - backconn->aborted = 1; + backend->c->aborted = 1; ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r, APLOGNO(01022) "pre_connection setup failed (%d)", rc); return HTTP_INTERNAL_SERVER_ERROR; @@ -312,12 +352,11 @@ static int proxy_connect_handler(request_rec *r, p "connection complete to %pI (%s)", nexthop, connectname); apr_table_setn(r->notes, "proxy-source-port", apr_psprintf(r->pool, "%hu", - backconn->local_addr->port)); + backend->c->local_addr->port)); + client->bb = apr_brigade_create(p, client->c->bucket_alloc); + backend->bb = apr_brigade_create(p, backend->c->bucket_alloc); - bb_front = apr_brigade_create(p, c->bucket_alloc); - bb_back = apr_brigade_create(p, backconn->bucket_alloc); - /* If we are connecting through a remote proxy, we need to pass * the CONNECT request on to it. */ @@ -326,11 +365,11 @@ static int proxy_connect_handler(request_rec *r, p */ ap_log_rerror(APLOG_MARK, APLOG_TRACE2, 0, r, "sending the CONNECT request to the remote proxy"); - ap_fprintf(backconn->output_filters, bb_back, + ap_fprintf(backend->c->output_filters, backend->bb, "CONNECT %s HTTP/1.0" CRLF, r->uri); - ap_fprintf(backconn->output_filters, bb_back, + ap_fprintf(backend->c->output_filters, backend->bb, "Proxy-agent: %s" CRLF CRLF, ap_get_server_banner()); - ap_fflush(backconn->output_filters, bb_back); + ap_fflush(backend->c->output_filters, backend->bb); } else { ap_log_rerror(APLOG_MARK, APLOG_TRACE1, 0, r, "Returning 200 OK"); @@ -337,13 +376,13 @@ static int proxy_connect_handler(request_rec *r, p nbytes = apr_snprintf(buffer, sizeof(buffer), "HTTP/1.0 200 Connection Established" CRLF); ap_xlate_proto_to_ascii(buffer, nbytes); - ap_fwrite(c->output_filters, bb_front, buffer, nbytes); + ap_fwrite(client->c->output_filters, client->bb, buffer, nbytes); nbytes = apr_snprintf(buffer, sizeof(buffer), "Proxy-agent: %s" CRLF CRLF, ap_get_server_banner()); ap_xlate_proto_to_ascii(buffer, nbytes); - ap_fwrite(c->output_filters, bb_front, buffer, nbytes); - ap_fflush(c->output_filters, bb_front); + ap_fwrite(client->c->output_filters, client->bb, buffer, nbytes); + ap_fflush(client->c->output_filters, client->bb); #if 0 /* This is safer code, but it doesn't work yet. I'm leaving it * here so that I can fix it later. @@ -357,6 +396,16 @@ static int proxy_connect_handler(request_rec *r, p ap_log_rerror(APLOG_MARK, APLOG_TRACE2, 0, r, "setting up poll()"); + rv = apr_pollset_create(&pollset, 2, r->pool, APR_POLLSET_NOCOPY); + if (rv != APR_SUCCESS) { + apr_socket_close(backend->pfd.desc.s); + ap_log_rerror(APLOG_MARK, APLOG_ERR, rv, r, APLOGNO(01020) + "error apr_pollset_create()"); + return HTTP_INTERNAL_SERVER_ERROR; + } + apr_pollset_add(pollset, &client->pfd); + apr_pollset_add(pollset, &backend->pfd); + /* * Step Four: Handle Data Transfer * @@ -364,23 +413,41 @@ static int proxy_connect_handler(request_rec *r, p */ /* we are now acting as a tunnel - the input/output filter stacks should - * not contain any non-connection filters. + * not contain any non-connection or coalescing filters. */ + ap_remove_output_filter_byhandle(c->output_filters, + "SSL/TLS Coalescing Filter"); r->output_filters = c->output_filters; r->proto_output_filters = c->output_filters; r->input_filters = c->input_filters; r->proto_input_filters = c->input_filters; -/* r->sent_bodyct = 1;*/ + /* r->sent_bodyct = 1;*/ + client->core_filter = client->c->output_filters; + while (client->core_filter->next) { + client->core_filter = client->core_filter->next; + } + client->core_output = client->core_filter->frec->filter_func.out_func; + + backend->core_filter = backend->c->output_filters; + while (backend->core_filter->next) { + backend->core_filter = backend->core_filter->next; + } + backend->core_output = backend->core_filter->frec->filter_func.out_func; + + apr_socket_timeout_get(client->pfd.desc.s, &client->timeout); + apr_socket_timeout_get(backend->pfd.desc.s, &backend->timeout); + timeout = client->timeout < backend->timeout ? client->timeout + : backend->timeout; + do { /* Loop until done (one side closes the connection, or an error) */ - rv = apr_pollset_poll(pollset, -1, &pollcnt, &signalled); + rv = apr_pollset_poll(pollset, timeout, &pollcnt, &signalled); if (rv != APR_SUCCESS) { if (APR_STATUS_IS_EINTR(rv)) { continue; } - apr_socket_close(sock); - ap_log_rerror(APLOG_MARK, APLOG_ERR, rv, r, APLOGNO(01023) "error apr_poll()"); - return HTTP_INTERNAL_SERVER_ERROR; + ap_log_rerror(APLOG_MARK, APLOG_ERR, rv, r, APLOGNO(01023) "polling"); + break; } #ifdef DEBUGGING ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r, APLOGNO(01024) @@ -390,58 +457,104 @@ static int proxy_connect_handler(request_rec *r, p for (pi = 0; pi < pollcnt; pi++) { const apr_pollfd_t *cur = &signalled[pi]; - if (cur->desc.s == sock) { - pollevent = cur->rtnevents; - if (pollevent & (APR_POLLIN | APR_POLLHUP)) { + if (cur->desc.s != client->pfd.desc.s + && cur->desc.s != backend->pfd.desc.s) { + ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r, APLOGNO(01028) + "unknown socket in pollset"); + done = 1; + break; + } + + if (!(cur->rtnevents & (APR_POLLIN | APR_POLLHUP | APR_POLLOUT))) { + ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r, APLOGNO(01026) + "polling failed (0x%x)", cur->rtnevents); + if (cur->desc.s == client->pfd.desc.s) { + client->c->aborted = 1; + } + else { + backend->c->aborted = 1; + } + done = 1; + break; + } + + if (cur->rtnevents & (APR_POLLIN | APR_POLLHUP)) { + int in = (cur->desc.s != client->pfd.desc.s), out = !in; #ifdef DEBUGGING - ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r, APLOGNO(01025) - "sock was readable"); + ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r, APLOGNO(01025) + "%s was readable", conns[in].name); #endif - done |= ap_proxy_transfer_between_connections(r, backconn, - c, bb_back, - bb_front, - "sock", NULL, - CONN_BLKSZ, 1) - != APR_SUCCESS; + rv = ap_proxy_transfer_between_connections(r, + conns[in].c, conns[out].c, + conns[in].bb, conns[out].bb, + conns[in].name, + NULL, read_buf_size, + AP_PROXY_TRANSFER_FLUSH_NEVER | + AP_PROXY_TRANSFER_UNBUFFERED); + if (rv != APR_SUCCESS) { + if (!APR_STATUS_IS_EOF(rv)) { + /* Real failure, bail out */ + done = 1; + } + else { + /* Stop reading, wait for POLLOUT on the other side to + * shut it down. + */ +#ifdef DEBUGGING + ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r, + "%s read shutdown", conns[in].name); +#endif + conns[in].shutdown |= PROXY_SHUTDOWN_READ; + pollset_del_conn(pollset, &conns[in], APR_POLLIN); + pollset_add_conn(pollset, &conns[out], APR_POLLOUT); + } } - else if (pollevent & APR_POLLERR) { - ap_log_rerror(APLOG_MARK, APLOG_NOTICE, 0, r, APLOGNO(01026) - "err on backconn"); - backconn->aborted = 1; - done = 1; + else if (conns[out].c->data_in_output_filters) { + /* While waiting for POLLOUT on the other side, pause + * POLLIN on the this side to avoid filling the output + * filters even more and hence blocking there. + */ +#ifdef DEBUGGING + ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r, + "%s wait writable", conns[out].name); +#endif + pollset_del_conn(pollset, &conns[in], APR_POLLIN); + pollset_add_conn(pollset, &conns[out], APR_POLLOUT); } } - else if (cur->desc.s == client_socket) { - pollevent = cur->rtnevents; - if (pollevent & (APR_POLLIN | APR_POLLHUP)) { + + if (cur->rtnevents & APR_POLLOUT) { + int out = (cur->desc.s != client->pfd.desc.s), in = !out; #ifdef DEBUGGING - ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r, APLOGNO(01027) - "client was readable"); + ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r, + "%s was writable", conns[out].name); #endif - done |= ap_proxy_transfer_between_connections(r, c, - backconn, - bb_front, - bb_back, - "client", - NULL, - CONN_BLKSZ, 1) - != APR_SUCCESS; - } - else if (pollevent & APR_POLLERR) { - ap_log_rerror(APLOG_MARK, APLOG_NOTICE, 0, r, APLOGNO(02827) - "err on client"); - c->aborted = 1; + rv = conns[out].core_output(conns[out].core_filter, NULL); + if (rv != APR_SUCCESS && !APR_STATUS_IS_EAGAIN(rv)) { + /* Real failure, bail out */ done = 1; } + else if (!conns[out].c->data_in_output_filters) { + /* Pending data now flushed, stop POLLOUT and restore + * POLLIN on the other side (unless it's shutdown for read + * already in which case we simply propagate the shutdown). + */ + if (conns[in].shutdown & PROXY_SHUTDOWN_READ) { +#ifdef DEBUGGING + ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r, + "%s write shutdown", conns[out].name); +#endif + apr_socket_shutdown(conns[out].pfd.desc.s, 1); + conns[out].shutdown |= PROXY_SHUTDOWN_WRITE; + } + else { + pollset_add_conn(pollset, &conns[in], APR_POLLIN); + } + pollset_del_conn(pollset, &conns[out], APR_POLLOUT); + } } - else { - ap_log_rerror(APLOG_MARK, APLOG_INFO, 0, r, APLOGNO(01028) - "unknown socket in pollset"); - done = 1; - } - } - } while (!done); + } while (!done && (client->pfd.reqevents || backend->pfd.reqevents)); ap_log_rerror(APLOG_MARK, APLOG_TRACE2, 0, r, "finished with poll() - cleaning up"); @@ -451,14 +564,11 @@ static int proxy_connect_handler(request_rec *r, p * * Close the socket and clean up */ + apr_pollset_remove(pollset, &client->pfd); + apr_pollset_remove(pollset, &backend->pfd); + apr_socket_close(backend->pfd.desc.s); + c->aborted = 1; - if (backconn->aborted) - apr_socket_close(sock); - else - ap_lingering_close(backconn); - - c->keepalive = AP_CONN_CLOSE; - return OK; } --- modules/proxy/proxy_util.c (revision 1814718) +++ modules/proxy/proxy_util.c (working copy) @@ -3774,7 +3774,7 @@ PROXY_DECLARE(apr_status_t) ap_proxy_transfer_betw const char *name, int *sent, apr_off_t bsize, - int after) + int flags) { apr_status_t rv; #ifdef DEBUGGING @@ -3804,7 +3804,8 @@ PROXY_DECLARE(apr_status_t) ap_proxy_transfer_betw *sent = 1; } ap_proxy_buckets_lifetime_transform(r, bb_i, bb_o); - if (!after) { + /* No AP_PROXY_TRANSFER_FLUSH_ flag means flush each */ + if (!(flags & AP_PROXY_TRANSFER_FLUSH_MASK)) { apr_bucket *b; /* @@ -3833,9 +3834,10 @@ PROXY_DECLARE(apr_status_t) ap_proxy_transfer_betw "error on %s - ap_get_brigade", name); } - } while (rv == APR_SUCCESS); + } while (rv == APR_SUCCESS && (!(flags & AP_PROXY_TRANSFER_UNBUFFERED) + || !c_o->data_in_output_filters)); - if (after) { + if (flags & AP_PROXY_TRANSFER_FLUSH_AFTER) { ap_fflush(c_o->output_filters, bb_o); }