View | Details | Raw Unified | Return to bug 52073
Collapse All | Expand All

(-)src/protocol/http/org/apache/jmeter/protocol/http/sampler/HTTPSamplerBase.java (-66 / +110 lines)
Lines 35-48 Link Here
35
import java.util.List;
35
import java.util.List;
36
import java.util.Map;
36
import java.util.Map;
37
import java.util.Set;
37
import java.util.Set;
38
import java.util.concurrent.ArrayBlockingQueue;
39
import java.util.concurrent.BlockingQueue;
38
import java.util.concurrent.Callable;
40
import java.util.concurrent.Callable;
41
import java.util.concurrent.CountDownLatch;
39
import java.util.concurrent.ExecutionException;
42
import java.util.concurrent.ExecutionException;
40
import java.util.concurrent.Future;
43
import java.util.concurrent.Future;
41
import java.util.concurrent.LinkedBlockingQueue;
44
import java.util.concurrent.SynchronousQueue;
42
import java.util.concurrent.ThreadFactory;
45
import java.util.concurrent.ThreadFactory;
43
import java.util.concurrent.ThreadPoolExecutor;
46
import java.util.concurrent.ThreadPoolExecutor;
44
import java.util.concurrent.TimeUnit;
47
import java.util.concurrent.TimeUnit;
45
import java.util.concurrent.TimeoutException;
48
import java.util.concurrent.atomic.AtomicInteger;
46
49
47
import org.apache.commons.io.IOUtils;
50
import org.apache.commons.io.IOUtils;
48
import org.apache.commons.lang3.StringUtils;
51
import org.apache.commons.lang3.StringUtils;
Lines 54-61 Link Here
54
import org.apache.jmeter.protocol.http.control.CacheManager;
57
import org.apache.jmeter.protocol.http.control.CacheManager;
55
import org.apache.jmeter.protocol.http.control.Cookie;
58
import org.apache.jmeter.protocol.http.control.Cookie;
56
import org.apache.jmeter.protocol.http.control.CookieManager;
59
import org.apache.jmeter.protocol.http.control.CookieManager;
57
import org.apache.jmeter.protocol.http.control.HeaderManager;
58
import org.apache.jmeter.protocol.http.control.DNSCacheManager;
60
import org.apache.jmeter.protocol.http.control.DNSCacheManager;
61
import org.apache.jmeter.protocol.http.control.HeaderManager;
59
import org.apache.jmeter.protocol.http.parser.HTMLParseException;
62
import org.apache.jmeter.protocol.http.parser.HTMLParseException;
60
import org.apache.jmeter.protocol.http.parser.HTMLParser;
63
import org.apache.jmeter.protocol.http.parser.HTMLParser;
61
import org.apache.jmeter.protocol.http.util.ConversionUtils;
64
import org.apache.jmeter.protocol.http.util.ConversionUtils;
Lines 187-196 Link Here
187
190
188
    public static final boolean BROWSER_COMPATIBLE_MULTIPART_MODE_DEFAULT = false; // The default setting to be used (i.e. historic)
191
    public static final boolean BROWSER_COMPATIBLE_MULTIPART_MODE_DEFAULT = false; // The default setting to be used (i.e. historic)
189
    
192
    
190
    private static final long KEEPALIVETIME = 0; // for Thread Pool for resources but no need to use a special value?
193
    private static final long KEEPALIVETIME = JMeterUtils.getPropDefault("httpsampler.parallel_download_thread_keepalive", 60L); // for Thread Pool for resources but no need to use a special value?
191
    
192
    private static final long AWAIT_TERMINATION_TIMEOUT = 
193
        JMeterUtils.getPropDefault("httpsampler.await_termination_timeout", 60); // $NON-NLS-1$ // default value: 60 secs 
194
    
194
    
195
    private static final boolean IGNORE_FAILED_EMBEDDED_RESOURCES = 
