Index: java/org/apache/coyote/http11/Http11Processor.java =================================================================== --- java/org/apache/coyote/http11/Http11Processor.java (revision 1300373) +++ java/org/apache/coyote/http11/Http11Processor.java (working copy) @@ -25,6 +25,8 @@ import java.util.regex.Pattern; import java.util.regex.PatternSyntaxException; +import javax.servlet.http.HttpServletResponse; + import org.apache.coyote.ActionCode; import org.apache.coyote.ActionHook; import org.apache.coyote.Adapter; @@ -1135,6 +1137,11 @@ InternalInputBuffer internalBuffer = (InternalInputBuffer) request.getInputBuffer(); internalBuffer.addActiveFilter(savedBody); + } else if (actionCode == ActionCode.ACTION_PROTOCOL_SWITCH) { + inputBuffer.lastActiveFilter = -1; + outputBuffer.lastActiveFilter = -1; + //this is to trick, since it's BIO, let them discover data as they read + request.setAvailable(1); } } @@ -1569,6 +1576,10 @@ (outputFilters[Constants.IDENTITY_FILTER]); contentDelimitation = true; } else { + if (statusCode == HttpServletResponse.SC_SWITCHING_PROTOCOLS) { + outputBuffer.addActiveFilter + (outputFilters[Constants.IDENTITY_FILTER]); + } else if (entityBody && http11) { outputBuffer.addActiveFilter (outputFilters[Constants.CHUNKED_FILTER]); @@ -1614,6 +1625,9 @@ // If we know that the request is bad this early, add the // Connection: close header. keepAlive = keepAlive && !statusDropsConnection(statusCode); + if (statusCode == HttpServletResponse.SC_SWITCHING_PROTOCOLS) { + keepAlive = false; + } else if (!keepAlive) { headers.addValue(Constants.CONNECTION).setString(Constants.CLOSE); } else if (!http11 && !error) { Index: java/org/apache/coyote/http11/Http11AprProcessor.java =================================================================== --- java/org/apache/coyote/http11/Http11AprProcessor.java (revision 1300373) +++ java/org/apache/coyote/http11/Http11AprProcessor.java (working copy) @@ -26,6 +26,8 @@ import java.security.cert.CertificateFactory; import java.security.cert.X509Certificate; +import javax.servlet.http.HttpServletResponse; + import org.apache.coyote.ActionCode; import org.apache.coyote.ActionHook; import org.apache.coyote.Adapter; @@ -1265,6 +1267,9 @@ //no op } else if (actionCode == ActionCode.ACTION_COMET_SETTIMEOUT) { //no op + } else if (actionCode == ActionCode.ACTION_PROTOCOL_SWITCH) { + inputBuffer.lastActiveFilter = -1; + outputBuffer.lastActiveFilter = -1; } } @@ -1708,6 +1713,10 @@ (outputFilters[Constants.IDENTITY_FILTER]); contentDelimitation = true; } else { + if (statusCode == HttpServletResponse.SC_SWITCHING_PROTOCOLS) { + outputBuffer.addActiveFilter + (outputFilters[Constants.IDENTITY_FILTER]); + } else if (entityBody && http11) { outputBuffer.addActiveFilter (outputFilters[Constants.CHUNKED_FILTER]); @@ -1753,6 +1762,9 @@ // If we know that the request is bad this early, add the // Connection: close header. keepAlive = keepAlive && !statusDropsConnection(statusCode); + if (statusCode == HttpServletResponse.SC_SWITCHING_PROTOCOLS) { + keepAlive = false; + } else if (!keepAlive) { headers.addValue(Constants.CONNECTION).setString(Constants.CLOSE); } else if (!http11 && !error) { Index: java/org/apache/coyote/http11/Http11NioProcessor.java =================================================================== --- java/org/apache/coyote/http11/Http11NioProcessor.java (revision 1300373) +++ java/org/apache/coyote/http11/Http11NioProcessor.java (working copy) @@ -25,6 +25,8 @@ import java.util.regex.Pattern; import java.util.regex.PatternSyntaxException; +import javax.servlet.http.HttpServletResponse; + import org.apache.coyote.ActionCode; import org.apache.coyote.ActionHook; import org.apache.coyote.Adapter; @@ -328,7 +330,6 @@ */ protected String server = null; - // ------------------------------------------------------------- Properties @@ -1254,6 +1255,9 @@ RequestInfo rp = request.getRequestProcessor(); if ( rp.getStage() != org.apache.coyote.Constants.STAGE_SERVICE ) //async handling attach.setTimeout(timeout); + } else if (actionCode == ActionCode.ACTION_PROTOCOL_SWITCH) { + inputBuffer.lastActiveFilter = -1; + outputBuffer.lastActiveFilter = -1; } } @@ -1703,6 +1707,10 @@ (outputFilters[Constants.IDENTITY_FILTER]); contentDelimitation = true; } else { + if (statusCode == HttpServletResponse.SC_SWITCHING_PROTOCOLS) { + outputBuffer.addActiveFilter + (outputFilters[Constants.IDENTITY_FILTER]); + } else if (entityBody && http11) { outputBuffer.addActiveFilter (outputFilters[Constants.CHUNKED_FILTER]); @@ -1748,6 +1756,9 @@ // If we know that the request is bad this early, add the // Connection: close header. keepAlive = keepAlive && !statusDropsConnection(statusCode); + if (statusCode == HttpServletResponse.SC_SWITCHING_PROTOCOLS) { + keepAlive = false; + } else if (!keepAlive) { headers.addValue(Constants.CONNECTION).setString(Constants.CLOSE); } else if (!http11 && !error) { Index: java/org/apache/coyote/ActionCode.java =================================================================== --- java/org/apache/coyote/ActionCode.java (revision 1300373) +++ java/org/apache/coyote/ActionCode.java (working copy) @@ -161,6 +161,12 @@ */ public static final ActionCode ACTION_COMET_SETTIMEOUT = new ActionCode(25); + + /** + * Clears input and output filters from input and output + */ + public static final ActionCode ACTION_PROTOCOL_SWITCH = new ActionCode(26); + // ----------------------------------------------------------- Constructors int code; Index: java/org/apache/tomcat/util/buf/B2CConverter.java =================================================================== --- java/org/apache/tomcat/util/buf/B2CConverter.java (revision 1300373) +++ java/org/apache/tomcat/util/buf/B2CConverter.java (working copy) @@ -47,6 +47,9 @@ private static final Map encodingToCharsetCache = new HashMap(); + public static final Charset ISO_8859_1; + public static final Charset UTF_8; + static { for (Charset charset: Charset.availableCharsets().values()) { encodingToCharsetCache.put( @@ -56,7 +59,18 @@ alias.toLowerCase(Locale.US), charset); } } + Charset iso88591 = null; + Charset utf8 = null; + try { + iso88591 = getCharset("ISO-8859-1"); + utf8 = getCharset("UTF-8"); + } catch (UnsupportedEncodingException e) { + // Impossible. All JVMs must support these. + e.printStackTrace(); } + ISO_8859_1 = iso88591; + UTF_8 = utf8; + } public static Charset getCharset(String enc) throws UnsupportedEncodingException { Index: java/org/apache/catalina/websocket/LocalStrings.properties =================================================================== --- java/org/apache/catalina/websocket/LocalStrings.properties (revision 0) +++ java/org/apache/catalina/websocket/LocalStrings.properties (revision 0) @@ -0,0 +1,27 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +frame.eos=The end of the stream was reached before the expected number of payload bytes could be read +frame.invalidUtf8=A sequence of bytes was received that did not represent valid UTF-8 +frame.readFailed=Failed to read the first byte of the next WebSocket frame. The return value from the read was [{0}] +frame.readEos=The end of the stream was reached when trying to read the first byte of a new WebSocket frame +frame.notMasked=The client frame was not masked but all client frames must be masked + +is.notContinuation=A frame with the OpCode [{0}] was received when a continuation frame was expected +is.unknownOpCode=A frame with the unrecognized OpCode [{0}] was received + +message.bufferTooSmall=The buffer is not big enough to contain the message currently being processed + +outbound.closed=The WebSocket connection has been closed \ No newline at end of file Index: java/org/apache/catalina/websocket/MessageInbound.java =================================================================== --- java/org/apache/catalina/websocket/MessageInbound.java (revision 0) +++ java/org/apache/catalina/websocket/MessageInbound.java (revision 0) @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.catalina.websocket; + +import java.io.IOException; +import java.io.InputStream; +import java.io.Reader; +import java.nio.ByteBuffer; +import java.nio.CharBuffer; + +import org.apache.tomcat.util.res.StringManager; + +/** + * Base implementation of the class used to process WebSocket connections based + * on messages. Applications should extend this class to provide application + * specific functionality. Applications that wish to operate on a stream basis + * rather than a message basis should use {@link StreamInbound}. + */ +public abstract class MessageInbound extends StreamInbound { + + private static final StringManager sm = + StringManager.getManager(Constants.Package); + + + // 2MB - like maxPostSize + private int byteBufferMaxSize = 2097152; + private int charBufferMaxSize = 2097152; + + private ByteBuffer bb = ByteBuffer.allocate(8192); + private CharBuffer cb = CharBuffer.allocate(8192); + + + @Override + protected final void onBinaryData(InputStream is) throws IOException { + int read = 0; + while (read > -1) { + bb.position(bb.position() + read); + if (bb.remaining() == 0) { + resizeByteBuffer(); + } + read = is.read(bb.array(), bb.position(), bb.remaining()); + } + bb.flip(); + onBinaryMessage(bb); + bb.clear(); + } + + + @Override + protected final void onTextData(Reader r) throws IOException { + int read = 0; + while (read > -1) { + cb.position(cb.position() + read); + if (cb.remaining() == 0) { + resizeCharBuffer(); + } + read = r.read(cb.array(), cb.position(), cb.remaining()); + } + cb.flip(); + onTextMessage(cb); + cb.clear(); + } + + + private void resizeByteBuffer() throws IOException { + int maxSize = getByteBufferMaxSize(); + if (bb.limit() >= maxSize) { + throw new IOException(sm.getString("message.bufferTooSmall")); + } + + long newSize = bb.limit() * 2; + if (newSize > maxSize) { + newSize = maxSize; + } + + // Cast is safe. newSize < maxSize and maxSize is an int + ByteBuffer newBuffer = ByteBuffer.allocate((int) newSize); + bb.rewind(); + newBuffer.put(bb); + bb = newBuffer; + } + + + private void resizeCharBuffer() throws IOException { + int maxSize = getCharBufferMaxSize(); + if (cb.limit() >= maxSize) { + throw new IOException(sm.getString("message.bufferTooSmall")); + } + + long newSize = cb.limit() * 2; + if (newSize > maxSize) { + newSize = maxSize; + } + + // Cast is safe. newSize < maxSize and maxSize is an int + CharBuffer newBuffer = CharBuffer.allocate((int) newSize); + cb.rewind(); + newBuffer.put(cb); + cb = newBuffer; + } + + + /** + * Obtain the current maximum size (in bytes) of the buffer used for binary + * messages. + */ + public final int getByteBufferMaxSize() { + return byteBufferMaxSize; + } + + + /** + * Set the maximum size (in bytes) of the buffer used for binary messages. + */ + public final void setByteBufferMaxSize(int byteBufferMaxSize) { + this.byteBufferMaxSize = byteBufferMaxSize; + } + + + /** + * Obtain the current maximum size (in characters) of the buffer used for + * binary messages. + */ + public final int getCharBufferMaxSize() { + return charBufferMaxSize; + } + + + /** + * Set the maximum size (in characters) of the buffer used for textual + * messages. + */ + public final void setCharBufferMaxSize(int charBufferMaxSize) { + this.charBufferMaxSize = charBufferMaxSize; + } + + + /** + * This method is called when there is a binary WebSocket message available + * to process. The message is presented via a ByteBuffer and may have been + * formed from one or more frames. The number of frames used to transmit the + * message is not made visible to the application. + * + * @param message The WebSocket message + * + * @throws IOException If a problem occurs processing the message. Any + * exception will trigger the closing of the WebSocket + * connection. + */ + protected abstract void onBinaryMessage(ByteBuffer message) + throws IOException; + + + /** + * This method is called when there is a textual WebSocket message available + * to process. The message is presented via a CharBuffer and may have been + * formed from one or more frames. The number of frames used to transmit the + * message is not made visible to the application. + * + * @param message The WebSocket message + * + * @throws IOException If a problem occurs processing the message. Any + * exception will trigger the closing of the WebSocket + * connection. + */ + protected abstract void onTextMessage(CharBuffer message) + throws IOException; +} Index: java/org/apache/catalina/websocket/WsFrame.java =================================================================== --- java/org/apache/catalina/websocket/WsFrame.java (revision 0) +++ java/org/apache/catalina/websocket/WsFrame.java (revision 0) @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.catalina.websocket; + +import java.io.EOFException; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.CharBuffer; +import java.nio.charset.CoderResult; + +import org.apache.catalina.CometEvent; +import org.apache.catalina.util.Conversions; +import org.apache.tomcat.util.res.StringManager; + +/** + * Represents a complete WebSocket frame with the exception of the payload for + * non-control frames. + */ +public class WsFrame { + + private static final StringManager sm = + StringManager.getManager(Constants.Package); + + + private final boolean fin; + private final int rsv; + private final byte opCode; + private byte[] mask = new byte[4]; + private long payloadLength; + private ByteBuffer payload; + + /** + * Create the new WebSocket frame, reading data from the processor as + * necessary. + * + * @param first First byte of data for this frame + * @param processor Processor associated with the WebSocket connection on + * which the frame has been sent + * + * @throws IOException If a problem occurs processing the frame. Any + * exception will trigger the closing of the WebSocket + * connection. + */ + private WsFrame(byte first, + CometEvent event) throws IOException { + + int b = first & 0xFF; + fin = (b & 0x80) > 0; + rsv = (b & 0x70) >>> 4; + opCode = (byte) (b & 0x0F); + + b = blockingRead(event); + // Client data must be masked + if ((b & 0x80) == 0) { + throw new IOException(sm.getString("frame.notMasked")); + } + + payloadLength = b & 0x7F; + if (payloadLength == 126) { + byte[] extended = new byte[2]; + blockingRead(event, extended); + payloadLength = Conversions.byteArrayToLong(extended); + } else if (payloadLength == 127) { + byte[] extended = new byte[8]; + blockingRead(event, extended); + payloadLength = Conversions.byteArrayToLong(extended); + } + + if (isControl()) { + if (payloadLength > 125) { + throw new IOException(); + } + if (!fin) { + throw new IOException(); + } + } + + blockingRead(event, mask); + + if (isControl()) { + // Note: Payload limited to <= 125 bytes by test above + payload = ByteBuffer.allocate((int) payloadLength); + blockingRead(event, payload); + + if (opCode == Constants.OPCODE_CLOSE && payloadLength > 2) { + // Check close payload - if present - is valid UTF-8 + CharBuffer cb = CharBuffer.allocate((int) payloadLength); + Utf8Decoder decoder = new Utf8Decoder(); + payload.position(2); + CoderResult cr = decoder.decode(payload, cb, true); + payload.position(0); + if (cr.isError()) { + throw new IOException(sm.getString("frame.invalidUtf8")); + } + } + } else { + payload = null; + } + } + + public boolean getFin() { + return fin; + } + + public int getRsv() { + return rsv; + } + + public byte getOpCode() { + return opCode; + } + + public boolean isControl() { + return (opCode & 0x08) > 0; + } + + public byte[] getMask() { + return mask; + } + + public long getPayLoadLength() { + return payloadLength; + } + + public ByteBuffer getPayLoad() { + return payload; + } + + + /* + * Blocks until a aingle byte has been read + */ + private int blockingRead(CometEvent event) + throws IOException { + int result = event.getHttpServletRequest().getInputStream().read(); + if (result == -1) { + throw new IOException(sm.getString("frame.eos")); + } + return result; + } + + + /* + * Blocks until the byte array has been filled. + */ + private void blockingRead(CometEvent event, byte[] bytes) + throws IOException { + int read = 0; + int last = 0; + while (read < bytes.length) { + last = event.getHttpServletRequest().getInputStream().read(bytes, read, bytes.length - read); + if (last == -1) { + throw new IOException(sm.getString("frame.eos")); + } + read += last; + } + } + + + /* + * Intended to read whole payload and blocks until it has. Therefore able to + * unmask the payload data. + */ + private void blockingRead(CometEvent event, ByteBuffer bb) + throws IOException { + int last = 0; + while (bb.hasRemaining()) { + last = event.getHttpServletRequest().getInputStream().read(); + if (last == -1) { + throw new IOException(sm.getString("frame.eos")); + } + bb.put((byte) (last ^ mask[bb.position() % 4])); + } + bb.flip(); + } + + + /** + * Read the next WebSocket frame, reading data from the processor as + * necessary. + * + * @param processor Processor associated with the WebSocket connection on + * which the frame has been sent + * + * @param block Should this method block until a frame is presented if no + * data is currently available to process. Note that is a + * single byte is available, this method will block until the + * complete frame (excluding payload for non-control frames) is + * available. + * + * @throws IOException If a problem occurs processing the frame. Any + * exception will trigger the closing of the WebSocket + * connection. + */ + public static WsFrame nextFrame(CometEvent event, + boolean block) throws IOException { + + byte[] first = new byte[1]; + + if (!block) { + if (event.getHttpServletRequest().getInputStream().available() == 0) { + return null; + } + } + + int read = event.getHttpServletRequest().getInputStream().read(first, 0, 1); + if (read == 1) { + return new WsFrame(first[0], event); + } else if (read == 0) { + return null; + } else if (read == -1) { + throw new EOFException(sm.getString("frame.readEos")); + } else { + throw new IOException( + sm.getString("frame.readFailed", Integer.valueOf(read))); + } + } +} Index: java/org/apache/catalina/websocket/WsInputStream.java =================================================================== --- java/org/apache/catalina/websocket/WsInputStream.java (revision 0) +++ java/org/apache/catalina/websocket/WsInputStream.java (revision 0) @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.catalina.websocket; + +import java.io.IOException; +import java.io.InputStream; + +import org.apache.catalina.CometEvent; +import org.apache.tomcat.util.res.StringManager; + +/** + * This class is used to read WebSocket frames from the underlying socket and + * makes the payload available for reading as an {@link InputStream}. It only + * makes the number of bytes declared in the payload length available for + * reading even if more bytes are available from the socket. + */ +public class WsInputStream extends InputStream { + + private static final StringManager sm = + StringManager.getManager(Constants.Package); + + + private CometEvent event; + private WsOutbound outbound; + + private WsFrame frame; + private long remaining; + private long readThisFragment; + + private String error = null; + + + public WsInputStream(CometEvent event, WsOutbound outbound) { + this.event = event; + this.outbound = outbound; + } + + + /** + * Process the next WebSocket frame. + * + * @param block Should this method block until a frame is presented if no + * data is currently available to process. Note that is a + * single byte is available, this method will block until the + * complete frame (excluding payload for non-control frames) is + * available. + * + * @return The next frame to be processed or null if block is + * false and there is no data to be processed. + * + * @throws IOException If a problem occurs reading the data for the frame. + */ + public WsFrame nextFrame(boolean block) throws IOException { + frame = WsFrame.nextFrame(event, block); + if (frame != null) { + readThisFragment = 0; + remaining = frame.getPayLoadLength(); + } + return frame; + } + + + // ----------------------------------------------------- InputStream methods + + @Override + public int read() throws IOException { + + makePayloadDataAvailable(); + + if (remaining == 0) { + return -1; + } + + remaining--; + readThisFragment++; + + int masked = event.getHttpServletRequest().getInputStream().available(); + if(masked == -1 || masked == 0) { + return -1; + } + masked = event.getHttpServletRequest().getInputStream().read(); + return masked ^ + (frame.getMask()[(int) ((readThisFragment - 1) % 4)] & 0xFF); + } + + + @Override + public int read(byte b[], int off, int len) throws IOException { + + makePayloadDataAvailable(); + + if (remaining == 0) { + return -1; + } + + if (len > remaining) { + len = (int) remaining; + } + int result = event.getHttpServletRequest().getInputStream().read(b, off, len); + if(result == -1) { + return -1; + } + + for (int i = off; i < off + result; i++) { + b[i] = (byte) (b[i] ^ + frame.getMask()[(int) ((readThisFragment + i - off) % 4)]); + } + remaining -= result; + readThisFragment += result; + return result; + } + + + /* + * Ensures that there is payload data ready to read. + */ + private void makePayloadDataAvailable() throws IOException { + if (error != null) { + throw new IOException(error); + } + while (remaining == 0 && !frame.getFin()) { + // Need more data - process next frame + nextFrame(true); + while (frame.isControl()) { + if (frame.getOpCode() == Constants.OPCODE_PING) { + outbound.pong(frame.getPayLoad()); + } else if (frame.getOpCode() == Constants.OPCODE_PONG) { + // NO-OP. Swallow it. + } else if (frame.getOpCode() == Constants.OPCODE_CLOSE) { + outbound.close(frame); + } else{ + throw new IOException(sm.getString("is.unknownOpCode", + Byte.valueOf(frame.getOpCode()))); + } + nextFrame(true); + } + if (frame.getOpCode() != Constants.OPCODE_CONTINUATION) { + error = sm.getString("is.notContinuation", + Byte.valueOf(frame.getOpCode())); + throw new IOException(error); + } + } + } +} Index: java/org/apache/catalina/websocket/WsOutbound.java =================================================================== --- java/org/apache/catalina/websocket/WsOutbound.java (revision 0) +++ java/org/apache/catalina/websocket/WsOutbound.java (revision 0) @@ -0,0 +1,407 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.catalina.websocket; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.CharBuffer; +import java.nio.charset.CharsetEncoder; +import java.nio.charset.CoderResult; + +import org.apache.catalina.CometEvent; +import org.apache.tomcat.util.buf.B2CConverter; +import org.apache.tomcat.util.res.StringManager; + +/** + * Provides the means to write WebSocket messages to the client. All methods + * that write to the client (or update a buffer that is later written to the + * client) are synchronized to prevent multiple threads trying to write to the + * client at the same time. + */ +public class WsOutbound { + + private static final StringManager sm = + StringManager.getManager(Constants.Package); + public static final int DEFAULT_BUFFER_SIZE = 8192; + + private CometEvent event; + private ByteBuffer bb; + private CharBuffer cb; + private boolean closed = false; + private Boolean text = null; + private boolean firstFrame = true; + + + public WsOutbound(CometEvent event) { + this(event, DEFAULT_BUFFER_SIZE, DEFAULT_BUFFER_SIZE); + } + + + public WsOutbound(CometEvent event, int byteBufferSize, + int charBufferSize) { + this.event = event; + this.bb = ByteBuffer.allocate(byteBufferSize); + this.cb = CharBuffer.allocate(charBufferSize); + } + + + /** + * Adds the data to the buffer for binary data. If a textual message is + * currently in progress that message will be completed and a new binary + * message started. If the buffer for binary data is full, the buffer will + * be flushed and a new binary continuation fragment started. + * + * @param b The byte (only the least significant byte is used) of data to + * send to the client. + * + * @throws IOException If a flush is required and an error occurs writing + * the WebSocket frame to the client + */ + public synchronized void writeBinaryData(int b) throws IOException { + if (closed) { + throw new IOException(sm.getString("outbound.closed")); + } + + if (bb.position() == bb.capacity()) { + doFlush(false); + } + if (text == null) { + text = Boolean.FALSE; + } else if (text == Boolean.TRUE) { + // Flush the character data + flush(); + text = Boolean.FALSE; + } + bb.put((byte) (b & 0xFF)); + } + + + /** + * Adds the data to the buffer for textual data. If a binary message is + * currently in progress that message will be completed and a new textual + * message started. If the buffer for textual data is full, the buffer will + * be flushed and a new textual continuation fragment started. + * + * @param c The character to send to the client. + * + * @throws IOException If a flush is required and an error occurs writing + * the WebSocket frame to the client + */ + public synchronized void writeTextData(char c) throws IOException { + if (closed) { + throw new IOException(sm.getString("outbound.closed")); + } + + if (cb.position() == cb.capacity()) { + doFlush(false); + } + + if (text == null) { + text = Boolean.TRUE; + } else if (text == Boolean.FALSE) { + // Flush the binary data + flush(); + text = Boolean.TRUE; + } + cb.append(c); + } + + + /** + * Flush any message (binary or textual) that may be buffered and then send + * a WebSocket binary message as a single frame with the provided buffer as + * the payload of the message. + * + * @param msgBb The buffer containing the payload + * + * @throws IOException If an error occurs writing to the client + */ + public synchronized void writeBinaryMessage(ByteBuffer msgBb) + throws IOException { + + if (closed) { + throw new IOException(sm.getString("outbound.closed")); + } + + if (text != null) { + // Empty the buffer + flush(); + } + text = Boolean.FALSE; + doWriteBytes(msgBb, true); + } + + + /** + * Flush any message (binary or textual) that may be buffered and then send + * a WebSocket text message as a single frame with the provided buffer as + * the payload of the message. + * + * @param msgCb The buffer containing the payload + * + * @throws IOException If an error occurs writing to the client + */ + public synchronized void writeTextMessage(CharBuffer msgCb) + throws IOException { + + if (closed) { + throw new IOException(sm.getString("outbound.closed")); + } + + if (text != null) { + // Empty the buffer + flush(); + } + text = Boolean.TRUE; + doWriteText(msgCb, true); + } + + + /** + * Flush any message (binary or textual) that may be buffered. + * + * @throws IOException If an error occurs writing to the client + */ + public synchronized void flush() throws IOException { + if (closed) { + throw new IOException(sm.getString("outbound.closed")); + } + doFlush(true); + } + + + private void doFlush(boolean finalFragment) throws IOException { + if (text == null) { + // No data + return; + } + if (text.booleanValue()) { + cb.flip(); + doWriteText(cb, finalFragment); + } else { + bb.flip(); + doWriteBytes(bb, finalFragment); + } + } + + + /** + * Respond to a client close by sending a close that echoes the status code + * and message. + * + * @param frame The close frame received from a client + * + * @throws IOException If an error occurs writing to the client + */ + protected void close(WsFrame frame) throws IOException { + if (frame.getPayLoadLength() > 0) { + // Must be status (2 bytes) plus optional message + if (frame.getPayLoadLength() == 1) { + throw new IOException(); + } + int status = (frame.getPayLoad().get() & 0xFF) << 8; + status += frame.getPayLoad().get() & 0xFF; + + if (validateCloseStatus(status)) { + // Echo the status back to the client + close(status, frame.getPayLoad()); + } else { + // Invalid close code + close(Constants.STATUS_PROTOCOL_ERROR, null); + } + } else { + // No status + close(0, null); + } + } + + + private boolean validateCloseStatus(int status) { + + if (status == Constants.STATUS_CLOSE_NORMAL || + status == Constants.STATUS_SHUTDOWN || + status == Constants.STATUS_PROTOCOL_ERROR || + status == Constants.STATUS_UNEXPECTED_DATA_TYPE || + status == Constants.STATUS_BAD_DATA || + status == Constants.STATUS_POLICY_VIOLATION || + status == Constants.STATUS_MESSAGE_TOO_LARGE || + status == Constants.STATUS_REQUIRED_EXTENSION || + status == Constants.STATUS_UNEXPECTED_CONDITION || + (status > 2999 && status < 5000)) { + // Other 1xxx reserved / not permitted + // 2xxx reserved + // 3xxx framework defined + // 4xxx application defined + return true; + } + // <1000 unused + // >4999 undefined + return false; + } + + + /** + * Send a close message to the client + * + * @param status Must be a valid status code or zero to send no code + * @param data Optional message. If message is defined, a valid status + * code must be provided. + * + * @throws IOException If an error occurs writing to the client + */ + public synchronized void close(int status, ByteBuffer data) + throws IOException { + + if (closed) { + return; + } + closed = true; + + event.getHttpServletResponse().getOutputStream().write(0x88); + if (status == 0) { + event.getHttpServletResponse().getOutputStream().write(0); + } else if (data == null || data.position() == data.limit()) { + event.getHttpServletResponse().getOutputStream().write(2); + event.getHttpServletResponse().getOutputStream().write(status >>> 8); + event.getHttpServletResponse().getOutputStream().write(status); + } else { + event.getHttpServletResponse().getOutputStream().write(2 + data.limit() - data.position()); + event.getHttpServletResponse().getOutputStream().write(status >>> 8); + event.getHttpServletResponse().getOutputStream().write(status); + event.getHttpServletResponse().getOutputStream().write(data.array(), data.position(), + data.limit() - data.position()); + } + event.getHttpServletResponse().flushBuffer(); + + bb = null; + cb = null; + event = null; + } + + + /** + * Send a pong message to the client + * + * @param data Optional message. + * + * @throws IOException If an error occurs writing to the client + */ + public synchronized void pong(ByteBuffer data) throws IOException { + + if (closed) { + throw new IOException(sm.getString("outbound.closed")); + } + + doFlush(true); + + event.getHttpServletResponse().getOutputStream().write(0x8A); + if (data == null) { + event.getHttpServletResponse().getOutputStream().write(0); + } else { + event.getHttpServletResponse().getOutputStream().write(data.limit() - data.position()); + event.getHttpServletResponse().getOutputStream().write(data.array(), data.position(), + data.limit() - data.position()); + } + + event.getHttpServletResponse().flushBuffer(); + } + + + /** + * Writes the provided bytes as the payload in a new WebSocket frame. + * + * @param buffer The bytes to include in the payload. + * @param finalFragment Do these bytes represent the final fragment of a + * WebSocket message? + * @throws IOException + */ + private void doWriteBytes(ByteBuffer buffer, boolean finalFragment) + throws IOException { + + // Work out the first byte + int first = 0x00; + if (finalFragment) { + first = first + 0x80; + } + if (firstFrame) { + if (text.booleanValue()) { + first = first + 0x1; + } else { + first = first + 0x2; + } + } + // Continuation frame is OpCode 0 + event.getHttpServletResponse().getOutputStream().write(first); + + if (buffer.limit() < 126) { + event.getHttpServletResponse().getOutputStream().write(buffer.limit()); + } else if (buffer.limit() < 65536) { + event.getHttpServletResponse().getOutputStream().write(126); + event.getHttpServletResponse().getOutputStream().write(buffer.limit() >>> 8); + event.getHttpServletResponse().getOutputStream().write(buffer.limit() & 0xFF); + } else { + // Will never be more than 2^31-1 + event.getHttpServletResponse().getOutputStream().write(127); + event.getHttpServletResponse().getOutputStream().write(0); + event.getHttpServletResponse().getOutputStream().write(0); + event.getHttpServletResponse().getOutputStream().write(0); + event.getHttpServletResponse().getOutputStream().write(0); + event.getHttpServletResponse().getOutputStream().write(buffer.limit() >>> 24); + event.getHttpServletResponse().getOutputStream().write(buffer.limit() >>> 16); + event.getHttpServletResponse().getOutputStream().write(buffer.limit() >>> 8); + event.getHttpServletResponse().getOutputStream().write(buffer.limit() & 0xFF); + + } + + // Write the content + event.getHttpServletResponse().getOutputStream().write(buffer.array(), 0, buffer.limit()); + event.getHttpServletResponse().flushBuffer(); + + // Reset + if (finalFragment) { + text = null; + firstFrame = true; + } else { + firstFrame = false; + } + bb.clear(); + } + + + /* + * Convert the textual message to bytes and then output it. + */ + private void doWriteText(CharBuffer buffer, boolean finalFragment) + throws IOException { + CharsetEncoder encoder = B2CConverter.UTF_8.newEncoder(); + do { + CoderResult cr = encoder.encode(buffer, bb, true); + if (cr.isError()) { + cr.throwException(); + } + bb.flip(); + if (buffer.hasRemaining()) { + doWriteBytes(bb, false); + } else { + doWriteBytes(bb, finalFragment); + } + } while (buffer.hasRemaining()); + + // Reset - bb will be cleared in doWriteBytes() + cb.clear(); + } +} Index: java/org/apache/catalina/websocket/StreamInbound.java =================================================================== --- java/org/apache/catalina/websocket/StreamInbound.java (revision 0) +++ java/org/apache/catalina/websocket/StreamInbound.java (revision 0) @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.catalina.websocket; + +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.Reader; +import java.nio.ByteBuffer; +import java.nio.charset.MalformedInputException; +import java.nio.charset.UnmappableCharacterException; + +import org.apache.catalina.CometEvent; + +/** + * Base implementation of the class used to process WebSocket connections based + * on streams. Applications should extend this class to provide application + * specific functionality. Applications that wish to operate on a message basis + * rather than a stream basis should use {@link MessageInbound}. + */ +public abstract class StreamInbound { + + private CometEvent event; + private WsOutbound outbound; + private int outboundByteBufferSize = WsOutbound.DEFAULT_BUFFER_SIZE; + private int outboundCharBufferSize = WsOutbound.DEFAULT_BUFFER_SIZE; + + + + public int getOutboundByteBufferSize() { + return outboundByteBufferSize; + } + + + /** + * This only applies to the {@link WsOutbound} instance returned from + * {@link #getWsOutbound()} created by a subsequent call to + * {@link #setUpgradeOutbound(UpgradeOutbound)}. The current + * {@link WsOutbound} instance, if any, is not affected. + * + * @param outboundByteBufferSize + */ + public void setOutboundByteBufferSize(int outboundByteBufferSize) { + this.outboundByteBufferSize = outboundByteBufferSize; + } + + + public int getOutboundCharBufferSize() { + return outboundCharBufferSize; + } + + + /** + * This only applies to the {@link WsOutbound} instance returned from + * {@link #getWsOutbound()} created by a subsequent call to + * {@link #setUpgradeOutbound(UpgradeOutbound)}. The current + * {@link WsOutbound} instance, if any, is not affected. + * + * @param outboundCharBufferSize + */ + public void setOutboundCharBufferSize(int outboundCharBufferSize) { + this.outboundCharBufferSize = outboundCharBufferSize; + } + + + public final void setUpgradeOutbound(CometEvent event) { + outbound = new WsOutbound(event, outboundByteBufferSize, + outboundCharBufferSize); + } + + + public final void setUpgradeProcessor(CometEvent event) { + this.event = event; + } + + + /** + * Obtain the outbound side of this WebSocket connection used for writing + * data to the client. + */ + public final WsOutbound getWsOutbound() { + return outbound; + } + + + public final CometEvent.EventType onData() throws IOException { + // Must be start the start of a message (which may consist of multiple + // frames) + WsInputStream wsIs = new WsInputStream(event, getWsOutbound()); + + try { + WsFrame frame = wsIs.nextFrame(true); + + while (frame != null) { + // TODO User defined extensions may define values for rsv + if (frame.getRsv() > 0) { + closeOutboundConnection( + Constants.STATUS_PROTOCOL_ERROR, null); + return CometEvent.EventType.END; + } + + byte opCode = frame.getOpCode(); + + if (opCode == Constants.OPCODE_BINARY) { + onBinaryData(wsIs); + } else if (opCode == Constants.OPCODE_TEXT) { + InputStreamReader r = + new InputStreamReader(wsIs, new Utf8Decoder()); + onTextData(r); + } else if (opCode == Constants.OPCODE_CLOSE){ + closeOutboundConnection(frame); + return CometEvent.EventType.END; + } else if (opCode == Constants.OPCODE_PING) { + getWsOutbound().pong(frame.getPayLoad()); + } else if (opCode == Constants.OPCODE_PONG) { + // NO-OP + } else { + // Unknown OpCode + closeOutboundConnection( + Constants.STATUS_PROTOCOL_ERROR, null); + return CometEvent.EventType.END; + } + frame = wsIs.nextFrame(false); + } + } catch (MalformedInputException mie) { + // Invalid UTF-8 + closeOutboundConnection(Constants.STATUS_BAD_DATA, null); + return CometEvent.EventType.END; + } catch (UnmappableCharacterException uce) { + // Invalid UTF-8 + closeOutboundConnection(Constants.STATUS_BAD_DATA, null); + return CometEvent.EventType.END; + } catch (IOException ioe) { + // Given something must have gone to reach this point, this + // might not work but try it anyway. + closeOutboundConnection(Constants.STATUS_PROTOCOL_ERROR, null); + return CometEvent.EventType.END; + } + return CometEvent.EventType.READ; + } + + private void closeOutboundConnection(int status, ByteBuffer data) throws IOException { + try { + getWsOutbound().close(status, data); + } finally { + onClose(status); + } + } + + private void closeOutboundConnection(WsFrame frame) throws IOException { + try { + getWsOutbound().close(frame); + } finally { + onClose(Constants.OPCODE_CLOSE); + } + } + + public void onUpgradeComplete() { + onOpen(outbound); + } + + /** + * Intended to be overridden by sub-classes that wish to be notified + * when the outbound connection is established. The default implementation + * is a NO-OP. + * + * @param outbound The outbound WebSocket connection. + */ + protected void onOpen(WsOutbound outbound) { + // NO-OP + } + + /** + * Intended to be overridden by sub-classes that wish to be notified + * when the outbound connection is closed. The default implementation + * is a NO-OP. + * + * @param status The status code of the close reason. + */ + protected void onClose(int status) { + // NO-OP + } + + + /** + * This method is called when there is a binary WebSocket message available + * to process. The message is presented via a stream and may be formed from + * one or more frames. The number of frames used to transmit the message is + * not made visible to the application. + * + * @param is The WebSocket message + * + * @throws IOException If a problem occurs processing the message. Any + * exception will trigger the closing of the WebSocket + * connection. + */ + protected abstract void onBinaryData(InputStream is) throws IOException; + + + /** + * This method is called when there is a textual WebSocket message available + * to process. The message is presented via a reader and may be formed from + * one or more frames. The number of frames used to transmit the message is + * not made visible to the application. + * + * @param r The WebSocket message + * + * @throws IOException If a problem occurs processing the message. Any + * exception will trigger the closing of the WebSocket + * connection. + */ + protected abstract void onTextData(Reader r) throws IOException; +} Index: java/org/apache/catalina/websocket/WebSocketServlet.java =================================================================== --- java/org/apache/catalina/websocket/WebSocketServlet.java (revision 0) +++ java/org/apache/catalina/websocket/WebSocketServlet.java (revision 0) @@ -0,0 +1,308 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.catalina.websocket; + +import java.io.IOException; +import java.lang.reflect.Field; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Enumeration; +import java.util.List; + +import javax.servlet.ServletException; +import javax.servlet.http.HttpServlet; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import org.apache.catalina.CometEvent; +import org.apache.catalina.CometEvent.EventSubType; +import org.apache.catalina.CometEvent.EventType; +import org.apache.catalina.CometProcessor; +import org.apache.catalina.connector.CometEventImpl; +import org.apache.catalina.connector.Request; +import org.apache.catalina.connector.RequestFacade; +import org.apache.catalina.connector.Response; +import org.apache.catalina.util.Base64; +import org.apache.tomcat.util.buf.B2CConverter; + +/** + * Provides the base implementation of a Servlet for processing WebSocket + * connections as per RFC6455. It is expected that applications will extend this + * implementation and provide application specific functionality. + */ +public abstract class WebSocketServlet extends HttpServlet implements CometProcessor { + + private static final long serialVersionUID = 1L; + private static final byte[] WS_ACCEPT = + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11".getBytes( + B2CConverter.ISO_8859_1); + + private MessageDigest sha1Helper; + + + @Override + public void init() throws ServletException { + super.init(); + + try { + sha1Helper = MessageDigest.getInstance("SHA1"); + } catch (NoSuchAlgorithmException e) { + throw new ServletException(e); + } + } + + @Override + public void event(CometEvent event) throws IOException, ServletException { + try { + if (event.getEventType() == CometEvent.EventType.BEGIN) { + doBegin(event); + } else if (event.getEventType() == CometEvent.EventType.READ) { + doRead(event); + } else if (event.getEventType() == CometEvent.EventType.END) { + event.close(); + } else if (event.getEventType() == CometEvent.EventType.ERROR) { + event.close(); + } + } catch (IOException x) { + event.close(); + } catch (ServletException x) { + event.close(); + } + } + + private Object getField(String name, Object o) throws IllegalAccessException, NoSuchFieldException{ + Field f = o.getClass().getDeclaredField(name); + f.setAccessible(true); + Object result = f.get(o); + f.setAccessible(false); + return result; + } + + protected CometEvent getCometEvent(HttpServletRequest req, HttpServletResponse resp) throws ServletException { + try { + Request request = (Request)getField("request", req); + Response response = (Response)getField("response",resp); + return new CometEventImpl(request, response); + }catch (Exception x) { + throw new ServletException(x); + } + } + + @Override + protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { + CometEvent event = getCometEvent(req,resp); + try { + doBegin(event); + CometEvent.EventType result = CometEvent.EventType.READ; + while (result == CometEvent.EventType.READ) { + result = doRead(event); + } + } catch (Exception x) { + if (event!=null) event.close(); + throw new ServletException(x); + } + } + + protected CometEvent.EventType doRead(CometEvent event) + throws ServletException, IOException { + StreamInbound inbound = (StreamInbound)event.getHttpServletRequest().getAttribute("org.apache.catalina.comet.inbound"); + final CometEvent.EventType result = inbound.onData(); + if (result != CometEvent.EventType.READ) { + event.close(); + } + return result; + + } + + + protected void doBegin(CometEvent event) + throws ServletException, IOException { + + HttpServletRequest req = event.getHttpServletRequest(); + HttpServletResponse resp = event.getHttpServletResponse(); + + // Information required to send the server handshake message + String key; + String subProtocol = null; + List extensions = Collections.emptyList(); + + if (!headerContainsToken(req, "upgrade", "websocket")) { + resp.sendError(HttpServletResponse.SC_BAD_REQUEST); + return; + } + + if (!headerContainsToken(req, "connection", "upgrade")) { + resp.sendError(HttpServletResponse.SC_BAD_REQUEST); + return; + } + + if (!headerContainsToken(req, "sec-websocket-version", "13")) { + resp.setStatus(426); + resp.setHeader("Sec-WebSocket-Version", "13"); + return; + } + + key = req.getHeader("Sec-WebSocket-Key"); + if (key == null) { + resp.sendError(HttpServletResponse.SC_BAD_REQUEST); + return; + } + + String origin = req.getHeader("Origin"); + if (!verifyOrigin(origin)) { + resp.sendError(HttpServletResponse.SC_FORBIDDEN); + return; + } + + List subProtocols = getTokensFromHeader(req, + "Sec-WebSocket-Protocol-Client"); + if (!subProtocols.isEmpty()) { + subProtocol = selectSubProtocol(subProtocols); + + } + + // TODO Read client handshake - Sec-WebSocket-Extensions + + // TODO Extensions require the ability to specify something (API TBD) + // that can be passed to the Tomcat internals and process extension + // data present when the frame is fragmented. + + // If we got this far, all is good. Accept the connection. + resp.setHeader("upgrade", "websocket"); + resp.setHeader("connection", "upgrade"); + resp.setHeader("Sec-WebSocket-Accept", getWebSocketAccept(key)); + if (subProtocol != null) { + resp.setHeader("Sec-WebSocket-Protocol", subProtocol); + } + if (!extensions.isEmpty()) { + // TODO + } + + resp.setStatus(HttpServletResponse.SC_SWITCHING_PROTOCOLS); + resp.flushBuffer(); + + //here we need to stop using filters + //TODO - stop using filters + try { + ((Request)getField("request",event)).protocolSwitch(); + }catch (Exception x) { + throw new ServletException(x); + } + + + // Small hack until the Servlet API provides a way to do this. + StreamInbound inbound = createWebSocketInbound(subProtocol); + inbound.setUpgradeProcessor(event); + inbound.setUpgradeOutbound(event); + req.setAttribute("org.apache.catalina.comet.inbound", inbound); + } + + + /* + * This only works for tokens. Quoted strings need more sophisticated + * parsing. + */ + private boolean headerContainsToken(HttpServletRequest req, + String headerName, String target) { + Enumeration headers = req.getHeaders(headerName); + while (headers.hasMoreElements()) { + String header = headers.nextElement(); + String[] tokens = header.split(","); + for (String token : tokens) { + if (target.equalsIgnoreCase(token.trim())) { + return true; + } + } + } + return true; + } + + + /* + * This only works for tokens. Quoted strings need more sophisticated + * parsing. + */ + private List getTokensFromHeader(HttpServletRequest req, + String headerName) { + List result = new ArrayList(); + + Enumeration headers = req.getHeaders(headerName); + while (headers.hasMoreElements()) { + String header = headers.nextElement(); + String[] tokens = header.split(","); + for (String token : tokens) { + result.add(token.trim()); + } + } + return result; + } + + + private String getWebSocketAccept(String key) { + synchronized (sha1Helper) { + sha1Helper.reset(); + sha1Helper.update(key.getBytes(B2CConverter.ISO_8859_1)); + return new String(Base64.encode(sha1Helper.digest(WS_ACCEPT)), B2CConverter.ISO_8859_1); + } + } + + + /** + * Intended to be overridden by sub-classes that wish to verify the origin + * of a WebSocket request before processing it. + * + * @param origin The value of the origin header from the request which + * may be null + * + * @return true to accept the request. false to + * reject it. This default implementation always returns + * true. + */ + protected boolean verifyOrigin(String origin) { + return true; + } + + + /** + * Intended to be overridden by sub-classes that wish to select a + * sub-protocol if the client provides a list of supported protocols. + * + * @param subProtocols The list of sub-protocols supported by the client + * in client preference order. The server is under no + * obligation to respect the declared preference + * @return null if no sub-protocol is selected or the name of + * the protocol which must be one of the protocols listed by + * the client. This default implementation always returns + * null. + */ + protected String selectSubProtocol(List subProtocols) { + return null; + } + + + /** + * Create the instance that will process this inbound connection. + * Applications must provide a new instance for each connection. + * + * @param subProtocol The sub-protocol agreed between the client and + * server or null if none was agreed + * @return + */ + protected abstract StreamInbound createWebSocketInbound(String subProtocol); +} Index: java/org/apache/catalina/websocket/Constants.java =================================================================== --- java/org/apache/catalina/websocket/Constants.java (revision 0) +++ java/org/apache/catalina/websocket/Constants.java (revision 0) @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.catalina.websocket; + +/** + * Constants for this Java package. + */ +public class Constants { + public static final String Package = "org.apache.catalina.websocket"; + + // OP Codes + public static final byte OPCODE_CONTINUATION = 0x00; + public static final byte OPCODE_TEXT = 0x01; + public static final byte OPCODE_BINARY = 0x02; + public static final byte OPCODE_CLOSE = 0x08; + public static final byte OPCODE_PING = 0x09; + public static final byte OPCODE_PONG = 0x0A; + + // Status Codes + // Definitions as per RFC 6455 (http://tools.ietf.org/html/rfc6455) + /** + * 1000 indicates a normal closure, meaning whatever purpose the + * connection was established for has been fulfilled. + */ + public static final int STATUS_CLOSE_NORMAL = 1000; + + /** + * 1001 indicates that an endpoint is "going away", such as a server + * going down, or a browser having navigated away from a page. + */ + public static final int STATUS_SHUTDOWN = 1001; + + /** + * 1002 indicates that an endpoint is terminating the connection due + * to a protocol error. + */ + public static final int STATUS_PROTOCOL_ERROR = 1002; + + /** + * 1003 indicates that an endpoint is terminating the connection + * because it has received a type of data it cannot accept (e.g. an + * endpoint that understands only text data MAY send this if it + * receives a binary message). + */ + public static final int STATUS_UNEXPECTED_DATA_TYPE = 1003; + + // 1004 is reserved. The specific meaning might be defined in the future. + + /** + * 1005 is a reserved value and MUST NOT be set as a status code in a + * Close control frame by an endpoint. It is designated for use in + * applications expecting a status code to indicate that no status + * code was actually present. + */ + public static final int STATUS_CODE_MISSING = 1005; + + /** + * 1006 is a reserved value and MUST NOT be set as a status code in a + * Close control frame by an endpoint. It is designated for use in + * applications expecting a status code to indicate that the + * connection was closed abnormally, e.g. without sending or + * receiving a Close control frame. + */ + public static final int STATUS_CLOSED_UNEXPECTEDLY = 1006; + + /** + * 1007 indicates that an endpoint is terminating the connection + * because it has received data within a message that was not + * consistent with the type of the message (e.g., non-UTF-8 [RFC3629] + * data within a text message). + */ + public static final int STATUS_BAD_DATA = 1007; + + /** + * 1008 indicates that an endpoint is terminating the connection + * because it has received a message that violates its policy. This + * is a generic status code that can be returned when there is no + * other more suitable status code (e.g. 1003 or 1009), or if there + * is a need to hide specific details about the policy. + */ + public static final int STATUS_POLICY_VIOLATION = 1008; + + /** + * 1009 indicates that an endpoint is terminating the connection + * because it has received a message which is too big for it to + * process. + */ + public static final int STATUS_MESSAGE_TOO_LARGE = 1009; + + /** + * 1010 indicates that an endpoint (client) is terminating the + * connection because it has expected the server to negotiate one or + * more extension, but the server didn't return them in the response + * message of the WebSocket handshake. The list of extensions which + * are needed SHOULD appear in the /reason/ part of the Close frame. + * Note that this status code is not used by the server, because it + * can fail the WebSocket handshake instead. + */ + public static final int STATUS_REQUIRED_EXTENSION = 1010; + + /** + * 1011 indicates that a server is terminating the connection because it + * encountered an unexpected condition that prevented it from fulfilling the + * request. + */ + public static final int STATUS_UNEXPECTED_CONDITION = 1011; +} Index: java/org/apache/catalina/websocket/Utf8Decoder.java =================================================================== --- java/org/apache/catalina/websocket/Utf8Decoder.java (revision 0) +++ java/org/apache/catalina/websocket/Utf8Decoder.java (revision 0) @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.catalina.websocket; + +import java.nio.ByteBuffer; +import java.nio.CharBuffer; +import java.nio.charset.CharsetDecoder; +import java.nio.charset.CoderResult; + +import org.apache.tomcat.util.buf.B2CConverter; + +/** + * Decodes bytes to UTF-8. Extracted from Apache Harmony and modified to reject + * code points from U+D800 to U+DFFF as per RFC3629. The standard Java decoder + * does not reject these. + */ +public class Utf8Decoder extends CharsetDecoder { + + // The next table contains information about UTF-8 charset and + // correspondence of 1st byte to the length of sequence + // For information please visit http://www.ietf.org/rfc/rfc3629.txt + // + // Please note, o means 0, actually. + // ------------------------------------------------------------------- + // 0 1 2 3 Value + // ------------------------------------------------------------------- + // oxxxxxxx 00000000 00000000 0xxxxxxx + // 11oyyyyy 1oxxxxxx 00000000 00000yyy yyxxxxxx + // 111ozzzz 1oyyyyyy 1oxxxxxx 00000000 zzzzyyyy yyxxxxxx + // 1111ouuu 1ouuzzzz 1oyyyyyy 1oxxxxxx 000uuuuu zzzzyyyy yyxxxxxx + + private static final int remainingBytes[] = { + // 1owwwwww + -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, + -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, + -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, + -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, + // 11oyyyyy + 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, + 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, + // 111ozzzz + 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, + // 1111ouuu + 3, 3, 3, 3, 3, 3, 3, 3, + // > 11110111 + -1, -1, -1, -1, -1, -1, -1, -1 }; + + private static final int remainingNumbers[] = { + 0, // 0 1 2 3 + 4224, // (01o00000b << 6)+(1o000000b) + 401536, // (011o0000b << 12)+(1o000000b << 6)+(1o000000b) + 29892736 // (0111o000b << 18)+(1o000000b << 12)+(1o000000b << 6)+(1o000000b) + }; + + private static final int lowerEncodingLimit[] = { -1, 0x80, 0x800, 0x10000 }; + + public Utf8Decoder() { + super(B2CConverter.UTF_8, 1.0f, 1.0f); + } + + @Override + protected CoderResult decodeLoop(ByteBuffer in, CharBuffer out) { + if (in.hasArray() && out.hasArray()) { + return decodeHasArray(in, out); + } + return decodeNotHasArray(in, out); + } + + private CoderResult decodeNotHasArray(ByteBuffer in, CharBuffer out) { + int outRemaining = out.remaining(); + int pos = in.position(); + int limit = in.limit(); + try { + while (pos < limit) { + if (outRemaining == 0) { + return CoderResult.OVERFLOW; + } + + int jchar = in.get(); + if (jchar < 0) { + jchar = jchar & 0x7F; + int tail = remainingBytes[jchar]; + if (tail == -1) { + return CoderResult.malformedForLength(1); + } + if (limit - pos < 1 + tail) { + return CoderResult.UNDERFLOW; + } + + int nextByte; + for (int i = 0; i < tail; i++) { + nextByte = in.get() & 0xFF; + if ((nextByte & 0xC0) != 0x80) { + return CoderResult + .malformedForLength(1 + i); + } + jchar = (jchar << 6) + nextByte; + } + jchar -= remainingNumbers[tail]; + if (jchar < lowerEncodingLimit[tail]) { + // Should have been encoded in a fewer octets + return CoderResult.malformedForLength(1); + } + pos += tail; + } + if (jchar <= 0xffff) { + out.put((char) jchar); + outRemaining--; + } else { + if (outRemaining < 2) { + return CoderResult.OVERFLOW; + } + out.put((char) ((jchar >> 0xA) + 0xD7C0)); + out.put((char) ((jchar & 0x3FF) + 0xDC00)); + outRemaining -= 2; + } + pos++; + } + return CoderResult.UNDERFLOW; + } finally { + in.position(pos); + } + } + + private CoderResult decodeHasArray(ByteBuffer in, CharBuffer out) { + int outRemaining = out.remaining(); + int pos = in.position(); + int limit = in.limit(); + final byte[] bArr = in.array(); + final char[] cArr = out.array(); + final int inIndexLimit = limit + in.arrayOffset(); + + int inIndex = pos + in.arrayOffset(); + int outIndex = out.position() + out.arrayOffset(); + + // if someone would change the limit in process, + // he would face consequences + for (; inIndex < inIndexLimit && outRemaining > 0; inIndex++) { + int jchar = bArr[inIndex]; + if (jchar < 0) { + jchar = jchar & 0x7F; + int tail = remainingBytes[jchar]; + + if (tail == -1) { + in.position(inIndex - in.arrayOffset()); + out.position(outIndex - out.arrayOffset()); + return CoderResult.malformedForLength(1); + } + if (inIndexLimit - inIndex < 1 + tail) { + break; + } + + for (int i = 0; i < tail; i++) { + int nextByte = bArr[inIndex + i + 1] & 0xFF; + if ((nextByte & 0xC0) != 0x80) { + in.position(inIndex - in.arrayOffset()); + out.position(outIndex - out.arrayOffset()); + return CoderResult.malformedForLength(1 + i); + } + jchar = (jchar << 6) + nextByte; + } + jchar -= remainingNumbers[tail]; + if (jchar < lowerEncodingLimit[tail]) { + // Should have been encoded in fewer octets + in.position(inIndex - in.arrayOffset()); + out.position(outIndex - out.arrayOffset()); + return CoderResult.malformedForLength(1); + } + inIndex += tail; + } + // Note: This is the additional test added + if (jchar >= 0xD800 && jchar <=0xDFFF) { + return CoderResult.unmappableForLength(3); + } + if (jchar <= 0xffff) { + cArr[outIndex++] = (char) jchar; + outRemaining--; + } else { + if (outRemaining < 2) { + return CoderResult.OVERFLOW; + } + cArr[outIndex++] = (char) ((jchar >> 0xA) + 0xD7C0); + cArr[outIndex++] = (char) ((jchar & 0x3FF) + 0xDC00); + outRemaining -= 2; + } + } + in.position(inIndex - in.arrayOffset()); + out.position(outIndex - out.arrayOffset()); + return (outRemaining == 0 && inIndex < inIndexLimit) ? + CoderResult.OVERFLOW : + CoderResult.UNDERFLOW; + } +} Index: java/org/apache/catalina/connector/Request.java =================================================================== --- java/org/apache/catalina/connector/Request.java (revision 1300373) +++ java/org/apache/catalina/connector/Request.java (working copy) @@ -2393,6 +2393,10 @@ coyoteRequest.action(ActionCode.ACTION_COMET_SETTIMEOUT,new Long(timeout)); } + public void protocolSwitch() { + coyoteRequest.action(ActionCode.ACTION_PROTOCOL_SWITCH, null); + } + // ------------------------------------------------------ Protected Methods Index: java/org/apache/catalina/connector/CometEventImpl.java =================================================================== --- java/org/apache/catalina/connector/CometEventImpl.java (revision 1300373) +++ java/org/apache/catalina/connector/CometEventImpl.java (working copy) @@ -30,6 +30,7 @@ public class CometEventImpl implements CometEvent { + public static final int PROTOCOL_SWITCH_ID = -10; /** * The string manager for this package. Index: java/org/apache/catalina/util/Conversions.java =================================================================== --- java/org/apache/catalina/util/Conversions.java (revision 0) +++ java/org/apache/catalina/util/Conversions.java (revision 0) @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.catalina.util; + +import java.io.IOException; + +public class Conversions { + + private Conversions() { + // Utility class. Hide default constructor. + } + + public static long byteArrayToLong(byte[] input) throws IOException { + if (input.length > 8) { + // TODO: Better message + throw new IOException(); + } + + int shift = 0; + long result = 0; + for (int i = input.length - 1; i >= 0; i--) { + result = result + ((input[i] & 0xFF) << shift); + shift += 8; + } + + return result; + } +} Index: webapps/examples/WEB-INF/web.xml =================================================================== --- webapps/examples/WEB-INF/web.xml (revision 1300373) +++ webapps/examples/WEB-INF/web.xml (working copy) @@ -303,5 +303,41 @@ java.lang.Integer 10 + + + wsEchoStream + websocket.EchoStream + + + wsEchoStream + /websocket/echoStream + + + wsEchoMessage + websocket.EchoMessage + + + byteBufferMaxSize + 20971520 + + + charBufferMaxSize + 20971520 + + + + wsEchoMessage + /websocket/echoMessage + + + wsSnake + websocket.snake.SnakeWebSocketServlet + + + wsSnake + /websocket/snake + Index: webapps/examples/WEB-INF/classes/websocket/snake/Direction.java =================================================================== --- webapps/examples/WEB-INF/classes/websocket/snake/Direction.java (revision 0) +++ webapps/examples/WEB-INF/classes/websocket/snake/Direction.java (revision 0) @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package websocket.snake; + +public enum Direction { + NONE, NORTH, SOUTH, EAST, WEST +} Index: webapps/examples/WEB-INF/classes/websocket/snake/Snake.java =================================================================== --- webapps/examples/WEB-INF/classes/websocket/snake/Snake.java (revision 0) +++ webapps/examples/WEB-INF/classes/websocket/snake/Snake.java (revision 0) @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package websocket.snake; + +import java.io.IOException; +import java.nio.CharBuffer; +import java.util.ArrayDeque; +import java.util.Collection; +import java.util.Deque; + +import org.apache.catalina.websocket.WsOutbound; + +public class Snake { + + private static final int DEFAULT_LENGTH = 5; + + private final int id; + private final WsOutbound outbound; + + private Direction direction; + private int length = DEFAULT_LENGTH; + private Location head; + private Deque tail = new ArrayDeque(); + private String hexColor; + + public Snake(int id, WsOutbound outbound) { + this.id = id; + this.outbound = outbound; + this.hexColor = SnakeWebSocketServlet.getRandomHexColor(); + resetState(); + } + + private void resetState() { + this.direction = Direction.NONE; + this.head = SnakeWebSocketServlet.getRandomLocation(); + this.tail.clear(); + this.length = DEFAULT_LENGTH; + } + + private synchronized void kill() { + resetState(); + try { + CharBuffer response = CharBuffer.wrap("{'type': 'dead'}"); + outbound.writeTextMessage(response); + } catch (IOException ioe) { + // Ignore + } + } + + private synchronized void reward() { + length++; + try { + CharBuffer response = CharBuffer.wrap("{'type': 'kill'}"); + outbound.writeTextMessage(response); + } catch (IOException ioe) { + // Ignore + } + } + + public synchronized void update(Collection snakes) { + Location nextLocation = head.getAdjacentLocation(direction); + if (nextLocation.x >= SnakeWebSocketServlet.PLAYFIELD_WIDTH) { + nextLocation.x = 0; + } + if (nextLocation.y >= SnakeWebSocketServlet.PLAYFIELD_HEIGHT) { + nextLocation.y = 0; + } + if (nextLocation.x < 0) { + nextLocation.x = SnakeWebSocketServlet.PLAYFIELD_WIDTH; + } + if (nextLocation.y < 0) { + nextLocation.y = SnakeWebSocketServlet.PLAYFIELD_HEIGHT; + } + if (direction != Direction.NONE) { + tail.addFirst(head); + if (tail.size() > length) { + tail.removeLast(); + } + head = nextLocation; + } + + for (Snake snake : snakes) { + if (snake.getTail().contains(head)) { + kill(); + if (id != snake.id) { + snake.reward(); + } + } + } + } + + public synchronized Collection getTail() { + return tail; + } + + public synchronized void setDirection(Direction direction) { + this.direction = direction; + } + + public synchronized String getLocationsJson() { + StringBuilder sb = new StringBuilder(); + sb.append(String.format("{x: %d, y: %d}", + Integer.valueOf(head.x), Integer.valueOf(head.y))); + for (Location location : tail) { + sb.append(','); + sb.append(String.format("{x: %d, y: %d}", + Integer.valueOf(location.x), Integer.valueOf(location.y))); + } + return String.format("{'id':%d,'body':[%s]}", + Integer.valueOf(id), sb.toString()); + } + + public int getId() { + return id; + } + + public String getHexColor() { + return hexColor; + } +} Index: webapps/examples/WEB-INF/classes/websocket/snake/Location.java =================================================================== --- webapps/examples/WEB-INF/classes/websocket/snake/Location.java (revision 0) +++ webapps/examples/WEB-INF/classes/websocket/snake/Location.java (revision 0) @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package websocket.snake; + +public class Location { + + public int x; + public int y; + + public Location(int x, int y) { + this.x = x; + this.y = y; + } + + public Location getAdjacentLocation(Direction direction) { + switch (direction) { + case NORTH: + return new Location(x, y - SnakeWebSocketServlet.GRID_SIZE); + case SOUTH: + return new Location(x, y + SnakeWebSocketServlet.GRID_SIZE); + case EAST: + return new Location(x + SnakeWebSocketServlet.GRID_SIZE, y); + case WEST: + return new Location(x - SnakeWebSocketServlet.GRID_SIZE, y); + case NONE: + // fall through + default: + return this; + } + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + Location location = (Location) o; + + if (x != location.x) return false; + if (y != location.y) return false; + + return true; + } + + @Override + public int hashCode() { + int result = x; + result = 31 * result + y; + return result; + } +} Index: webapps/examples/WEB-INF/classes/websocket/snake/SnakeWebSocketServlet.java =================================================================== --- webapps/examples/WEB-INF/classes/websocket/snake/SnakeWebSocketServlet.java (revision 0) +++ webapps/examples/WEB-INF/classes/websocket/snake/SnakeWebSocketServlet.java (revision 0) @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package websocket.snake; + +import java.awt.Color; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.CharBuffer; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.Random; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; + +import javax.servlet.ServletException; + +import org.apache.catalina.websocket.MessageInbound; +import org.apache.catalina.websocket.StreamInbound; +import org.apache.catalina.websocket.WebSocketServlet; +import org.apache.catalina.websocket.WsOutbound; +import org.apache.juli.logging.Log; +import org.apache.juli.logging.LogFactory; + +/** + * Example web socket servlet for simple multiplayer snake. + */ +public class SnakeWebSocketServlet extends WebSocketServlet { + + private static final long serialVersionUID = 1L; + + private static final Log log = + LogFactory.getLog(SnakeWebSocketServlet.class); + + public static final int PLAYFIELD_WIDTH = 640; + public static final int PLAYFIELD_HEIGHT = 480; + public static final int GRID_SIZE = 10; + + private static final long TICK_DELAY = 100; + + private static final Random random = new Random(); + + private final Timer gameTimer = + new Timer(SnakeWebSocketServlet.class.getSimpleName() + " Timer"); + + private final AtomicInteger connectionIds = new AtomicInteger(0); + private final ConcurrentHashMap snakes = + new ConcurrentHashMap(); + private final ConcurrentHashMap connections = + new ConcurrentHashMap(); + + @Override + public void init() throws ServletException { + super.init(); + gameTimer.scheduleAtFixedRate(new TimerTask() { + @Override + public void run() { + try { + tick(); + } catch (RuntimeException e) { + log.error("Caught to prevent timer from shutting down", e); + } + } + }, TICK_DELAY, TICK_DELAY); + } + + private void tick() { + StringBuilder sb = new StringBuilder(); + for (Iterator iterator = getSnakes().iterator(); + iterator.hasNext();) { + Snake snake = iterator.next(); + snake.update(getSnakes()); + sb.append(snake.getLocationsJson()); + if (iterator.hasNext()) { + sb.append(','); + } + } + broadcast(String.format("{'type': 'update', 'data' : [%s]}", + sb.toString())); + } + + private void broadcast(String message) { + for (SnakeMessageInbound connection : getConnections()) { + try { + CharBuffer response = CharBuffer.wrap(message); + connection.getWsOutbound().writeTextMessage(response); + } catch (IOException ignore) { + // Ignore + } + } + } + + private Collection getConnections() { + return Collections.unmodifiableCollection(connections.values()); + } + + private Collection getSnakes() { + return Collections.unmodifiableCollection(snakes.values()); + } + + public static String getRandomHexColor() { + float hue = random.nextFloat(); + // sat between 0.1 and 0.3 + float saturation = (random.nextInt(2000) + 1000) / 10000f; + float luminance = 0.9f; + Color color = Color.getHSBColor(hue, saturation, luminance); + return '#' + Integer.toHexString( + (color.getRGB() & 0xffffff) | 0x1000000).substring(1); + } + + public static Location getRandomLocation() { + int x = roundByGridSize( + random.nextInt(SnakeWebSocketServlet.PLAYFIELD_WIDTH)); + int y = roundByGridSize( + random.nextInt(SnakeWebSocketServlet.PLAYFIELD_HEIGHT)); + return new Location(x, y); + } + + private static int roundByGridSize(int value) { + value = value + (SnakeWebSocketServlet.GRID_SIZE / 2); + value = value / SnakeWebSocketServlet.GRID_SIZE; + value = value * SnakeWebSocketServlet.GRID_SIZE; + return value; + } + + @Override + public void destroy() { + super.destroy(); + if (gameTimer != null) { + gameTimer.cancel(); + } + } + + @Override + protected StreamInbound createWebSocketInbound(String subProtocol) { + return new SnakeMessageInbound(connectionIds.incrementAndGet()); + } + + private final class SnakeMessageInbound extends MessageInbound { + + private final int id; + private Snake snake; + + private SnakeMessageInbound(int id) { + this.id = id; + } + + @Override + protected void onOpen(WsOutbound outbound) { + this.snake = new Snake(id, outbound); + snakes.put(Integer.valueOf(id), snake); + connections.put(Integer.valueOf(id), this); + StringBuilder sb = new StringBuilder(); + for (Iterator iterator = getSnakes().iterator(); + iterator.hasNext();) { + Snake snake = iterator.next(); + sb.append(String.format("{id: %d, color: '%s'}", + Integer.valueOf(snake.getId()), snake.getHexColor())); + if (iterator.hasNext()) { + sb.append(','); + } + } + broadcast(String.format("{'type': 'join','data':[%s]}", + sb.toString())); + } + + @Override + protected void onClose(int status) { + connections.remove(Integer.valueOf(id)); + snakes.remove(Integer.valueOf(id)); + broadcast(String.format("{'type': 'leave', 'id': %d}", + Integer.valueOf(id))); + } + + @Override + protected void onBinaryMessage(ByteBuffer message) throws IOException { + throw new UnsupportedOperationException( + "Binary message not supported."); + } + + @Override + protected void onTextMessage(CharBuffer charBuffer) throws IOException { + String message = charBuffer.toString(); + if ("west".equals(message)) { + snake.setDirection(Direction.WEST); + } else if ("north".equals(message)) { + snake.setDirection(Direction.NORTH); + } else if ("east".equals(message)) { + snake.setDirection(Direction.EAST); + } else if ("south".equals(message)) { + snake.setDirection(Direction.SOUTH); + } + } + } +} Index: webapps/examples/WEB-INF/classes/websocket/EchoMessage.java =================================================================== --- webapps/examples/WEB-INF/classes/websocket/EchoMessage.java (revision 0) +++ webapps/examples/WEB-INF/classes/websocket/EchoMessage.java (revision 0) @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package websocket; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.CharBuffer; + +import javax.servlet.ServletException; + +import org.apache.catalina.websocket.MessageInbound; +import org.apache.catalina.websocket.StreamInbound; +import org.apache.catalina.websocket.WebSocketServlet; + + +public class EchoMessage extends WebSocketServlet { + + private static final long serialVersionUID = 1L; + private volatile int byteBufSize; + private volatile int charBufSize; + + @Override + public void init() throws ServletException { + super.init(); + byteBufSize = getInitParameterIntValue("byteBufferMaxSize", 2097152); + charBufSize = getInitParameterIntValue("charBufferMaxSize", 2097152); + } + + public int getInitParameterIntValue(String name, int defaultValue) { + String val = this.getInitParameter(name); + int result = defaultValue; + try { + result = Integer.parseInt(val); + }catch (Exception x) { + } + return result; + } + + + + @Override + protected StreamInbound createWebSocketInbound(String subProtocol) { + return new EchoMessageInbound(byteBufSize,charBufSize); + } + + private static final class EchoMessageInbound extends MessageInbound { + + public EchoMessageInbound(int byteBufferMaxSize, int charBufferMaxSize) { + super(); + setByteBufferMaxSize(byteBufferMaxSize); + setCharBufferMaxSize(charBufferMaxSize); + } + + @Override + protected void onBinaryMessage(ByteBuffer message) throws IOException { + getWsOutbound().writeBinaryMessage(message); + } + + @Override + protected void onTextMessage(CharBuffer message) throws IOException { + getWsOutbound().writeTextMessage(message); + } + } +} Index: webapps/examples/WEB-INF/classes/websocket/EchoStream.java =================================================================== --- webapps/examples/WEB-INF/classes/websocket/EchoStream.java (revision 0) +++ webapps/examples/WEB-INF/classes/websocket/EchoStream.java (revision 0) @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package websocket; + +import java.io.IOException; +import java.io.InputStream; +import java.io.Reader; + +import org.apache.catalina.websocket.StreamInbound; +import org.apache.catalina.websocket.WebSocketServlet; +import org.apache.catalina.websocket.WsOutbound; + + +public class EchoStream extends WebSocketServlet { + + private static final long serialVersionUID = 1L; + + @Override + protected StreamInbound createWebSocketInbound(String subProtocol) { + return new EchoStreamInbound(); + } + + private static final class EchoStreamInbound extends StreamInbound { + + @Override + protected void onBinaryData(InputStream is) throws IOException { + // Simply echo the data to back to the client. + WsOutbound outbound = getWsOutbound(); + + int i = is.read(); + while (i != -1) { + outbound.writeBinaryData(i); + i = is.read(); + } + + outbound.flush(); + } + + @Override + protected void onTextData(Reader r) throws IOException { + // Simply echo the data to back to the client. + WsOutbound outbound = getWsOutbound(); + + int c = r.read(); + while (c != -1) { + outbound.writeTextData((char) c); + c = r.read(); + } + + outbound.flush(); + } + } +}