View | Details | Raw Unified | Return to bug 47886
Collapse All | Expand All

(-)src/core/org/apache/jmeter/engine/StandardJMeterEngine.java (-48 / +29 lines)
Lines 33-38 Link Here
33
import java.util.List;
33
import java.util.List;
34
import java.util.Map;
34
import java.util.Map;
35
import java.util.Properties;
35
import java.util.Properties;
36
import java.util.concurrent.Executors;
37
import java.util.concurrent.Future;
38
import java.util.concurrent.ScheduledExecutorService;
39
import java.util.concurrent.TimeUnit;
36
40
37
import org.apache.jmeter.JMeter;
41
import org.apache.jmeter.JMeter;
38
import org.apache.jmeter.testbeans.TestBean;
42
import org.apache.jmeter.testbeans.TestBean;
Lines 93-99 Link Here
93
    private static final List<TestListener> testList = new ArrayList<TestListener>();
97
    private static final List<TestListener> testList = new ArrayList<TestListener>();
94
98
95
    /** JMeterThread => its JVM thread */
99
    /** JMeterThread => its JVM thread */
96
    private final Map<JMeterThread, Thread> allThreads;
100
    private final Map<JMeterThread, Future<?>> allThreads;
97
101
98
    /** flag to show that groups are still being created, i.e test plan is not complete */
102
    /** flag to show that groups are still being created, i.e test plan is not complete */
99
    private volatile boolean startingGroups;
103
    private volatile boolean startingGroups;
Lines 106-111 Link Here
106
110
107
    private HashTree test;
111
    private HashTree test;
108
112
113
    private ScheduledExecutorService mThreadPool;
114
    
109
    private volatile SearchByClass testListenersSave;
115
    private volatile SearchByClass testListenersSave;
110
116
111
    private final String host;
117
    private final String host;
Lines 152-160 Link Here
152
            thrd.stop();
158
            thrd.stop();
153
            thrd.interrupt();
159
            thrd.interrupt();
154
            if (now) {
160
            if (now) {
155
                Thread t = engine.allThreads.get(thrd);
161
                Future<?> t = engine.allThreads.get(thrd);
156
                if (t != null) {
162
                if (t != null) {
157
                    t.interrupt();
163
                    t.cancel(true);
158
                }
164
                }
159
            }
165
            }
160
            return true;
166
            return true;
Lines 170-176 Link Here
170
176
171
    public StandardJMeterEngine(String host) {
177
    public StandardJMeterEngine(String host) {
172
        this.host = host;
178
        this.host = host;
173
        this.allThreads = Collections.synchronizedMap(new HashMap<JMeterThread, Thread>());
179
        this.allThreads = Collections.synchronizedMap(new HashMap<JMeterThread, Future<?>>());
174
        // Hack to allow external control
180
        // Hack to allow external control
175
        engine = this;
181
        engine = this;
176
    }
182
    }
Lines 425-430 Link Here
425
        // and the listeners, and the timer
431
        // and the listeners, and the timer
426
        Iterator<ThreadGroup> iter = searcher.getSearchResults().iterator();
432
        Iterator<ThreadGroup> iter = searcher.getSearchResults().iterator();
427
433
434
		int threadPoolSize = JMeterUtils.getPropDefault("threadPoolSize", 1000);
435
        log.debug("thread pool size is " + threadPoolSize);
436
		mThreadPool = Executors.newScheduledThreadPool(threadPoolSize);
437
428
        /*
438
        /*
429
         * Here's where the test really starts. Run a Full GC now: it's no harm
439
         * Here's where the test really starts. Run a Full GC now: it's no harm
430
         * at all (just delays test start by a tiny amount) and hitting one too
440
         * at all (just delays test start by a tiny amount) and hitting one too
Lines 468-489 Link Here
468
                jmeterThread.setThreadNum(i);
478
                jmeterThread.setThreadNum(i);
469
                jmeterThread.setThreadGroup(group);
479
                jmeterThread.setThreadGroup(group);
470
                jmeterThread.setInitialContext(JMeterContextService.getContext());
480
                jmeterThread.setInitialContext(JMeterContextService.getContext());
471
                jmeterThread.setInitialDelay((int) (perThreadDelay * i));
472
                final String threadName = groupName + " " + (groupCount) + "-" + (i + 1);
481
                final String threadName = groupName + " " + (groupCount) + "-" + (i + 1);
473
                jmeterThread.setThreadName(threadName);
482
                jmeterThread.setThreadName(threadName);
474
483
475
                scheduleThread(jmeterThread, group);
476
477
                // Set up variables for stop handling
484
                // Set up variables for stop handling
478
                jmeterThread.setEngine(this);
485
                jmeterThread.setEngine(this);
479
                jmeterThread.setOnErrorStopTest(onErrorStopTest);
486
                jmeterThread.setOnErrorStopTest(onErrorStopTest);
480
                jmeterThread.setOnErrorStopTestNow(onErrorStopTestNow);
487
                jmeterThread.setOnErrorStopTestNow(onErrorStopTestNow);
481
                jmeterThread.setOnErrorStopThread(onErrorStopThread);
488
                jmeterThread.setOnErrorStopThread(onErrorStopThread);
482
489
                
483
                Thread newThread = new Thread(jmeterThread);
490
                jmeterThread.initRun();
484
                newThread.setName(threadName);
491
                Future<?> future = scheduleThread(jmeterThread, group, (int)perThreadDelay*i);
485
                allThreads.put(jmeterThread, newThread);
492
                allThreads.put(jmeterThread, future);
486
                newThread.start();
487
            } // end of thread startup for this thread group
493
            } // end of thread startup for this thread group
488
            if (serialized && iter.hasNext()) {
494
            if (serialized && iter.hasNext()) {
489
                log.info("Waiting for thread group: "+groupName+" to finish before starting next group");
495
                log.info("Waiting for thread group: "+groupName+" to finish before starting next group");
Lines 506-560 Link Here
506
     * @param thread
512
     * @param thread
507
     * @param group
513
     * @param group
508
     */
