Lines 24-30
Link Here
|
24 |
import java.util.concurrent.atomic.AtomicInteger; |
24 |
import java.util.concurrent.atomic.AtomicInteger; |
25 |
/** |
25 |
/** |
26 |
* Same as a java.util.concurrent.ThreadPoolExecutor but implements a much more efficient |
26 |
* Same as a java.util.concurrent.ThreadPoolExecutor but implements a much more efficient |
27 |
* getActiveCount method, to be used to properly handle the work queue |
27 |
* {@link #getSubmittedCount()} method, to be used to properly handle the work queue. |
28 |
* If a RejectedExecutionHandler is not specified a default one will be configured |
28 |
* If a RejectedExecutionHandler is not specified a default one will be configured |
29 |
* and that one will always throw a RejectedExecutionException |
29 |
* and that one will always throw a RejectedExecutionException |
30 |
* @author fhanik |
30 |
* @author fhanik |
Lines 32-38
Link Here
|
32 |
*/ |
32 |
*/ |
33 |
public class ThreadPoolExecutor extends java.util.concurrent.ThreadPoolExecutor { |
33 |
public class ThreadPoolExecutor extends java.util.concurrent.ThreadPoolExecutor { |
34 |
|
34 |
|
35 |
private final AtomicInteger activeCount = new AtomicInteger(0); |
35 |
/** |
|
|
36 |
* The number of tasks submitted but not yet finished. This includes tasks |
37 |
* in the queue and tasks that have been handed to a worker thread but the |
38 |
* latter did not start executing the task yet. |
39 |
* This number is always greater or equal to {@link #getActiveCount()}. |
40 |
*/ |
41 |
private final AtomicInteger submittedCount = new AtomicInteger(0); |
36 |
|
42 |
|
37 |
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) { |
43 |
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) { |
38 |
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler); |
44 |
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler); |
Lines 53-70
Link Here
|
53 |
|
59 |
|
54 |
@Override |
60 |
@Override |
55 |
protected void afterExecute(Runnable r, Throwable t) { |
61 |
protected void afterExecute(Runnable r, Throwable t) { |
56 |
activeCount.decrementAndGet(); |
62 |
submittedCount.decrementAndGet(); |
57 |
} |
63 |
} |
58 |
|
64 |
|
59 |
@Override |
65 |
public int getSubmittedCount() { |
60 |
protected void beforeExecute(Thread t, Runnable r) { |
66 |
return submittedCount.get(); |
61 |
activeCount.incrementAndGet(); |
|
|
62 |
} |
67 |
} |
63 |
|
|
|
64 |
@Override |
65 |
public int getActiveCount() { |
66 |
return activeCount.get(); |
67 |
} |
68 |
|
68 |
|
69 |
/** |
69 |
/** |
70 |
* {@inheritDoc} |
70 |
* {@inheritDoc} |
Lines 88-93
Link Here
|
88 |
* @throws NullPointerException if command or unit is null |
88 |
* @throws NullPointerException if command or unit is null |
89 |
*/ |
89 |
*/ |
90 |
public void execute(Runnable command, long timeout, TimeUnit unit) { |
90 |
public void execute(Runnable command, long timeout, TimeUnit unit) { |
|
|
91 |
submittedCount.incrementAndGet(); |
91 |
try { |
92 |
try { |
92 |
super.execute(command); |
93 |
super.execute(command); |
93 |
} catch (RejectedExecutionException rx) { |
94 |
} catch (RejectedExecutionException rx) { |
Lines 95-107
Link Here
|
95 |
final TaskQueue queue = (TaskQueue)super.getQueue(); |
96 |
final TaskQueue queue = (TaskQueue)super.getQueue(); |
96 |
try { |
97 |
try { |
97 |
if (!queue.force(command, timeout, unit)) { |
98 |
if (!queue.force(command, timeout, unit)) { |
|
|
99 |
submittedCount.decrementAndGet(); |
98 |
throw new RejectedExecutionException("Queue capacity is full."); |
100 |
throw new RejectedExecutionException("Queue capacity is full."); |
99 |
} |
101 |
} |
100 |
} catch (InterruptedException x) { |
102 |
} catch (InterruptedException x) { |
|
|
103 |
submittedCount.decrementAndGet(); |
101 |
Thread.interrupted(); |
104 |
Thread.interrupted(); |
102 |
throw new RejectedExecutionException(x); |
105 |
throw new RejectedExecutionException(x); |
103 |
} |
106 |
} |
104 |
} else { |
107 |
} else { |
|
|
108 |
submittedCount.decrementAndGet(); |
105 |
throw rx; |
109 |
throw rx; |
106 |
} |
110 |
} |
107 |
|
111 |
|