View | Details | Raw Unified | Return to bug 49159
Collapse All | Expand All

(-)conf/server.xml (+1 lines)
Lines 28-33 Link Here
28
  <!-- Prevent memory leaks due to use of particular java/javax APIs-->
28
  <!-- Prevent memory leaks due to use of particular java/javax APIs-->
29
  <Listener className="org.apache.catalina.core.JreMemoryLeakPreventionListener" />
29
  <Listener className="org.apache.catalina.core.JreMemoryLeakPreventionListener" />
30
  <Listener className="org.apache.catalina.mbeans.GlobalResourcesLifecycleListener" />
30
  <Listener className="org.apache.catalina.mbeans.GlobalResourcesLifecycleListener" />
31
  <Listener className="org.apache.catalina.core.ThreadLocalLeakPreventionListener" />
31
32
32
  <!-- Global JNDI resources
33
  <!-- Global JNDI resources
33
       Documentation at /docs/jndi-resources-howto.html
34
       Documentation at /docs/jndi-resources-howto.html
(-)java/org/apache/catalina/core/StandardContext.java (+6 lines)
Lines 35-40 Link Here
35
import java.util.Set;
35
import java.util.Set;
36
import java.util.Stack;
36
import java.util.Stack;
37
import java.util.TreeMap;
37
import java.util.TreeMap;
38
import java.util.concurrent.atomic.AtomicLong;
38
39
39
import javax.management.ListenerNotFoundException;
40
import javax.management.ListenerNotFoundException;
40
import javax.management.MBeanNotificationInfo;
41
import javax.management.MBeanNotificationInfo;
Lines 125-130 Link Here
125
126
126
    private static final Log log = LogFactory.getLog(StandardContext.class);
127
    private static final Log log = LogFactory.getLog(StandardContext.class);
127
128
129
    //TODO : move this to the Service instance level to avoid renewing Threads 
130
    //of other Services, they did not serve any request for the context that was stopped
131
    public static AtomicLong lastContextStoppedTime = new AtomicLong(0L);
128
132
129
    // ----------------------------------------------------------- Constructors
133
    // ----------------------------------------------------------- Constructors
130
134
Lines 5008-5013 Link Here
5008
        //reset the instance manager
5012
        //reset the instance manager
5009
        instanceManager = null;
5013
        instanceManager = null;
5010
5014
5015
        lastContextStoppedTime.set(System.currentTimeMillis());
5016
        
5011
        if (log.isDebugEnabled())
5017
        if (log.isDebugEnabled())
5012
            log.debug("Stopping complete");
5018
            log.debug("Stopping complete");
5013
5019
(-)java/org/apache/catalina/core/ThreadLocalLeakPreventionListener.java (+223 lines)
Line 0 Link Here
1
package org.apache.catalina.core;
2
3
import java.util.concurrent.BlockingQueue;
4
import java.util.concurrent.Executor;
5
import java.util.concurrent.ThreadPoolExecutor;
6
import java.util.concurrent.TimeUnit;
7
8
import org.apache.catalina.Container;
9
import org.apache.catalina.ContainerEvent;
10
import org.apache.catalina.ContainerListener;
11
import org.apache.catalina.Context;
12
import org.apache.catalina.Engine;
13
import org.apache.catalina.Host;
14
import org.apache.catalina.Lifecycle;
15
import org.apache.catalina.LifecycleEvent;
16
import org.apache.catalina.LifecycleListener;
17
import org.apache.catalina.Server;
18
import org.apache.catalina.Service;
19
import org.apache.catalina.connector.Connector;
20
import org.apache.coyote.ProtocolHandler;
21
import org.apache.coyote.ajp.AjpAprProtocol;
22
import org.apache.coyote.ajp.AjpProtocol;
23
import org.apache.coyote.http11.AbstractHttp11Protocol;
24
import org.apache.coyote.http11.Http11AprProtocol;
25
import org.apache.juli.logging.Log;
26
import org.apache.juli.logging.LogFactory;
27
import org.apache.tomcat.util.threads.TaskQueue;
28
29
/**
30
 * A {@link LifecycleListener} that shuts down idle threads in Executor pools
31
 * when a {@link Context} is being stopped to avoid thread-local related memory
32
 * leaks.<br/>
33
 * Note : active threads will be renewed one by one when they come back to the
34
 * pool after executing their task, see
35
 * {@link org.apache.tomcat.util.threads.ThreadPoolExecutor}.afterExecute().<br/>
36
 * 
37
 * This listener must be declared in server.xml to be active.
38
 * 
39
 * @author slaurent
40
 * 
41
 */
