View | Details | Raw Unified | Return to bug 44199
Collapse All | Expand All

(-)java/org/apache/tomcat/util/net/BacklogMeasuringServerSocketFactory.java (+172 lines)
Line 0 Link Here
1
/*
2
 *  Licensed to the Apache Software Foundation (ASF) under one or more
3
 *  contributor license agreements.  See the NOTICE file distributed with
4
 *  this work for additional information regarding copyright ownership.
5
 *  The ASF licenses this file to You under the Apache License, Version 2.0
6
 *  (the "License"); you may not use this file except in compliance with
7
 *  the License.  You may obtain a copy of the License at
8
 *
9
 *      http://www.apache.org/licenses/LICENSE-2.0
10
 *
11
 *  Unless required by applicable law or agreed to in writing, software
12
 *  distributed under the License is distributed on an "AS IS" BASIS,
13
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
 *  See the License for the specific language governing permissions and
15
 *  limitations under the License.
16
 */
17
18
19
package org.apache.tomcat.util.net;
20
21
import java.io.*;
22
import java.net.*;
23
import java.util.concurrent.BlockingQueue;
24
import java.util.concurrent.LinkedBlockingQueue;
25
import java.util.concurrent.atomic.AtomicLong;
26
import javax.management.MBeanRegistration;
27
import javax.management.MBeanServer;
28
import javax.management.ObjectName;
29
import org.apache.juli.logging.Log;
30
import org.apache.juli.logging.LogFactory;
31
import org.apache.tomcat.util.modeler.Registry;
32
33
34
/**
35
 * Backlog measuring server socket factory.
36
 *
37
 * @author skibaa@gmail.com
38
 * @author Andrew Skiba
39
 */
40
41
public class BacklogMeasuringServerSocketFactory extends ServerSocketFactory
42
    implements MBeanRegistration
43
{
44
    protected static Log log = LogFactory.getLog(BacklogMeasuringServerSocketFactory.class);
45
    private static AtomicLong queuesCommonSize = new AtomicLong(0);
46
47
    static class ServerSocketProxy extends ServerSocket {
48
        public static final int DEFAULT_QUEUE_SIZE = 50;
49
50
        BlockingQueue<Socket> acceptQueue;
51
        Thread acceptThread;
52
53
        class AcceptRunnable implements Runnable {
54
            public void run() {
55
                try {
56
                    while (!isClosed()) {
57
                        acceptQueue.put(superAccept());
58
                        queuesCommonSize.incrementAndGet();
59
                    }
60
                } catch (InterruptedException ex) {
61
                    log.warn("unexpected exception", ex);
62
                } catch (IOException ex) {
63
                    log.info("stopping accepting connections", ex);
64
                }
65
            }
66
        }
67
68
        private Socket superAccept () throws IOException {
69
            return super.accept();
70
        }
71
        void init (int queueSize) {
72
            acceptQueue = new LinkedBlockingQueue (queueSize);
73
            acceptThread = new Thread(new AcceptRunnable(), "Accept-"+toString());
74
            acceptThread.start();                    
75
        }
76
        
77
        public ServerSocketProxy(int port) throws IOException {
78
            super(port);
79
            init(DEFAULT_QUEUE_SIZE);
80
        }
81
        
82
        public ServerSocketProxy(int port, int backlog) throws IOException {
83
            super(port, backlog);
84
            init(backlog);
85
        }
86
        
87
        public ServerSocketProxy(int port, int backlog, InetAddress bindAddr) throws IOException {
88
            super(port, backlog, bindAddr);
89
            init(backlog);
90
        }
91
92
        @Override
93
        public Socket accept() throws IOException {
94
            try {
95
                Socket res = acceptQueue.take();
96
                queuesCommonSize.decrementAndGet();
97
                return res;
98
            } catch (InterruptedException ex) {
99
                throw new SocketException ("unexpected InterruptedException");
100
            }
101
        }
102
    }
103
    
104
    public BacklogMeasuringServerSocketFactory () {
105
        try {
106
            Registry.getRegistry(null, null).registerComponent(this,
107
                    new ObjectName("measure:type=Backlog,obj="+hashCode()), null);
108
        } catch (Exception ex) {
109
            log.error("MBean was not registered", ex);
110
        }
111
    }
112
113
    public ServerSocket createSocket (int port)
114
    throws IOException {
115
        return new ServerSocketProxy(port);
116
    }
117
118
    public ServerSocket createSocket (int port, int backlog)
119
    throws IOException {
120
        return new ServerSocketProxy (port, backlog);
121
    }
122
123
    public ServerSocket createSocket (int port, int backlog,
124
        InetAddress ifAddress)
125
    throws IOException {
126
        return new ServerSocketProxy (port, backlog, ifAddress);
127
    }
128
 
129
    public Socket acceptSocket(ServerSocket socket)
130
 	throws IOException {
131
        return socket.accept();
132
    }
133
 
134
    public void handshake(Socket sock)
135
 	throws IOException {
136
 	// NOOP
137
    }
138
    
139
    public long size () {
140
        return queuesCommonSize.longValue();
141
    }
142
143
    
144
    protected String domain;
145
    protected ObjectName oname;
146
    protected MBeanServer mserver;
147
148
    public ObjectName getObjectName() {
149
        return oname;
150
    }
151
152
    public String getDomain() {
153
        return domain;
154
    }
155
156
157
    public ObjectName preRegister(MBeanServer server, ObjectName name) throws Exception {
158
        this.oname = name;
159
        this.mserver = server;
160
        domain = name.getDomain();
161
        return name;
162
    }
163
164
    public void postRegister(Boolean registrationDone) {
165
    }
166
167
    public void preDeregister() throws Exception {
168
    }
169
170
    public void postDeregister() {
171
    }    
172
 }

Return to bug 44199