View | Details | Raw Unified | Return to bug 50306
Collapse All | Expand All

(-)ThreadPoolExecutor.java (-1 / +199 lines)
Lines 16-27 Link Here
16
 */
16
 */
17
package org.apache.tomcat.util.threads;
17
package org.apache.tomcat.util.threads;
18
18
19
import java.util.List;
20
import java.util.Queue;
21
import java.util.Timer;
22
import java.util.TimerTask;
19
import java.util.concurrent.BlockingQueue;
23
import java.util.concurrent.BlockingQueue;
24
import java.util.concurrent.ConcurrentHashMap;
25
import java.util.concurrent.ConcurrentLinkedQueue;
20
import java.util.concurrent.RejectedExecutionException;
26
import java.util.concurrent.RejectedExecutionException;
21
import java.util.concurrent.RejectedExecutionHandler;
27
import java.util.concurrent.RejectedExecutionHandler;
22
import java.util.concurrent.ThreadFactory;
28
import java.util.concurrent.ThreadFactory;
23
import java.util.concurrent.TimeUnit;
29
import java.util.concurrent.TimeUnit;
24
import java.util.concurrent.atomic.AtomicInteger;
30
import java.util.concurrent.atomic.AtomicInteger;
31
25
/**
32
/**
26
 * Same as a java.util.concurrent.ThreadPoolExecutor but implements a much more efficient
33
 * Same as a java.util.concurrent.ThreadPoolExecutor but implements a much more efficient
27
 * {@link #getSubmittedCount()} method, to be used to properly handle the work queue.
34
 * {@link #getSubmittedCount()} method, to be used to properly handle the work queue.
Lines 39-67 Link Here
39
     * This number is always greater or equal to {@link #getActiveCount()}.
46
     * This number is always greater or equal to {@link #getActiveCount()}.
40
     */
47
     */
41
    private final AtomicInteger submittedCount = new AtomicInteger(0);
48
    private final AtomicInteger submittedCount = new AtomicInteger(0);
49
    private static final org.apache.juli.logging.Log log = org.apache.juli.logging.LogFactory.getLog(org.apache.tomcat.util.threads.ThreadPoolExecutor.class);
50
    private final AtomicInteger hungCount = new AtomicInteger(0);
51
    private static long THREAD_MONITOR_THRESHOLD_IN_SECONDS = 10 * 60; //Default 10 min.
52
    private static long THREAD_MONITOR_INTERVAL_IN_SECONDS = 120; //Default 120 seconds
53
    //Don't keep references to any actual running Thread objects, except in this Map (which is automatically cleaned in afterExecute() ).
54
    //That way, Threads can be GC'ed, eventhough the Monitor still thinks they are hung (caused by a long monitor interval).
55
    private ConcurrentHashMap<Runnable, MonitoredThread> activeThreads = null;
56
    private Queue<CompletedHungThread> completedHungThreadsQueue = null;
57
    private Timer timer = null;
42
    
58
    
59
    //This is only modified in the static initializer. If modified elsewhere,
60
    //it should be changed to volatile or Atomic
61
    private static boolean monitoringEnabled = true; //Default?
62
63
    static {
64
        //Get properties (if configured), and check for sensible values
65
66
        Integer thresholdTmp = Integer.getInteger("org.apache.catalina.threadmonitor.threshold"); //In seconds
67
        if (thresholdTmp != null) {
68
            int threshold = thresholdTmp.intValue();
69
            if (threshold >= 60) { //SO_TIMEOUT is 20 sec in Tomcat, the threshold must at least be larger than that
70
                THREAD_MONITOR_THRESHOLD_IN_SECONDS = threshold;
71
            }
72
        }
73
74
        Integer intervalTmp = Integer.getInteger("org.apache.catalina.threadmonitor.interval"); //In seconds
75
        if (intervalTmp != null) {
76
            int interval = intervalTmp.intValue();
77
            if (interval >= 60 && interval <= 600) {
78
                THREAD_MONITOR_INTERVAL_IN_SECONDS = interval;
79
            } else if (interval <= 0) {
80
                monitoringEnabled = false;
81
            }
82
        }
83
    }
