View | Details | Raw Unified | Return to bug 49730
Collapse All | Expand All

(-)java/org/apache/tomcat/util/threads/TaskQueue.java (-2 / +2 lines)
Lines 19-26 Link Here
19
import java.util.Collection;
19
import java.util.Collection;
20
import java.util.concurrent.LinkedBlockingQueue;
20
import java.util.concurrent.LinkedBlockingQueue;
21
import java.util.concurrent.RejectedExecutionException;
21
import java.util.concurrent.RejectedExecutionException;
22
import java.util.concurrent.ThreadPoolExecutor;
23
import java.util.concurrent.TimeUnit;
22
import java.util.concurrent.TimeUnit;
23
import org.apache.tomcat.util.threads.ThreadPoolExecutor;
24
/**
24
/**
25
 * As task queue specifically designed to run with a thread pool executor.
25
 * As task queue specifically designed to run with a thread pool executor.
26
 * The task queue is optimised to properly utilize threads within 
26
 * The task queue is optimised to properly utilize threads within 
Lines 65-71 Link Here
65
        //we are maxed out on threads, simply queue the object
65
        //we are maxed out on threads, simply queue the object
66
        if (parent.getPoolSize() == parent.getMaximumPoolSize()) return super.offer(o);
66
        if (parent.getPoolSize() == parent.getMaximumPoolSize()) return super.offer(o);
67
        //we have idle threads, just add it to the queue
67
        //we have idle threads, just add it to the queue
68
        if (parent.getActiveCount()<(parent.getPoolSize())) return super.offer(o);
68
        if (parent.getSubmittedCount()<(parent.getPoolSize())) return super.offer(o);
69
        //if we have less threads than maximum force creation of a new thread
69
        //if we have less threads than maximum force creation of a new thread
70
        if (parent.getPoolSize()<parent.getMaximumPoolSize()) return false;
70
        if (parent.getPoolSize()<parent.getMaximumPoolSize()) return false;
71
        //if we reached here, we need to add it to the queue
71
        //if we reached here, we need to add it to the queue
(-)java/org/apache/tomcat/util/threads/ThreadPoolExecutor.java (-11 / +15 lines)
Lines 24-30 Link Here
24
import java.util.concurrent.atomic.AtomicInteger;
24
import java.util.concurrent.atomic.AtomicInteger;
25
/**
25
/**
26
 * Same as a java.util.concurrent.ThreadPoolExecutor but implements a much more efficient
26
 * Same as a java.util.concurrent.ThreadPoolExecutor but implements a much more efficient
27
 * getActiveCount method, to be used to properly handle the work queue
27
 * {@link #getSubmittedCount()} method, to be used to properly handle the work queue.
28
 * If a RejectedExecutionHandler is not specified a default one will be configured
28
 * If a RejectedExecutionHandler is not specified a default one will be configured
29
 * and that one will always throw a RejectedExecutionException
29
 * and that one will always throw a RejectedExecutionException
30
 * @author fhanik
30
 * @author fhanik
Lines 32-38 Link Here
32
 */
32
 */
33
public class ThreadPoolExecutor extends java.util.concurrent.ThreadPoolExecutor {
33
public class ThreadPoolExecutor extends java.util.concurrent.ThreadPoolExecutor {
34
    
34
    
35
    private final AtomicInteger activeCount = new AtomicInteger(0);
35
	/**
36
	 * The number of tasks submitted but not yet finished. This includes tasks
37
	 * in the queue and tasks that have been handed to a worker thread but the
38
	 * latter did not start executing the task yet.
39
	 * This number is always greater or equal to {@link #getActiveCount()}.
40
	 */
41
    private final AtomicInteger submittedCount = new AtomicInteger(0);
36
    
42
    
37
    public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
43
    public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
38
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
44
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
Lines 53-70 Link Here
53
59
54
    @Override
60
    @Override
55
    protected void afterExecute(Runnable r, Throwable t) {
61
    protected void afterExecute(Runnable r, Throwable t) {
56
        activeCount.decrementAndGet();
62
        submittedCount.decrementAndGet();
57
    }
63
    }
58
64
59
    @Override
65
    public int getSubmittedCount() {
60
    protected void beforeExecute(Thread t, Runnable r) {
66
        return submittedCount.get();
61
        activeCount.incrementAndGet();
62
    }
67
    }
63
64
    @Override
65
    public int getActiveCount() {
66
        return activeCount.get();
67
    }
68
    
68
    
69
    /**
69
    /**
70
     * {@inheritDoc}
70
     * {@inheritDoc}
Lines 88-93 Link Here
88
     * @throws NullPointerException if command or unit is null
88
     * @throws NullPointerException if command or unit is null
89
     */
89
     */
90
    public void execute(Runnable command, long timeout, TimeUnit unit) {
90
    public void execute(Runnable command, long timeout, TimeUnit unit) {
91
		submittedCount.incrementAndGet();
91
        try {
92
        try {
92
            super.execute(command);
93
            super.execute(command);
93
        } catch (RejectedExecutionException rx) {
94
        } catch (RejectedExecutionException rx) {
Lines 95-107 Link Here
95
                final TaskQueue queue = (TaskQueue)super.getQueue();
96
                final TaskQueue queue = (TaskQueue)super.getQueue();
96
                try {
97
                try {
97
                    if (!queue.force(command, timeout, unit)) {
98
                    if (!queue.force(command, timeout, unit)) {
99
                    	submittedCount.decrementAndGet();
98
                        throw new RejectedExecutionException("Queue capacity is full.");
100
                        throw new RejectedExecutionException("Queue capacity is full.");
99
                    }
101
                    }
100
                } catch (InterruptedException x) {
102
                } catch (InterruptedException x) {
103
                	submittedCount.decrementAndGet();
101
                    Thread.interrupted();
104
                    Thread.interrupted();
102
                    throw new RejectedExecutionException(x);
105
                    throw new RejectedExecutionException(x);
103
                }
106
                }
104
            } else {
107
            } else {
108
            	submittedCount.decrementAndGet();
105
                throw rx;
109
                throw rx;
106
            }
110
            }
107
            
111
            

Return to bug 49730