View | Details | Raw Unified | Return to bug 49159
Collapse All | Expand All

(-)java/org/apache/catalina/connector/mbeans-descriptors.xml (+4 lines)
Lines 97-102 Link Here
97
          description="The number of request processing threads that will be created"
97
          description="The number of request processing threads that will be created"
98
                 type="int"/>
98
                 type="int"/>
99
99
100
    <attribute   name="maxQueueSize"
101
          description="The maximum number of request accepted but waiting in a queue to be processed"
102
                 type="int"/>
103
100
    <!-- Common -->
104
    <!-- Common -->
101
    <attribute   name="port"
105
    <attribute   name="port"
102
          description="The port number on which we listen for ajp13 requests"
106
          description="The port number on which we listen for ajp13 requests"
(-)java/org/apache/catalina/core/StandardContext.java (+6 lines)
Lines 35-40 Link Here
35
import java.util.Set;
35
import java.util.Set;
36
import java.util.Stack;
36
import java.util.Stack;
37
import java.util.TreeMap;
37
import java.util.TreeMap;
38
import java.util.concurrent.atomic.AtomicLong;
38
39
39
import javax.management.ListenerNotFoundException;
40
import javax.management.ListenerNotFoundException;
40
import javax.management.MBeanNotificationInfo;
41
import javax.management.MBeanNotificationInfo;
Lines 124-129 Link Here
124
125
125
    private static final Log log = LogFactory.getLog(StandardContext.class);
126
    private static final Log log = LogFactory.getLog(StandardContext.class);
126
127
128
    //TODO : move this to the Service instance level to avoid renewing Threads 
129
    //of other Services, they did not serve any request for the context that was stopped
130
    public static AtomicLong lastContextStoppedTime = new AtomicLong(0L);
127
131
128
    // ----------------------------------------------------------- Constructors
132
    // ----------------------------------------------------------- Constructors
129
133
Lines 4994-4999 Link Here
4994
        //reset the instance manager
4998
        //reset the instance manager
4995
        instanceManager = null;
4999
        instanceManager = null;
4996
5000
5001
        lastContextStoppedTime.set(System.currentTimeMillis());
5002
        
4997
        if (log.isDebugEnabled())
5003
        if (log.isDebugEnabled())
4998
            log.debug("Stopping complete");
5004
            log.debug("Stopping complete");
4999
5005
(-)java/org/apache/tomcat/util/net/AbstractEndpoint.java (-2 / +12 lines)
Lines 268-275 Link Here
268
            return maxThreads;
268
            return maxThreads;
269
        }
269
        }
270
    }
270
    }
271
    
272
    private int maxQueueSize = 100;
271
273
272
    /**
274
    public int getMaxQueueSize() {
275
		return maxQueueSize;
276
	}
277
	public void setMaxQueueSize(int maxQueueSize) {
278
		this.maxQueueSize = maxQueueSize;
279
	}
280
281
282
	/**
273
     * Max keep alive requests 
283
     * Max keep alive requests 
274
     */
284
     */
275
    private int maxKeepAliveRequests=100; // as in Apache HTTPD server
285
    private int maxKeepAliveRequests=100; // as in Apache HTTPD server
Lines 372-378 Link Here
372
382
373
    public void createExecutor() {
383
    public void createExecutor() {
374
        internalExecutor = true;
384
        internalExecutor = true;
375
        TaskQueue taskqueue = new TaskQueue();
385
        TaskQueue taskqueue = new TaskQueue(maxQueueSize);
376
        TaskThreadFactory tf = new TaskThreadFactory(getName() + "-exec-", daemon, getThreadPriority());
386
        TaskThreadFactory tf = new TaskThreadFactory(getName() + "-exec-", daemon, getThreadPriority());
377
        executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), 60, TimeUnit.SECONDS,taskqueue, tf);
387
        executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), 60, TimeUnit.SECONDS,taskqueue, tf);
378
        taskqueue.setParent( (ThreadPoolExecutor) executor);
388
        taskqueue.setParent( (ThreadPoolExecutor) executor);
(-)java/org/apache/tomcat/util/threads/TaskQueue.java (-11 / +5 lines)
Lines 16-23 Link Here
16
 */
16
 */
