Line 0
Link Here
|
|
|
1 |
/* |
2 |
* Licensed to the Apache Software Foundation (ASF) under one or more |
3 |
* contributor license agreements. See the NOTICE file distributed with |
4 |
* this work for additional information regarding copyright ownership. |
5 |
* The ASF licenses this file to You under the Apache License, Version 2.0 |
6 |
* (the "License"); you may not use this file except in compliance with |
7 |
* the License. You may obtain a copy of the License at |
8 |
* |
9 |
* http://www.apache.org/licenses/LICENSE-2.0 |
10 |
* |
11 |
* Unless required by applicable law or agreed to in writing, software |
12 |
* distributed under the License is distributed on an "AS IS" BASIS, |
13 |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
14 |
* See the License for the specific language governing permissions and |
15 |
* limitations under the License. |
16 |
*/ |
17 |
|
18 |
|
19 |
package org.apache.tomcat.util.net; |
20 |
|
21 |
import java.io.*; |
22 |
import java.net.*; |
23 |
import java.util.concurrent.BlockingQueue; |
24 |
import java.util.concurrent.LinkedBlockingQueue; |
25 |
import java.util.concurrent.atomic.AtomicLong; |
26 |
import javax.management.MBeanRegistration; |
27 |
import javax.management.MBeanServer; |
28 |
import javax.management.ObjectName; |
29 |
import org.apache.juli.logging.Log; |
30 |
import org.apache.juli.logging.LogFactory; |
31 |
import org.apache.tomcat.util.modeler.Registry; |
32 |
|
33 |
|
34 |
/** |
35 |
* Backlog measuring server socket factory. |
36 |
* |
37 |
* @author skibaa@gmail.com |
38 |
* @author Andrew Skiba |
39 |
*/ |
40 |
|
41 |
public class BacklogMeasuringServerSocketFactory extends ServerSocketFactory |
42 |
implements MBeanRegistration |
43 |
{ |
44 |
protected static Log log = LogFactory.getLog(BacklogMeasuringServerSocketFactory.class); |
45 |
private static AtomicLong queuesCommonSize = new AtomicLong(0); |
46 |
|
47 |
static class ServerSocketProxy extends ServerSocket { |
48 |
public static final int DEFAULT_QUEUE_SIZE = 50; |
49 |
|
50 |
BlockingQueue<Socket> acceptQueue; |
51 |
Thread acceptThread; |
52 |
|
53 |
class AcceptRunnable implements Runnable { |
54 |
public void run() { |
55 |
try { |
56 |
while (!isClosed()) { |
57 |
acceptQueue.put(superAccept()); |
58 |
queuesCommonSize.incrementAndGet(); |
59 |
} |
60 |
} catch (InterruptedException ex) { |
61 |
log.warn("unexpected exception", ex); |
62 |
} catch (IOException ex) { |
63 |
log.info("stopping accepting connections", ex); |
64 |
} |
65 |
} |
66 |
} |
67 |
|
68 |
private Socket superAccept () throws IOException { |
69 |
return super.accept(); |
70 |
} |
71 |
void init (int queueSize) { |
72 |
acceptQueue = new LinkedBlockingQueue (queueSize); |
73 |
acceptThread = new Thread(new AcceptRunnable(), "Accept-"+toString()); |
74 |
acceptThread.start(); |
75 |
} |
76 |
|
77 |
public ServerSocketProxy(int port) throws IOException { |
78 |
super(port); |
79 |
init(DEFAULT_QUEUE_SIZE); |
80 |
} |
81 |
|
82 |
public ServerSocketProxy(int port, int backlog) throws IOException { |
83 |
super(port, backlog); |
84 |
init(backlog); |
85 |
} |
86 |
|
87 |
public ServerSocketProxy(int port, int backlog, InetAddress bindAddr) throws IOException { |
88 |
super(port, backlog, bindAddr); |
89 |
init(backlog); |
90 |
} |
91 |
|
92 |
@Override |
93 |
public Socket accept() throws IOException { |
94 |
try { |
95 |
Socket res = acceptQueue.take(); |
96 |
queuesCommonSize.decrementAndGet(); |
97 |
return res; |
98 |
} catch (InterruptedException ex) { |
99 |
throw new SocketException ("unexpected InterruptedException"); |
100 |
} |
101 |
} |
102 |
} |
103 |
|
104 |
public BacklogMeasuringServerSocketFactory () { |
105 |
try { |
106 |
Registry.getRegistry(null, null).registerComponent(this, |
107 |
new ObjectName("measure:type=Backlog,obj="+hashCode()), null); |
108 |
} catch (Exception ex) { |
109 |
log.error("MBean was not registered", ex); |
110 |
} |
111 |
} |
112 |
|
113 |
public ServerSocket createSocket (int port) |
114 |
throws IOException { |
115 |
return new ServerSocketProxy(port); |
116 |
} |
117 |
|
118 |
public ServerSocket createSocket (int port, int backlog) |
119 |
throws IOException { |
120 |
return new ServerSocketProxy (port, backlog); |
121 |
} |
122 |
|
123 |
public ServerSocket createSocket (int port, int backlog, |
124 |
InetAddress ifAddress) |
125 |
throws IOException { |
126 |
return new ServerSocketProxy (port, backlog, ifAddress); |
127 |
} |
128 |
|
129 |
public Socket acceptSocket(ServerSocket socket) |
130 |
throws IOException { |
131 |
return socket.accept(); |
132 |
} |
133 |
|
134 |
public void handshake(Socket sock) |
135 |
throws IOException { |
136 |
// NOOP |
137 |
} |
138 |
|
139 |
public long size () { |
140 |
return queuesCommonSize.longValue(); |
141 |
} |
142 |
|
143 |
|
144 |
protected String domain; |
145 |
protected ObjectName oname; |
146 |
protected MBeanServer mserver; |
147 |
|
148 |
public ObjectName getObjectName() { |
149 |
return oname; |
150 |
} |
151 |
|
152 |
public String getDomain() { |
153 |
return domain; |
154 |
} |
155 |
|
156 |
|
157 |
public ObjectName preRegister(MBeanServer server, ObjectName name) throws Exception { |
158 |
this.oname = name; |
159 |
this.mserver = server; |
160 |
domain = name.getDomain(); |
161 |
return name; |
162 |
} |
163 |
|
164 |
public void postRegister(Boolean registrationDone) { |
165 |
} |
166 |
|
167 |
public void preDeregister() throws Exception { |
168 |
} |
169 |
|
170 |
public void postDeregister() { |
171 |
} |
172 |
} |