42
public class ThreadLocalLeakPreventionListener implements LifecycleListener, ContainerListener {
43
    private static final Log log = LogFactory.getLog(ThreadLocalLeakPreventionListener.class);
44
45
    /**
46
     * Listens for {@link LifecycleEvent} for the start of the {@link Server} to
47
     * initialize itself and then for after_stop events of each {@link Context}.
48
     */
49
    @Override
50
    public void lifecycleEvent(LifecycleEvent event) {
51
        try {
52
            Lifecycle lifecycle = event.getLifecycle();
53
            if (Lifecycle.AFTER_START_EVENT.equals(event.getType()) && lifecycle instanceof Server) {
54
                // when the server starts, we register ourself as listener for
55
                // all context
56
                // as well as container event listener so that we know when new
57
                // Context are deployed
58
                Server server = (Server) lifecycle;
59
                registerListenersForServer(server);
60
            }
61
62
            if (Lifecycle.AFTER_STOP_EVENT.equals(event.getType()) && lifecycle instanceof Context) {
63
                stopIdleThreads((Context) lifecycle);
64
            }
65
        } catch (Exception e) {
66
            log.error("Exception processing event " + event, e);
67
        }
68
    }
69
70
    @Override
71
    public void containerEvent(ContainerEvent event) {
72
        try {
73
            String type = event.getType();
74
            if (Container.ADD_CHILD_EVENT.equals(type)) {
75
                processContainerAddChild(event.getContainer(), (Container) event.getData());
76
            } else if (Container.REMOVE_CHILD_EVENT.equals(type)) {
77
                processContainerRemoveChild(event.getContainer(), (Container) event.getData());
78
            }
79
        } catch (Exception e) {
80
            log.error("Exception processing event " + event, e);
81
        }
82
83
    }
84
85
    private void registerListenersForServer(Server server) {
86
        for (Service service : server.findServices()) {
87
            Engine engine = (Engine) service.getContainer();
88
            engine.addContainerListener(this);
89
            registerListenersForEngine(engine);
90
        }
91
92
    }
93
94
    private void registerListenersForEngine(Engine engine) {
95
        for (Container hostContainer : engine.findChildren()) {
96
            Host host = (Host) hostContainer;
97
            host.addContainerListener(this);
98
            registerListenersForHost(host);
99
        }
100
    }
101
102
    private void registerListenersForHost(Host host) {
103
        for (Container contextContainer : host.findChildren()) {
104
            Context context = (Context) contextContainer;
105
            registerContextListener(context);
106
        }
107
    }
108
109
    private void registerContextListener(Context context) {
110
        context.addLifecycleListener(this);
111
    }
112
113
    protected void processContainerAddChild(Container parent, Container child) {
114
        if (log.isDebugEnabled())
115
            log.debug("Process addChild[parent=" + parent + ",child=" + child + "]");
116
117
        try {
118
            if (child instanceof Context) {
119
                registerContextListener((Context) child);
120
            } else if (child instanceof Engine) {
121
                registerListenersForEngine((Engine) child);
122
            } else if (child instanceof Host) {
123
                registerListenersForHost((Host) child);
124
            }
125
        } catch (Throwable t) {
126
            log.error("processContainerAddChild: Throwable", t);
127
        }
128
129
    }
130
131
    protected void processContainerRemoveChild(Container parent, Container child) {
132
133
        if (log.isDebugEnabled())
134
            log.debug("Process removeChild[parent=" + parent + ",child=" + child + "]");
135
136
        try {
137
            if (child instanceof Context) {
138
                Context context = (Context) child;
139
                context.removeLifecycleListener(this);
140
            } else if (child instanceof Host) {
141
                Host host = (Host) child;
142
                host.removeContainerListener(this);
143
            } else if (child instanceof Engine) {
144
                Engine engine = (Engine) child;
145
                engine.removeContainerListener(this);
146
            }
147
        } catch (Throwable t) {
148
            log.error("processContainerRemoveChild: Throwable", t);
149
        }
150
151
    }
152
153
    /**
154
     * Temporarily sets the corePoolSize of each {@link ThreadPoolExecutor} of
155
     * each Connector of the given Context so that their idle threads are
156
     * stopped. The corePoolSize is reverted after a small delay, so that the
157
     * net effect is that idle threads are renewed.
158
     * 
159
     * @param context
160
     *            the context being stopped, used to discover all the Connectors
161
     *            of its parent Service.
162
     */
163
    private void stopIdleThreads(Context context) {
164
        Engine engine = (Engine) context.getParent().getParent();
165
        Service service = engine.getService();
166
        Connector[] connectors = service.findConnectors();
167
        if (connectors != null) {
168
            for (Connector connector : connectors) {
169
                ProtocolHandler handler = connector.getProtocolHandler();
170
                Executor executor = null;
171
                if (handler instanceof AbstractHttp11Protocol) {
172
                    executor = ((AbstractHttp11Protocol) handler).getExecutor();
173
                } else if (handler instanceof AjpProtocol) {
174
                    executor = ((AjpProtocol) handler).getExecutor();
175
                } else if (handler instanceof AjpAprProtocol) {
176
                    executor = ((AjpAprProtocol) handler).getExecutor();
177
                } else if (handler instanceof Http11AprProtocol) {
178
                    executor = ((Http11AprProtocol) handler).getExecutor();
179
                }
180
                if (log.isDebugEnabled()) {
181
                    log.debug("stopping idle threads for Executor " + executor);
182
                }
183
                if (executor instanceof ThreadPoolExecutor) {
184
                    ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executor;
185
                    BlockingQueue<Runnable> queue = threadPoolExecutor.getQueue();
186
187
                    // save the current pool parameters to restore them later
188
                    int savedCorePoolSize = threadPoolExecutor.getCorePoolSize();
189
                    long savedKeepAliveTime = threadPoolExecutor.getKeepAliveTime(TimeUnit.MILLISECONDS);
190
191
                    if (queue instanceof TaskQueue) {
192
                        // note by slaurent : quite oddly
193
                        // threadPoolExecutor.setCorePoolSize checks that
194
                        // queue.remainingCapacity()==0. I did not understand
195
                        // why, but to get the intended effect of waking up idle
196
                        // threads, I temporarily fake this condition.
197
                        ((TaskQueue) queue).setForcedRemainingCapacity(0);
198
                    }
199
                    // setCorePoolSize(0) wakes idle threads, but the next thing
200
                    // they do is to ask the queue for another task. Since we
201
                    // want them to exit, we temporarily force the timeout of
202
                    // the queue to 0
203
                    threadPoolExecutor.setKeepAliveTime(0, TimeUnit.MILLISECONDS);
204
                    threadPoolExecutor.setCorePoolSize(0);
205
206
                    // wait a little to allow idle threads to shutdown
207
                    try {
208
                        Thread.sleep(1000L);
209
                    } catch (InterruptedException e) {
210
                        // ignore
211
                    }
212
213
                    //ok, restore the state of the queue and pool
214
                    if (queue instanceof TaskQueue) {
215
                        ((TaskQueue) queue).setForcedRemainingCapacity(null);
216
                    }
217
                    threadPoolExecutor.setKeepAliveTime(savedKeepAliveTime, TimeUnit.MILLISECONDS);
218
                    threadPoolExecutor.setCorePoolSize(savedCorePoolSize);
219
                }
220
            }
221
        }
222
    }
223
}
(-)java/org/apache/tomcat/util/threads/TaskQueue.java (+14 lines)
Lines 31-36 Link Here
31
 */