84
43
    public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
85
    public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
44
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
86
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
87
        createThreadPoolMonitor(corePoolSize);
45
    }
88
    }
46
89
47
    public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,
90
    public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,
48
            RejectedExecutionHandler handler) {
91
            RejectedExecutionHandler handler) {
49
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
92
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
93
        createThreadPoolMonitor(corePoolSize);
50
    }
94
    }
51
95
52
    public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
96
    public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
53
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, new RejectHandler());
97
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, new RejectHandler());
98
        createThreadPoolMonitor(corePoolSize);
54
    }
99
    }
55
100
56
    public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
101
    public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
57
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, new RejectHandler());
102
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, new RejectHandler());
103
        createThreadPoolMonitor(corePoolSize);
58
    }
104
    }
59
105
106
    private void createThreadPoolMonitor(int corePoolSize) {
107
        if (monitoringEnabled) {
108
            activeThreads = new ConcurrentHashMap<Runnable, MonitoredThread>(corePoolSize * 2); //TODO is initialSize ok?
109
            completedHungThreadsQueue = new ConcurrentLinkedQueue<CompletedHungThread>();
110
            timer = new Timer("ThreadPool monitor timer", true); //daemon
111
            MonitorTask monitorTask = new MonitorTask();
112
            timer.schedule(monitorTask, THREAD_MONITOR_INTERVAL_IN_SECONDS * 1000, THREAD_MONITOR_INTERVAL_IN_SECONDS * 1000); //delay, period
113
114
            log.debug("Monitoring threads with threshold = " + THREAD_MONITOR_THRESHOLD_IN_SECONDS + " sec, interval = " + THREAD_MONITOR_INTERVAL_IN_SECONDS + " sec");
115
        }
116
    }
117
60
    @Override
118
    @Override
119
    protected void beforeExecute(Thread thread, Runnable r) {
120
        super.beforeExecute(thread, r);
121
        if (monitoringEnabled) {
122
            //Save the thread/runnable
123
            //Keeping a reference to the thread object here does not prevent GC'ing,
124
            //as the reference is removed from the Map in afterExecute()
125
            activeThreads.put(r, new MonitoredThread(thread));
126
        }
127
    }
128
129
    /*
130
     * This method is invoked by the thread that executed the task
131
     */
132
    @Override
61
    protected void afterExecute(Runnable r, Throwable t) {
133
    protected void afterExecute(Runnable r, Throwable t) {
134
        super.afterExecute(r,t);
62
        submittedCount.decrementAndGet();
135
        submittedCount.decrementAndGet();
136
        if (monitoringEnabled) {
137
            MonitoredThread mt = activeThreads.remove(r);
138
            if (mt.isMarkedAsHung()) {
139
                completedHungThreadsQueue.add(new CompletedHungThread(mt.getThread().getName(), mt.getActiveTimeInMillis()));
63
    }
140
    }
141
        }
142
    }
64
143
144
    class MonitoredThread {
145
146
        //We need a reference to the thread to get a stack trace from the TimerTask
147
        private Thread thread;
148
        private long start = System.currentTimeMillis();
149
        private volatile boolean isHung = false;
150
151
        public MonitoredThread(Thread thread) {
152
            this.thread = thread;
153
        }
154
155
        public Thread getThread() {
156
            return this.thread;
157
        }
158
159
        public long getActiveTimeInMillis() {
160
            return System.currentTimeMillis() - start;
161
        }
162
163
        public void markAsHung() {
164
            this.isHung = true;
165
        }
166
167
        public boolean isMarkedAsHung() {
168
            return this.isHung;
169
        }
170
    }