17
package org.apache.tomcat.util.threads;
17
package org.apache.tomcat.util.threads;
18
18
19
import java.util.Collection;
19
import java.util.concurrent.ArrayBlockingQueue;
20
import java.util.concurrent.LinkedBlockingQueue;
21
import java.util.concurrent.RejectedExecutionException;
20
import java.util.concurrent.RejectedExecutionException;
22
import java.util.concurrent.ThreadPoolExecutor;
21
import java.util.concurrent.ThreadPoolExecutor;
23
import java.util.concurrent.TimeUnit;
22
import java.util.concurrent.TimeUnit;
Lines 29-49 Link Here
29
 * @author fhanik
28
 * @author fhanik
30
 *
29
 *
31
 */
30
 */
32
public class TaskQueue extends LinkedBlockingQueue<Runnable> {
31
public class TaskQueue extends ArrayBlockingQueue<Runnable> {
33
    private ThreadPoolExecutor parent = null;
32
    private ThreadPoolExecutor parent = null;
34
33
35
    public TaskQueue() {
36
        super();
37
    }
38
34
39
    public TaskQueue(int capacity) {
35
    public TaskQueue(int capacity) {
40
        super(capacity);
36
    	//use a fair queue to allow more deterministic renewal of threads after a 
37
    	//context is stopped (memory leak protection)
38
        super(capacity, true);
41
    }
39
    }
42
40
43
    public TaskQueue(Collection<? extends Runnable> c) {
44
        super(c);
45
    }
46
47
    public void setParent(ThreadPoolExecutor tp) {
41
    public void setParent(ThreadPoolExecutor tp) {
48
        parent = tp;
42
        parent = tp;
49
    }
43
    }
(-)java/org/apache/tomcat/util/threads/TaskThread.java (+47 lines)
Line 0 Link Here
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one or more
3
 * contributor license agreements.  See the NOTICE file distributed with
4
 * this work for additional information regarding copyright ownership.
5
 * The ASF licenses this file to You under the Apache License, Version 2.0
6
 * (the "License"); you may not use this file except in compliance with
7
 * the License.  You may obtain a copy of the License at
8
 * 
9
 *      http://www.apache.org/licenses/LICENSE-2.0
10
 * 
11
 * Unless required by applicable law or agreed to in writing, software
12
 * distributed under the License is distributed on an "AS IS" BASIS,
13
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
 * See the License for the specific language governing permissions and
15
 * limitations under the License.
16
 */
17
package org.apache.tomcat.util.threads;
18
19
/**
20
 * A Thread implementation that records the time at which it was created.
21
 * 
22
 * @author slaurent
23
 * 
24
 */
25
public class TaskThread extends Thread {
26
27
	private final long creationTime;
28
29
	public TaskThread(ThreadGroup group, Runnable target, String name) {
30
		super(group, target, name);
31
		this.creationTime = System.currentTimeMillis();
32
	}
33
34
	public TaskThread(ThreadGroup group, Runnable target, String name,
35
			long stackSize) {
36
		super(group, target, name, stackSize);
37
		this.creationTime = System.currentTimeMillis();
38
	}
39
40
	/**
41
	 * @return the time (in ms) at which this thread was created
42
	 */
43
	public final long getCreationTime() {
44
		return creationTime;
45
	}
46
47
}
(-)java/org/apache/tomcat/util/threads/TaskThreadFactory.java (-1 / +1 lines)
Lines 38-44 Link Here
38
    }
38
    }