31
 */
32
public class TaskQueue extends LinkedBlockingQueue<Runnable> {
32
public class TaskQueue extends LinkedBlockingQueue<Runnable> {
33
    private ThreadPoolExecutor parent = null;
33
    private ThreadPoolExecutor parent = null;
34
    private Integer forcedRemainingCapacity = null;
34
35
35
    public TaskQueue() {
36
    public TaskQueue() {
36
        super();
37
        super();
Lines 71-74 Link Here
71
        //if we reached here, we need to add it to the queue
72
        //if we reached here, we need to add it to the queue
72
        return super.offer(o);
73
        return super.offer(o);
73
    }
74
    }
75
76
    @Override
77
    public int remainingCapacity() {
78
        if(forcedRemainingCapacity != null) {
79
            return forcedRemainingCapacity.intValue();
80
        }
81
        return super.remainingCapacity();
82
    }
83
84
    public void setForcedRemainingCapacity(Integer forcedRemainingCapacity) {
85
        this.forcedRemainingCapacity = forcedRemainingCapacity;
86
    }
87
    
74
}
88
}
(-)java/org/apache/tomcat/util/threads/TaskThread.java (+47 lines)
Line 0 Link Here
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one or more
3
 * contributor license agreements.  See the NOTICE file distributed with
4
 * this work for additional information regarding copyright ownership.