514
     */
509
    private void scheduleThread(JMeterThread thread, ThreadGroup group) {
515
    private Future<?> scheduleThread(JMeterThread thread, ThreadGroup group, int delay) {
510
        // if true the Scheduler is enabled
516
        // if true the Scheduler is enabled
511
        if (group.getScheduler()) {
517
    	return mThreadPool.schedule(thread, delay, TimeUnit.MILLISECONDS);
512
            long now = System.currentTimeMillis();
513
            // set the start time for the Thread
514
            if (group.getDelay() > 0) {// Duration is in seconds
515
                thread.setStartTime(group.getDelay() * 1000 + now);
516
            } else {
517
                long start = group.getStartTime();
518
                if (start < now) {
519
                    start = now; // Force a sensible start time
520
                }
521
                thread.setStartTime(start);
522
            }
523
524
            // set the endtime for the Thread
525
            if (group.getDuration() > 0) {// Duration is in seconds
526
                thread.setEndTime(group.getDuration() * 1000 + (thread.getStartTime()));
527
            } else {
528
                thread.setEndTime(group.getEndTime());
529
            }
530
531
            // Enables the scheduler
532
            thread.setScheduled(true);
533
        }
534
    }
518
    }
535
519
536
    private boolean verifyThreadsStopped() {
520
    private boolean verifyThreadsStopped() {
537
        boolean stoppedAll = true;
521
        boolean stoppedAll = true;
538
        List<Thread> threadsToCheck = new ArrayList<Thread>(allThreads.size());
522
        List<Future<?>> threadsToCheck = new ArrayList<Future<?>>(allThreads.size());
539
        synchronized (allThreads) { // Protect iterator
523
        synchronized (allThreads) { // Protect iterator
540
            Iterator<JMeterThread> iter = allThreads.keySet().iterator();
524
            Iterator<JMeterThread> iter = allThreads.keySet().iterator();
541
            while (iter.hasNext()) {
525
            while (iter.hasNext()) {
542
                Thread t = allThreads.get(iter.next());
526
                Future<?> t = allThreads.get(iter.next());
543
                if (t != null) {
527
                if (t != null) {
544
                    threadsToCheck.add(t); // Do work later to reduce time in synch block.
528
                    threadsToCheck.add(t); // Do work later to reduce time in synch block.
545
                }
529
                }
546
            }
530
            }
547
        }
531
        }
548
        for(int i=0; i < threadsToCheck.size(); i++) {
532
        for(int i=0; i < threadsToCheck.size(); i++) {
549
            Thread t = threadsToCheck.get(i);
533
        	Future<?> t = threadsToCheck.get(i);
550
            if (t.isAlive()) {
534
            if (t.isCancelled() || t.isDone()) {
551
                try {
535
               	t.cancel(true);
552
                    t.join(WAIT_TO_DIE);
536
                if (t.isCancelled() || t.isDone()) {
553
                } catch (InterruptedException e) {
554
                }
555
                if (t.isAlive()) {
556
                    stoppedAll = false;
537
                    stoppedAll = false;
557
                    log.warn("Thread won't exit: " + t.getName());
538
                    log.warn("Thread won't exit: " + t);
558
                }
539
                }
559
            }
540
            }
560
        }
541
        }
Lines 568-575 Link Here
568
                JMeterThread item = iter.next();
549
                JMeterThread item = iter.next();
569
                item.stop(); // set stop flag
550
                item.stop(); // set stop flag
570
                item.interrupt(); // interrupt sampler if possible
551
                item.interrupt(); // interrupt sampler if possible
571
                Thread t = allThreads.get(item);
552
                Future<?> t = allThreads.get(item);
572
                t.interrupt(); // also interrupt JVM thread
553
                t.cancel(true);// also interrupt JVM thread
573
            }
554
            }