195
    private static final boolean IGNORE_FAILED_EMBEDDED_RESOURCES = 
196
            JMeterUtils.getPropDefault("httpsampler.ignore_failed_embedded_resources", false); // $NON-NLS-1$ // default value: false
196
            JMeterUtils.getPropDefault("httpsampler.ignore_failed_embedded_resources", false); // $NON-NLS-1$ // default value: false
Lines 908-913 Link Here
908
            JMeterUtils.getPropDefault("httpsampler.separate.container", true); // $NON-NLS-1$
908
            JMeterUtils.getPropDefault("httpsampler.separate.container", true); // $NON-NLS-1$
909
909
910
    /**
910
    /**
911
     * Shared executor used for Embedded resources parallel download
912
     */
913
    private static final ThreadPoolExecutor PARALLEL_DOWNLOAD_EXECUTOR = 
914
            new ThreadPoolExecutor(0, // corePoolSize 
915
                    Integer.MAX_VALUE, // maximumPoolSize
916
                    KEEPALIVETIME, // keepAliveTime
917
                    TimeUnit.SECONDS, // unit
918
                    new SynchronousQueue<Runnable>(), 
919
                    new ThreadFactory() {
920
                        @Override
921
                        public Thread newThread(final Runnable r) {
922
                            Thread t = new CleanerThread(new Runnable() {
923
                                @Override
924
                                public void run() {
925
                                    try {
926
                                        r.run();
927
                                    } finally {
928
                                        ((CleanerThread)Thread.currentThread()).notifyThreadEnd();
929
                                    }
930
                                }
931
                            });
932
                            return t;
933
                        }
934
            });
