Lines 49-54
Link Here
|
49 |
|
49 |
|
50 |
protected String name; |
50 |
protected String name; |
51 |
|
51 |
|
|
|
52 |
/** |
53 |
* Number of tasks submitted and not yet completed. |
54 |
*/ |
55 |
protected AtomicInteger submittedTasksCount; |
56 |
|
52 |
private LifecycleSupport lifecycle = new LifecycleSupport(this); |
57 |
private LifecycleSupport lifecycle = new LifecycleSupport(this); |
53 |
// ---------------------------------------------- Constructors |
58 |
// ---------------------------------------------- Constructors |
54 |
public StandardThreadExecutor() { |
59 |
public StandardThreadExecutor() { |
Lines 63-70
Link Here
|
63 |
TaskQueue taskqueue = new TaskQueue(); |
68 |
TaskQueue taskqueue = new TaskQueue(); |
64 |
TaskThreadFactory tf = new TaskThreadFactory(namePrefix); |
69 |
TaskThreadFactory tf = new TaskThreadFactory(namePrefix); |
65 |
lifecycle.fireLifecycleEvent(START_EVENT, null); |
70 |
lifecycle.fireLifecycleEvent(START_EVENT, null); |
66 |
executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), maxIdleTime, TimeUnit.MILLISECONDS,taskqueue, tf); |
71 |
executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), maxIdleTime, TimeUnit.MILLISECONDS,taskqueue, tf) { |
|
|
72 |
@Override |
73 |
protected void afterExecute(Runnable r, Throwable t) { |
74 |
AtomicInteger atomic = submittedTasksCount; |
75 |
if(atomic!=null) { |
76 |
atomic.decrementAndGet(); |
77 |
} |
78 |
} |
79 |
}; |
67 |
taskqueue.setParent( (ThreadPoolExecutor) executor); |
80 |
taskqueue.setParent( (ThreadPoolExecutor) executor); |
|
|
81 |
submittedTasksCount = new AtomicInteger(); |
68 |
lifecycle.fireLifecycleEvent(AFTER_START_EVENT, null); |
82 |
lifecycle.fireLifecycleEvent(AFTER_START_EVENT, null); |
69 |
} |
83 |
} |
70 |
|
84 |
|
Lines 73-88
Link Here
|
73 |
lifecycle.fireLifecycleEvent(STOP_EVENT, null); |
87 |
lifecycle.fireLifecycleEvent(STOP_EVENT, null); |
74 |
if ( executor != null ) executor.shutdown(); |
88 |
if ( executor != null ) executor.shutdown(); |
75 |
executor = null; |
89 |
executor = null; |
|
|
90 |
submittedTasksCount = null; |
76 |
lifecycle.fireLifecycleEvent(AFTER_STOP_EVENT, null); |
91 |
lifecycle.fireLifecycleEvent(AFTER_STOP_EVENT, null); |
77 |
} |
92 |
} |
78 |
|
93 |
|
79 |
public void execute(Runnable command) { |
94 |
public void execute(Runnable command) { |
80 |
if ( executor != null ) { |
95 |
if ( executor != null ) { |
|
|
96 |
submittedTasksCount.incrementAndGet(); |
81 |
try { |
97 |
try { |
82 |
executor.execute(command); |
98 |
executor.execute(command); |
83 |
} catch (RejectedExecutionException rx) { |
99 |
} catch (RejectedExecutionException rx) { |
84 |
//there could have been contention around the queue |
100 |
//there could have been contention around the queue |
85 |
if ( !( (TaskQueue) executor.getQueue()).force(command) ) throw new RejectedExecutionException(); |
101 |
if ( !( (TaskQueue) executor.getQueue()).force(command) ) { |
|
|
102 |
submittedTasksCount.decrementAndGet(); |
103 |
throw new RejectedExecutionException(); |
104 |
} |
86 |
} |
105 |
} |
87 |
} else throw new IllegalStateException("StandardThreadPool not started."); |
106 |
} else throw new IllegalStateException("StandardThreadPool not started."); |
88 |
} |
107 |
} |
Lines 234-246
Link Here
|
234 |
public boolean offer(Runnable o) { |
253 |
public boolean offer(Runnable o) { |
235 |
//we can't do any checks |
254 |
//we can't do any checks |
236 |
if (parent==null) return super.offer(o); |
255 |
if (parent==null) return super.offer(o); |
|
|
256 |
int poolSize = parent.getPoolSize(); |
237 |
//we are maxed out on threads, simply queue the object |
257 |
//we are maxed out on threads, simply queue the object |
238 |
if (parent.getPoolSize() == parent.getMaximumPoolSize()) return super.offer(o); |
258 |
if (parent.getPoolSize() == parent.getMaximumPoolSize()) return super.offer(o); |
239 |
//we have idle threads, just add it to the queue |
259 |
//we have idle threads, just add it to the queue |
240 |
//this is an approximation, so it could use some tuning |
260 |
//note that we don't use getActiveCount(), see BZ 49730 |
241 |
if (parent.getActiveCount()<(parent.getPoolSize())) return super.offer(o); |
261 |
AtomicInteger submittedTasksCount = StandardThreadExecutor.this.submittedTasksCount; |
|
|
262 |
if(submittedTasksCount!=null) { |
263 |
if (submittedTasksCount.get()<=poolSize) return super.offer(o); |
264 |
} |
242 |
//if we have less threads than maximum force creation of a new thread |
265 |
//if we have less threads than maximum force creation of a new thread |
243 |
if (parent.getPoolSize()<parent.getMaximumPoolSize()) return false; |
266 |
if (poolSize<parent.getMaximumPoolSize()) return false; |
244 |
//if we reached here, we need to add it to the queue |
267 |
//if we reached here, we need to add it to the queue |
245 |
return super.offer(o); |
268 |
return super.offer(o); |
246 |
} |
269 |
} |