171
172
    class CompletedHungThread {
173
174
        private String name;
175
        private long totalActiveTime;
176
177
        public CompletedHungThread(String name, long totalActiveTime) {
178
            this.name = name;
179
            this.totalActiveTime = totalActiveTime;
180
        }
181
182
        public String getName() {
183
            return this.name;
184
        }
185
186
        public long getTotalActiveTime() {
187
            return this.totalActiveTime;
188
        }
189
    }
190
191
    class MonitorTask extends TimerTask {
192
193
        public MonitorTask() {
194
        }
195
196
        @Override
197
        public void run() {
198
            //Check monitored threads
199
            for (MonitoredThread mt : activeThreads.values()) {
200
                long activeTime = mt.getActiveTimeInMillis();
201
202
                if (!mt.isMarkedAsHung() && activeTime >= THREAD_MONITOR_THRESHOLD_IN_SECONDS * 1000) {
203
                    int numHungThreads = hungCount.incrementAndGet();
204
                    mt.markAsHung();
205
                    notifyHungThreadDetected(mt.getThread().getName(), mt.getThread().getStackTrace(), activeTime, numHungThreads);
206
                }
207
            }
208
            //Check if any threads previously reported as hung, have finished.
209
            CompletedHungThread cht = completedHungThreadsQueue.poll();
210
            while (cht != null) {
211
                int numHungThreads = hungCount.decrementAndGet();
212
                notifyHungThreadCompleted(cht.getName(), cht.getTotalActiveTime(), numHungThreads);
213
                cht = completedHungThreadsQueue.poll();
214
            }
215
        }
216
    }
217
218
    //The following methods could be implemented using a Listener instead:
219
    //HungThreadListener
220
    //"onHungThreadDetected"
221
    //"onHungThreadCompleted"
222
    private void notifyHungThreadDetected(String threadName, StackTraceElement[] trace, long activeTime, int numHungThreads) {
223
        //TODO Add JMX notification ?
224
        String header = "Thread \"" + threadName + "\" has been active for " + activeTime + " milliseconds and may be hung. There is/are " + numHungThreads + " thread(s) in total in this thread pool that may be hung.";
225
        header += "\n" + getStackTraceAsString(trace);
226
        log.warn(header);
227
    }
228
229
    private void notifyHungThreadCompleted(String threadName, long activeTime, int numHungThreads) {
230
        //TODO Add JMX notification ?
231
        String msg = "Thread \"" + threadName + "\" was previously reported to be hung but has completed. It was active for approximately " + activeTime + " milliseconds. There is/are " + numHungThreads + " thread(s) in total in the server that still may be hung.";
232
        log.warn(msg); //Since the "hung thread notification" is warn, this should also be warn
233
    }
234
235
    //TODO Utility method, should be moved to utility class
236
    public static String getStackTraceAsString(StackTraceElement[] trace) {
237
        StringBuilder buf = new StringBuilder();
238
        for (int i = 0; i < trace.length; i++) {
239
            buf.append("\tat ").append(trace[i]).append("\n");
240
        }
241
        return buf.toString();
242
    }
243
244
    public void shutdownMonitorTimer(){
245
        if(timer != null){
246
            timer.cancel();
247
        }
248
    }
249
250
    @Override
251
    public void shutdown(){
252
        super.shutdown();
253
        shutdownMonitorTimer();
254
    }
255
256
    @Override
257
    public List<Runnable> shutdownNow(){
258
        List<Runnable> runnables = super.shutdownNow();
259
        shutdownMonitorTimer();
260
        return runnables;
261
    }
262
65
    public int getSubmittedCount() {
263
    public int getSubmittedCount() {
66
        return submittedCount.get();
264
        return submittedCount.get();
67
    }
265
    }
Lines 71-77 Link Here
71
     */
269
     */
72
    @Override
270
    @Override
73
    public void execute(Runnable command) {
271
    public void execute(Runnable command) {
74
        execute(command,0,TimeUnit.MILLISECONDS);
272
        execute(command, 0, TimeUnit.MILLISECONDS);
75
    }
273
    }
76
    
274
    
77
    /**
275
    /**

Return to bug 50306