diff --git a/java/org/apache/catalina/util/Conversions.java b/java/org/apache/catalina/util/Conversions.java index 8161213..322fdbb 100644 --- a/java/org/apache/catalina/util/Conversions.java +++ b/java/org/apache/catalina/util/Conversions.java @@ -32,7 +32,7 @@ public class Conversions { int shift = 0; long result = 0; - for (int i = input.length; i < 0; i--) { + for (int i = input.length - 1; i >= 0; i--) { result = result + ((input[i] & 0xFF) << shift); shift += 8; } diff --git a/java/org/apache/catalina/websocket/MaskingStream.java b/java/org/apache/catalina/websocket/MaskingStream.java new file mode 100644 index 0000000..ba70ca6 --- /dev/null +++ b/java/org/apache/catalina/websocket/MaskingStream.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.catalina.websocket; + +import java.io.FilterInputStream; +import java.io.IOException; +import java.io.InputStream; + +/** + * This class masks (and demasks) WebSocket payload streams + */ +public class MaskingStream extends InputStream +{ + /** + * Length in bytes of the masking key field + */ + private static final int maskingKeyLength = 4; + + /** + * The bit mask representing the useful bits of the mask itself + */ + private static final int maskingKeyBits = maskingKeyLength - 1; + + /** + * The underlying stream being masked + */ + private final InputStream input; + + /** + * The masking key + */ + private final byte[] mask; + + /** + * The current position in the mask + */ + private int position = -1; + + /** + * Masks (or demasks) the given input stream + * @param InputStream the stream to mask + * @throws IllegalArgumentException + */ + public MaskingStream(InputStream streamToMask, byte[] maskingKey) + { + input = streamToMask; + + if(maskingKey == null || maskingKey.length != maskingKeyLength) + { + throw new IllegalArgumentException("invalid masking key"); + } + + mask = maskingKey; + } + + /** + * @see FilterInputStream + */ + @Override + public int read() throws IOException + { + // Read the next byte and check for end of stream + int nextByte = input.read(); + if(nextByte == -1) { + return -1; + } + + // Advance to the next place in the mask + position = (position + 1) & maskingKeyBits; + + return (nextByte ^ mask[position]); + } +} diff --git a/java/org/apache/catalina/websocket/MessageInbound.java b/java/org/apache/catalina/websocket/MessageInbound.java deleted file mode 100644 index a9a1b4c..0000000 --- a/java/org/apache/catalina/websocket/MessageInbound.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.catalina.websocket; - -import java.io.IOException; -import java.io.InputStream; -import java.io.Reader; -import java.nio.ByteBuffer; -import java.nio.CharBuffer; - -public abstract class MessageInbound extends StreamInbound { - - // TODO: Make buffer sizes configurable - // TODO: Allow buffers to expand - ByteBuffer bb = ByteBuffer.allocate(8192); - CharBuffer cb = CharBuffer.allocate(8192); - - @Override - protected void onBinaryData(InputStream is) throws IOException { - int read = 0; - while (read > -1) { - bb.position(bb.position() + read); - read = is.read(bb.array(), bb.position(), bb.remaining()); - } - bb.flip(); - onBinaryMessage(bb); - bb.clear(); - } - - @Override - protected void onTextData(Reader r) throws IOException { - int read = 0; - while (read > -1) { - cb.position(cb.position() + read); - read = r.read(cb.array(), cb.position(), cb.remaining()); - } - cb.limit(cb.position()); - cb.position(0); - onTextMessage(cb); - cb.clear(); - } - - protected abstract void onBinaryMessage(ByteBuffer message) - throws IOException; - protected abstract void onTextMessage(CharBuffer message) - throws IOException; -} diff --git a/java/org/apache/catalina/websocket/StreamInbound.java b/java/org/apache/catalina/websocket/StreamInbound.java deleted file mode 100644 index a230edb..0000000 --- a/java/org/apache/catalina/websocket/StreamInbound.java +++ /dev/null @@ -1,148 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.catalina.websocket; - -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.io.Reader; - -import org.apache.catalina.util.Conversions; -import org.apache.coyote.http11.upgrade.UpgradeInbound; -import org.apache.coyote.http11.upgrade.UpgradeOutbound; -import org.apache.coyote.http11.upgrade.UpgradeProcessor; -import org.apache.tomcat.util.buf.B2CConverter; -import org.apache.tomcat.util.net.AbstractEndpoint.Handler.SocketState; - -public abstract class StreamInbound implements UpgradeInbound { - - // These attributes apply to the current frame being processed - private boolean fin = true; - private boolean rsv1 = false; - private boolean rsv2 = false; - private boolean rsv3 = false; - private int opCode = -1; - private long payloadLength = -1; - - // These attributes apply to the message that may be spread over multiple - // frames - // TODO - - private UpgradeProcessor processor = null; - private WsOutbound outbound; - - @Override - public void setUpgradeOutbound(UpgradeOutbound upgradeOutbound) { - outbound = new WsOutbound(upgradeOutbound); - } - - - @Override - public void setUpgradeProcessor(UpgradeProcessor processor) { - this.processor = processor; - } - - public WsOutbound getStreamOutbound() { - return outbound; - } - - @Override - public SocketState onData() throws IOException { - // Must be start the start of a frame - - // Read the first byte - int i = processor.read(); - - fin = (i & 0x80) > 0; - - rsv1 = (i & 0x40) > 0; - rsv2 = (i & 0x20) > 0; - rsv3 = (i & 0x10) > 0; - - if (rsv1 || rsv2 || rsv3) { - // TODO: Not supported. - } - - opCode = (i & 0x0F); - validateOpCode(opCode); - - // Read the next byte - i = processor.read(); - - // Client data must be masked and this isn't - if ((i & 0x80) == 0) { - // TODO: Better message - throw new IOException(); - } - - payloadLength = i & 0x7F; - if (payloadLength == 126) { - byte[] extended = new byte[2]; - processor.read(extended); - payloadLength = Conversions.byteArrayToLong(extended); - } else if (payloadLength == 127) { - byte[] extended = new byte[8]; - processor.read(extended); - payloadLength = Conversions.byteArrayToLong(extended); - } - - byte[] mask = new byte[4]; - processor.read(mask); - - if (opCode == 1 || opCode == 2) { - WsInputStream wsIs = new WsInputStream(processor, mask, - payloadLength); - if (opCode == 2) { - onBinaryData(wsIs); - } else { - InputStreamReader r = - new InputStreamReader(wsIs, B2CConverter.UTF_8); - onTextData(r); - } - } - - // TODO: Doesn't currently handle multi-frame messages. That will need - // some refactoring. - - // TODO: Per frame extension handling is not currently supported. - - // TODO: Handle other control frames. - - // TODO: Handle control frames appearing in the middle of a multi-frame - // message - - return SocketState.UPGRADED; - } - - protected abstract void onBinaryData(InputStream is) throws IOException; - protected abstract void onTextData(Reader r) throws IOException; - - private void validateOpCode(int opCode) throws IOException { - switch (opCode) { - case 0: - case 1: - case 2: - case 8: - case 9: - case 10: - break; - default: - // TODO: Message - throw new IOException(); - } - } -} diff --git a/java/org/apache/catalina/websocket/WebSocketConnection.java b/java/org/apache/catalina/websocket/WebSocketConnection.java new file mode 100644 index 0000000..08aa164 --- /dev/null +++ b/java/org/apache/catalina/websocket/WebSocketConnection.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.catalina.websocket; + +import java.io.IOException; + +import org.apache.coyote.http11.upgrade.UpgradeInbound; +import org.apache.coyote.http11.upgrade.UpgradeOutbound; +import org.apache.coyote.http11.upgrade.UpgradeProcessor; +import org.apache.tomcat.util.net.AbstractEndpoint.Handler.SocketState; +import org.apache.catalina.websocket.WebSocketFrame.OpCode; + +public abstract class WebSocketConnection implements UpgradeInbound { + + // TODO much better fragmentation system + private boolean currentlyFragmented = false; + private OpCode currentDataOpcode = null; + + private UpgradeProcessor processor = null; + private UpgradeOutbound outbound; + + @Override + public void setUpgradeOutbound(UpgradeOutbound upgradeOutbound) { + outbound = upgradeOutbound; + } + + @Override + public void setUpgradeProcessor(UpgradeProcessor processor) { + this.processor = processor; + } + + @Override + public SocketState onData() throws IOException { + // Must be the start of a frame + WebSocketFrame frame = WebSocketFrame.decode(processor); + + // Fragmentation + if (currentlyFragmented) { + // This frame is inside a fragmented message + + // Reject non-continuation data frames + if(frame.isData()) { + writeFrame(WebSocketFrame.protocolErrorCloseFrame()); + closeImmediately(); + } + } else { + // This frame is the first frame of a new message + + // Reject spurious continuation frames + if (frame.getOpcode() == OpCode.Continuation) { + writeFrame(WebSocketFrame.protocolErrorCloseFrame()); + closeImmediately(); + } + } + + // Route the frame + if (frame.isData() || frame.getOpcode() == OpCode.Continuation) { + handleDataFrame(frame); + + // Update fragmentation state for next time + if(frame.isFin()) { + currentlyFragmented = false; + } else { + currentlyFragmented = true; + currentDataOpcode = frame.getOpcode(); + } + } else if (frame.isControl()) { + handleControl(frame); + } + + // TODO per-frame extension handling is not currently supported. + + return SocketState.UPGRADED; + } + + private void handleControl(WebSocketFrame frame) throws IOException { + // Control frames must not be fragmented + if (frame.isFin() == false) { + writeFrame(WebSocketFrame.protocolErrorCloseFrame()); + closeImmediately(); + } + + // Control frames must not have extended length + if (frame.getPayloadLength() > 125) { + writeFrame(WebSocketFrame.protocolErrorCloseFrame()); + closeImmediately(); + } + + switch (frame.getOpcode()) { + case Ping: + System.out.println(""); + writeFrame(WebSocketFrame.makePong(frame)); + break; + case Pong: + System.out.println(""); + break; + case ConnectionClose: + // Reply with a close + writeFrame(WebSocketFrame.closeFrame()); + closeImmediately(); + break; + } + } + + private void closeImmediately() throws IOException { + // drop the TCP connection + processor.close(); + } + + public void writeFrame(WebSocketFrame frame) throws IOException { + frame.encode(outbound); + outbound.flush(); + } + + private void handleDataFrame(WebSocketFrame frame) throws IOException { + OpCode opcode = frame.getOpcode(); + + if(opcode == OpCode.Continuation) { + opcode = currentDataOpcode; + } + + switch (opcode) { + case Text: + onTextData(frame); + break; + + case Binary: + onBinaryData(frame); + break; + } + } + + protected abstract void onTextData(WebSocketFrame frame) throws IOException; + + protected abstract void onBinaryData(WebSocketFrame frame) + throws IOException; + + protected abstract void endOfMessage(); +} diff --git a/java/org/apache/catalina/websocket/WebSocketFrame.java b/java/org/apache/catalina/websocket/WebSocketFrame.java new file mode 100644 index 0000000..a4849f1 --- /dev/null +++ b/java/org/apache/catalina/websocket/WebSocketFrame.java @@ -0,0 +1,598 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.catalina.websocket; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.io.Reader; +import java.nio.ByteBuffer; +import java.nio.charset.Charset; + +import org.apache.catalina.util.Conversions; +import org.apache.catalina.util.IOTools; +import org.apache.coyote.http11.upgrade.UpgradeProcessor; + +/* + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 + +-+-+-+-+-------+-+-------------+-------------------------------+ + |F|R|R|R| opcode|M| Payload len | Extended payload length | + |I|S|S|S| (4) |A| (7) | (16/64) | + |N|V|V|V| |S| | (if payload len==126/127) | + | |1|2|3| |K| | | + +-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - - + + | Extended payload length continued, if payload len == 127 | + + - - - - - - - - - - - - - - - +-------------------------------+ + | |Masking-key, if MASK set to 1 | + +-------------------------------+-------------------------------+ + | Masking-key (continued) | Payload Data | + +-------------------------------- - - - - - - - - - - - - - - - + + : Payload Data continued ... : + + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + + | Payload Data continued ... | + +---------------------------------------------------------------+ + */ + +/** + * Object representation of a WebSocket frame. It knows how to decode itself + * from an InputStream + */ +public class WebSocketFrame { + /** + * The character set used to encode text frames + */ + private static final Charset textCharset = Charset.forName("UTF-8"); + + /** + * The maximum supported payload length (temporary implementation) + */ + private static final long maxSupportedPayloadLength = 1 << 25; // 32 MB + + /** + * FIN bit, every non-fragmented bit should have this set. It has nothing to + * do with closing of connection. + */ + private boolean fin; + + /** + * Type of the frame. + */ + private OpCode opcode; + + /** + * Whether this frame's data are masked by maskingKey. + */ + private boolean mask; + + /** + * If the payload (data) is masked, it needs to be XORed (in a special way) + * with this value. + */ + private byte[] maskingKey; + + /** + * Length of the payload in bytes + */ + private long payloadLength; + + /** + * The payload stream (someday this will allow for 63-bit payloads) + */ + private InputStream payload; + + /** + * Type of frame + */ + public enum OpCode { + Continuation(0x0), Text(0x1), Binary(0x2), + ConnectionClose(0x8), Ping(0x9), Pong(0xA); + + private final int opcode; + + OpCode(int opcode) { + this.opcode = opcode; + } + + public int getOpCodeNumber() { + return this.opcode; + } + + @Override + public String toString() { + return this.name(); + } + + public static OpCode getOpCodeByNumber(int number) throws IOException { + for (OpCode opcode : OpCode.values()) { + if (opcode.getOpCodeNumber() == number) + return opcode; + } + + throw new IOException("invalid opcode"); + } + } + + public enum StatusCode { + NormalClose(1000), ProtocolErrorClose(1002), MessageTooBig(1009); + // TODO there are far more status codes defined: + // http://tools.ietf.org/html/rfc6455#section-7.4 + + private final int statusCode; + + StatusCode(int statusCode) { + this.statusCode = statusCode; + } + + public int getStatusCodeNumber() { + return this.statusCode; + } + + public byte[] encode() { + // Status codes are 16-bit unsigned integers + ByteBuffer code = ByteBuffer.allocate(2); + code.putShort((short) statusCode); + + // TODO include optional UTF-8 explanation in closing frames + // (must adjust the size of the buffer accordingly) + + return code.array(); + } + + @Override + public String toString() { + return this.name(); + } + } + + public static WebSocketFrame decode(final UpgradeProcessor processor) + throws IOException + { + return decode(new InputStream() { + @Override + public int read() throws IOException { + return processor.read(); + } + }); + } + + public static WebSocketFrame decode(InputStream input) + throws IOException { + // Read the first byte + int i = input.read(); + + if (i == -1) { + throw new IOException("reached end of stream"); + } + + // Build a frame from scratch + WebSocketFrame frame = new WebSocketFrame(); + + // Set the fin bit + frame.setFin((i & 0x80) > 0); + + // Extract the reserved bits + boolean rsv1 = (i & 0x40) > 0; + boolean rsv2 = (i & 0x20) > 0; + boolean rsv3 = (i & 0x10) > 0; + + // For now, require all reserved bits to be cleared + if (rsv1 || rsv2 || rsv3) { + // TODO better error message for reserved bits + throw new IOException("reserved bits must not be set"); + } + + // Set the opcode + frame.setOpcode(OpCode.getOpCodeByNumber((i & 0x0F))); + + // Read the second byte + i = input.read(); + + // Set the mask + frame.setMask((i & 0x80) > 0); + + // Read the payload length + // (not set until payload is actually read) + long payloadLength = i & 0x7F; + + // Read the extended payload length field, if present + if (payloadLength == 126) { + // Read the 16-bit field + byte[] extended = readAll(input, 2); + + // Set the actual payload length + payloadLength = Conversions.byteArrayToLong(extended); + + } else if (payloadLength == 127) { + // Read the 63-bit field + byte[] extended = readAll(input, 8); + + // Set the actual payload length + payloadLength = Conversions.byteArrayToLong(extended); + + } + + // Read the masking key, if present + if (frame.isMask()) { + byte[] maskingKey = readAll(input, 4); + frame.setMaskingKey(maskingKey); + } else { + // This is a server, so require client data to be masked + // TODO better error message for unmasked client data + throw new IOException("client data must be masked"); + } + + // Decode the payload + validatePayloadLength(payloadLength); + byte[] payload = readAll(input, (int) payloadLength); + + // Set the payload (implicitly sets the payload length) + frame.setPayload(payload); + + // Unmask the payload + frame.maskPayload(); + + // Return the fully decoded frame + return frame; + } + + /** + * Writes this frame to the given stream + * @param OutputStream the stream to write to + */ + public void encode(OutputStream output) throws IOException { + // Encode the first byte (flags and opcode) + int flagsAndOpcode = 0; + + // Set the final fragment bit + flagsAndOpcode = flagsAndOpcode | (fin ? 0x80 : 0x00); + + // Set reserve bits + // flagsAndOpcode = flagsAndOpcode | rsv1 | rsv2 | rsv3; + + // Set the opcode + flagsAndOpcode = flagsAndOpcode | opcode.getOpCodeNumber(); + + // Write the first byte (flags and opcode) + output.write(flagsAndOpcode); + + // Encode the second byte (masking bit and payload length) + int maskAndLength = 0; + + // Set the masking bit + maskAndLength = maskAndLength | (mask ? 0x80 : 0x00); + + // Determine if we need an extended length field + byte[] extendedLength = null; + + if (payloadLength > 0xffff) { // 63-bit extended length + // Set the length field + maskAndLength = maskAndLength | 127; + + // Write the extended field + extendedLength = getBytes(payloadLength, 8); + + // TODO implement 63-bit payloads + throw new UnsupportedOperationException( + "63-bit payloads not supported"); + + } else if (payloadLength > 125) { // 16-bit extended length + // Set the length field + maskAndLength = maskAndLength | 126; + + // Write the extended field + extendedLength = getBytes(payloadLength, 2); + } + else + { + // Set the length field + maskAndLength = maskAndLength | (int) payloadLength; + } + + // Write the mask and length fields + output.write(maskAndLength); + + // Write the extended length field, if any + if(extendedLength != null) { + output.write(extendedLength); + } + + if (mask) { + // Write the masking key + output.write(maskingKey); + + // Mask the payload + maskPayload(); + } + + // Write the payload + validatePayloadLength(payloadLength); + IOTools.flow(payload, output); + } + + private static void validatePayloadLength(long length) { + // This is a first-draft implementation, without a fancy + // streaming API to support really huge messages, so we + // an artificial limit on payload length to make things easier. + // TODO remove this method when streaming payload is implemented + if (length > maxSupportedPayloadLength) { + throw new UnsupportedOperationException("payload too large"); + } + } + + /** + * Creates a new control frame for closing the connection + * + * @return normal closing frame + */ + public static WebSocketFrame closeFrame() { + return new WebSocketFrame(true, OpCode.ConnectionClose, + StatusCode.NormalClose.encode()); + } + + /** + * Creates a new control frame for closing the connection with protocol + * error. + * + * @return protocol error closing frame + */ + public static WebSocketFrame protocolErrorCloseFrame() { + return new WebSocketFrame(true, OpCode.ConnectionClose, + StatusCode.ProtocolErrorClose.encode()); + } + + /** + * Wrapper around constructor that allows to easily send a text message. + * + * @param message + * text of the message + * @return frame with message encoded as data in the frame + */ + public static WebSocketFrame message(String message) { + return new WebSocketFrame(true, OpCode.Text, + message.getBytes(textCharset)); + } + + /** + * Convenient method that makes it easy to send a pong reply + * + * @param WebSocketFrame + * the ping message to which to reply + * + * @return the reply to the given ping + */ + public static WebSocketFrame makePong(WebSocketFrame frame) { + // Actually, we just need to flip the mask and set the new opcode + frame.setMask(!frame.isMask()); + frame.setOpcode(OpCode.Pong); + + return frame; + } + + /** + * Private constructor for null frames + */ + private WebSocketFrame() { + } + + /** + * Constructor for frames with unmasked payload + * + * @param fin + * whether FIN bit should be set + * @param opcode + * type of frame + * @param mask + * whether this frame is masked + * @param payload + * the byte array containing the payload + */ + public WebSocketFrame(boolean fin, OpCode opcode, byte[] payload) { + this.fin = fin; + this.mask = false; + this.opcode = opcode; + setPayload(payload); + } + + /** + * Constructor for frames with unmasked payload + * + * @param fin + * whether FIN bit should be set + * @param opcode + * type of frame + * @param payload + * the byte buffer containing the payload + */ + public WebSocketFrame(boolean fin, OpCode opcode, + ByteBuffer payload) { + this.fin = fin; + this.mask = true; + this.opcode = opcode; + setPayload(payload); + } + + @Override + public String toString() { + return String.format("FIN:%s OPCODE:%s MASK:%s LEN:%s\n", fin ? "1" + : "0", opcode, mask ? "1" : "0", payloadLength); + } + + /** + * @return Whether this frame is the final frame. + */ + public boolean isFin() { + return fin; + } + + public void setFin(boolean fin) { + this.fin = fin; + } + + public OpCode getOpcode() { + return opcode; + } + + public void setOpcode(OpCode opcode) { + this.opcode = opcode; + } + + /** + * Indicates whether this frame has Connection Close flag set and therefore + * the endpoint receiving this frame must close connection. + */ + public boolean isClose() { + return getOpcode().equals(OpCode.ConnectionClose); + } + + public boolean isMask() { + return mask; + } + + public void setMask(boolean mask) { + this.mask = mask; + } + + public void toggleMask() { + setMask(!mask); + } + + /** + * @return true iff this frame contains binary or text data + */ + public boolean isData() { + return opcode.equals(OpCode.Binary) || opcode.equals(OpCode.Text); + } + + /** + * Finds out whether this frame is a control frame. + * + * @return true iff this frame is a control frame + */ + public boolean isControl() { + return opcode.equals(OpCode.ConnectionClose) + || opcode.equals(OpCode.Ping) || opcode.equals(OpCode.Pong); + } + + public byte[] getMaskingKey() { + return maskingKey; + } + + public void setMaskingKey(byte[] maskingKey) { + this.maskingKey = maskingKey; + } + + private void maskPayload() + { + payload = new MaskingStream(payload, maskingKey); + } + + public long getPayloadLength() { + return payloadLength; + } + + /** + * @returns the payload + */ + public InputStream getPayload() { + return payload; + } + + public Reader readPayload() { + return new InputStreamReader(payload, textCharset); + } + + public void setPayload(InputStream newPayload, long newPayloadLength) { + this.payload = newPayload; + this.payloadLength = newPayloadLength; + } + + public void setPayload(byte[] newPayload) { + // Convert to stream + setPayload(new ByteArrayInputStream(newPayload), newPayload.length); + } + + public void setPayload(ByteBuffer newPayload) { + // Convert to stream + setPayload(new ByteArrayInputStream(newPayload.array(), + newPayload.position(), newPayload.remaining()), + newPayload.remaining()); + } + + /** + * Safely reads available bytes from stream into a byte array + * @param InputStream the stream to read from + * @param int the number of bytes to read + * @return a byte array containing input bytes or null on failure + * @throws IOException + */ + public static byte[] readAll(InputStream input, int length) + throws IOException + { + // Declare the byte array + byte[] buffer = new byte[length]; + + // See how many bytes are returned + int totalBytesRead = 0; + + // Read up bytes until we have them all or there aren't any more + while(totalBytesRead < length) + { + // Count the number of bytes read + int bytesRead = input.read(buffer, totalBytesRead, + length - totalBytesRead); + + // Check for end of input + if(bytesRead == -1) break; + + // Total the number of bytes read + totalBytesRead += bytesRead; + } + + // Ensure we read all the bytes + if(totalBytesRead != length) + { + throw new IOException("stopped reading bytes prematurely"); + } + + // Return the byte array + return buffer; + } + + /** + * Extracts the bytes of an unsigned integer as bytes + * @param long the unsigned integer whose bytes are to be extracted + * @param int the number of significant bytes (low-order) + * @return a byte array representing the bytes of the unsigned integer + */ + public static byte[] getBytes(long unsignedInt, int numBytes) + { + // Initialize the byte array + byte[] array = new byte[numBytes]; + + // Extract each bytes + for(int i = 0; i < array.length; ++i) + { + array[numBytes - i - 1] = (byte) (unsignedInt >> (i * 8)); + } + + // Return the constructed array + return array; + } +} diff --git a/java/org/apache/catalina/websocket/WebSocketServlet.java b/java/org/apache/catalina/websocket/WebSocketServlet.java index 1c11e6a..a363e83 100644 --- a/java/org/apache/catalina/websocket/WebSocketServlet.java +++ b/java/org/apache/catalina/websocket/WebSocketServlet.java @@ -62,11 +62,9 @@ public abstract class WebSocketServlet extends HttpServlet { @Override protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { - + // Information required to send the server handshake message String key; - String subProtocol = null; - List extensions = Collections.emptyList(); if (!headerContains(req, "upgrade", "websocket")) { resp.sendError(HttpServletResponse.SC_BAD_REQUEST); @@ -96,27 +94,31 @@ public abstract class WebSocketServlet extends HttpServlet { return; } - // TODO Read client handshake - Sec-WebSocket-Protocol - // Sec-WebSocket-Extensions + String subprotocol = req.getHeader("Sec-WebSocket-Protocol"); + if (subprotocol != null) { + // TODO future websocket subprotocols + } // TODO Extensions require the ability to specify something (API TBD) // that can be passed to the Tomcat internals and process extension // data present when the frame is fragmented. - + List extensions = Collections.emptyList(); + String extensionString = req.getHeader("Sec-WebSocket-Extensions"); + if (extensionString != null) { + // TODO read websocket extensions + } + if (!extensions.isEmpty()) { + // TODO future websocket extensions + } + // If we got this far, all is good. Accept the connection. resp.setHeader("upgrade", "websocket"); resp.setHeader("connection", "upgrade"); resp.setHeader("Sec-WebSocket-Accept", getWebSocketAccept(key)); - if (subProtocol != null) { - // TODO - } - if (!extensions.isEmpty()) { - // TODO - } // Small hack until the Servlet API provides a way to do this. - StreamInbound inbound = createWebSocketInbound(); - ((RequestFacade) req).doUpgrade(inbound); + WebSocketConnection connection = createWebSocketConnection(); + ((RequestFacade) req).doUpgrade(connection); } @@ -163,5 +165,5 @@ public abstract class WebSocketServlet extends HttpServlet { return true; } - protected abstract StreamInbound createWebSocketInbound(); + protected abstract WebSocketConnection createWebSocketConnection(); } diff --git a/java/org/apache/catalina/websocket/WsInputStream.java b/java/org/apache/catalina/websocket/WsInputStream.java deleted file mode 100644 index 7886d49..0000000 --- a/java/org/apache/catalina/websocket/WsInputStream.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.catalina.websocket; - -import java.io.IOException; - -import org.apache.coyote.http11.upgrade.UpgradeProcessor; - -public class WsInputStream extends java.io.InputStream { - - private UpgradeProcessor processor; - private byte[] mask; - private long remaining; - private long read; - - public WsInputStream(UpgradeProcessor processor, byte[] mask, - long remaining) { - this.processor = processor; - this.mask = mask; - this.remaining = remaining; - this.read = 0; - } - - @Override - public int read() throws IOException { - if (remaining == 0) { - return -1; - } - - remaining--; - read++; - - int masked = processor.read(); - return masked ^ mask[(int) ((read - 1) % 4)]; - } - -} diff --git a/java/org/apache/catalina/websocket/WsOutbound.java b/java/org/apache/catalina/websocket/WsOutbound.java deleted file mode 100644 index 2aacbe5..0000000 --- a/java/org/apache/catalina/websocket/WsOutbound.java +++ /dev/null @@ -1,178 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.catalina.websocket; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.CharBuffer; - -import org.apache.coyote.http11.upgrade.UpgradeOutbound; -import org.apache.tomcat.util.buf.B2CConverter; - -public class WsOutbound { - - private static final int DEFAULT_BUFFER_SIZE = 2048; - - private UpgradeOutbound upgradeOutbound; - private ByteBuffer bb; - private CharBuffer cb; - protected Boolean text = null; - protected boolean firstFrame = true; - - - public WsOutbound(UpgradeOutbound upgradeOutbound) { - this.upgradeOutbound = upgradeOutbound; - // TODO: Make buffer size configurable - // Byte buffer needs to be 4* char buffer to be sure that char buffer - // can always we written into Byte buffer - this.bb = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE * 4); - this.cb = CharBuffer.allocate(DEFAULT_BUFFER_SIZE); - } - - - public void writeBinaryData(int b) throws IOException { - if (bb.position() == bb.capacity()) { - doFlush(false); - } - if (text == null) { - text = Boolean.FALSE; - } else if (text == Boolean.TRUE) { - // Flush the character data - flush(); - text = Boolean.FALSE; - } - bb.put((byte) (b & 0xFF)); - } - - - public void writeTextData(char c) throws IOException { - if (cb.position() == cb.capacity()) { - doFlush(false); - } - - if (text == null) { - text = Boolean.TRUE; - } else if (text == Boolean.FALSE) { - // Flush the binary data - flush(); - text = Boolean.TRUE; - } - cb.append(c); - } - - - public void writeBinaryMessage(ByteBuffer msgBb) throws IOException { - if (text != null) { - // Empty the buffer - flush(); - } - text = Boolean.FALSE; - doWriteBinary(msgBb, true); - } - - - public void writeTextMessage(CharBuffer msgCb) throws IOException { - if (text != null) { - // Empty the buffer - flush(); - } - text = Boolean.TRUE; - doWriteText(msgCb, true); - } - - - public void flush() throws IOException { - doFlush(true); - } - - private void doFlush(boolean finalFragment) throws IOException { - if (text == null) { - // No data - return; - } - if (text.booleanValue()) { - doWriteText(cb, finalFragment); - } else { - doWriteBinary(bb, finalFragment); - } - } - - - public void close() throws IOException { - doFlush(true); - - // TODO: Send a close message - bb = null; - cb = null; - upgradeOutbound = null; - } - - - protected void doWriteBinary(ByteBuffer buffer, boolean finalFragment) - throws IOException { - - // Prepare to write - buffer.flip(); - - // Work out the first byte - int first = 0x00; - if (finalFragment) { - first = first + 0x80; - } - if (firstFrame) { - if (text.booleanValue()) { - first = first + 0x1; - } else { - first = first + 0x2; - } - } - // Continuation frame is OpCode 0 - upgradeOutbound.write(first); - - // Note: buffer will never be more than 2^16 in length - if (buffer.limit() < 126) { - upgradeOutbound.write(buffer.limit()); - } else { - upgradeOutbound.write(126); - upgradeOutbound.write(buffer.limit() >>> 8); - upgradeOutbound.write(buffer.limit() & 0xFF); - } - - // Write the content - upgradeOutbound.write(buffer.array(), 0, buffer.limit()); - upgradeOutbound.flush(); - - // Reset - if (finalFragment) { - text = null; - firstFrame = true; - } else { - firstFrame = false; - } - bb.clear(); - } - - - protected void doWriteText(CharBuffer buffer, boolean finalFragment) - throws IOException { - buffer.flip(); - B2CConverter.UTF_8.newEncoder().encode(buffer, bb, true); - doWriteBinary(bb, finalFragment); - // Reset - cb.clear(); - } -} diff --git a/java/org/apache/coyote/http11/upgrade/UpgradeAprProcessor.java b/java/org/apache/coyote/http11/upgrade/UpgradeAprProcessor.java index 211b022..dce5c3a 100644 --- a/java/org/apache/coyote/http11/upgrade/UpgradeAprProcessor.java +++ b/java/org/apache/coyote/http11/upgrade/UpgradeAprProcessor.java @@ -65,6 +65,10 @@ public class UpgradeAprProcessor extends UpgradeProcessor { return bytes[0]; } + @Override + public void close() throws IOException { + Socket.close(socket); + } @Override public int read(byte[] bytes) throws IOException { diff --git a/java/org/apache/coyote/http11/upgrade/UpgradeBioProcessor.java b/java/org/apache/coyote/http11/upgrade/UpgradeBioProcessor.java index 2f10c93..c656ff5 100644 --- a/java/org/apache/coyote/http11/upgrade/UpgradeBioProcessor.java +++ b/java/org/apache/coyote/http11/upgrade/UpgradeBioProcessor.java @@ -31,15 +31,19 @@ import org.apache.tomcat.util.net.SocketWrapper; */ public class UpgradeBioProcessor extends UpgradeProcessor { - private final InputStream inputStream; - private final OutputStream outputStream; + private InputStream inputStream; + private OutputStream outputStream; + + private Socket socket; public UpgradeBioProcessor(SocketWrapper wrapper, UpgradeInbound upgradeInbound) throws IOException { super(upgradeInbound); - this.inputStream = wrapper.getSocket().getInputStream(); - this.outputStream = wrapper.getSocket().getOutputStream(); + socket = wrapper.getSocket(); + + this.inputStream = socket.getInputStream(); + this.outputStream = socket.getOutputStream(); } @@ -71,4 +75,9 @@ public class UpgradeBioProcessor extends UpgradeProcessor { public int read(byte[] bytes) throws IOException { return inputStream.read(bytes); } + + @Override + public void close() throws IOException { + socket.close(); + } } diff --git a/java/org/apache/coyote/http11/upgrade/UpgradeNioProcessor.java b/java/org/apache/coyote/http11/upgrade/UpgradeNioProcessor.java index cc28089..627dd2c 100644 --- a/java/org/apache/coyote/http11/upgrade/UpgradeNioProcessor.java +++ b/java/org/apache/coyote/http11/upgrade/UpgradeNioProcessor.java @@ -176,4 +176,9 @@ public class UpgradeNioProcessor extends UpgradeProcessor { } return written; } + + @Override + public void close() throws IOException { + nioChannel.close(); + } } diff --git a/java/org/apache/coyote/http11/upgrade/UpgradeProcessor.java b/java/org/apache/coyote/http11/upgrade/UpgradeProcessor.java index ea30472..bca49c9 100644 --- a/java/org/apache/coyote/http11/upgrade/UpgradeProcessor.java +++ b/java/org/apache/coyote/http11/upgrade/UpgradeProcessor.java @@ -47,6 +47,8 @@ public abstract class UpgradeProcessor implements Processor { // Input methods public abstract int read() throws IOException; public abstract int read(byte[] bytes) throws IOException; + + public abstract void close() throws IOException; @Override public final UpgradeInbound getUpgradeInbound() { diff --git a/test/org/apache/catalina/websocket/TestWebSocket.java b/test/org/apache/catalina/websocket/TestWebSocket.java index 9a5ca1a..ed0bc10 100644 --- a/test/org/apache/catalina/websocket/TestWebSocket.java +++ b/test/org/apache/catalina/websocket/TestWebSocket.java @@ -16,10 +16,7 @@ */ package org.apache.catalina.websocket; -import java.io.InputStream; -import java.io.Reader; -import java.nio.ByteBuffer; -import java.nio.CharBuffer; +import java.io.IOException; import org.junit.Test; @@ -38,46 +35,29 @@ public class TestWebSocket extends TomcatBaseTest { private static final long serialVersionUID = 1L; @Override - protected StreamInbound createWebSocketInbound() { - return new SimpleStreamInbound(); + protected WebSocketConnection createWebSocketConnection() { + return new SimpleConnection(); } } - private static final class SimpleStreamInbound extends StreamInbound { - + private static final class SimpleConnection extends WebSocketConnection { + @Override - protected void onBinaryData(InputStream is) { + protected void onTextData(WebSocketFrame frame) throws IOException { // TODO Auto-generated method stub + } @Override - protected void onTextData(Reader r) { + protected void onBinaryData(WebSocketFrame frame) throws IOException { // TODO Auto-generated method stub + } - } - - - private static final class MessageWebSocketServlet - extends WebSocketServlet { - - private static final long serialVersionUID = 1L; - - @Override - protected StreamInbound createWebSocketInbound() { - return new SimpleMessageInbound(); - } - } - - private static final class SimpleMessageInbound extends MessageInbound { - - @Override - protected void onBinaryMessage(ByteBuffer message) { - // TODO Auto-generated method stub - } - + @Override - protected void onTextMessage(CharBuffer message) { + protected void endOfMessage() { // TODO Auto-generated method stub + } } } diff --git a/webapps/examples/WEB-INF/classes/websocket/EchoMessage.java b/webapps/examples/WEB-INF/classes/websocket/EchoMessage.java index 729d068..928dab4 100644 --- a/webapps/examples/WEB-INF/classes/websocket/EchoMessage.java +++ b/webapps/examples/WEB-INF/classes/websocket/EchoMessage.java @@ -17,34 +17,55 @@ package websocket; import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.CharBuffer; +import java.io.InputStream; +import java.io.Reader; -import org.apache.catalina.websocket.MessageInbound; -import org.apache.catalina.websocket.StreamInbound; +import org.apache.catalina.websocket.WebSocketConnection; +import org.apache.catalina.websocket.WebSocketFrame; import org.apache.catalina.websocket.WebSocketServlet; - public class EchoMessage extends WebSocketServlet { - private static final long serialVersionUID = 1L; + private static final long serialVersionUID = 1L; - @Override - protected StreamInbound createWebSocketInbound() { - return new EchoMessageInbound(); - } + @Override + protected WebSocketConnection createWebSocketConnection() { + // Create a connection that prints out whatever messages it receives + return new PrintMessageConnection(); + } - private static final class EchoMessageInbound extends MessageInbound { + private final class PrintMessageConnection extends WebSocketConnection + { + @Override + protected void onTextData(WebSocketFrame frame) throws IOException { + Reader payload = frame.readPayload(); + + System.out.print(""); + + int i; + while((i = payload.read()) != -1) + { + System.out.print((char) i); + } + } @Override - protected void onBinaryMessage(ByteBuffer message) throws IOException { - System.out.write(message.array(), 0, message.limit()); - System.out.print('\n'); + protected void onBinaryData(WebSocketFrame frame) throws IOException { + InputStream payload = frame.getPayload(); + + System.out.println(""); + + int i; + while((i = payload.read()) != -1) + { + System.out.print((char) i); + } } @Override - protected void onTextMessage(CharBuffer message) throws IOException { - System.out.println(message); + protected void endOfMessage() { + System.out.println(""); } - } + + } } diff --git a/webapps/examples/WEB-INF/classes/websocket/EchoStream.java b/webapps/examples/WEB-INF/classes/websocket/EchoStream.java index 1ab76cc..5a5b4c9 100644 --- a/webapps/examples/WEB-INF/classes/websocket/EchoStream.java +++ b/webapps/examples/WEB-INF/classes/websocket/EchoStream.java @@ -17,51 +17,51 @@ package websocket; import java.io.IOException; -import java.io.InputStream; -import java.io.Reader; -import org.apache.catalina.websocket.StreamInbound; +import org.apache.catalina.websocket.WebSocketConnection; +import org.apache.catalina.websocket.WebSocketFrame; import org.apache.catalina.websocket.WebSocketServlet; -import org.apache.catalina.websocket.WsOutbound; - public class EchoStream extends WebSocketServlet { - private static final long serialVersionUID = 1L; - - @Override - protected StreamInbound createWebSocketInbound() { - return new EchoStreamInbound(); - } + private static final long serialVersionUID = 1L; - private static final class EchoStreamInbound extends StreamInbound { + @Override + protected WebSocketConnection createWebSocketConnection() { + // Create a connection that echoes back anything it receives + return new EchoStreamConnection(); + } + private final class EchoStreamConnection extends WebSocketConnection + { @Override - protected void onBinaryData(InputStream is) throws IOException { - // Simply echo the data to back to the client. - WsOutbound outbound = getStreamOutbound(); - - int i = is.read(); - while (i != -1) { - outbound.writeBinaryData(i); - i = is.read(); - } - - outbound.flush(); + protected void onTextData(WebSocketFrame frame) throws IOException { + System.out.println(""); + + // Toggle the masking flag + frame.toggleMask(); + + // Echo the frame right back + writeFrame(frame); } @Override - protected void onTextData(Reader r) throws IOException { - // Simply echo the data to back to the client. - WsOutbound outbound = getStreamOutbound(); - - int c = r.read(); - while (c != -1) { - outbound.writeTextData((char) c); - c = r.read(); - } + protected void onBinaryData(WebSocketFrame frame) throws IOException { + System.out.println(""); + + // Toggle the masking flag + frame.toggleMask(); + + // Echo the frame right back + writeFrame(frame); + } - outbound.flush(); + @Override + protected void endOfMessage() { + // That was the final fragment } - } + + } }