935
936
    /**
911
     * Get the URL, built from its component parts.
937
     * Get the URL, built from its component parts.
912
     *
938
     *
913
     * <p>
939
     * <p>
Lines 1221-1227 Link Here
1221
            }
1247
            }
1222
            
1248
            
1223
            // For concurrent get resources
1249
            // For concurrent get resources
1224
            final List<Callable<AsynSamplerResultHolder>> liste = new ArrayList<Callable<AsynSamplerResultHolder>>();
1250
            final List<ASyncSample> listAsyncTaskForDownload = new ArrayList<ASyncSample>();
1225
1251
1226
            while (urls.hasNext()) {
1252
            while (urls.hasNext()) {
1227
                Object binURL = urls.next(); // See catch clause below
1253
                Object binURL = urls.next(); // See catch clause below
Lines 1248-1254 Link Here
1248
                        
1274
                        
1249
                        if (isConcurrentDwn()) {
1275
                        if (isConcurrentDwn()) {
1250
                            // if concurrent download emb. resources, add to a list for async gets later
1276
                            // if concurrent download emb. resources, add to a list for async gets later
1251
                            liste.add(new ASyncSample(url, HTTPConstants.GET, false, frameDepth + 1, getCookieManager(), this));
1277
                            listAsyncTaskForDownload.add(new ASyncSample(url, HTTPConstants.GET, false, frameDepth + 1, getCookieManager(), this));
1252
                        } else {
1278
                        } else {
1253
                            // default: serial download embedded resources
1279
                            // default: serial download embedded resources
1254
                            HTTPSampleResult binRes = sample(url, HTTPConstants.GET, false, frameDepth + 1);
1280
                            HTTPSampleResult binRes = sample(url, HTTPConstants.GET, false, frameDepth + 1);
Lines 1272-1337 Link Here
1272
                    log.warn("Concurrent download resources selected, "// $NON-NLS-1$
1298
                    log.warn("Concurrent download resources selected, "// $NON-NLS-1$
1273
                            + "but pool size value is bad. Use default value");// $NON-NLS-1$
1299
                            + "but pool size value is bad. Use default value");// $NON-NLS-1$
1274
                }
1300
                }
1275
                // Thread pool Executor to get resources 
1276
                // use a LinkedBlockingQueue, note: max pool size doesn't effect
1277
                final ThreadPoolExecutor exec = new ThreadPoolExecutor(
1278
                        poolSize, poolSize, KEEPALIVETIME, TimeUnit.SECONDS,
1279
                        new LinkedBlockingQueue<Runnable>(),
1280
                        new ThreadFactory() {
1281
                            @Override
1282
                            public Thread newThread(final Runnable r) {
1283
                                Thread t = new CleanerThread(new Runnable() {
1284
                                    @Override
1285
                                    public void run() {
1286
                                        try {
1287
                                            r.run();
1288
                                        } finally {
1289
                                            ((CleanerThread)Thread.currentThread()).notifyThreadEnd();
1290
                                        }
1291
                                    }
1292
                                });
1293
                                return t;
1294
                            }
1295
                        });
1296
1301
1297
                boolean tasksCompleted = false;
1298
                try {
1302
                try {
1299
                    // sample all resources with threadpool
1303
                    CountDownLatch countDownLatch = new CountDownLatch(listAsyncTaskForDownload.size());
1300
                    final List<Future<AsynSamplerResultHolder>> retExec = exec.invokeAll(liste);
1304
                    final List<Future<AsynSamplerResultHolder>> retExec = new ArrayList<Future<AsynSamplerResultHolder>>(listAsyncTaskForDownload.size());
1301
                    // call normal shutdown (wait ending all tasks)
1305
                    // this queue will ensure we don't use more threads than allowed
1302
                    exec.shutdown();
1306
                    BlockingQueue<Callable<AsynSamplerResultHolder>> queueOfCurrentDownloads = new ArrayBlockingQueue<Callable<AsynSamplerResultHolder>>(poolSize);
1303
                    // put a timeout if tasks couldn't terminate
1307
                    while(listAsyncTaskForDownload.size()>0){
1304
                    exec.awaitTermination(AWAIT_TERMINATION_TIMEOUT, TimeUnit.SECONDS);
1308
                        // Remove from list to download
1309
                        ASyncSample command = listAsyncTaskForDownload.remove(0);
1310
                        // Set latch so that we get notification on end
1311
                        command.setLatch(countDownLatch);
1312
                        command.setQueueOfCurrentDownloads(queueOfCurrentDownloads);
1313
                        // put in queue to control number of Parallel tasks per VU Thread
1314
                        // We want to block if size of queue is reached (ie number of parallel downloads)
1315
                        queueOfCurrentDownloads.put(command);
1316
                        // Call task
1317
                        retExec.addAll(PARALLEL_DOWNLOAD_EXECUTOR.invokeAll(Arrays.asList(new ASyncSample[]{command})));
1318
                    }
1319
                    // Wait for termination
1320
                    if(log.isDebugEnabled()) {
1321
                        log.debug("Waiting for download of "+listAsyncTaskForDownload.size() +" resources");
1322
                    }
1323
                    countDownLatch.await();
1305
                    CookieManager cookieManager = getCookieManager();
1324
                    CookieManager cookieManager = getCookieManager();
1306
                    // add result to main sampleResult
1325
                    // add result to main sampleResult
1307
                    for (Future<AsynSamplerResultHolder> future : retExec) {
1326
                    for (Future<AsynSamplerResultHolder> future : retExec) {
1308
                        AsynSamplerResultHolder binRes;
1327
                        AsynSamplerResultHolder binRes;
1309
                        try {
1328
                        binRes = future.get();
1310
                            binRes = future.get(1, TimeUnit.MILLISECONDS);
1329
                        if(cookieManager != null) {
1311
                            if(cookieManager != null) {
1330
                            CollectionProperty cookies = binRes.getCookies();
1312
                                CollectionProperty cookies = binRes.getCookies();
1331
                            PropertyIterator iter = cookies.iterator();
1313
                                PropertyIterator iter = cookies.iterator();
1332
                            while (iter.hasNext()) {
1314
                                while (iter.hasNext()) {
1333
                                Cookie cookie = (Cookie) iter.next().getObjectValue();
1315
                                    Cookie cookie = (Cookie) iter.next().getObjectValue();
1334
                                cookieManager.add(cookie) ;
1316
                                    cookieManager.add(cookie) ;
1317
                                }
1318
                            }
1335
                            }
1319
                            res.addSubResult(binRes.getResult());
1320
                            setParentSampleSuccess(res, res.isSuccessful() && (binRes.getResult() != null ? binRes.getResult().isSuccessful():true));
1321
                        } catch (TimeoutException e) {
1322
                            errorResult(e, res);
1323
                        }
1336
                        }
1337
                        res.addSubResult(binRes.getResult());
1338
                        setParentSampleSuccess(res, res.isSuccessful() && (binRes.getResult() != null ? binRes.getResult().isSuccessful():true));
1324
                    }
1339
                    }
1325
                    tasksCompleted = exec.awaitTermination(1, TimeUnit.MILLISECONDS); // did all the tasks finish?
1326
                } catch (InterruptedException ie) {
1340
                } catch (InterruptedException ie) {
1327
                    log.warn("Interruped fetching embedded resources", ie); // $NON-NLS-1$
1341
                    log.warn("Interruped fetching embedded resources", ie); // $NON-NLS-1$
1328
                } catch (ExecutionException ee) {
1342
                } catch (ExecutionException ee) {
1329
                    log.warn("Execution issue when fetching embedded resources", ee); // $NON-NLS-1$
1343
                    log.warn("Execution issue when fetching embedded resources", ee); // $NON-NLS-1$
1330
                } finally {
1344
                } 
1331
                    if (!tasksCompleted) {
1332
                        exec.shutdownNow(); // kill any remaining tasks
1333
                    }
1334
                }
1335
            }
1345
            }
1336
        }
1346
        }
1337
        return res;
1347
        return res;
Lines 1870-1875 Link Here
1870
        final private int depth;
1880
        final private int depth;
1871
        private final HTTPSamplerBase sampler;
1881
        private final HTTPSamplerBase sampler;
1872
        private final JMeterContext jmeterContextOfParentThread;
1882
        private final JMeterContext jmeterContextOfParentThread;
1883
        // Use to notify end of ASyncSAmple
1884
        private CountDownLatch latch;
1885
        // Queue to remove ourselves from 
1886
        private BlockingQueue<Callable<AsynSamplerResultHolder>> queueOfCurrentDownloads;
1873
1887
1874
        ASyncSample(URL url, String method,
1888
        ASyncSample(URL url, String method,
1875
                boolean areFollowingRedirect, int depth,  CookieManager cookieManager, HTTPSamplerBase base){
1889
                boolean areFollowingRedirect, int depth,  CookieManager cookieManager, HTTPSamplerBase base){
Lines 1877-1882 Link Here
1877
            this.method = method;
1891
            this.method = method;
1878
            this.areFollowingRedirect = areFollowingRedirect;
1892
            this.areFollowingRedirect = areFollowingRedirect;
1879
            this.depth = depth;
1893
            this.depth = depth;
1894
            // We clone for thread safety, but this comes at the cost of not sharing HttpClient state
1880
            this.sampler = (HTTPSamplerBase) base.clone();
1895
            this.sampler = (HTTPSamplerBase) base.clone();
1881
            // We don't want to use CacheManager clone but the parent one, and CacheManager is Thread Safe
1896
            // We don't want to use CacheManager clone but the parent one, and CacheManager is Thread Safe
1882
            CacheManager cacheManager = base.getCacheManager();
1897
            CacheManager cacheManager = base.getCacheManager();
Lines 1891-1921 Link Here
1891
            this.jmeterContextOfParentThread = JMeterContextService.getContext();
1906
            this.jmeterContextOfParentThread = JMeterContextService.getContext();
1892
        }
1907
        }
1893
1908
1909
        /**
1910
         * @param queue queue of current downloads
1911
         */
