Index: java/org/apache/tomcat/util/net/BacklogMeasuringServerSocketFactory.java =================================================================== --- java/org/apache/tomcat/util/net/BacklogMeasuringServerSocketFactory.java (revision 0) +++ java/org/apache/tomcat/util/net/BacklogMeasuringServerSocketFactory.java (revision 0) @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.tomcat.util.net; + +import java.io.*; +import java.net.*; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicLong; +import javax.management.MBeanRegistration; +import javax.management.MBeanServer; +import javax.management.ObjectName; +import org.apache.juli.logging.Log; +import org.apache.juli.logging.LogFactory; +import org.apache.tomcat.util.modeler.Registry; + + +/** + * Backlog measuring server socket factory. + * + * @author skibaa@gmail.com + * @author Andrew Skiba + */ + +public class BacklogMeasuringServerSocketFactory extends ServerSocketFactory + implements MBeanRegistration +{ + protected static Log log = LogFactory.getLog(BacklogMeasuringServerSocketFactory.class); + private static AtomicLong queuesCommonSize = new AtomicLong(0); + + static class ServerSocketProxy extends ServerSocket { + public static final int DEFAULT_QUEUE_SIZE = 50; + + BlockingQueue acceptQueue; + Thread acceptThread; + + class AcceptRunnable implements Runnable { + public void run() { + try { + while (!isClosed()) { + acceptQueue.put(superAccept()); + queuesCommonSize.incrementAndGet(); + } + } catch (InterruptedException ex) { + log.warn("unexpected exception", ex); + } catch (IOException ex) { + log.info("stopping accepting connections", ex); + } + } + } + + private Socket superAccept () throws IOException { + return super.accept(); + } + void init (int queueSize) { + acceptQueue = new LinkedBlockingQueue (queueSize); + acceptThread = new Thread(new AcceptRunnable(), "Accept-"+toString()); + acceptThread.start(); + } + + public ServerSocketProxy(int port) throws IOException { + super(port); + init(DEFAULT_QUEUE_SIZE); + } + + public ServerSocketProxy(int port, int backlog) throws IOException { + super(port, backlog); + init(backlog); + } + + public ServerSocketProxy(int port, int backlog, InetAddress bindAddr) throws IOException { + super(port, backlog, bindAddr); + init(backlog); + } + + @Override + public Socket accept() throws IOException { + try { + Socket res = acceptQueue.take(); + queuesCommonSize.decrementAndGet(); + return res; + } catch (InterruptedException ex) { + throw new SocketException ("unexpected InterruptedException"); + } + } + } + + public BacklogMeasuringServerSocketFactory () { + try { + Registry.getRegistry(null, null).registerComponent(this, + new ObjectName("measure:type=Backlog,obj="+hashCode()), null); + } catch (Exception ex) { + log.error("MBean was not registered", ex); + } + } + + public ServerSocket createSocket (int port) + throws IOException { + return new ServerSocketProxy(port); + } + + public ServerSocket createSocket (int port, int backlog) + throws IOException { + return new ServerSocketProxy (port, backlog); + } + + public ServerSocket createSocket (int port, int backlog, + InetAddress ifAddress) + throws IOException { + return new ServerSocketProxy (port, backlog, ifAddress); + } + + public Socket acceptSocket(ServerSocket socket) + throws IOException { + return socket.accept(); + } + + public void handshake(Socket sock) + throws IOException { + // NOOP + } + + public long size () { + return queuesCommonSize.longValue(); + } + + + protected String domain; + protected ObjectName oname; + protected MBeanServer mserver; + + public ObjectName getObjectName() { + return oname; + } + + public String getDomain() { + return domain; + } + + + public ObjectName preRegister(MBeanServer server, ObjectName name) throws Exception { + this.oname = name; + this.mserver = server; + domain = name.getDomain(); + return name; + } + + public void postRegister(Boolean registrationDone) { + } + + public void preDeregister() throws Exception { + } + + public void postDeregister() { + } + }