39
39
40
    public Thread newThread(Runnable r) {
40
    public Thread newThread(Runnable r) {
41
        Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement());
41
        TaskThread t = new TaskThread(group, r, namePrefix + threadNumber.getAndIncrement());
42
        t.setDaemon(daemon);
42
        t.setDaemon(daemon);
43
        t.setPriority(threadPriority);
43
        t.setPriority(threadPriority);
44
        return t;
44
        return t;
(-)java/org/apache/tomcat/util/threads/ThreadPoolExecutor.java (-1 / +42 lines)
Lines 16-27 Link Here
16
 */
16
 */
17
package org.apache.tomcat.util.threads;
17
package org.apache.tomcat.util.threads;
18
18
19
import java.lang.Thread.UncaughtExceptionHandler;
19
import java.util.concurrent.BlockingQueue;
20
import java.util.concurrent.BlockingQueue;
20
import java.util.concurrent.RejectedExecutionException;
21
import java.util.concurrent.RejectedExecutionException;
21
import java.util.concurrent.RejectedExecutionHandler;
22
import java.util.concurrent.RejectedExecutionHandler;
22
import java.util.concurrent.ThreadFactory;
23
import java.util.concurrent.ThreadFactory;
23
import java.util.concurrent.TimeUnit;
24
import java.util.concurrent.TimeUnit;
24
import java.util.concurrent.atomic.AtomicInteger;
25
import java.util.concurrent.atomic.AtomicInteger;
26
import java.util.concurrent.atomic.AtomicLong;
27
28
import org.apache.catalina.core.StandardContext;
29
import org.apache.juli.logging.Log;
30
import org.apache.juli.logging.LogFactory;
25
/**
31
/**
26
 * Same as a java.util.concurrent.ThreadPoolExecutor but implements a much more efficient
32
 * Same as a java.util.concurrent.ThreadPoolExecutor but implements a much more efficient
27
 * getActiveCount method, to be used to properly handle the work queue
33
 * getActiveCount method, to be used to properly handle the work queue
Lines 31-39 Link Here
31
 *
37
 *
32
 */
38
 */
33
public class ThreadPoolExecutor extends java.util.concurrent.ThreadPoolExecutor {
39
public class ThreadPoolExecutor extends java.util.concurrent.ThreadPoolExecutor {
34
    
40
	private static final Log log = LogFactory.getLog(ThreadPoolExecutor.class);
41
	
35
    private final AtomicInteger activeCount = new AtomicInteger(0);
42
    private final AtomicInteger activeCount = new AtomicInteger(0);
36
    
43
    
44
    /**
45
     * Most recent time in ms when a thread decided to kill itself to avoid potential memory leaks.
46
     * Useful to throttle the rate of renewals of threads. 
47
     */
48
    private final AtomicLong lastTimeThreadKilledItself = new AtomicLong(0L);
49
    
50
    //TODO make it configurable
51
    private final long threadRenewalRateMs = 1000L;
52
    
37
    public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
53
    public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
38
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
54
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
39
    }
55
    }
Lines 54-59 Link Here
54
    @Override
70
    @Override
55
    protected void afterExecute(Runnable r, Throwable t) {
71
    protected void afterExecute(Runnable r, Throwable t) {
56
        activeCount.decrementAndGet();
72
        activeCount.decrementAndGet();
73
74
        if (t == null) {
75
            if (Thread.currentThread() instanceof TaskThread) {
76
                TaskThread currentTaskThread = (TaskThread) Thread.currentThread();
77
                if (currentTaskThread.getCreationTime() < StandardContext.lastContextStoppedTime.longValue()) {
78
                    long lastTime = lastTimeThreadKilledItself.longValue();
79
                    if (lastTime + threadRenewalRateMs < System.currentTimeMillis()) {
80
                        if (lastTimeThreadKilledItself.compareAndSet(lastTime, System.currentTimeMillis())) {
81
                            //OK, it's really time to dispose of this thread 
82
                            
83
                            final String msg = "Stopping thread " + currentTaskThread.getName()
84
                                    + " to avoid potential memory leaks after a context was stopped.";
85
                            currentTaskThread.setUncaughtExceptionHandler(new UncaughtExceptionHandler() {
86
                                @Override
87
                                public void uncaughtException(Thread t, Throwable e) {
88
                                    // yes, swallow the exception
89
                                    log.info(msg);
90
                                }
91
                            });
92
                            throw new RuntimeException(msg);
93
                        }
94
                    }
95
                }
96
            }
97
        }
57
    }
98
    }
58
99
59
    @Override
100
    @Override
(-)webapps/docs/config/executor.xml (+3 lines)
Lines 98-103 Link Here
98
    <attribute name="minSpareThreads" required="false">
98
    <attribute name="minSpareThreads" required="false">
99
      <p>(int) The minimum number of threads always kept alive, default is <code>25</code></p>
99
      <p>(int) The minimum number of threads always kept alive, default is <code>25</code></p>
100
    </attribute>
100
    </attribute>
101
    <attribute name="maxQueueSize" required="false">
102
      <p>(int) Maximum number of tasks for the pending task queue, default is <code>Integer.MAX_VALUE</code></p>
103
    </attribute>
101
    <attribute name="maxIdleTime" required="false">
104
    <attribute name="maxIdleTime" required="false">
102
      <p>(int) The number of milliseconds before an idle thread shutsdown, unless the number of active threads are less
105
      <p>(int) The number of milliseconds before an idle thread shutsdown, unless the number of active threads are less
103
         or equal to minSpareThreads. Default value is <code>60000</code>(1 minute)</p>
106
         or equal to minSpareThreads. Default value is <code>60000</code>(1 minute)</p>

Return to bug 49159