View | Details | Raw Unified | Return to bug 49159
Collapse All | Expand All

(-)conf/server.xml (+1 lines)
Lines 28-33 Link Here
28
  <!-- Prevent memory leaks due to use of particular java/javax APIs-->
28
  <!-- Prevent memory leaks due to use of particular java/javax APIs-->
29
  <Listener className="org.apache.catalina.core.JreMemoryLeakPreventionListener" />
29
  <Listener className="org.apache.catalina.core.JreMemoryLeakPreventionListener" />
30
  <Listener className="org.apache.catalina.mbeans.GlobalResourcesLifecycleListener" />
30
  <Listener className="org.apache.catalina.mbeans.GlobalResourcesLifecycleListener" />
31
  <Listener className="org.apache.catalina.core.ThreadLocalLeakPreventionListener" />
31
32
32
  <!-- Global JNDI resources
33
  <!-- Global JNDI resources
33
       Documentation at /docs/jndi-resources-howto.html
34
       Documentation at /docs/jndi-resources-howto.html
(-)java/org/apache/catalina/core/ThreadLocalLeakPreventionListener.java (+183 lines)
Line 0 Link Here
1
package org.apache.catalina.core;
2
3
import java.util.concurrent.Executor;
4
5
import org.apache.catalina.Container;
6
import org.apache.catalina.ContainerEvent;
7
import org.apache.catalina.ContainerListener;
8
import org.apache.catalina.Context;
9
import org.apache.catalina.Engine;
10
import org.apache.catalina.Host;
11
import org.apache.catalina.Lifecycle;
12
import org.apache.catalina.LifecycleEvent;
13
import org.apache.catalina.LifecycleListener;
14
import org.apache.catalina.Server;
15
import org.apache.catalina.Service;
16
import org.apache.catalina.connector.Connector;
17
import org.apache.coyote.ProtocolHandler;
18
import org.apache.coyote.ajp.AjpAprProtocol;
19
import org.apache.coyote.ajp.AjpProtocol;
20
import org.apache.coyote.http11.AbstractHttp11Protocol;
21
import org.apache.coyote.http11.Http11AprProtocol;
22
import org.apache.juli.logging.Log;
23
import org.apache.juli.logging.LogFactory;
24
import org.apache.tomcat.util.threads.ThreadPoolExecutor;
25
26
/**
27
 * A {@link LifecycleListener} that shuts down idle threads in Executor pools
28
 * when a {@link Context} is being stopped to avoid thread-local related memory
29
 * leaks.<br/>
30
 * Note : active threads will be renewed one by one when they come back to the
31
 * pool after executing their task, see
32
 * {@link org.apache.tomcat.util.threads.ThreadPoolExecutor}.afterExecute().<br/>
33
 * 
34
 * This listener must be declared in server.xml to be active.
35
 * 
36
 * @author slaurent
37
 * 
38
 */