5
 * The ASF licenses this file to You under the Apache License, Version 2.0
6
 * (the "License"); you may not use this file except in compliance with
7
 * the License.  You may obtain a copy of the License at
8
 * 
9
 *      http://www.apache.org/licenses/LICENSE-2.0
10
 * 
11
 * Unless required by applicable law or agreed to in writing, software
12
 * distributed under the License is distributed on an "AS IS" BASIS,
13
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
 * See the License for the specific language governing permissions and
15
 * limitations under the License.
16
 */
17
package org.apache.tomcat.util.threads;
18
19
/**
20
 * A Thread implementation that records the time at which it was created.
21
 * 
22
 * @author slaurent
23
 * 
24
 */
25
public class TaskThread extends Thread {
26
27
	private final long creationTime;
28
29
	public TaskThread(ThreadGroup group, Runnable target, String name) {
30
		super(group, target, name);
31
		this.creationTime = System.currentTimeMillis();
32
	}
33
34
	public TaskThread(ThreadGroup group, Runnable target, String name,
35
			long stackSize) {
36
		super(group, target, name, stackSize);
37
		this.creationTime = System.currentTimeMillis();
38
	}
39
40
	/**
41
	 * @return the time (in ms) at which this thread was created
42
	 */
43
	public final long getCreationTime() {
44
		return creationTime;
45
	}
46
47
}
(-)java/org/apache/tomcat/util/threads/TaskThreadFactory.java (-1 / +1 lines)
Lines 38-44 Link Here
38
    }
38
    }
