ASF Bugzilla – Attachment 31959 Details for
Bug 52073
Embedded Resources Parallel download : Improve performances by avoiding shutdown of ThreadPoolExecutor at each sample
Home
|
New
|
Browse
|
Search
|
[?]
|
Reports
|
Help
|
New Account
|
Log In
Remember
[x]
|
Forgot Password
Login:
[x]
[patch]
BROKEN PATCH to illustrate dev list discussion
BUG_52073.patch (text/plain), 15.10 KB, created by
Philippe Mouawad
on 2014-09-01 19:15:32 UTC
(
hide
)
Description:
BROKEN PATCH to illustrate dev list discussion
Filename:
MIME Type:
Creator:
Philippe Mouawad
Created:
2014-09-01 19:15:32 UTC
Size:
15.10 KB
patch
obsolete
>Index: src/protocol/http/org/apache/jmeter/protocol/http/sampler/HTTPSamplerBase.java >=================================================================== >--- src/protocol/http/org/apache/jmeter/protocol/http/sampler/HTTPSamplerBase.java (revision 1621523) >+++ src/protocol/http/org/apache/jmeter/protocol/http/sampler/HTTPSamplerBase.java (working copy) >@@ -35,14 +35,17 @@ > import java.util.List; > import java.util.Map; > import java.util.Set; >+import java.util.concurrent.ArrayBlockingQueue; >+import java.util.concurrent.BlockingQueue; > import java.util.concurrent.Callable; >+import java.util.concurrent.CountDownLatch; > import java.util.concurrent.ExecutionException; > import java.util.concurrent.Future; >-import java.util.concurrent.LinkedBlockingQueue; >+import java.util.concurrent.SynchronousQueue; > import java.util.concurrent.ThreadFactory; > import java.util.concurrent.ThreadPoolExecutor; > import java.util.concurrent.TimeUnit; >-import java.util.concurrent.TimeoutException; >+import java.util.concurrent.atomic.AtomicInteger; > > import org.apache.commons.io.IOUtils; > import org.apache.commons.lang3.StringUtils; >@@ -54,8 +57,8 @@ > import org.apache.jmeter.protocol.http.control.CacheManager; > import org.apache.jmeter.protocol.http.control.Cookie; > import org.apache.jmeter.protocol.http.control.CookieManager; >-import org.apache.jmeter.protocol.http.control.HeaderManager; > import org.apache.jmeter.protocol.http.control.DNSCacheManager; >+import org.apache.jmeter.protocol.http.control.HeaderManager; > import org.apache.jmeter.protocol.http.parser.HTMLParseException; > import org.apache.jmeter.protocol.http.parser.HTMLParser; > import org.apache.jmeter.protocol.http.util.ConversionUtils; >@@ -187,10 +190,7 @@ > > public static final boolean BROWSER_COMPATIBLE_MULTIPART_MODE_DEFAULT = false; // The default setting to be used (i.e. historic) > >- private static final long KEEPALIVETIME = 0; // for Thread Pool for resources but no need to use a special value? >- >- private static final long AWAIT_TERMINATION_TIMEOUT = >- JMeterUtils.getPropDefault("httpsampler.await_termination_timeout", 60); // $NON-NLS-1$ // default value: 60 secs >+ private static final long KEEPALIVETIME = JMeterUtils.getPropDefault("httpsampler.parallel_download_thread_keepalive", 60L); // for Thread Pool for resources but no need to use a special value? > > private static final boolean IGNORE_FAILED_EMBEDDED_RESOURCES = > JMeterUtils.getPropDefault("httpsampler.ignore_failed_embedded_resources", false); // $NON-NLS-1$ // default value: false >@@ -908,6 +908,32 @@ > JMeterUtils.getPropDefault("httpsampler.separate.container", true); // $NON-NLS-1$ > > /** >+ * Shared executor used for Embedded resources parallel download >+ */ >+ private static final ThreadPoolExecutor PARALLEL_DOWNLOAD_EXECUTOR = >+ new ThreadPoolExecutor(0, // corePoolSize >+ Integer.MAX_VALUE, // maximumPoolSize >+ KEEPALIVETIME, // keepAliveTime >+ TimeUnit.SECONDS, // unit >+ new SynchronousQueue<Runnable>(), >+ new ThreadFactory() { >+ @Override >+ public Thread newThread(final Runnable r) { >+ Thread t = new CleanerThread(new Runnable() { >+ @Override >+ public void run() { >+ try { >+ r.run(); >+ } finally { >+ ((CleanerThread)Thread.currentThread()).notifyThreadEnd(); >+ } >+ } >+ }); >+ return t; >+ } >+ }); >+ >+ /** > * Get the URL, built from its component parts. > * > * <p> >@@ -1221,7 +1247,7 @@ > } > > // For concurrent get resources >- final List<Callable<AsynSamplerResultHolder>> liste = new ArrayList<Callable<AsynSamplerResultHolder>>(); >+ final List<ASyncSample> listAsyncTaskForDownload = new ArrayList<ASyncSample>(); > > while (urls.hasNext()) { > Object binURL = urls.next(); // See catch clause below >@@ -1248,7 +1274,7 @@ > > if (isConcurrentDwn()) { > // if concurrent download emb. resources, add to a list for async gets later >- liste.add(new ASyncSample(url, HTTPConstants.GET, false, frameDepth + 1, getCookieManager(), this)); >+ listAsyncTaskForDownload.add(new ASyncSample(url, HTTPConstants.GET, false, frameDepth + 1, getCookieManager(), this)); > } else { > // default: serial download embedded resources > HTTPSampleResult binRes = sample(url, HTTPConstants.GET, false, frameDepth + 1); >@@ -1272,66 +1298,50 @@ > log.warn("Concurrent download resources selected, "// $NON-NLS-1$ > + "but pool size value is bad. Use default value");// $NON-NLS-1$ > } >- // Thread pool Executor to get resources >- // use a LinkedBlockingQueue, note: max pool size doesn't effect >- final ThreadPoolExecutor exec = new ThreadPoolExecutor( >- poolSize, poolSize, KEEPALIVETIME, TimeUnit.SECONDS, >- new LinkedBlockingQueue<Runnable>(), >- new ThreadFactory() { >- @Override >- public Thread newThread(final Runnable r) { >- Thread t = new CleanerThread(new Runnable() { >- @Override >- public void run() { >- try { >- r.run(); >- } finally { >- ((CleanerThread)Thread.currentThread()).notifyThreadEnd(); >- } >- } >- }); >- return t; >- } >- }); > >- boolean tasksCompleted = false; > try { >- // sample all resources with threadpool >- final List<Future<AsynSamplerResultHolder>> retExec = exec.invokeAll(liste); >- // call normal shutdown (wait ending all tasks) >- exec.shutdown(); >- // put a timeout if tasks couldn't terminate >- exec.awaitTermination(AWAIT_TERMINATION_TIMEOUT, TimeUnit.SECONDS); >+ CountDownLatch countDownLatch = new CountDownLatch(listAsyncTaskForDownload.size()); >+ final List<Future<AsynSamplerResultHolder>> retExec = new ArrayList<Future<AsynSamplerResultHolder>>(listAsyncTaskForDownload.size()); >+ // this queue will ensure we don't use more threads than allowed >+ BlockingQueue<Callable<AsynSamplerResultHolder>> queueOfCurrentDownloads = new ArrayBlockingQueue<Callable<AsynSamplerResultHolder>>(poolSize); >+ while(listAsyncTaskForDownload.size()>0){ >+ // Remove from list to download >+ ASyncSample command = listAsyncTaskForDownload.remove(0); >+ // Set latch so that we get notification on end >+ command.setLatch(countDownLatch); >+ command.setQueueOfCurrentDownloads(queueOfCurrentDownloads); >+ // put in queue to control number of Parallel tasks per VU Thread >+ // We want to block if size of queue is reached (ie number of parallel downloads) >+ queueOfCurrentDownloads.put(command); >+ // Call task >+ retExec.addAll(PARALLEL_DOWNLOAD_EXECUTOR.invokeAll(Arrays.asList(new ASyncSample[]{command}))); >+ } >+ // Wait for termination >+ if(log.isDebugEnabled()) { >+ log.debug("Waiting for download of "+listAsyncTaskForDownload.size() +" resources"); >+ } >+ countDownLatch.await(); > CookieManager cookieManager = getCookieManager(); > // add result to main sampleResult > for (Future<AsynSamplerResultHolder> future : retExec) { > AsynSamplerResultHolder binRes; >- try { >- binRes = future.get(1, TimeUnit.MILLISECONDS); >- if(cookieManager != null) { >- CollectionProperty cookies = binRes.getCookies(); >- PropertyIterator iter = cookies.iterator(); >- while (iter.hasNext()) { >- Cookie cookie = (Cookie) iter.next().getObjectValue(); >- cookieManager.add(cookie) ; >- } >+ binRes = future.get(); >+ if(cookieManager != null) { >+ CollectionProperty cookies = binRes.getCookies(); >+ PropertyIterator iter = cookies.iterator(); >+ while (iter.hasNext()) { >+ Cookie cookie = (Cookie) iter.next().getObjectValue(); >+ cookieManager.add(cookie) ; > } >- res.addSubResult(binRes.getResult()); >- setParentSampleSuccess(res, res.isSuccessful() && (binRes.getResult() != null ? binRes.getResult().isSuccessful():true)); >- } catch (TimeoutException e) { >- errorResult(e, res); > } >+ res.addSubResult(binRes.getResult()); >+ setParentSampleSuccess(res, res.isSuccessful() && (binRes.getResult() != null ? binRes.getResult().isSuccessful():true)); > } >- tasksCompleted = exec.awaitTermination(1, TimeUnit.MILLISECONDS); // did all the tasks finish? > } catch (InterruptedException ie) { > log.warn("Interruped fetching embedded resources", ie); // $NON-NLS-1$ > } catch (ExecutionException ee) { > log.warn("Execution issue when fetching embedded resources", ee); // $NON-NLS-1$ >- } finally { >- if (!tasksCompleted) { >- exec.shutdownNow(); // kill any remaining tasks >- } >- } >+ } > } > } > return res; >@@ -1870,6 +1880,10 @@ > final private int depth; > private final HTTPSamplerBase sampler; > private final JMeterContext jmeterContextOfParentThread; >+ // Use to notify end of ASyncSAmple >+ private CountDownLatch latch; >+ // Queue to remove ourselves from >+ private BlockingQueue<Callable<AsynSamplerResultHolder>> queueOfCurrentDownloads; > > ASyncSample(URL url, String method, > boolean areFollowingRedirect, int depth, CookieManager cookieManager, HTTPSamplerBase base){ >@@ -1877,6 +1891,7 @@ > this.method = method; > this.areFollowingRedirect = areFollowingRedirect; > this.depth = depth; >+ // We clone for thread safety, but this comes at the cost of not sharing HttpClient state > this.sampler = (HTTPSamplerBase) base.clone(); > // We don't want to use CacheManager clone but the parent one, and CacheManager is Thread Safe > CacheManager cacheManager = base.getCacheManager(); >@@ -1891,31 +1906,60 @@ > this.jmeterContextOfParentThread = JMeterContextService.getContext(); > } > >+ /** >+ * @param queue queue of current downloads >+ */ >+ public void setQueueOfCurrentDownloads( >+ BlockingQueue<Callable<AsynSamplerResultHolder>> queue) { >+ this.queueOfCurrentDownloads = queue; >+ } >+ >+ /** >+ * >+ * @param countDownLatch CountDownLatch >+ */ >+ public void setLatch(CountDownLatch countDownLatch) { >+ this.latch = countDownLatch; >+ } >+ > @Override > public AsynSamplerResultHolder call() { >- JMeterContextService.replaceContext(jmeterContextOfParentThread); >- ((CleanerThread) Thread.currentThread()).registerSamplerForEndNotification(sampler); >- HTTPSampleResult httpSampleResult = sampler.sample(url, method, areFollowingRedirect, depth); >- if(sampler.getCookieManager() != null) { >- CollectionProperty cookies = sampler.getCookieManager().getCookies(); >- return new AsynSamplerResultHolder(httpSampleResult, cookies); >- } else { >- return new AsynSamplerResultHolder(httpSampleResult, new CollectionProperty()); >+ try { >+ JMeterContextService.replaceContext(jmeterContextOfParentThread); >+ ((CleanerThread) Thread.currentThread()).registerSamplerForEndNotification(sampler); >+ HTTPSampleResult httpSampleResult = sampler.sample(url, method, areFollowingRedirect, depth); >+ if(sampler.getCookieManager() != null) { >+ CollectionProperty cookies = sampler.getCookieManager().getCookies(); >+ return new AsynSamplerResultHolder(httpSampleResult, cookies); >+ } else { >+ return new AsynSamplerResultHolder(httpSampleResult, new CollectionProperty()); >+ } >+ } finally { >+ // Remove ourselves so a new download can start >+ queueOfCurrentDownloads.remove(this); >+ // notify latch >+ latch.countDown(); > } > } > } > > /** >- * Custom thread implementation that >+ * Custom thread implementation that : >+ * - Gives a name to the thread >+ * - Notifies samplers of thread end > * > */ > private static class CleanerThread extends Thread { >+ private static final String PARALLEL_DOWNLOADER_THREAD_PREFIX = "PARALLEL_DOWNLOAD_THREAD-"; >+ private static final AtomicInteger ATOMIC_INTEGER = new AtomicInteger(0); >+ // FIXME This list would lead to OOM as it will only be cleaned at end of Load Test, and we create 1 HTTPSamplerBase >+ // each time a AsyncSample is created > private final List<HTTPSamplerBase> samplersToNotify = new ArrayList<HTTPSamplerBase>(); > /** > * @param runnable Runnable > */ > public CleanerThread(Runnable runnable) { >- super(runnable); >+ super(runnable, PARALLEL_DOWNLOADER_THREAD_PREFIX+ATOMIC_INTEGER.incrementAndGet()); > } > > /**
You cannot view the attachment while viewing its details because your browser does not support IFRAMEs.
View the attachment on a separate page
.
View Attachment As Diff
View Attachment As Raw
Actions:
View
|
Diff
Attachments on
bug 52073
: 31959 |
33580
|
33651