574
        }
555
        }
575
    }
556
    }
(-)src/core/org/apache/jmeter/threads/JMeterContextService.java (+8 lines)
Lines 55-60 Link Here
55
    public static JMeterContext getContext() {
55
    public static JMeterContext getContext() {
56
        return threadContext.get();
56
        return threadContext.get();
57
    }
57
    }
58
    public static void setContext(JMeterContext context) {
59
    	clearContext();
60
    	threadContext.set(context);
61
    }
58
62
59
    /**
63
    /**
60
     * Method is called by the JMeterEngine class when a test run is started.
64
     * Method is called by the JMeterEngine class when a test run is started.
Lines 125-128 Link Here
125
    public static synchronized void clearTotalThreads() {
129
    public static synchronized void clearTotalThreads() {
126
        totalThreads = 0;
130
        totalThreads = 0;
127
    }
131
    }
132
133
	public static void clearContext() {
134
    	threadContext.remove();
135
	}
128
}
136
}
(-)src/core/org/apache/jmeter/threads/JMeterThread.java (-112 / +11 lines)
Lines 90-106 Link Here
90
     */
90
     */
91
    private String threadName;
91
    private String threadName;
92
92
93
    private int initialDelay = 0;
94
95
    private int threadNum = 0;
93
    private int threadNum = 0;
96
94
97
    private long startTime = 0;
98
99
    private long endTime = 0;
100
101
    private boolean scheduler = false;
102
    // based on this scheduler is enabled or disabled
103
104
    // Gives access to parent thread threadGroup
95
    // Gives access to parent thread threadGroup
105
    private ThreadGroup threadGroup;
96
    private ThreadGroup threadGroup;
106
97
Lines 119-124 Link Here
119
    
110
    
120
    private volatile Sampler currentSampler;
111
    private volatile Sampler currentSampler;
121
112
113
	private JMeterContext mThreadContext;
114
122
    public JMeterThread(HashTree test, JMeterThreadMonitor monitor, ListenerNotifier note) {
115
    public JMeterThread(HashTree test, JMeterThreadMonitor monitor, ListenerNotifier note) {
123
        this.monitor = monitor;
116
        this.monitor = monitor;
124
        threadVars = new JMeterVariables();
117
        threadVars = new JMeterVariables();
Lines 136-210 Link Here
136
        threadVars.putAll(context.getVariables());
129
        threadVars.putAll(context.getVariables());
137
    }
130
    }
138
131
139
    /**
140
     * Enable the scheduler for this JMeterThread.
141
     */
142
    public void setScheduled(boolean sche) {
143
        this.scheduler = sche;
144
    }
145
146
    /**
147
     * Set the StartTime for this Thread.
148
     *
149
     * @param stime the StartTime value.
150
     */
151
    public void setStartTime(long stime) {
152
        startTime = stime;
153
    }
154
155
    /**
156
     * Get the start time value.
157
     *
158
     * @return the start time value.
159
     */
160
    public long getStartTime() {
161
        return startTime;
162
    }
163
164
    /**
165
     * Set the EndTime for this Thread.
166
     *
167
     * @param etime
168
     *            the EndTime value.
169
     */
170
    public void setEndTime(long etime) {
171
        endTime = etime;
172
    }
173
174
    /**
175
     * Get the end time value.
176
     *
177
     * @return the end time value.
178
     */
179
    public long getEndTime() {
180
        return endTime;
181
    }
182
183
    /**
184
     * Check the scheduled time is completed.
185
     *
186
     */
187
    private void stopScheduler() {
188
        long delay = System.currentTimeMillis() - endTime;
189
        if ((delay >= 0)) {
190
            running = false;
191
        }
192
    }
193
194
    /**
195
     * Wait until the scheduled start time if necessary
196
     *
197
     */
198
    private void startScheduler() {
199
        long delay = (startTime - System.currentTimeMillis());
200
        if (delay > 0) {
201
            try {
202
                Thread.sleep(delay);
203
            } catch (Exception e) {
204
            }
205
        }
206
    }
207
208
    public void setThreadName(String threadName) {
132
    public void setThreadName(String threadName) {
209
        this.threadName = threadName;
133
        this.threadName = threadName;
210
    }
134
    }