1912
        public void setQueueOfCurrentDownloads(
1913
                BlockingQueue<Callable<AsynSamplerResultHolder>> queue) {
1914
            this.queueOfCurrentDownloads = queue;
1915
        }
1916
1917
        /**
1918
         * 
1919
         * @param countDownLatch CountDownLatch
1920
         */
1921
        public void setLatch(CountDownLatch countDownLatch) {
1922
            this.latch = countDownLatch;
1923
        }
1924
1894
        @Override
1925
        @Override
1895
        public AsynSamplerResultHolder call() {
1926
        public AsynSamplerResultHolder call() {
1896
            JMeterContextService.replaceContext(jmeterContextOfParentThread);
1927
            try {
1897
            ((CleanerThread) Thread.currentThread()).registerSamplerForEndNotification(sampler);
1928
                JMeterContextService.replaceContext(jmeterContextOfParentThread);
1898
            HTTPSampleResult httpSampleResult = sampler.sample(url, method, areFollowingRedirect, depth);
1929
                ((CleanerThread) Thread.currentThread()).registerSamplerForEndNotification(sampler);
1899
            if(sampler.getCookieManager() != null) {
1930
                HTTPSampleResult httpSampleResult = sampler.sample(url, method, areFollowingRedirect, depth);
1900
                CollectionProperty cookies = sampler.getCookieManager().getCookies();
1931
                if(sampler.getCookieManager() != null) {
1901
                return new AsynSamplerResultHolder(httpSampleResult, cookies);
1932
                    CollectionProperty cookies = sampler.getCookieManager().getCookies();
1902
            } else {
1933
                    return new AsynSamplerResultHolder(httpSampleResult, cookies);
1903
                return new AsynSamplerResultHolder(httpSampleResult, new CollectionProperty());
1934
                } else {
1935
                    return new AsynSamplerResultHolder(httpSampleResult, new CollectionProperty());
1936
                }
1937
            } finally {
1938
                // Remove ourselves so a new download can start
1939
                queueOfCurrentDownloads.remove(this);
1940
                // notify latch 
1941
                latch.countDown();
1904
            }
1942
            }
1905
        }
1943
        }
