### Eclipse Workspace Patch 1.0 #P Tomcat Trunk Index: java/org/apache/catalina/tribes/group/interceptors/GzipInterceptor.java =================================================================== --- java/org/apache/catalina/tribes/group/interceptors/GzipInterceptor.java (revision 1147078) +++ java/org/apache/catalina/tribes/group/interceptors/GzipInterceptor.java (working copy) @@ -5,9 +5,9 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -20,6 +20,9 @@ import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.text.DecimalFormat; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.zip.GZIPInputStream; import java.util.zip.GZIPOutputStream; @@ -41,14 +44,84 @@ private static final Log log = LogFactory.getLog(GzipInterceptor.class); public static final int DEFAULT_BUFFER_SIZE = 2048; - + + private static final byte COMPRESSED = 1; + + private static final byte UNCOMPRESSED = 0; + + private final DecimalFormat df = new DecimalFormat("#0.00"); + + private int compressionMinSize = 2048; + + private int interval = 10000; + + private final AtomicInteger count = new AtomicInteger(); + + private final AtomicInteger countCompressedTX = new AtomicInteger(); + + private final AtomicInteger countUncompressedTX = new AtomicInteger(); + + private final AtomicInteger countDecompressedRX = new AtomicInteger(); + + private final AtomicInteger countUncompressedRX = new AtomicInteger(); + + private final AtomicLong sizeTX = new AtomicLong(); + + private final AtomicLong compressedSizeTX = new AtomicLong(); + + private final AtomicLong uncompressedSizeTX = new AtomicLong(); + + private final AtomicLong sizeRX = new AtomicLong(); + + private final AtomicLong decompressedSizeRX = new AtomicLong(); + + private final AtomicLong uncompressedSizeRX = new AtomicLong(); + @Override public void sendMessage(Member[] destination, ChannelMessage msg, InterceptorPayload payload) throws ChannelException { try { - byte[] data = compress(msg.getMessage().getBytes()); + boolean stats = (interval > 0); + + byte[] data = msg.getMessage().getBytes(); + + if (stats) { + sizeTX.addAndGet(data.length); + } + + boolean compressed = false; + if (data.length > compressionMinSize) { + data = compress(data); + compressed = true; + if (stats) { + countCompressedTX.incrementAndGet(); + compressedSizeTX.addAndGet(data.length); + } + if (log.isDebugEnabled()) + log.debug("Send message compressed. Size: " + msg.getMessage().getLength() + ", compressed size: " + data.length); + } else { + if (stats) { + countUncompressedTX.incrementAndGet(); + uncompressedSizeTX.addAndGet(data.length); + } + if (log.isDebugEnabled()) + log.debug("Send message uncompressed. Size: " + msg.getMessage().getLength()); + } + msg.getMessage().trim(msg.getMessage().getLength()); + + // Append compressed flag + if (compressed) { + msg.getMessage().append(COMPRESSED); + } else { + msg.getMessage().append(UNCOMPRESSED); + } + + // Append data msg.getMessage().append(data,0,data.length); getNext().sendMessage(destination, msg, payload); + + if (stats && count.incrementAndGet() % interval == 0) + report(); } catch ( IOException x ) { log.error("Unable to compress byte contents"); throw new ChannelException(x); @@ -58,15 +131,94 @@ @Override public void messageReceived(ChannelMessage msg) { try { - byte[] data = decompress(msg.getMessage().getBytes()); + boolean stats = (interval > 0); + + byte[] temp = msg.getMessage().getBytes(); + + // Read compressed flag + boolean compressed = (temp[0] == COMPRESSED); + + byte[] data = new byte[temp.length - 1]; + System.arraycopy(temp, 1, data, 0, data.length); + + if (stats) { + sizeRX.addAndGet(data.length); + } + + if (compressed) { + data = decompress(data); + if (stats) { + decompressedSizeRX.addAndGet(data.length); + countDecompressedRX.incrementAndGet(); + } + if (log.isDebugEnabled()) + log.debug("Message received compressed. Compressed size: " + msg.getMessage().getLength() + ", size: " + data.length); + } else { + if (stats) { + uncompressedSizeRX.addAndGet(data.length); + countUncompressedRX.incrementAndGet(); + } + if (log.isDebugEnabled()) + log.debug("Message received uncompressed. Size: " + msg.getMessage().getLength()); + } + msg.getMessage().trim(msg.getMessage().getLength()); msg.getMessage().append(data,0,data.length); getPrevious().messageReceived(msg); + + if (stats && count.incrementAndGet() % interval == 0) + report(); } catch ( IOException x ) { log.error("Unable to decompress byte contents",x); } } - + + /** + * Gets the compression min size + * + * @return + */ + public int getCompressionMinSize() { + return compressionMinSize; + } + + /** + * Sets the compression min size. Default is 2048. + * + * @param compressionMinSize + */ + public void setCompressionMinSize(int compressionMinSize) { + if (log.isInfoEnabled()) + log.info("Setting compressionMinSize to " + compressionMinSize); + this.compressionMinSize = compressionMinSize; + } + + /** + * Gets the interval when logging stats + * + * @return + */ + public int getInterval() { + return interval; + } + + /** + * Sets the interval when logging stats. Default is every 10000. message. 0 + * means no logging and no recording of stats. + * + * @param interval + */ + public void setInterval(int interval) { + if (log.isInfoEnabled()) + log.info("Setting interval to " + interval); + this.interval = interval; + } + + /** + * @param data Uncompressed data + * @return Compressed data + * @throws IOException + */ public static byte[] compress(byte[] data) throws IOException { ByteArrayOutputStream bout = new ByteArrayOutputStream(); GZIPOutputStream gout = new GZIPOutputStream(bout); @@ -75,7 +227,7 @@ gout.close(); return bout.toByteArray(); } - + /** * @param data Data to decompress * @return Decompressed data @@ -94,4 +246,31 @@ } return bout.toByteArray(); } + + private void report() { + if (log.isInfoEnabled()) { + StringBuilder buf = new StringBuilder(); + + // Amount of bytes which had been compressed + long bytesCompressed = sizeTX.get() - uncompressedSizeTX.get(); + // Amount of bytes which had been decompressed + long bytesDecompressed = sizeRX.get() - uncompressedSizeRX.get(); + + buf.append(getClass().getSimpleName()).append(" Report\n"); + buf.append("\tTx size: ").append(df.format(sizeTX.get() / (1024d * 1024d))).append(" MB\n"); + buf.append("\tTx compressed Msg: ").append(countCompressedTX.get()).append(" messages\n"); + buf.append("\tTx compressed Msg size: ").append(df.format(compressedSizeTX.get() / (1024d * 1024d))).append(" MB\n"); + buf.append("\tTx uncompressed Msg: ").append(countUncompressedTX.get()).append(" messages\n"); + buf.append("\tTx uncompressed Msg size: ").append(df.format(uncompressedSizeTX.get() / (1024d * 1024d))).append(" MB\n"); + buf.append("\tcompression ratio: ").append(df.format((bytesCompressed - compressedSizeTX.get()) * 100d / bytesCompressed)).append("%\n"); + + buf.append("\tRx size: ").append(df.format(sizeRX.get() / (1024d * 1024d))).append(" MB\n"); + buf.append("\tRx decompressed Msg: ").append(countDecompressedRX.get()).append(" messages\n"); + buf.append("\tRx decompressed Msg size: ").append(df.format(decompressedSizeRX.get() / (1024d * 1024d))).append(" MB\n"); + buf.append("\tRx uncompressed Msg: ").append(countUncompressedRX.get()).append(" messages\n"); + buf.append("\tRx uncompressed Msg size: ").append(df.format(uncompressedSizeRX.get() / (1024d * 1024d))).append(" MB\n"); + buf.append("\tdecompression ratio: ").append(df.format((decompressedSizeRX.get() - bytesDecompressed) * 100d / bytesDecompressed)).append("%"); + log.info(buf); + } + } }