39
public class ThreadLocalLeakPreventionListener implements LifecycleListener, ContainerListener {
40
    private static final Log log = LogFactory.getLog(ThreadLocalLeakPreventionListener.class);
41
42
    /**
43
     * Listens for {@link LifecycleEvent} for the start of the {@link Server} to
44
     * initialize itself and then for after_stop events of each {@link Context}.
45
     */
46
    @Override
47
    public void lifecycleEvent(LifecycleEvent event) {
48
        try {
49
            Lifecycle lifecycle = event.getLifecycle();
50
            if (Lifecycle.AFTER_START_EVENT.equals(event.getType()) && lifecycle instanceof Server) {
51
                // when the server starts, we register ourself as listener for
52
                // all context
53
                // as well as container event listener so that we know when new
54
                // Context are deployed
55
                Server server = (Server) lifecycle;
56
                registerListenersForServer(server);
57
            }
58
59
            if (Lifecycle.AFTER_STOP_EVENT.equals(event.getType()) && lifecycle instanceof Context) {
60
                stopIdleThreads((Context) lifecycle);
61
            }
62
        } catch (Exception e) {
63
            log.error("Exception processing event " + event, e);
64
        }
65
    }
66
67
    @Override
68
    public void containerEvent(ContainerEvent event) {
69
        try {
70
            String type = event.getType();
71
            if (Container.ADD_CHILD_EVENT.equals(type)) {
72
                processContainerAddChild(event.getContainer(), (Container) event.getData());
73
            } else if (Container.REMOVE_CHILD_EVENT.equals(type)) {
74
                processContainerRemoveChild(event.getContainer(), (Container) event.getData());
75
            }
76
        } catch (Exception e) {
77
            log.error("Exception processing event " + event, e);
78
        }
79
80
    }
81
82
    private void registerListenersForServer(Server server) {
83
        for (Service service : server.findServices()) {
84
            Engine engine = (Engine) service.getContainer();
85
            engine.addContainerListener(this);
86
            registerListenersForEngine(engine);
87
        }
88
89
    }
90
91
    private void registerListenersForEngine(Engine engine) {
92
        for (Container hostContainer : engine.findChildren()) {
93
            Host host = (Host) hostContainer;
94
            host.addContainerListener(this);
95
            registerListenersForHost(host);
96
        }
97
    }
98
99
    private void registerListenersForHost(Host host) {
100
        for (Container contextContainer : host.findChildren()) {
101
            Context context = (Context) contextContainer;
102
            registerContextListener(context);
103
        }
104
    }
105
106
    private void registerContextListener(Context context) {
107
        context.addLifecycleListener(this);
108
    }
109
110
    protected void processContainerAddChild(Container parent, Container child) {
111
        if (log.isDebugEnabled())
112
            log.debug("Process addChild[parent=" + parent + ",child=" + child + "]");
113
114
        try {
115
            if (child instanceof Context) {
116
                registerContextListener((Context) child);
117
            } else if (child instanceof Engine) {
118
                registerListenersForEngine((Engine) child);
119
            } else if (child instanceof Host) {
120
                registerListenersForHost((Host) child);
121
            }
122
        } catch (Throwable t) {
123
            log.error("processContainerAddChild: Throwable", t);
124
        }
125
126
    }
127
128
    protected void processContainerRemoveChild(Container parent, Container child) {
129
130
        if (log.isDebugEnabled())
131
            log.debug("Process removeChild[parent=" + parent + ",child=" + child + "]");
132
133
        try {
134
            if (child instanceof Context) {
135
                Context context = (Context) child;
136
                context.removeLifecycleListener(this);
137
            } else if (child instanceof Host) {
138
                Host host = (Host) child;
139
                host.removeContainerListener(this);
140
            } else if (child instanceof Engine) {
141
                Engine engine = (Engine) child;
142
                engine.removeContainerListener(this);
143
            }
144
        } catch (Throwable t) {
145
            log.error("processContainerRemoveChild: Throwable", t);
146
        }
147
148
    }
149
150
    /**
151
     * Updates each ThreadPoolExecutor with the current time, which is the time
152
     * when a context is being stopped.
153
     * 
154
     * @param context
155
     *            the context being stopped, used to discover all the Connectors
156
     *            of its parent Service.
157
     */
158
    private void stopIdleThreads(Context context) {
159
        Engine engine = (Engine) context.getParent().getParent();
160
        Service service = engine.getService();
161
        Connector[] connectors = service.findConnectors();
162
        if (connectors != null) {
163
            for (Connector connector : connectors) {
164
                ProtocolHandler handler = connector.getProtocolHandler();
165
                Executor executor = null;
166
                if (handler instanceof AbstractHttp11Protocol) {
167
                    executor = ((AbstractHttp11Protocol) handler).getExecutor();
168
                } else if (handler instanceof AjpProtocol) {
169
                    executor = ((AjpProtocol) handler).getExecutor();
170
                } else if (handler instanceof AjpAprProtocol) {
171
                    executor = ((AjpAprProtocol) handler).getExecutor();
172
                } else if (handler instanceof Http11AprProtocol) {
173
                    executor = ((Http11AprProtocol) handler).getExecutor();
174
                }
175
176
                if (executor instanceof ThreadPoolExecutor) {
177
                    ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executor;
178
                    threadPoolExecutor.contextStopping();
179
                }
180
            }
181
        }
182
    }
183
}
(-)java/org/apache/tomcat/util/threads/TaskQueue.java (+44 lines)
Lines 34-39 Link Here
34
    private static final long serialVersionUID = 1L;
