Lines 36-43
Link Here
|
36 |
int speed; |
36 |
int speed; |
37 |
int chunk_size; |
37 |
int chunk_size; |
38 |
int burst; |
38 |
int burst; |
|
|
39 |
int do_sleep; |
39 |
rl_state_e state; |
40 |
rl_state_e state; |
40 |
apr_bucket_brigade *tmpbb; |
41 |
apr_bucket_brigade *tmpbb; |
|
|
42 |
apr_bucket_brigade *buffer; |
41 |
apr_bucket_brigade *holdingbb; |
43 |
apr_bucket_brigade *holdingbb; |
42 |
} rl_ctx_t; |
44 |
} rl_ctx_t; |
43 |
|
45 |
|
Lines 62-68
Link Here
|
62 |
apr_status_t rv = APR_SUCCESS; |
64 |
apr_status_t rv = APR_SUCCESS; |
63 |
rl_ctx_t *ctx = f->ctx; |
65 |
rl_ctx_t *ctx = f->ctx; |
64 |
apr_bucket *fb; |
66 |
apr_bucket *fb; |
65 |
int do_sleep = 0; |
|
|
66 |
apr_bucket_alloc_t *ba = f->r->connection->bucket_alloc; |
67 |
apr_bucket_alloc_t *ba = f->r->connection->bucket_alloc; |
67 |
apr_bucket_brigade *bb = input_bb; |
68 |
apr_bucket_brigade *bb = input_bb; |
68 |
|
69 |
|
Lines 120-130
Link Here
|
120 |
ctx->state = RATE_LIMIT; |
121 |
ctx->state = RATE_LIMIT; |
121 |
ctx->speed = ratelimit; |
122 |
ctx->speed = ratelimit; |
122 |
ctx->burst = burst; |
123 |
ctx->burst = burst; |
|
|
124 |
ctx->do_sleep = 0; |
123 |
|
125 |
|
124 |
/* calculate how many bytes / interval we want to send */ |
126 |
/* calculate how many bytes / interval we want to send */ |
125 |
/* speed is bytes / second, so, how many (speed / 1000 % interval) */ |
127 |
/* speed is bytes / second, so, how many (speed / 1000 % interval) */ |
126 |
ctx->chunk_size = (ctx->speed / (1000 / RATE_INTERVAL_MS)); |
128 |
ctx->chunk_size = (ctx->speed / (1000 / RATE_INTERVAL_MS)); |
|
|
129 |
|
130 |
/* |
131 |
* Usage of the brigades: |
132 |
* tmpbb -> temporary scratch pad, used within the same filter's |
133 |
* invocation. |
134 |
* buffer -> buckets buffered between filter's executions (if needed) |
135 |
* holdingbb -> used to store buckets not meant to be rate limited |
136 |
*/ |
127 |
ctx->tmpbb = apr_brigade_create(f->r->pool, ba); |
137 |
ctx->tmpbb = apr_brigade_create(f->r->pool, ba); |
|
|
138 |
ctx->buffer = apr_brigade_create(f->r->pool, ba); |
128 |
ctx->holdingbb = apr_brigade_create(f->r->pool, ba); |
139 |
ctx->holdingbb = apr_brigade_create(f->r->pool, ba); |
129 |
} |
140 |
} |
130 |
|
141 |
|
Lines 185-190
Link Here
|
185 |
while (!APR_BRIGADE_EMPTY(bb)) { |
196 |
while (!APR_BRIGADE_EMPTY(bb)) { |
186 |
apr_bucket *stop_point; |
197 |
apr_bucket *stop_point; |
187 |
apr_off_t len = 0; |
198 |
apr_off_t len = 0; |
|
|
199 |
apr_off_t len_tmp = 0; |
188 |
|
200 |
|
189 |
if (f->c->aborted) { |
201 |
if (f->c->aborted) { |
190 |
apr_brigade_cleanup(bb); |
202 |
apr_brigade_cleanup(bb); |
Lines 192-237
Link Here
|
192 |
break; |
204 |
break; |
193 |
} |
205 |
} |
194 |
|
206 |
|
195 |
if (do_sleep) { |
207 |
if (ctx->do_sleep) { |
196 |
apr_sleep(RATE_INTERVAL_MS * 1000); |
208 |
apr_sleep(RATE_INTERVAL_MS * 1000); |
197 |
} |
209 |
} |
198 |
else { |
210 |
else { |
199 |
do_sleep = 1; |
211 |
ctx->do_sleep = 1; |
200 |
} |
212 |
} |
201 |
|
213 |
|
|
|
214 |
/* |
215 |
* Restore any bucket saved in a previous filter |
216 |
* invocation (if any) to tmpbb. |
217 |
*/ |
218 |
APR_BRIGADE_CONCAT(ctx->tmpbb, ctx->buffer); |
219 |
|
202 |
apr_brigade_length(bb, 1, &len); |
220 |
apr_brigade_length(bb, 1, &len); |
|
|
221 |
apr_brigade_length(ctx->tmpbb, 1, &len_tmp); |
203 |
|
222 |
|
204 |
/* |
223 |
/* |
205 |
* Pull next chunk of data; the initial amount is our |
224 |
* If the buckets to be rate limited spans multiple brigades |
206 |
* burst allotment (if any) plus a chunk. All subsequent |
225 |
* some buffering is needed. The idea is to pass buckets down |
207 |
* iterations are just chunks with whatever remaining |
226 |
* the chain (and flush) only when the length of the ctx->tmpbb |
208 |
* burst amounts we have left (in case not done in the |
227 |
* reaches ctx->chunk_size. If EOS is found as last bucket in |
209 |
* first bucket). |
228 |
* a brigade then ctx->tmpbb will be passed even if not reaching |
|
|
229 |
* chunk_size. The buckets are also setaside properly via |
230 |
* ap_save_brigade to avoid any inconsistency while buffering |
231 |
* buckets between filter invocations. |
210 |
*/ |
232 |
*/ |
211 |
rv = apr_brigade_partition(bb, |
233 |
if (len_tmp + len >= ctx->chunk_size + ctx->burst) { |
212 |
ctx->chunk_size + ctx->burst, &stop_point); |
234 |
rv = apr_brigade_partition(bb, |
213 |
if (rv != APR_SUCCESS && rv != APR_INCOMPLETE) { |
235 |
ctx->chunk_size + ctx->burst - len_tmp, &stop_point); |
214 |
ctx->state = RATE_ERROR; |
236 |
if (rv != APR_SUCCESS && rv != APR_INCOMPLETE) { |
215 |
ap_log_rerror(APLOG_MARK, APLOG_ERR, rv, f->r, APLOGNO(01456) |
237 |
ctx->state = RATE_ERROR; |
216 |
"rl: partition failed."); |
238 |
ap_log_rerror(APLOG_MARK, APLOG_ERR, rv, f->r, APLOGNO(01456) |
217 |
break; |
239 |
"rl: partition failed."); |
218 |
} |
240 |
break; |
|
|
241 |
} |
219 |
|
242 |
|
220 |
if (stop_point != APR_BRIGADE_SENTINEL(bb)) { |
243 |
if (stop_point != APR_BRIGADE_SENTINEL(bb)) { |
221 |
apr_bucket *f; |
244 |
apr_bucket_brigade* bb_tail; |
222 |
apr_bucket *e = APR_BUCKET_PREV(stop_point); |
245 |
bb_tail = apr_brigade_split(bb, stop_point); |
223 |
f = APR_RING_FIRST(&bb->list); |
246 |
APR_BRIGADE_CONCAT(ctx->tmpbb, bb); |
224 |
APR_RING_UNSPLICE(f, e, link); |
247 |
bb = bb_tail; |
225 |
APR_RING_SPLICE_HEAD(&ctx->tmpbb->list, f, e, apr_bucket, |
248 |
|
226 |
link); |
249 |
} |
227 |
} |
250 |
else { |
228 |
else { |
251 |
APR_BRIGADE_CONCAT(ctx->tmpbb, bb); |
229 |
APR_BRIGADE_CONCAT(ctx->tmpbb, bb); |
252 |
} |
230 |
} |
|
|
231 |
|
253 |
|
232 |
fb = apr_bucket_flush_create(ba); |
254 |
fb = apr_bucket_flush_create(ba); |
|
|
255 |
APR_BRIGADE_INSERT_TAIL(ctx->tmpbb, fb); |
233 |
|
256 |
|
234 |
APR_BRIGADE_INSERT_TAIL(ctx->tmpbb, fb); |
257 |
} else { |
|
|
258 |
APR_BRIGADE_CONCAT(ctx->tmpbb, bb); |
259 |
} |
235 |
|
260 |
|
236 |
/* |
261 |
/* |
237 |
* Adjust the burst amount depending on how much |
262 |
* Adjust the burst amount depending on how much |
Lines 255-269
Link Here
|
255 |
brigade_dump(f->r, bb); |
280 |
brigade_dump(f->r, bb); |
256 |
#endif /* RLFDEBUG */ |
281 |
#endif /* RLFDEBUG */ |
257 |
|
282 |
|
258 |
rv = ap_pass_brigade(f->next, ctx->tmpbb); |
283 |
apr_brigade_length(ctx->tmpbb, 1, &len_tmp); |
259 |
apr_brigade_cleanup(ctx->tmpbb); |
|
|
260 |
|
284 |
|
261 |
if (rv != APR_SUCCESS) { |
285 |
if ((len_tmp == ctx->chunk_size + ctx->burst) |
262 |
/* Most often, user disconnects from stream */ |
286 |
|| (len_tmp > 0 && APR_BUCKET_IS_EOS(APR_BRIGADE_LAST(ctx->tmpbb)))) { |
263 |
ctx->state = RATE_ERROR; |
287 |
rv = ap_pass_brigade(f->next, ctx->tmpbb); |
264 |
ap_log_rerror(APLOG_MARK, APLOG_TRACE1, rv, f->r, APLOGNO(01457) |
288 |
apr_brigade_cleanup(ctx->tmpbb); |
265 |
"rl: brigade pass failed."); |
289 |
|
266 |
break; |
290 |
if (rv != APR_SUCCESS) { |
|
|
291 |
/* Most often, user disconnects from stream */ |
292 |
ctx->state = RATE_ERROR; |
293 |
ap_log_rerror(APLOG_MARK, APLOG_TRACE1, rv, f->r, APLOGNO(01457) |
294 |
"rl: brigade pass failed."); |
295 |
break; |
296 |
} |
297 |
ctx->do_sleep = 1; |
298 |
} else { |
299 |
/* |
300 |
* Change the buckets' lifetime, this will allow them |
301 |
* to be used in the next execution of the filter. |
302 |
*/ |
303 |
ap_save_brigade(f, &(ctx->buffer), &(ctx->tmpbb), f->r->pool); |
304 |
ctx->do_sleep = 0; |
267 |
} |
305 |
} |
268 |
} |
306 |
} |
269 |
} |
307 |
} |