Lines 234-242 Link Here
234
158
235
    public void run() {
159
    public void run() {
236
        // threadContext is not thread-safe, so keep within thread
160
        // threadContext is not thread-safe, so keep within thread
237
        JMeterContext threadContext = JMeterContextService.getContext();
161
        threadStarted();
162
    	JMeterContextService.setContext(mThreadContext);
163
    	JMeterContext threadContext = mThreadContext;
238
        try {
164
        try {
239
            initRun(threadContext);
240
            while (running) {
165
            while (running) {
241
                Sampler sam;
166
                Sampler sam;
242
                while (running && (sam = controller.next()) != null) {
167
                while (running && (sam = controller.next()) != null) {
Lines 268-273 Link Here
268
            log.info("Thread finished: " + threadName);
193
            log.info("Thread finished: " + threadName);
269
            threadFinished();
194
            threadFinished();
270
            monitor.threadFinished(this); // Tell the engine we are done
195
            monitor.threadFinished(this); // Tell the engine we are done
196
            JMeterContextService.clearContext();
271
        }
197
        }
272
    }
198
    }
273
199
Lines 378-387 Link Here
378
                    compiler.done(pack); // Finish up
304
                    compiler.done(pack); // Finish up
379
                }
305
                }
380
            }
306
            }
381
            if (scheduler) {
382
                // checks the scheduler to stop the iteration
383
                stopScheduler();
384
            }
385
        } catch (JMeterStopTestException e) {
307
        } catch (JMeterStopTestException e) {
386
            log.info("Stopping Test: " + e.toString());
308
            log.info("Stopping Test: " + e.toString());
387
            stopTest();
309
            stopTest();
Lines 437-444 Link Here
437
     * @param threadContext 
359
     * @param threadContext 
438
     *
360
     *
439
     */
361
     */
440
    private void initRun(JMeterContext threadContext) {
362
    public void initRun() {
441
        threadContext.setVariables(threadVars);
363
    	JMeterContext threadContext = new JMeterContext();
364
    	threadContext.setVariables(threadVars);
442
        threadContext.setThreadNum(getThreadNum());
365
        threadContext.setThreadNum(getThreadNum());
443
        threadContext.getVariables().put(LAST_SAMPLE_OK, "true");
366
        threadContext.getVariables().put(LAST_SAMPLE_OK, "true");
444
        threadContext.setThread(this);
367
        threadContext.setThread(this);
Lines 446-456 Link Here
446
        threadContext.setEngine(engine);
369
        threadContext.setEngine(engine);
447
        testTree.traverse(compiler);
370
        testTree.traverse(compiler);
448
        // listeners = controller.getListeners();
371
        // listeners = controller.getListeners();
449
        if (scheduler) {
450
            // set the scheduler to start
451
            startScheduler();
452
        }
453
        rampUpDelay(); // TODO - how to handle thread stopped here
454
        log.info("Thread started: " + Thread.currentThread().getName());
372
        log.info("Thread started: " + Thread.currentThread().getName());
455
        /*
373
        /*
456
         * Setting SamplingStarted before the contollers are initialised allows
374
         * Setting SamplingStarted before the contollers are initialised allows
Lines 465-471 Link Here
465
        if (!startEarlier) {
383
        if (!startEarlier) {
466
            threadContext.setSamplingStarted(true);
384
            threadContext.setSamplingStarted(true);
467
        }
385
        }
468
        threadStarted();
386
        mThreadContext = threadContext;
469
    }
387
    }
470
388
471
    private void threadStarted() {
389
    private void threadStarted() {
Lines 526-532 Link Here
526
444
527
    /** {@inheritDoc} */
445
    /** {@inheritDoc} */
528
    public boolean interrupt(){
446
    public boolean interrupt(){
529
        Sampler samp = currentSampler; // fetch once
447
    	Sampler samp = currentSampler; // fetch once
530
        if (samp instanceof Interruptible){
448
        if (samp instanceof Interruptible){
531
            log.warn("Interrupting: " + threadName + " sampler: " +samp.getName());
449
            log.warn("Interrupting: " + threadName + " sampler: " +samp.getName());
532
            try {
450
            try {
Lines 674-699 Link Here
674
592
675
    }
593
    }
676
594
677
    public void setInitialDelay(int delay) {
678
        initialDelay = delay;
679
    }
680
681
    /**
595
    /**
682
     * Initial delay if ramp-up period is active for this threadGroup.
683
     */
684
    private void rampUpDelay() {
685
        if (initialDelay > 0) {
686
            long start = System.currentTimeMillis();
687
            try {
688
                Thread.sleep(initialDelay);
689
            } catch (InterruptedException e) {
690
                long actual = System.currentTimeMillis() - start;
691
                log.warn("RampUp delay for "+threadName+" was interrupted. Waited "+actual+" milli-seconds out of "+initialDelay);
692
            }
693
        }
694
    }
695
696
    /**
697
     * Returns the threadNum.
596
     * Returns the threadNum.
698
     */
597
     */
699
    public int getThreadNum() {
598
    public int getThreadNum() {

Return to bug 47886