ASF Bugzilla – Attachment 33580 Details for
Bug 52073
Embedded Resources Parallel download : Improve performances by avoiding shutdown of ThreadPoolExecutor at each sample
Home
|
New
|
Browse
|
Search
|
[?]
|
Reports
|
Help
|
New Account
|
Log In
Remember
[x]
|
Forgot Password
Login:
[x]
patch
pr132.diff.txt (text/plain), 24.67 KB, created by
benoit.wiart
on 2016-02-21 19:57:52 UTC
(
hide
)
Description:
patch
Filename:
MIME Type:
Creator:
benoit.wiart
Created:
2016-02-21 19:57:52 UTC
Size:
24.67 KB
patch
obsolete
>diff --git a/bin/jmeter.properties b/bin/jmeter.properties >index c43d003..53db852 100644 >--- a/bin/jmeter.properties >+++ b/bin/jmeter.properties >@@ -987,8 +987,7 @@ beanshell.server.file=../extras/startup.bsh > #httpsampler.max_redirects=5 > # Maximum frame/iframe nesting depth (default 5) > #httpsampler.max_frame_depth=5 >-# Maximum await termination timeout (secs) when concurrent download embedded resources (default 60) >-#httpsampler.await_termination_timeout=60 >+ > # Revert to BUG 51939 behaviour (no separate container for embedded resources) by setting the following false: > #httpsampler.separate.container=true > >@@ -996,6 +995,9 @@ beanshell.server.file=../extras/startup.bsh > # Parent sample will not be marked as failed > #httpsampler.ignore_failed_embedded_resources=false > >+#keep alive time for the parallel download threads (in seconds) >+#httpsampler.parallel_download_thread_keepalive_inseconds=60 >+ > # Don't keep the embedded resources response data : just keep the size and the md5 > # default to false > #httpsampler.embedded_resources_use_md5=false >diff --git a/src/protocol/http/org/apache/jmeter/protocol/http/sampler/HTTPSamplerBase.java b/src/protocol/http/org/apache/jmeter/protocol/http/sampler/HTTPSamplerBase.java >index 328cf61..d0acc4b 100644 >--- a/src/protocol/http/org/apache/jmeter/protocol/http/sampler/HTTPSamplerBase.java >+++ b/src/protocol/http/org/apache/jmeter/protocol/http/sampler/HTTPSamplerBase.java >@@ -38,11 +38,6 @@ > import java.util.concurrent.Callable; > import java.util.concurrent.ExecutionException; > import java.util.concurrent.Future; >-import java.util.concurrent.LinkedBlockingQueue; >-import java.util.concurrent.ThreadFactory; >-import java.util.concurrent.ThreadPoolExecutor; >-import java.util.concurrent.TimeUnit; >-import java.util.concurrent.TimeoutException; > > import org.apache.commons.io.IOUtils; > import org.apache.commons.lang3.StringUtils; >@@ -58,6 +53,8 @@ > import org.apache.jmeter.protocol.http.control.HeaderManager; > import org.apache.jmeter.protocol.http.parser.HTMLParseException; > import org.apache.jmeter.protocol.http.parser.HTMLParser; >+import org.apache.jmeter.protocol.http.sampler.ResourcesDownloader.AsynSamplerResultHolder; >+import org.apache.jmeter.protocol.http.sampler.ResourcesDownloader.CleanerThread; > import org.apache.jmeter.protocol.http.util.ConversionUtils; > import org.apache.jmeter.protocol.http.util.EncoderCache; > import org.apache.jmeter.protocol.http.util.HTTPArgument; >@@ -190,11 +187,6 @@ > > public static final boolean BROWSER_COMPATIBLE_MULTIPART_MODE_DEFAULT = false; // The default setting to be used (i.e. historic) > >- private static final long KEEPALIVETIME = 0; // for Thread Pool for resources but no need to use a special value? >- >- private static final long AWAIT_TERMINATION_TIMEOUT = >- JMeterUtils.getPropDefault("httpsampler.await_termination_timeout", 60); // $NON-NLS-1$ // default value: 60 secs >- > private static final boolean IGNORE_FAILED_EMBEDDED_RESOURCES = > JMeterUtils.getPropDefault("httpsampler.ignore_failed_embedded_resources", false); // $NON-NLS-1$ // default value: false > >@@ -1234,8 +1226,26 @@ protected HTTPSampleResult downloadPageResources(HTTPSampleResult res, HTTPSampl > } > > // For concurrent get resources >- final List<Callable<AsynSamplerResultHolder>> liste = new ArrayList<>(); >+ final List<Callable<AsynSamplerResultHolder>> list = new ArrayList<>(); > >+ int maxConcurrentDownloads = CONCURRENT_POOL_SIZE; // init with default value >+ boolean concurrentDwn = isConcurrentDwn(); >+ if (concurrentDwn) { >+ try { >+ maxConcurrentDownloads = Integer.parseInt(getConcurrentPool()); >+ } catch (NumberFormatException nfe) { >+ log.warn("Concurrent download resources selected, "// $NON-NLS-1$ >+ + "but pool size value is bad. Use default value (sampler name="+getName()+")");// $NON-NLS-1$ >+ } >+ >+ // if the user choose a number of parallel downloads of 1 >+ // no need to use another thread, do the sample on the current thread >+ if(maxConcurrentDownloads == 1) { >+ log.warn("Number of parallel downloads set to 1, (sampler name="+getName()+")"); >+ concurrentDwn = false; >+ } >+ } >+ > while (urls.hasNext()) { > Object binURL = urls.next(); // See catch clause below > try { >@@ -1244,7 +1254,7 @@ protected HTTPSampleResult downloadPageResources(HTTPSampleResult res, HTTPSampl > log.warn("Null URL detected (should not happen)"); > } else { > String urlstr = url.toString(); >- String urlStrEnc=escapeIllegalURLCharacters(encodeSpaces(urlstr)); >+ String urlStrEnc = escapeIllegalURLCharacters(encodeSpaces(urlstr)); > if (!urlstr.equals(urlStrEnc)){// There were some spaces in the URL > try { > url = new URL(urlStrEnc); >@@ -1265,9 +1275,9 @@ protected HTTPSampleResult downloadPageResources(HTTPSampleResult res, HTTPSampl > setParentSampleSuccess(res, false); > continue; > } >- if (isConcurrentDwn()) { >+ if (concurrentDwn) { > // if concurrent download emb. resources, add to a list for async gets later >- liste.add(new ASyncSample(url, HTTPConstants.GET, false, frameDepth + 1, getCookieManager(), this)); >+ list.add(new ASyncSample(url, HTTPConstants.GET, false, frameDepth + 1, getCookieManager(), this)); > } else { > // default: serial download embedded resources > HTTPSampleResult binRes = sample(url, HTTPConstants.GET, false, frameDepth + 1); >@@ -1281,76 +1291,35 @@ protected HTTPSampleResult downloadPageResources(HTTPSampleResult res, HTTPSampl > setParentSampleSuccess(res, false); > } > } >+ > // IF for download concurrent embedded resources >- if (isConcurrentDwn()) { >- int poolSize = CONCURRENT_POOL_SIZE; // init with default value >- try { >- poolSize = Integer.parseInt(getConcurrentPool()); >- } catch (NumberFormatException nfe) { >- log.warn("Concurrent download resources selected, "// $NON-NLS-1$ >- + "but pool size value is bad. Use default value");// $NON-NLS-1$ >- } >- final String parentThreadName = Thread.currentThread().getName(); >- // Thread pool Executor to get resources >- // use a LinkedBlockingQueue, note: max pool size doesn't effect >- final ThreadPoolExecutor exec = new ThreadPoolExecutor( >- poolSize, poolSize, KEEPALIVETIME, TimeUnit.SECONDS, >- new LinkedBlockingQueue<Runnable>(), >- new ThreadFactory() { >- @Override >- public Thread newThread(final Runnable r) { >- Thread t = new CleanerThread(new Runnable() { >- @Override >- public void run() { >- try { >- r.run(); >- } finally { >- ((CleanerThread)Thread.currentThread()).notifyThreadEnd(); >- } >- } >- }); >- t.setName(parentThreadName+"-ResDownload-" + t.getName()); //$NON-NLS-1$ >- t.setDaemon(true); >- return t; >- } >- }); >+ if (concurrentDwn && !list.isEmpty()) { >+ >+ ResourcesDownloader resourcesDownloader = ResourcesDownloader.getInstance(); > >- boolean tasksCompleted = false; > try { >- // sample all resources with threadpool >- final List<Future<AsynSamplerResultHolder>> retExec = exec.invokeAll(liste); >- // call normal shutdown (wait ending all tasks) >- exec.shutdown(); >- // put a timeout if tasks couldn't terminate >- exec.awaitTermination(AWAIT_TERMINATION_TIMEOUT, TimeUnit.SECONDS); >+ // sample all resources >+ final List<Future<AsynSamplerResultHolder>> retExec = resourcesDownloader.invokeAllAndAwaitTermination(maxConcurrentDownloads, list); > CookieManager cookieManager = getCookieManager(); > // add result to main sampleResult > for (Future<AsynSamplerResultHolder> future : retExec) { >- AsynSamplerResultHolder binRes; >- try { >- binRes = future.get(1, TimeUnit.MILLISECONDS); >- if(cookieManager != null) { >- CollectionProperty cookies = binRes.getCookies(); >- for (JMeterProperty jMeterProperty : cookies) { >- Cookie cookie = (Cookie) jMeterProperty.getObjectValue(); >- cookieManager.add(cookie) ; >- } >+ // this call will not block as the futures return by invokeAllAndAwaitTermination >+ // are either done or cancelled >+ AsynSamplerResultHolder binRes = future.get(); >+ if(cookieManager != null) { >+ CollectionProperty cookies = binRes.getCookies(); >+ for (JMeterProperty jMeterProperty : cookies) { >+ Cookie cookie = (Cookie) jMeterProperty.getObjectValue(); >+ cookieManager.add(cookie); > } >- res.addSubResult(binRes.getResult()); >- setParentSampleSuccess(res, res.isSuccessful() && (binRes.getResult() != null ? binRes.getResult().isSuccessful():true)); >- } catch (TimeoutException e) { >- errorResult(e, res); > } >+ res.addSubResult(binRes.getResult()); >+ setParentSampleSuccess(res, res.isSuccessful() && (binRes.getResult() != null ? binRes.getResult().isSuccessful():true)); > } >- tasksCompleted = exec.awaitTermination(1, TimeUnit.MILLISECONDS); // did all the tasks finish? > } catch (InterruptedException ie) { >- log.warn("Interruped fetching embedded resources", ie); // $NON-NLS-1$ >+ log.warn("Interrupted fetching embedded resources", ie); // $NON-NLS-1$ > } catch (ExecutionException ee) { > log.warn("Execution issue when fetching embedded resources", ee); // $NON-NLS-1$ >- } finally { >- if (!tasksCompleted) { >- exec.shutdownNow(); // kill any remaining tasks >- } > } > } > } >@@ -1976,67 +1945,6 @@ public AsynSamplerResultHolder call() { > } > > /** >- * Custom thread implementation that allows notification of threadEnd >- * >- */ >- private static class CleanerThread extends Thread { >- private final List<HTTPSamplerBase> samplersToNotify = new ArrayList<>(); >- /** >- * @param runnable Runnable >- */ >- public CleanerThread(Runnable runnable) { >- super(runnable); >- } >- >- /** >- * Notify of thread end >- */ >- public void notifyThreadEnd() { >- for (HTTPSamplerBase samplerBase : samplersToNotify) { >- samplerBase.threadFinished(); >- } >- samplersToNotify.clear(); >- } >- >- /** >- * Register sampler to be notify at end of thread >- * @param sampler {@link HTTPSamplerBase} >- */ >- public void registerSamplerForEndNotification(HTTPSamplerBase sampler) { >- this.samplersToNotify.add(sampler); >- } >- } >- >- /** >- * Holder of AsynSampler result >- */ >- private static class AsynSamplerResultHolder { >- private final HTTPSampleResult result; >- private final CollectionProperty cookies; >- /** >- * @param result {@link HTTPSampleResult} to hold >- * @param cookies cookies to hold >- */ >- public AsynSamplerResultHolder(HTTPSampleResult result, CollectionProperty cookies) { >- super(); >- this.result = result; >- this.cookies = cookies; >- } >- /** >- * @return the result >- */ >- public HTTPSampleResult getResult() { >- return result; >- } >- /** >- * @return the cookies >- */ >- public CollectionProperty getCookies() { >- return cookies; >- } >- } >- >- /** > * @see org.apache.jmeter.samplers.AbstractSampler#applies(org.apache.jmeter.config.ConfigTestElement) > */ > @Override >diff --git a/src/protocol/http/org/apache/jmeter/protocol/http/sampler/ResourcesDownloader.java b/src/protocol/http/org/apache/jmeter/protocol/http/sampler/ResourcesDownloader.java >new file mode 100644 >index 0000000..1141c6e >--- /dev/null >+++ b/src/protocol/http/org/apache/jmeter/protocol/http/sampler/ResourcesDownloader.java >@@ -0,0 +1,280 @@ >+/* >+ * Licensed to the Apache Software Foundation (ASF) under one or more >+ * contributor license agreements. See the NOTICE file distributed with >+ * this work for additional information regarding copyright ownership. >+ * The ASF licenses this file to You under the Apache License, Version 2.0 >+ * (the "License"); you may not use this file except in compliance with >+ * the License. You may obtain a copy of the License at >+ * >+ * http://www.apache.org/licenses/LICENSE-2.0 >+ * >+ * Unless required by applicable law or agreed to in writing, software >+ * distributed under the License is distributed on an "AS IS" BASIS, >+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. >+ * See the License for the specific language governing permissions and >+ * limitations under the License. >+ * >+ */ >+ >+package org.apache.jmeter.protocol.http.sampler; >+ >+import java.util.ArrayList; >+import java.util.List; >+import java.util.concurrent.Callable; >+import java.util.concurrent.CompletionService; >+import java.util.concurrent.ExecutorCompletionService; >+import java.util.concurrent.Future; >+import java.util.concurrent.SynchronousQueue; >+import java.util.concurrent.ThreadFactory; >+import java.util.concurrent.ThreadPoolExecutor; >+import java.util.concurrent.TimeUnit; >+ >+import org.apache.jmeter.testelement.property.CollectionProperty; >+import org.apache.jmeter.util.JMeterUtils; >+import org.apache.jorphan.logging.LoggingManager; >+import org.apache.log.Logger; >+ >+/** >+ * Manages the parallel http resources download.<br> >+ * A shared thread pool is used by all the sample.<br> >+ * A sampler will usually do the following >+ * <pre> {@code >+ * // list of AsynSamplerResultHolder to download >+ * List<Callable<AsynSamplerResultHolder>> list = ... >+ * >+ * // max parallel downloads >+ * int maxConcurrentDownloads = ... >+ * >+ * // get the singleton instance >+ * ResourcesDownloader resourcesDownloader = ResourcesDownloader.getInstance(); >+ * >+ * // schedule the downloads and wait for the completion >+ * List<Future<AsynSamplerResultHolder>> retExec = resourcesDownloader.invokeAllAndAwaitTermination(maxConcurrentDownloads, list); >+ * >+ * }</pre> >+ * >+ * the call to invokeAllAndAwaitTermination will block until the downloads complete or get interrupted<br> >+ * the Future list only contains task that have been scheduled in the threadpool.<br> >+ * The status of those futures are either done or cancelled<br> >+ * <br> >+ * >+ * Future enhancements : >+ * <ul> >+ * <li>re-use the httpclient between consecutive downloads. >+ * the current connection management is not realistic see BUG 59034 : https://bz.apache.org/bugzilla/show_bug.cgi?id=59034</li> >+ * <li>this implementation should be replaced with a NIO async download >+ * in order to reduce the number of threads needed</li> >+ * </ul> >+ * @since 3.0 >+ */ >+public class ResourcesDownloader { >+ >+ private static final Logger LOG = LoggingManager.getLoggerForClass(); >+ >+ private static final long THREAD_KEEP_ALIVE_TIME = JMeterUtils.getPropDefault("httpsampler.parallel_download_thread_keepalive_inseconds", 60L); >+ private static final int MIN_POOL_SIZE = 1; >+ private static final int MAX_POOL_SIZE = Integer.MAX_VALUE; >+ >+ private static final ResourcesDownloader INSTANCE = new ResourcesDownloader(); >+ >+ public static ResourcesDownloader getInstance() { >+ return INSTANCE; >+ } >+ >+ >+ private ThreadPoolExecutor concurrentExecutor = null; >+ >+ private ResourcesDownloader() { >+ init(); >+ } >+ >+ >+ private void init() { >+ ThreadPoolExecutor exec = new ThreadPoolExecutor( >+ MIN_POOL_SIZE, MAX_POOL_SIZE, THREAD_KEEP_ALIVE_TIME, TimeUnit.SECONDS, >+ new SynchronousQueue<Runnable>(), >+ new ThreadFactory() { >+ @Override >+ public Thread newThread(final Runnable r) { >+ Thread t = new CleanerThread(r); >+ t.setName("ResDownload-" + t.getName()); //$NON-NLS-1$ >+ t.setDaemon(true); >+ return t; >+ } >+ }) { >+ >+ @Override >+ protected void afterExecute(Runnable r, Throwable t) { >+ ((CleanerThread)Thread.currentThread()).notifyThreadEnd(); >+ } >+ }; >+ concurrentExecutor = exec; >+ } >+ >+ /** >+ * this method will try to shrink the thread pool size as much as possible >+ * it should be called at the end of a test >+ */ >+ public void shrink() { >+ if(concurrentExecutor.getPoolSize() > MIN_POOL_SIZE) { >+ // drain the queue >+ concurrentExecutor.purge(); >+ List<Runnable> drainList = new ArrayList<>(); >+ concurrentExecutor.getQueue().drainTo(drainList); >+ if(!drainList.isEmpty()) { >+ LOG.warn("the pool executor workqueue is not empty size=" + drainList.size()); >+ for (Runnable runnable : drainList) { >+ if(runnable instanceof Future<?>) { >+ Future<?> f = (Future<?>) runnable; >+ f.cancel(true); >+ } >+ else { >+ LOG.warn("Content of workqueue is not an instance of Future"); >+ } >+ } >+ } >+ >+ // this will force the release of the extra threads that are idle >+ // the remaining extra threads will be released with the keepAliveTime of the thread >+ concurrentExecutor.setMaximumPoolSize(MIN_POOL_SIZE); >+ >+ // do not immediately restore the MaximumPoolSize as it will block the release of the threads >+ } >+ } >+ >+ // probablyTheBestMethodNameInTheUniverseYeah! >+ /** >+ * This method will block until the downloads complete or it get interrupted >+ * the Future list returned by this method only contains tasks that have been scheduled in the threadpool.<br> >+ * The status of those futures are either done or cancelled >+ * >+ * @param maxConcurrentDownloads max concurrent downloads >+ * @param list list of resources to download >+ * @return list tasks that have been scheduled >+ * @throws InterruptedException >+ */ >+ public List<Future<AsynSamplerResultHolder>> invokeAllAndAwaitTermination(int maxConcurrentDownloads, List<Callable<AsynSamplerResultHolder>> list) throws InterruptedException { >+ List<Future<AsynSamplerResultHolder>> submittedTasks = new ArrayList<>(); >+ >+ // paranoid fast path >+ if(list.isEmpty()) { >+ return submittedTasks; >+ } >+ >+ // restore MaximumPoolSize original value >+ concurrentExecutor.setMaximumPoolSize(MAX_POOL_SIZE); >+ >+ if(LOG.isDebugEnabled()) { >+ LOG.debug("PoolSize=" + concurrentExecutor.getPoolSize()+" LargestPoolSize=" + concurrentExecutor.getLargestPoolSize()); >+ } >+ >+ CompletionService<AsynSamplerResultHolder> completionService = new ExecutorCompletionService<>(concurrentExecutor); >+ int remainingTasksToTake = list.size(); >+ >+ try { >+ // push the task in the threadpool until <maxConcurrentDownloads> is reached >+ int i = 0; >+ for (i = 0; i < Math.min(maxConcurrentDownloads, list.size()); i++) { >+ Callable<AsynSamplerResultHolder> task = list.get(i); >+ submittedTasks.add(completionService.submit(task)); >+ } >+ >+ // push the remaining tasks but ensure we use at most <maxConcurrentDownloads> threads >+ // wait for a previous download to finish before submitting a new one >+ for (; i < list.size(); i++) { >+ Callable<AsynSamplerResultHolder> task = list.get(i); >+ completionService.take(); >+ remainingTasksToTake--; >+ submittedTasks.add(completionService.submit(task)); >+ } >+ >+ // all the resources downloads are in the thread pool queue >+ // wait for the completion of all downloads >+ while (remainingTasksToTake > 0) { >+ completionService.take(); >+ remainingTasksToTake--; >+ } >+ } >+ finally { >+ //bug 51925 : Calling Stop on Test leaks executor threads when concurrent download of resources is on >+ if(remainingTasksToTake > 0) { >+ if(LOG.isDebugEnabled()) { >+ LOG.debug("Interrupted while waiting for resource downloads : cancelling remaining tasks"); >+ } >+ for (Future<AsynSamplerResultHolder> future : submittedTasks) { >+ if(!future.isDone()) { >+ future.cancel(true); >+ } >+ } >+ } >+ } >+ >+ return submittedTasks; >+ } >+ >+ >+ >+ /** >+ * Custom thread implementation that allows notification of threadEnd >+ */ >+ public static class CleanerThread extends Thread { >+ private final List<HTTPSamplerBase> samplersToNotify = new ArrayList<>(); >+ /** >+ * @param runnable Runnable >+ */ >+ public CleanerThread(Runnable runnable) { >+ super(runnable); >+ } >+ >+ /** >+ * Notify of thread end >+ */ >+ public void notifyThreadEnd() { >+ for (HTTPSamplerBase samplerBase : samplersToNotify) { >+ samplerBase.threadFinished(); >+ } >+ samplersToNotify.clear(); >+ } >+ >+ /** >+ * Register sampler to be notify at end of thread >+ * @param sampler {@link HTTPSamplerBase} >+ */ >+ public void registerSamplerForEndNotification(HTTPSamplerBase sampler) { >+ this.samplersToNotify.add(sampler); >+ } >+ } >+ >+ /** >+ * Holder of AsynSampler result >+ */ >+ public static class AsynSamplerResultHolder { >+ private final HTTPSampleResult result; >+ private final CollectionProperty cookies; >+ >+ /** >+ * @param result {@link HTTPSampleResult} to hold >+ * @param cookies cookies to hold >+ */ >+ public AsynSamplerResultHolder(HTTPSampleResult result, CollectionProperty cookies) { >+ super(); >+ this.result = result; >+ this.cookies = cookies; >+ } >+ >+ /** >+ * @return the result >+ */ >+ public HTTPSampleResult getResult() { >+ return result; >+ } >+ >+ /** >+ * @return the cookies >+ */ >+ public CollectionProperty getCookies() { >+ return cookies; >+ } >+ } >+ >+}
You cannot view the attachment while viewing its details because your browser does not support IFRAMEs.
View the attachment on a separate page
.
View Attachment As Raw
Actions:
View
Attachments on
bug 52073
:
31959
| 33580 |
33651