39
39
40
    public Thread newThread(Runnable r) {
40
    public Thread newThread(Runnable r) {
41
        Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement());
41
        TaskThread t = new TaskThread(group, r, namePrefix + threadNumber.getAndIncrement());
42
        t.setDaemon(daemon);
42
        t.setDaemon(daemon);
43
        t.setPriority(threadPriority);
43
        t.setPriority(threadPriority);
44
        return t;
44
        return t;
(-)java/org/apache/tomcat/util/threads/ThreadPoolExecutor.java (-1 / +42 lines)
Lines 16-27 Link Here
16
 */
16
 */
17
package org.apache.tomcat.util.threads;
17
package org.apache.tomcat.util.threads;
18
18
19
import java.lang.Thread.UncaughtExceptionHandler;
19
import java.util.concurrent.BlockingQueue;
20
import java.util.concurrent.BlockingQueue;
20
import java.util.concurrent.RejectedExecutionException;
21
import java.util.concurrent.RejectedExecutionException;
21
import java.util.concurrent.RejectedExecutionHandler;
22
import java.util.concurrent.RejectedExecutionHandler;
22
import java.util.concurrent.ThreadFactory;
23
import java.util.concurrent.ThreadFactory;
23
import java.util.concurrent.TimeUnit;
24
import java.util.concurrent.TimeUnit;
24
import java.util.concurrent.atomic.AtomicInteger;
25
import java.util.concurrent.atomic.AtomicInteger;
26
import java.util.concurrent.atomic.AtomicLong;
27
28
import org.apache.catalina.core.StandardContext;
29
import org.apache.juli.logging.Log;
30
import org.apache.juli.logging.LogFactory;
25
/**
31
/**
26
 * Same as a java.util.concurrent.ThreadPoolExecutor but implements a much more efficient
32
 * Same as a java.util.concurrent.ThreadPoolExecutor but implements a much more efficient
27
 * getActiveCount method, to be used to properly handle the work queue
33
 * getActiveCount method, to be used to properly handle the work queue
Lines 31-39 Link Here
31
 *
37
 *
32
 */
38
 */
33
public class ThreadPoolExecutor extends java.util.concurrent.ThreadPoolExecutor {
39
public class ThreadPoolExecutor extends java.util.concurrent.ThreadPoolExecutor {
34
    
40
	private static final Log log = LogFactory.getLog(ThreadPoolExecutor.class);
41
	
35
    private final AtomicInteger activeCount = new AtomicInteger(0);
42
    private final AtomicInteger activeCount = new AtomicInteger(0);
36
    
43
    
44
    /**
45
     * Most recent time in ms when a thread decided to kill itself to avoid potential memory leaks.
46
     * Useful to throttle the rate of renewals of threads. 
47
     */
48
    private final AtomicLong lastTimeThreadKilledItself = new AtomicLong(0L);
49
    
50
    //TODO make it configurable
51
    private final long threadRenewalRateMs = 1000L;
52
    
37
    public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
53
    public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
38
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
54
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
39
    }
55
    }
Lines 54-59 Link Here
54
    @Override
70
    @Override
55
    protected void afterExecute(Runnable r, Throwable t) {
71
    protected void afterExecute(Runnable r, Throwable t) {
56
        activeCount.decrementAndGet();
72
        activeCount.decrementAndGet();
73
74
        if (t == null) {
75
            if (Thread.currentThread() instanceof TaskThread) {
76
                TaskThread currentTaskThread = (TaskThread) Thread.currentThread();
77
                if (currentTaskThread.getCreationTime() < StandardContext.lastContextStoppedTime.longValue()) {
78
                    long lastTime = lastTimeThreadKilledItself.longValue();
79
                    if (lastTime + threadRenewalRateMs < System.currentTimeMillis()) {
80
                        if (lastTimeThreadKilledItself.compareAndSet(lastTime, System.currentTimeMillis())) {
81
                            //OK, it's really time to dispose of this thread 
82
                            
83
                            final String msg = "Stopping thread " + currentTaskThread.getName()
84
                                    + " to avoid potential memory leaks after a context was stopped.";
85
                            currentTaskThread.setUncaughtExceptionHandler(new UncaughtExceptionHandler() {
86
                                @Override
87
                                public void uncaughtException(Thread t, Throwable e) {
88
                                    // yes, swallow the exception
89
                                    log.info(msg);
90
                                }
91
                            });
92
                            throw new RuntimeException(msg);
93
                        }
94
                    }
95
                }
96
            }
97
        }
57
    }
98
    }
58
99
59
    @Override
100
    @Override

Return to bug 49159