ASF Bugzilla – Attachment 26451 Details for
Bug 50306
Detect stuck threads
Home
|
New
|
Browse
|
Search
|
[?]
|
Reports
|
Help
|
New Account
|
Log In
Remember
[x]
|
Forgot Password
Login:
[x]
[patch]
Patch for Tomcat's ThreadPoolExecutor
stuck_thread_detection.patch (text/plain), 11.05 KB, created by
TomLu
on 2010-12-29 05:02:04 UTC
(
hide
)
Description:
Patch for Tomcat's ThreadPoolExecutor
Filename:
MIME Type:
Creator:
TomLu
Created:
2010-12-29 05:02:04 UTC
Size:
11.05 KB
patch
obsolete
># This patch file was generated by NetBeans IDE ># Following Index: paths are relative to: C:\Java\apache-tomcat-7.0.5-src\java\org\apache\tomcat\util\threads ># This patch can be applied using context Tools: Patch action on respective folder. ># It uses platform neutral UTF-8 encoding and \n newlines. ># Above lines and this line are ignored by the patching process. >Index: ThreadPoolExecutor.java >--- ThreadPoolExecutor.java Base (BASE) >+++ ThreadPoolExecutor.java Locally Modified (Based On LOCAL) >@@ -16,12 +16,19 @@ > */ > package org.apache.tomcat.util.threads; > >+import java.util.List; >+import java.util.Queue; >+import java.util.Timer; >+import java.util.TimerTask; > import java.util.concurrent.BlockingQueue; >+import java.util.concurrent.ConcurrentHashMap; >+import java.util.concurrent.ConcurrentLinkedQueue; > import java.util.concurrent.RejectedExecutionException; > import java.util.concurrent.RejectedExecutionHandler; > import java.util.concurrent.ThreadFactory; > import java.util.concurrent.TimeUnit; > import java.util.concurrent.atomic.AtomicInteger; >+ > /** > * Same as a java.util.concurrent.ThreadPoolExecutor but implements a much more efficient > * {@link #getSubmittedCount()} method, to be used to properly handle the work queue. >@@ -39,29 +46,220 @@ > * This number is always greater or equal to {@link #getActiveCount()}. > */ > private final AtomicInteger submittedCount = new AtomicInteger(0); >+ private static final org.apache.juli.logging.Log log = org.apache.juli.logging.LogFactory.getLog(org.apache.tomcat.util.threads.ThreadPoolExecutor.class); >+ private final AtomicInteger hungCount = new AtomicInteger(0); >+ private static long THREAD_MONITOR_THRESHOLD_IN_SECONDS = 10 * 60; //Default 10 min. >+ private static long THREAD_MONITOR_INTERVAL_IN_SECONDS = 120; //Default 120 seconds >+ //Don't keep references to any actual running Thread objects, except in this Map (which is automatically cleaned in afterExecute() ). >+ //That way, Threads can be GC'ed, eventhough the Monitor still thinks they are hung (caused by a long monitor interval). >+ private ConcurrentHashMap<Runnable, MonitoredThread> activeThreads = null; >+ private Queue<CompletedHungThread> completedHungThreadsQueue = null; >+ private Timer timer = null; > >+ //This is only modified in the static initializer. If modified elsewhere, >+ //it should be changed to volatile or Atomic >+ private static boolean monitoringEnabled = true; //Default? >+ >+ static { >+ //Get properties (if configured), and check for sensible values >+ >+ Integer thresholdTmp = Integer.getInteger("org.apache.catalina.threadmonitor.threshold"); //In seconds >+ if (thresholdTmp != null) { >+ int threshold = thresholdTmp.intValue(); >+ if (threshold >= 60) { //SO_TIMEOUT is 20 sec in Tomcat, the threshold must at least be larger than that >+ THREAD_MONITOR_THRESHOLD_IN_SECONDS = threshold; >+ } >+ } >+ >+ Integer intervalTmp = Integer.getInteger("org.apache.catalina.threadmonitor.interval"); //In seconds >+ if (intervalTmp != null) { >+ int interval = intervalTmp.intValue(); >+ if (interval >= 60 && interval <= 600) { >+ THREAD_MONITOR_INTERVAL_IN_SECONDS = interval; >+ } else if (interval <= 0) { >+ monitoringEnabled = false; >+ } >+ } >+ } >+ > public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) { > super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler); >+ createThreadPoolMonitor(corePoolSize); > } > > public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, > RejectedExecutionHandler handler) { > super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); >+ createThreadPoolMonitor(corePoolSize); > } > > public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { > super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, new RejectHandler()); >+ createThreadPoolMonitor(corePoolSize); > } > > public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { > super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, new RejectHandler()); >+ createThreadPoolMonitor(corePoolSize); > } > >+ private void createThreadPoolMonitor(int corePoolSize) { >+ if (monitoringEnabled) { >+ activeThreads = new ConcurrentHashMap<Runnable, MonitoredThread>(corePoolSize * 2); //TODO is initialSize ok? >+ completedHungThreadsQueue = new ConcurrentLinkedQueue<CompletedHungThread>(); >+ timer = new Timer("ThreadPool monitor timer", true); //daemon >+ MonitorTask monitorTask = new MonitorTask(); >+ timer.schedule(monitorTask, THREAD_MONITOR_INTERVAL_IN_SECONDS * 1000, THREAD_MONITOR_INTERVAL_IN_SECONDS * 1000); //delay, period >+ >+ log.debug("Monitoring threads with threshold = " + THREAD_MONITOR_THRESHOLD_IN_SECONDS + " sec, interval = " + THREAD_MONITOR_INTERVAL_IN_SECONDS + " sec"); >+ } >+ } >+ > @Override >+ protected void beforeExecute(Thread thread, Runnable r) { >+ super.beforeExecute(thread, r); >+ if (monitoringEnabled) { >+ //Save the thread/runnable >+ //Keeping a reference to the thread object here does not prevent GC'ing, >+ //as the reference is removed from the Map in afterExecute() >+ activeThreads.put(r, new MonitoredThread(thread)); >+ } >+ } >+ >+ /* >+ * This method is invoked by the thread that executed the task >+ */ >+ @Override > protected void afterExecute(Runnable r, Throwable t) { >+ super.afterExecute(r,t); > submittedCount.decrementAndGet(); >+ if (monitoringEnabled) { >+ MonitoredThread mt = activeThreads.remove(r); >+ if (mt.isMarkedAsHung()) { >+ completedHungThreadsQueue.add(new CompletedHungThread(mt.getThread().getName(), mt.getActiveTimeInMillis())); > } >+ } >+ } > >+ class MonitoredThread { >+ >+ //We need a reference to the thread to get a stack trace from the TimerTask >+ private Thread thread; >+ private long start = System.currentTimeMillis(); >+ private volatile boolean isHung = false; >+ >+ public MonitoredThread(Thread thread) { >+ this.thread = thread; >+ } >+ >+ public Thread getThread() { >+ return this.thread; >+ } >+ >+ public long getActiveTimeInMillis() { >+ return System.currentTimeMillis() - start; >+ } >+ >+ public void markAsHung() { >+ this.isHung = true; >+ } >+ >+ public boolean isMarkedAsHung() { >+ return this.isHung; >+ } >+ } >+ >+ class CompletedHungThread { >+ >+ private String name; >+ private long totalActiveTime; >+ >+ public CompletedHungThread(String name, long totalActiveTime) { >+ this.name = name; >+ this.totalActiveTime = totalActiveTime; >+ } >+ >+ public String getName() { >+ return this.name; >+ } >+ >+ public long getTotalActiveTime() { >+ return this.totalActiveTime; >+ } >+ } >+ >+ class MonitorTask extends TimerTask { >+ >+ public MonitorTask() { >+ } >+ >+ @Override >+ public void run() { >+ //Check monitored threads >+ for (MonitoredThread mt : activeThreads.values()) { >+ long activeTime = mt.getActiveTimeInMillis(); >+ >+ if (!mt.isMarkedAsHung() && activeTime >= THREAD_MONITOR_THRESHOLD_IN_SECONDS * 1000) { >+ int numHungThreads = hungCount.incrementAndGet(); >+ mt.markAsHung(); >+ notifyHungThreadDetected(mt.getThread().getName(), mt.getThread().getStackTrace(), activeTime, numHungThreads); >+ } >+ } >+ //Check if any threads previously reported as hung, have finished. >+ CompletedHungThread cht = completedHungThreadsQueue.poll(); >+ while (cht != null) { >+ int numHungThreads = hungCount.decrementAndGet(); >+ notifyHungThreadCompleted(cht.getName(), cht.getTotalActiveTime(), numHungThreads); >+ cht = completedHungThreadsQueue.poll(); >+ } >+ } >+ } >+ >+ //The following methods could be implemented using a Listener instead: >+ //HungThreadListener >+ //"onHungThreadDetected" >+ //"onHungThreadCompleted" >+ private void notifyHungThreadDetected(String threadName, StackTraceElement[] trace, long activeTime, int numHungThreads) { >+ //TODO Add JMX notification ? >+ String header = "Thread \"" + threadName + "\" has been active for " + activeTime + " milliseconds and may be hung. There is/are " + numHungThreads + " thread(s) in total in this thread pool that may be hung."; >+ header += "\n" + getStackTraceAsString(trace); >+ log.warn(header); >+ } >+ >+ private void notifyHungThreadCompleted(String threadName, long activeTime, int numHungThreads) { >+ //TODO Add JMX notification ? >+ String msg = "Thread \"" + threadName + "\" was previously reported to be hung but has completed. It was active for approximately " + activeTime + " milliseconds. There is/are " + numHungThreads + " thread(s) in total in the server that still may be hung."; >+ log.warn(msg); //Since the "hung thread notification" is warn, this should also be warn >+ } >+ >+ //TODO Utility method, should be moved to utility class >+ public static String getStackTraceAsString(StackTraceElement[] trace) { >+ StringBuilder buf = new StringBuilder(); >+ for (int i = 0; i < trace.length; i++) { >+ buf.append("\tat ").append(trace[i]).append("\n"); >+ } >+ return buf.toString(); >+ } >+ >+ public void shutdownMonitorTimer(){ >+ if(timer != null){ >+ timer.cancel(); >+ } >+ } >+ >+ @Override >+ public void shutdown(){ >+ super.shutdown(); >+ shutdownMonitorTimer(); >+ } >+ >+ @Override >+ public List<Runnable> shutdownNow(){ >+ List<Runnable> runnables = super.shutdownNow(); >+ shutdownMonitorTimer(); >+ return runnables; >+ } >+ > public int getSubmittedCount() { > return submittedCount.get(); > } >@@ -71,7 +269,7 @@ > */ > @Override > public void execute(Runnable command) { >- execute(command,0,TimeUnit.MILLISECONDS); >+ execute(command, 0, TimeUnit.MILLISECONDS); > } > > /**
You cannot view the attachment while viewing its details because your browser does not support IFRAMEs.
View the attachment on a separate page
.
View Attachment As Diff
View Attachment As Raw
Actions:
View
|
Diff
Attachments on
bug 50306
:
26451
|
26769
|
28961
|
28966