34
    private static final long serialVersionUID = 1L;
35
35
36
    private ThreadPoolExecutor parent = null;
36
    private ThreadPoolExecutor parent = null;
37
    
38
    // no need to be volatile, the one times when we change and read it occur in
39
    // a single thread (the one that did stop a context and fired listeners)
40
    private Integer forcedRemainingCapacity = null;
37
41
38
    public TaskQueue() {
42
    public TaskQueue() {
39
        super();
43
        super();
Lines 74-77 Link Here
74
        //if we reached here, we need to add it to the queue
78
        //if we reached here, we need to add it to the queue
75
        return super.offer(o);
79
        return super.offer(o);
76
    }
80
    }
81
82
83
    @Override
84
    public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException {
85
        Runnable runnable = super.poll(timeout, unit);
86
        if (runnable == null && parent != null) {
87
            // the poll timed out, it gives an opportunity to stop the current
88
            // thread if needed to avoid memory leaks.
89
            parent.stopCurrentThreadIfNeeded();
90
        }
91
        return runnable;
92
    }
93
    
94
95
    @Override
96
    public Runnable take() throws InterruptedException {
97
        if (parent != null && parent.currentThreadShouldBeStopped()) {
98
            return poll(parent.getKeepAliveTime(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS);
99
            //yes, this may return null (in case of timeout) which normally does not occur with take()
100
            //but the ThreadPoolExecutor implementation allows this
101
        }
102
        return super.take();
103
    }
104
105
    @Override
106
    public int remainingCapacity() {
107
        if(forcedRemainingCapacity != null) {
108
            // ThreadPoolExecutor.setCorePoolSize checks that
109
            // remainingCapacity==0 to allow to interrupt idle threads
110
            // I don't see why, but this hack allows to conform to this
111
            // "requirement"
112
            return forcedRemainingCapacity.intValue();
113
        }
114
        return super.remainingCapacity();
115
    }
116
117
    public void setForcedRemainingCapacity(Integer forcedRemainingCapacity) {
118
        this.forcedRemainingCapacity = forcedRemainingCapacity;
119
    }
120
77
}
121
}
(-)java/org/apache/tomcat/util/threads/TaskThread.java (+47 lines)
Line 0 Link Here
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one or more
3
 * contributor license agreements.  See the NOTICE file distributed with
4
 * this work for additional information regarding copyright ownership.
5
 * The ASF licenses this file to You under the Apache License, Version 2.0
6
 * (the "License"); you may not use this file except in compliance with
7
 * the License.  You may obtain a copy of the License at
8
 * 
9
 *      http://www.apache.org/licenses/LICENSE-2.0
10
 * 
11
 * Unless required by applicable law or agreed to in writing, software
12
 * distributed under the License is distributed on an "AS IS" BASIS,
13
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
 * See the License for the specific language governing permissions and
15
 * limitations under the License.
16
 */
17
package org.apache.tomcat.util.threads;
18
19
/**
20
 * A Thread implementation that records the time at which it was created.
21
 * 
22
 * @author slaurent
23
 * 
24
 */
25
public class TaskThread extends Thread {
26
27
	private final long creationTime;
28
29
	public TaskThread(ThreadGroup group, Runnable target, String name) {
30
		super(group, target, name);
31
		this.creationTime = System.currentTimeMillis();
32
	}
33
34
	public TaskThread(ThreadGroup group, Runnable target, String name,
35
			long stackSize) {
36
		super(group, target, name, stackSize);
37
		this.creationTime = System.currentTimeMillis();
38
	}
39
40
	/**
41
	 * @return the time (in ms) at which this thread was created
42
	 */
43
	public final long getCreationTime() {
44
		return creationTime;
45
	}
46
47
}
(-)java/org/apache/tomcat/util/threads/TaskThreadFactory.java (-1 / +1 lines)
Lines 39-45 Link Here
39
39
40
    @Override
