Index: src/protocol/http/org/apache/jmeter/protocol/http/sampler/HTTPSamplerBase.java =================================================================== --- src/protocol/http/org/apache/jmeter/protocol/http/sampler/HTTPSamplerBase.java (revision 1621523) +++ src/protocol/http/org/apache/jmeter/protocol/http/sampler/HTTPSamplerBase.java (working copy) @@ -35,14 +35,17 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; -import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; @@ -54,8 +57,8 @@ import org.apache.jmeter.protocol.http.control.CacheManager; import org.apache.jmeter.protocol.http.control.Cookie; import org.apache.jmeter.protocol.http.control.CookieManager; -import org.apache.jmeter.protocol.http.control.HeaderManager; import org.apache.jmeter.protocol.http.control.DNSCacheManager; +import org.apache.jmeter.protocol.http.control.HeaderManager; import org.apache.jmeter.protocol.http.parser.HTMLParseException; import org.apache.jmeter.protocol.http.parser.HTMLParser; import org.apache.jmeter.protocol.http.util.ConversionUtils; @@ -187,10 +190,7 @@ public static final boolean BROWSER_COMPATIBLE_MULTIPART_MODE_DEFAULT = false; // The default setting to be used (i.e. historic) - private static final long KEEPALIVETIME = 0; // for Thread Pool for resources but no need to use a special value? - - private static final long AWAIT_TERMINATION_TIMEOUT = - JMeterUtils.getPropDefault("httpsampler.await_termination_timeout", 60); // $NON-NLS-1$ // default value: 60 secs + private static final long KEEPALIVETIME = JMeterUtils.getPropDefault("httpsampler.parallel_download_thread_keepalive", 60L); // for Thread Pool for resources but no need to use a special value? private static final boolean IGNORE_FAILED_EMBEDDED_RESOURCES = JMeterUtils.getPropDefault("httpsampler.ignore_failed_embedded_resources", false); // $NON-NLS-1$ // default value: false @@ -908,6 +908,32 @@ JMeterUtils.getPropDefault("httpsampler.separate.container", true); // $NON-NLS-1$ /** + * Shared executor used for Embedded resources parallel download + */ + private static final ThreadPoolExecutor PARALLEL_DOWNLOAD_EXECUTOR = + new ThreadPoolExecutor(0, // corePoolSize + Integer.MAX_VALUE, // maximumPoolSize + KEEPALIVETIME, // keepAliveTime + TimeUnit.SECONDS, // unit + new SynchronousQueue(), + new ThreadFactory() { + @Override + public Thread newThread(final Runnable r) { + Thread t = new CleanerThread(new Runnable() { + @Override + public void run() { + try { + r.run(); + } finally { + ((CleanerThread)Thread.currentThread()).notifyThreadEnd(); + } + } + }); + return t; + } + }); + + /** * Get the URL, built from its component parts. * *