1906
    }
1944
    }
1907
    
1945
    
1908
    /**
1946
    /**
1909
     * Custom thread implementation that 
1947
     * Custom thread implementation that :
1948
     * - Gives a name to the thread
1949
     * - Notifies samplers of thread end
1910
     *
1950
     *
1911
     */
1951
     */
1912
    private static class CleanerThread extends Thread {
1952
    private static class CleanerThread extends Thread {
1953
        private static final String PARALLEL_DOWNLOADER_THREAD_PREFIX = "PARALLEL_DOWNLOAD_THREAD-";
1954
        private static final AtomicInteger ATOMIC_INTEGER = new AtomicInteger(0);
1955
        // FIXME This list would lead to OOM as it will only be cleaned at end of Load Test, and we create 1 HTTPSamplerBase
1956
        // each time a AsyncSample is created
1913
        private final List<HTTPSamplerBase> samplersToNotify = new ArrayList<HTTPSamplerBase>();
1957
        private final List<HTTPSamplerBase> samplersToNotify = new ArrayList<HTTPSamplerBase>();
1914
        /**
1958
        /**
1915
         * @param runnable Runnable
1959
         * @param runnable Runnable
1916
         */
1960
         */
1917
        public CleanerThread(Runnable runnable) {
1961
        public CleanerThread(Runnable runnable) {
1918
           super(runnable);
1962
           super(runnable, PARALLEL_DOWNLOADER_THREAD_PREFIX+ATOMIC_INTEGER.incrementAndGet());
1919
        }
1963
        }
1920
        
1964
        
1921
        /**
1965
        /**

Return to bug 52073