40
    @Override
41
    public Thread newThread(Runnable r) {
41
    public Thread newThread(Runnable r) {
42
        Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement());
42
        TaskThread t = new TaskThread(group, r, namePrefix + threadNumber.getAndIncrement());
43
        t.setDaemon(daemon);
43
        t.setDaemon(daemon);
44
        t.setPriority(threadPriority);
44
        t.setPriority(threadPriority);
45
        return t;
45
        return t;
(-)java/org/apache/tomcat/util/threads/ThreadPoolExecutor.java (-2 / +95 lines)
Lines 16-27 Link Here
16
 */
16
 */
17
package org.apache.tomcat.util.threads;
17
package org.apache.tomcat.util.threads;
18
18
19
import java.lang.Thread.UncaughtExceptionHandler;
19
import java.util.concurrent.BlockingQueue;
20
import java.util.concurrent.BlockingQueue;
20
import java.util.concurrent.RejectedExecutionException;
21
import java.util.concurrent.RejectedExecutionException;
21
import java.util.concurrent.RejectedExecutionHandler;
22
import java.util.concurrent.RejectedExecutionHandler;
22
import java.util.concurrent.ThreadFactory;
23
import java.util.concurrent.ThreadFactory;
23
import java.util.concurrent.TimeUnit;
24
import java.util.concurrent.TimeUnit;
24
import java.util.concurrent.atomic.AtomicInteger;
25
import java.util.concurrent.atomic.AtomicInteger;
26
import java.util.concurrent.atomic.AtomicLong;
27
28
import org.apache.juli.logging.Log;
29
import org.apache.juli.logging.LogFactory;
30
25
/**
31
/**
26
 * Same as a java.util.concurrent.ThreadPoolExecutor but implements a much more efficient
32
 * Same as a java.util.concurrent.ThreadPoolExecutor but implements a much more efficient
27
 * {@link #getSubmittedCount()} method, to be used to properly handle the work queue.
33
 * {@link #getSubmittedCount()} method, to be used to properly handle the work queue.
Lines 31-37 Link Here
31
 *
37
 *
32
 */
38
 */
33
public class ThreadPoolExecutor extends java.util.concurrent.ThreadPoolExecutor {
39
public class ThreadPoolExecutor extends java.util.concurrent.ThreadPoolExecutor {
34
    
40
    private static final Log log = LogFactory.getLog(ThreadPoolExecutor.class);
41
35
    /**
42
    /**
36
     * The number of tasks submitted but not yet finished. This includes tasks
43
     * The number of tasks submitted but not yet finished. This includes tasks
37
     * in the queue and tasks that have been handed to a worker thread but the
44
     * in the queue and tasks that have been handed to a worker thread but the
Lines 39-45 Link Here
39
     * This number is always greater or equal to {@link #getActiveCount()}.
46
     * This number is always greater or equal to {@link #getActiveCount()}.
40
     */
47
     */
41
    private final AtomicInteger submittedCount = new AtomicInteger(0);
48
    private final AtomicInteger submittedCount = new AtomicInteger(0);
42
    
49
    private final AtomicLong lastContextStoppedTime = new AtomicLong(0L);
50
51
    /**
52
     * Most recent time in ms when a thread decided to kill itself to avoid
53
     * potential memory leaks. Useful to throttle the rate of renewals of
54
     * threads.
55
     */
56
    private final AtomicLong lastTimeThreadKilledItself = new AtomicLong(0L);
57
58
    // TODO make it configurable
59
    private final long threadRenewalRateMs = 1000L;
60
43
    public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
61
    public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
44
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
62
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
45
    }
63
    }
Lines 60-67 Link Here
60
    @Override
78
    @Override
61
    protected void afterExecute(Runnable r, Throwable t) {
79
    protected void afterExecute(Runnable r, Throwable t) {
62
        submittedCount.decrementAndGet();
80
        submittedCount.decrementAndGet();
81
82
        if (t == null) {
83
            stopCurrentThreadIfNeeded();
84
        }
63
    }
85
    }
64
86
87
    /**
88
     * If the current thread was started before the last time when a context was
89
     * stopped, an exception is thrown so that the current thread is stopped.
90
     */
91
    protected void stopCurrentThreadIfNeeded() {
92
        if (currentThreadShouldBeStopped()) {
93
            long lastTime = lastTimeThreadKilledItself.longValue();
94
            if (lastTime + threadRenewalRateMs < System.currentTimeMillis()) {
95
                if (lastTimeThreadKilledItself.compareAndSet(lastTime, System.currentTimeMillis() + 1)) {
96
                    // OK, it's really time to dispose of this thread
97
                    
98
                    final String msg = "Stopping thread " + Thread.currentThread().getName()
99
                            + " to avoid potential memory leaks after a context was stopped.";
100
                    Thread.currentThread().setUncaughtExceptionHandler(new UncaughtExceptionHandler() {
101
                        @Override
102
                        public void uncaughtException(Thread t, Throwable e) {
103
                            // yes, swallow the exception
104
                            log.debug(msg);
105
                        }
106
                    });
107
                    throw new RuntimeException(msg);
108
                }
109
            }
110
        }
111
    }
112
    
113
    protected boolean currentThreadShouldBeStopped() {
114
        if (Thread.currentThread() instanceof TaskThread) {
115
            TaskThread currentTaskThread = (TaskThread) Thread.currentThread();
116
            if (currentTaskThread.getCreationTime() < this.lastContextStoppedTime.longValue()) {
117
                return true;
118
            }
119
        }
120
        return false;
121
    }
122
65
    public int getSubmittedCount() {
123
    public int getSubmittedCount() {
66
        return submittedCount.get();
124
        return submittedCount.get();
67
    }
125
    }
Lines 111-116 Link Here
111
            
169
            
112
        }