@@ -1221,7 +1247,7 @@ } // For concurrent get resources - final List> liste = new ArrayList>(); + final List listAsyncTaskForDownload = new ArrayList(); while (urls.hasNext()) { Object binURL = urls.next(); // See catch clause below @@ -1248,7 +1274,7 @@ if (isConcurrentDwn()) { // if concurrent download emb. resources, add to a list for async gets later - liste.add(new ASyncSample(url, HTTPConstants.GET, false, frameDepth + 1, getCookieManager(), this)); + listAsyncTaskForDownload.add(new ASyncSample(url, HTTPConstants.GET, false, frameDepth + 1, getCookieManager(), this)); } else { // default: serial download embedded resources HTTPSampleResult binRes = sample(url, HTTPConstants.GET, false, frameDepth + 1); @@ -1272,66 +1298,50 @@ log.warn("Concurrent download resources selected, "// $NON-NLS-1$ + "but pool size value is bad. Use default value");// $NON-NLS-1$ } - // Thread pool Executor to get resources - // use a LinkedBlockingQueue, note: max pool size doesn't effect - final ThreadPoolExecutor exec = new ThreadPoolExecutor( - poolSize, poolSize, KEEPALIVETIME, TimeUnit.SECONDS, - new LinkedBlockingQueue(), - new ThreadFactory() { - @Override - public Thread newThread(final Runnable r) { - Thread t = new CleanerThread(new Runnable() { - @Override - public void run() { - try { - r.run(); - } finally { - ((CleanerThread)Thread.currentThread()).notifyThreadEnd(); - } - } - }); - return t; - } - }); - boolean tasksCompleted = false; try { - // sample all resources with threadpool - final List> retExec = exec.invokeAll(liste); - // call normal shutdown (wait ending all tasks) - exec.shutdown(); - // put a timeout if tasks couldn't terminate - exec.awaitTermination(AWAIT_TERMINATION_TIMEOUT, TimeUnit.SECONDS); + CountDownLatch countDownLatch = new CountDownLatch(listAsyncTaskForDownload.size()); + final List> retExec = new ArrayList>(listAsyncTaskForDownload.size()); + // this queue will ensure we don't use more threads than allowed + BlockingQueue> queueOfCurrentDownloads = new ArrayBlockingQueue>(poolSize); + while(listAsyncTaskForDownload.size()>0){ + // Remove from list to download + ASyncSample command = listAsyncTaskForDownload.remove(0); + // Set latch so that we get notification on end + command.setLatch(countDownLatch); + command.setQueueOfCurrentDownloads(queueOfCurrentDownloads); + // put in queue to control number of Parallel tasks per VU Thread + // We want to block if size of queue is reached (ie number of parallel downloads) + queueOfCurrentDownloads.put(command); + // Call task + retExec.addAll(PARALLEL_DOWNLOAD_EXECUTOR.invokeAll(Arrays.asList(new ASyncSample[]{command}))); + } + // Wait for termination + if(log.isDebugEnabled()) { + log.debug("Waiting for download of "+listAsyncTaskForDownload.size() +" resources"); + } + countDownLatch.await(); CookieManager cookieManager = getCookieManager(); // add result to main sampleResult for (Future future : retExec) { AsynSamplerResultHolder binRes; - try { - binRes = future.get(1, TimeUnit.MILLISECONDS); - if(cookieManager != null) { - CollectionProperty cookies = binRes.getCookies(); - PropertyIterator iter = cookies.iterator(); - while (iter.hasNext()) { - Cookie cookie = (Cookie) iter.next().getObjectValue(); - cookieManager.add(cookie) ; - } + binRes = future.get(); + if(cookieManager != null) { + CollectionProperty cookies = binRes.getCookies(); + PropertyIterator iter = cookies.iterator(); + while (iter.hasNext()) { + Cookie cookie = (Cookie) iter.next().getObjectValue(); + cookieManager.add(cookie) ; } - res.addSubResult(binRes.getResult()); - setParentSampleSuccess(res, res.isSuccessful() && (binRes.getResult() != null ? binRes.getResult().isSuccessful():true)); - } catch (TimeoutException e) { - errorResult(e, res); } + res.addSubResult(binRes.getResult()); + setParentSampleSuccess(res, res.isSuccessful() && (binRes.getResult() != null ? binRes.getResult().isSuccessful():true)); } - tasksCompleted = exec.awaitTermination(1, TimeUnit.MILLISECONDS); // did all the tasks finish? } catch (InterruptedException ie) { log.warn("Interruped fetching embedded resources", ie); // $NON-NLS-1$ } catch (ExecutionException ee) { log.warn("Execution issue when fetching embedded resources", ee); // $NON-NLS-1$ - } finally { - if (!tasksCompleted) { - exec.shutdownNow(); // kill any remaining tasks - } - } + } } } return res; @@ -1870,6 +1880,10 @@ final private int depth; private final HTTPSamplerBase sampler; private final JMeterContext jmeterContextOfParentThread; + // Use to notify end of ASyncSAmple + private CountDownLatch latch; + // Queue to remove ourselves from + private BlockingQueue> queueOfCurrentDownloads; ASyncSample(URL url, String method, boolean areFollowingRedirect, int depth, CookieManager cookieManager, HTTPSamplerBase base){ @@ -1877,6 +1891,7 @@ this.method = method; this.areFollowingRedirect = areFollowingRedirect; this.depth = depth; + // We clone for thread safety, but this comes at the cost of not sharing HttpClient state this.sampler = (HTTPSamplerBase) base.clone(); // We don't want to use CacheManager clone but the parent one, and CacheManager is Thread Safe CacheManager cacheManager = base.getCacheManager(); @@ -1891,31 +1906,60 @@ this.jmeterContextOfParentThread = JMeterContextService.getContext(); } + /** + * @param queue queue of current downloads + */ + public void setQueueOfCurrentDownloads( + BlockingQueue> queue) { + this.queueOfCurrentDownloads = queue; + } + + /** + * + * @param countDownLatch CountDownLatch + */ + public void setLatch(CountDownLatch countDownLatch) { + this.latch = countDownLatch; + } + @Override public AsynSamplerResultHolder call() { - JMeterContextService.replaceContext(jmeterContextOfParentThread); - ((CleanerThread) Thread.currentThread()).registerSamplerForEndNotification(sampler); - HTTPSampleResult httpSampleResult = sampler.sample(url, method, areFollowingRedirect, depth); - if(sampler.getCookieManager() != null) { - CollectionProperty cookies = sampler.getCookieManager().getCookies(); - return new AsynSamplerResultHolder(httpSampleResult, cookies); - } else { - return new AsynSamplerResultHolder(httpSampleResult, new CollectionProperty()); + try { + JMeterContextService.replaceContext(jmeterContextOfParentThread); + ((CleanerThread) Thread.currentThread()).registerSamplerForEndNotification(sampler); + HTTPSampleResult httpSampleResult = sampler.sample(url, method, areFollowingRedirect, depth); + if(sampler.getCookieManager() != null) { + CollectionProperty cookies = sampler.getCookieManager().getCookies(); + return new AsynSamplerResultHolder(httpSampleResult, cookies); + } else { + return new AsynSamplerResultHolder(httpSampleResult, new CollectionProperty()); + } + } finally { + // Remove ourselves so a new download can start + queueOfCurrentDownloads.remove(this); + // notify latch + latch.countDown(); } } } /** - * Custom thread implementation that + * Custom thread implementation that : + * - Gives a name to the thread + * - Notifies samplers of thread end * */ private static class CleanerThread extends Thread { + private static final String PARALLEL_DOWNLOADER_THREAD_PREFIX = "PARALLEL_DOWNLOAD_THREAD-"; + private static final AtomicInteger ATOMIC_INTEGER = new AtomicInteger(0); + // FIXME This list would lead to OOM as it will only be cleaned at end of Load Test, and we create 1 HTTPSamplerBase + // each time a AsyncSample is created private final List samplersToNotify = new ArrayList(); /** * @param runnable Runnable */ public CleanerThread(Runnable runnable) { - super(runnable); + super(runnable, PARALLEL_DOWNLOADER_THREAD_PREFIX+ATOMIC_INTEGER.incrementAndGet()); } /**