# This patch file was generated by NetBeans IDE # Following Index: paths are relative to: C:\Java\apache-tomcat-7.0.5-src\java\org\apache\tomcat\util\threads # This patch can be applied using context Tools: Patch action on respective folder. # It uses platform neutral UTF-8 encoding and \n newlines. # Above lines and this line are ignored by the patching process. Index: ThreadPoolExecutor.java --- ThreadPoolExecutor.java Base (BASE) +++ ThreadPoolExecutor.java Locally Modified (Based On LOCAL) @@ -16,12 +16,19 @@ */ package org.apache.tomcat.util.threads; +import java.util.List; +import java.util.Queue; +import java.util.Timer; +import java.util.TimerTask; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; + /** * Same as a java.util.concurrent.ThreadPoolExecutor but implements a much more efficient * {@link #getSubmittedCount()} method, to be used to properly handle the work queue. @@ -39,29 +46,220 @@ * This number is always greater or equal to {@link #getActiveCount()}. */ private final AtomicInteger submittedCount = new AtomicInteger(0); + private static final org.apache.juli.logging.Log log = org.apache.juli.logging.LogFactory.getLog(org.apache.tomcat.util.threads.ThreadPoolExecutor.class); + private final AtomicInteger hungCount = new AtomicInteger(0); + private static long THREAD_MONITOR_THRESHOLD_IN_SECONDS = 10 * 60; //Default 10 min. + private static long THREAD_MONITOR_INTERVAL_IN_SECONDS = 120; //Default 120 seconds + //Don't keep references to any actual running Thread objects, except in this Map (which is automatically cleaned in afterExecute() ). + //That way, Threads can be GC'ed, eventhough the Monitor still thinks they are hung (caused by a long monitor interval). + private ConcurrentHashMap activeThreads = null; + private Queue completedHungThreadsQueue = null; + private Timer timer = null; + //This is only modified in the static initializer. If modified elsewhere, + //it should be changed to volatile or Atomic + private static boolean monitoringEnabled = true; //Default? + + static { + //Get properties (if configured), and check for sensible values + + Integer thresholdTmp = Integer.getInteger("org.apache.catalina.threadmonitor.threshold"); //In seconds + if (thresholdTmp != null) { + int threshold = thresholdTmp.intValue(); + if (threshold >= 60) { //SO_TIMEOUT is 20 sec in Tomcat, the threshold must at least be larger than that + THREAD_MONITOR_THRESHOLD_IN_SECONDS = threshold; + } + } + + Integer intervalTmp = Integer.getInteger("org.apache.catalina.threadmonitor.interval"); //In seconds + if (intervalTmp != null) { + int interval = intervalTmp.intValue(); + if (interval >= 60 && interval <= 600) { + THREAD_MONITOR_INTERVAL_IN_SECONDS = interval; + } else if (interval <= 0) { + monitoringEnabled = false; + } + } + } + public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, RejectedExecutionHandler handler) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler); + createThreadPoolMonitor(corePoolSize); } public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); + createThreadPoolMonitor(corePoolSize); } public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, new RejectHandler()); + createThreadPoolMonitor(corePoolSize); } public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, new RejectHandler()); + createThreadPoolMonitor(corePoolSize); } + private void createThreadPoolMonitor(int corePoolSize) { + if (monitoringEnabled) { + activeThreads = new ConcurrentHashMap(corePoolSize * 2); //TODO is initialSize ok? + completedHungThreadsQueue = new ConcurrentLinkedQueue(); + timer = new Timer("ThreadPool monitor timer", true); //daemon + MonitorTask monitorTask = new MonitorTask(); + timer.schedule(monitorTask, THREAD_MONITOR_INTERVAL_IN_SECONDS * 1000, THREAD_MONITOR_INTERVAL_IN_SECONDS * 1000); //delay, period + + log.debug("Monitoring threads with threshold = " + THREAD_MONITOR_THRESHOLD_IN_SECONDS + " sec, interval = " + THREAD_MONITOR_INTERVAL_IN_SECONDS + " sec"); + } + } + @Override + protected void beforeExecute(Thread thread, Runnable r) { + super.beforeExecute(thread, r); + if (monitoringEnabled) { + //Save the thread/runnable + //Keeping a reference to the thread object here does not prevent GC'ing, + //as the reference is removed from the Map in afterExecute() + activeThreads.put(r, new MonitoredThread(thread)); + } + } + + /* + * This method is invoked by the thread that executed the task + */ + @Override protected void afterExecute(Runnable r, Throwable t) { + super.afterExecute(r,t); submittedCount.decrementAndGet(); + if (monitoringEnabled) { + MonitoredThread mt = activeThreads.remove(r); + if (mt.isMarkedAsHung()) { + completedHungThreadsQueue.add(new CompletedHungThread(mt.getThread().getName(), mt.getActiveTimeInMillis())); } + } + } + class MonitoredThread { + + //We need a reference to the thread to get a stack trace from the TimerTask + private Thread thread; + private long start = System.currentTimeMillis(); + private volatile boolean isHung = false; + + public MonitoredThread(Thread thread) { + this.thread = thread; + } + + public Thread getThread() { + return this.thread; + } + + public long getActiveTimeInMillis() { + return System.currentTimeMillis() - start; + } + + public void markAsHung() { + this.isHung = true; + } + + public boolean isMarkedAsHung() { + return this.isHung; + } + } + + class CompletedHungThread { + + private String name; + private long totalActiveTime; + + public CompletedHungThread(String name, long totalActiveTime) { + this.name = name; + this.totalActiveTime = totalActiveTime; + } + + public String getName() { + return this.name; + } + + public long getTotalActiveTime() { + return this.totalActiveTime; + } + } + + class MonitorTask extends TimerTask { + + public MonitorTask() { + } + + @Override + public void run() { + //Check monitored threads + for (MonitoredThread mt : activeThreads.values()) { + long activeTime = mt.getActiveTimeInMillis(); + + if (!mt.isMarkedAsHung() && activeTime >= THREAD_MONITOR_THRESHOLD_IN_SECONDS * 1000) { + int numHungThreads = hungCount.incrementAndGet(); + mt.markAsHung(); + notifyHungThreadDetected(mt.getThread().getName(), mt.getThread().getStackTrace(), activeTime, numHungThreads); + } + } + //Check if any threads previously reported as hung, have finished. + CompletedHungThread cht = completedHungThreadsQueue.poll(); + while (cht != null) { + int numHungThreads = hungCount.decrementAndGet(); + notifyHungThreadCompleted(cht.getName(), cht.getTotalActiveTime(), numHungThreads); + cht = completedHungThreadsQueue.poll(); + } + } + } + + //The following methods could be implemented using a Listener instead: + //HungThreadListener + //"onHungThreadDetected" + //"onHungThreadCompleted" + private void notifyHungThreadDetected(String threadName, StackTraceElement[] trace, long activeTime, int numHungThreads) { + //TODO Add JMX notification ? + String header = "Thread \"" + threadName + "\" has been active for " + activeTime + " milliseconds and may be hung. There is/are " + numHungThreads + " thread(s) in total in this thread pool that may be hung."; + header += "\n" + getStackTraceAsString(trace); + log.warn(header); + } + + private void notifyHungThreadCompleted(String threadName, long activeTime, int numHungThreads) { + //TODO Add JMX notification ? + String msg = "Thread \"" + threadName + "\" was previously reported to be hung but has completed. It was active for approximately " + activeTime + " milliseconds. There is/are " + numHungThreads + " thread(s) in total in the server that still may be hung."; + log.warn(msg); //Since the "hung thread notification" is warn, this should also be warn + } + + //TODO Utility method, should be moved to utility class + public static String getStackTraceAsString(StackTraceElement[] trace) { + StringBuilder buf = new StringBuilder(); + for (int i = 0; i < trace.length; i++) { + buf.append("\tat ").append(trace[i]).append("\n"); + } + return buf.toString(); + } + + public void shutdownMonitorTimer(){ + if(timer != null){ + timer.cancel(); + } + } + + @Override + public void shutdown(){ + super.shutdown(); + shutdownMonitorTimer(); + } + + @Override + public List shutdownNow(){ + List runnables = super.shutdownNow(); + shutdownMonitorTimer(); + return runnables; + } + public int getSubmittedCount() { return submittedCount.get(); } @@ -71,7 +269,7 @@ */ @Override public void execute(Runnable command) { - execute(command,0,TimeUnit.MILLISECONDS); + execute(command, 0, TimeUnit.MILLISECONDS); } /**