170
        }
113
    }
171
    }
172
173
    public void contextStopping() {
174
        this.lastContextStoppedTime.set(System.currentTimeMillis());
175
176
        // save the current pool parameters to restore them later
177
        int savedCorePoolSize = this.getCorePoolSize();
178
        TaskQueue taskQueue = getQueue() instanceof TaskQueue ? (TaskQueue) getQueue() : null;
179
        if (taskQueue != null) {
180
            // note by slaurent : quite oddly threadPoolExecutor.setCorePoolSize
181
            // checks that queue.remainingCapacity()==0. I did not understand
182
            // why, but to get the intended effect of waking up idle threads, I
183
            // temporarily fake this condition.
184
            taskQueue.setForcedRemainingCapacity(0);
185
        }
186
187
        // setCorePoolSize(0) wakes idle threads
188
        this.setCorePoolSize(0);
189
190
        // wait a little so that idle threads wake and poll the queue again,
191
        // this time always with a timeout (queue.poll() instead of queue.take())
192
        // even if we did not wait enough, TaskQueue.take() takes care of timing out
193
        // so that we are sure that all threads of the pool are renewed in a limited
194
        // time, something like (threadKeepAlive + longest request time)
195
        try {
196
            Thread.sleep(200L);
197
        } catch (InterruptedException e) {
198
            //yes, ignore
199
        }
200
        
201
        if (taskQueue != null) {
202
            // ok, restore the state of the queue and pool
203
            taskQueue.setForcedRemainingCapacity(null);
204
        }
205
        this.setCorePoolSize(savedCorePoolSize);
206
    }
114
    
207
    
115
    private static class RejectHandler implements RejectedExecutionHandler {
208
    private static class RejectHandler implements RejectedExecutionHandler {
116
        @Override
209
        @Override

Return to bug 49159