diff --git a/java/org/apache/catalina/util/Conversions.java b/java/org/apache/catalina/util/Conversions.java
index 8161213..322fdbb 100644
--- a/java/org/apache/catalina/util/Conversions.java
+++ b/java/org/apache/catalina/util/Conversions.java
@@ -32,7 +32,7 @@ public class Conversions {
int shift = 0;
long result = 0;
- for (int i = input.length; i < 0; i--) {
+ for (int i = input.length - 1; i >= 0; i--) {
result = result + ((input[i] & 0xFF) << shift);
shift += 8;
}
diff --git a/java/org/apache/catalina/websocket/MaskingStream.java b/java/org/apache/catalina/websocket/MaskingStream.java
new file mode 100644
index 0000000..ba70ca6
--- /dev/null
+++ b/java/org/apache/catalina/websocket/MaskingStream.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.catalina.websocket;
+
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * This class masks (and demasks) WebSocket payload streams
+ */
+public class MaskingStream extends InputStream
+{
+ /**
+ * Length in bytes of the masking key field
+ */
+ private static final int maskingKeyLength = 4;
+
+ /**
+ * The bit mask representing the useful bits of the mask itself
+ */
+ private static final int maskingKeyBits = maskingKeyLength - 1;
+
+ /**
+ * The underlying stream being masked
+ */
+ private final InputStream input;
+
+ /**
+ * The masking key
+ */
+ private final byte[] mask;
+
+ /**
+ * The current position in the mask
+ */
+ private int position = -1;
+
+ /**
+ * Masks (or demasks) the given input stream
+ * @param InputStream the stream to mask
+ * @throws IllegalArgumentException
+ */
+ public MaskingStream(InputStream streamToMask, byte[] maskingKey)
+ {
+ input = streamToMask;
+
+ if(maskingKey == null || maskingKey.length != maskingKeyLength)
+ {
+ throw new IllegalArgumentException("invalid masking key");
+ }
+
+ mask = maskingKey;
+ }
+
+ /**
+ * @see FilterInputStream
+ */
+ @Override
+ public int read() throws IOException
+ {
+ // Read the next byte and check for end of stream
+ int nextByte = input.read();
+ if(nextByte == -1) {
+ return -1;
+ }
+
+ // Advance to the next place in the mask
+ position = (position + 1) & maskingKeyBits;
+
+ return (nextByte ^ mask[position]);
+ }
+}
diff --git a/java/org/apache/catalina/websocket/MessageInbound.java b/java/org/apache/catalina/websocket/MessageInbound.java
deleted file mode 100644
index a9a1b4c..0000000
--- a/java/org/apache/catalina/websocket/MessageInbound.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.catalina.websocket;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.Reader;
-import java.nio.ByteBuffer;
-import java.nio.CharBuffer;
-
-public abstract class MessageInbound extends StreamInbound {
-
- // TODO: Make buffer sizes configurable
- // TODO: Allow buffers to expand
- ByteBuffer bb = ByteBuffer.allocate(8192);
- CharBuffer cb = CharBuffer.allocate(8192);
-
- @Override
- protected void onBinaryData(InputStream is) throws IOException {
- int read = 0;
- while (read > -1) {
- bb.position(bb.position() + read);
- read = is.read(bb.array(), bb.position(), bb.remaining());
- }
- bb.flip();
- onBinaryMessage(bb);
- bb.clear();
- }
-
- @Override
- protected void onTextData(Reader r) throws IOException {
- int read = 0;
- while (read > -1) {
- cb.position(cb.position() + read);
- read = r.read(cb.array(), cb.position(), cb.remaining());
- }
- cb.limit(cb.position());
- cb.position(0);
- onTextMessage(cb);
- cb.clear();
- }
-
- protected abstract void onBinaryMessage(ByteBuffer message)
- throws IOException;
- protected abstract void onTextMessage(CharBuffer message)
- throws IOException;
-}
diff --git a/java/org/apache/catalina/websocket/StreamInbound.java b/java/org/apache/catalina/websocket/StreamInbound.java
deleted file mode 100644
index a230edb..0000000
--- a/java/org/apache/catalina/websocket/StreamInbound.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.catalina.websocket;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.Reader;
-
-import org.apache.catalina.util.Conversions;
-import org.apache.coyote.http11.upgrade.UpgradeInbound;
-import org.apache.coyote.http11.upgrade.UpgradeOutbound;
-import org.apache.coyote.http11.upgrade.UpgradeProcessor;
-import org.apache.tomcat.util.buf.B2CConverter;
-import org.apache.tomcat.util.net.AbstractEndpoint.Handler.SocketState;
-
-public abstract class StreamInbound implements UpgradeInbound {
-
- // These attributes apply to the current frame being processed
- private boolean fin = true;
- private boolean rsv1 = false;
- private boolean rsv2 = false;
- private boolean rsv3 = false;
- private int opCode = -1;
- private long payloadLength = -1;
-
- // These attributes apply to the message that may be spread over multiple
- // frames
- // TODO
-
- private UpgradeProcessor> processor = null;
- private WsOutbound outbound;
-
- @Override
- public void setUpgradeOutbound(UpgradeOutbound upgradeOutbound) {
- outbound = new WsOutbound(upgradeOutbound);
- }
-
-
- @Override
- public void setUpgradeProcessor(UpgradeProcessor> processor) {
- this.processor = processor;
- }
-
- public WsOutbound getStreamOutbound() {
- return outbound;
- }
-
- @Override
- public SocketState onData() throws IOException {
- // Must be start the start of a frame
-
- // Read the first byte
- int i = processor.read();
-
- fin = (i & 0x80) > 0;
-
- rsv1 = (i & 0x40) > 0;
- rsv2 = (i & 0x20) > 0;
- rsv3 = (i & 0x10) > 0;
-
- if (rsv1 || rsv2 || rsv3) {
- // TODO: Not supported.
- }
-
- opCode = (i & 0x0F);
- validateOpCode(opCode);
-
- // Read the next byte
- i = processor.read();
-
- // Client data must be masked and this isn't
- if ((i & 0x80) == 0) {
- // TODO: Better message
- throw new IOException();
- }
-
- payloadLength = i & 0x7F;
- if (payloadLength == 126) {
- byte[] extended = new byte[2];
- processor.read(extended);
- payloadLength = Conversions.byteArrayToLong(extended);
- } else if (payloadLength == 127) {
- byte[] extended = new byte[8];
- processor.read(extended);
- payloadLength = Conversions.byteArrayToLong(extended);
- }
-
- byte[] mask = new byte[4];
- processor.read(mask);
-
- if (opCode == 1 || opCode == 2) {
- WsInputStream wsIs = new WsInputStream(processor, mask,
- payloadLength);
- if (opCode == 2) {
- onBinaryData(wsIs);
- } else {
- InputStreamReader r =
- new InputStreamReader(wsIs, B2CConverter.UTF_8);
- onTextData(r);
- }
- }
-
- // TODO: Doesn't currently handle multi-frame messages. That will need
- // some refactoring.
-
- // TODO: Per frame extension handling is not currently supported.
-
- // TODO: Handle other control frames.
-
- // TODO: Handle control frames appearing in the middle of a multi-frame
- // message
-
- return SocketState.UPGRADED;
- }
-
- protected abstract void onBinaryData(InputStream is) throws IOException;
- protected abstract void onTextData(Reader r) throws IOException;
-
- private void validateOpCode(int opCode) throws IOException {
- switch (opCode) {
- case 0:
- case 1:
- case 2:
- case 8:
- case 9:
- case 10:
- break;
- default:
- // TODO: Message
- throw new IOException();
- }
- }
-}
diff --git a/java/org/apache/catalina/websocket/WebSocketConnection.java b/java/org/apache/catalina/websocket/WebSocketConnection.java
new file mode 100644
index 0000000..08aa164
--- /dev/null
+++ b/java/org/apache/catalina/websocket/WebSocketConnection.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.catalina.websocket;
+
+import java.io.IOException;
+
+import org.apache.coyote.http11.upgrade.UpgradeInbound;
+import org.apache.coyote.http11.upgrade.UpgradeOutbound;
+import org.apache.coyote.http11.upgrade.UpgradeProcessor;
+import org.apache.tomcat.util.net.AbstractEndpoint.Handler.SocketState;
+import org.apache.catalina.websocket.WebSocketFrame.OpCode;
+
+public abstract class WebSocketConnection implements UpgradeInbound {
+
+ // TODO much better fragmentation system
+ private boolean currentlyFragmented = false;
+ private OpCode currentDataOpcode = null;
+
+ private UpgradeProcessor> processor = null;
+ private UpgradeOutbound outbound;
+
+ @Override
+ public void setUpgradeOutbound(UpgradeOutbound upgradeOutbound) {
+ outbound = upgradeOutbound;
+ }
+
+ @Override
+ public void setUpgradeProcessor(UpgradeProcessor> processor) {
+ this.processor = processor;
+ }
+
+ @Override
+ public SocketState onData() throws IOException {
+ // Must be the start of a frame
+ WebSocketFrame frame = WebSocketFrame.decode(processor);
+
+ // Fragmentation
+ if (currentlyFragmented) {
+ // This frame is inside a fragmented message
+
+ // Reject non-continuation data frames
+ if(frame.isData()) {
+ writeFrame(WebSocketFrame.protocolErrorCloseFrame());
+ closeImmediately();
+ }
+ } else {
+ // This frame is the first frame of a new message
+
+ // Reject spurious continuation frames
+ if (frame.getOpcode() == OpCode.Continuation) {
+ writeFrame(WebSocketFrame.protocolErrorCloseFrame());
+ closeImmediately();
+ }
+ }
+
+ // Route the frame
+ if (frame.isData() || frame.getOpcode() == OpCode.Continuation) {
+ handleDataFrame(frame);
+
+ // Update fragmentation state for next time
+ if(frame.isFin()) {
+ currentlyFragmented = false;
+ } else {
+ currentlyFragmented = true;
+ currentDataOpcode = frame.getOpcode();
+ }
+ } else if (frame.isControl()) {
+ handleControl(frame);
+ }
+
+ // TODO per-frame extension handling is not currently supported.
+
+ return SocketState.UPGRADED;
+ }
+
+ private void handleControl(WebSocketFrame frame) throws IOException {
+ // Control frames must not be fragmented
+ if (frame.isFin() == false) {
+ writeFrame(WebSocketFrame.protocolErrorCloseFrame());
+ closeImmediately();
+ }
+
+ // Control frames must not have extended length
+ if (frame.getPayloadLength() > 125) {
+ writeFrame(WebSocketFrame.protocolErrorCloseFrame());
+ closeImmediately();
+ }
+
+ switch (frame.getOpcode()) {
+ case Ping:
+ System.out.println("");
+ writeFrame(WebSocketFrame.makePong(frame));
+ break;
+ case Pong:
+ System.out.println("");
+ break;
+ case ConnectionClose:
+ // Reply with a close
+ writeFrame(WebSocketFrame.closeFrame());
+ closeImmediately();
+ break;
+ }
+ }
+
+ private void closeImmediately() throws IOException {
+ // drop the TCP connection
+ processor.close();
+ }
+
+ public void writeFrame(WebSocketFrame frame) throws IOException {
+ frame.encode(outbound);
+ outbound.flush();
+ }
+
+ private void handleDataFrame(WebSocketFrame frame) throws IOException {
+ OpCode opcode = frame.getOpcode();
+
+ if(opcode == OpCode.Continuation) {
+ opcode = currentDataOpcode;
+ }
+
+ switch (opcode) {
+ case Text:
+ onTextData(frame);
+ break;
+
+ case Binary:
+ onBinaryData(frame);
+ break;
+ }
+ }
+
+ protected abstract void onTextData(WebSocketFrame frame) throws IOException;
+
+ protected abstract void onBinaryData(WebSocketFrame frame)
+ throws IOException;
+
+ protected abstract void endOfMessage();
+}
diff --git a/java/org/apache/catalina/websocket/WebSocketFrame.java b/java/org/apache/catalina/websocket/WebSocketFrame.java
new file mode 100644
index 0000000..a4849f1
--- /dev/null
+++ b/java/org/apache/catalina/websocket/WebSocketFrame.java
@@ -0,0 +1,598 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.catalina.websocket;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.Reader;
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+
+import org.apache.catalina.util.Conversions;
+import org.apache.catalina.util.IOTools;
+import org.apache.coyote.http11.upgrade.UpgradeProcessor;
+
+/*
+ 0 1 2 3
+ 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+ +-+-+-+-+-------+-+-------------+-------------------------------+
+ |F|R|R|R| opcode|M| Payload len | Extended payload length |
+ |I|S|S|S| (4) |A| (7) | (16/64) |
+ |N|V|V|V| |S| | (if payload len==126/127) |
+ | |1|2|3| |K| | |
+ +-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - - +
+ | Extended payload length continued, if payload len == 127 |
+ + - - - - - - - - - - - - - - - +-------------------------------+
+ | |Masking-key, if MASK set to 1 |
+ +-------------------------------+-------------------------------+
+ | Masking-key (continued) | Payload Data |
+ +-------------------------------- - - - - - - - - - - - - - - - +
+ : Payload Data continued ... :
+ + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +
+ | Payload Data continued ... |
+ +---------------------------------------------------------------+
+ */
+
+/**
+ * Object representation of a WebSocket frame. It knows how to decode itself
+ * from an InputStream
+ */
+public class WebSocketFrame {
+ /**
+ * The character set used to encode text frames
+ */
+ private static final Charset textCharset = Charset.forName("UTF-8");
+
+ /**
+ * The maximum supported payload length (temporary implementation)
+ */
+ private static final long maxSupportedPayloadLength = 1 << 25; // 32 MB
+
+ /**
+ * FIN bit, every non-fragmented bit should have this set. It has nothing to
+ * do with closing of connection.
+ */
+ private boolean fin;
+
+ /**
+ * Type of the frame.
+ */
+ private OpCode opcode;
+
+ /**
+ * Whether this frame's data are masked by maskingKey.
+ */
+ private boolean mask;
+
+ /**
+ * If the payload (data) is masked, it needs to be XORed (in a special way)
+ * with this value.
+ */
+ private byte[] maskingKey;
+
+ /**
+ * Length of the payload in bytes
+ */
+ private long payloadLength;
+
+ /**
+ * The payload stream (someday this will allow for 63-bit payloads)
+ */
+ private InputStream payload;
+
+ /**
+ * Type of frame
+ */
+ public enum OpCode {
+ Continuation(0x0), Text(0x1), Binary(0x2),
+ ConnectionClose(0x8), Ping(0x9), Pong(0xA);
+
+ private final int opcode;
+
+ OpCode(int opcode) {
+ this.opcode = opcode;
+ }
+
+ public int getOpCodeNumber() {
+ return this.opcode;
+ }
+
+ @Override
+ public String toString() {
+ return this.name();
+ }
+
+ public static OpCode getOpCodeByNumber(int number) throws IOException {
+ for (OpCode opcode : OpCode.values()) {
+ if (opcode.getOpCodeNumber() == number)
+ return opcode;
+ }
+
+ throw new IOException("invalid opcode");
+ }
+ }
+
+ public enum StatusCode {
+ NormalClose(1000), ProtocolErrorClose(1002), MessageTooBig(1009);
+ // TODO there are far more status codes defined:
+ // http://tools.ietf.org/html/rfc6455#section-7.4
+
+ private final int statusCode;
+
+ StatusCode(int statusCode) {
+ this.statusCode = statusCode;
+ }
+
+ public int getStatusCodeNumber() {
+ return this.statusCode;
+ }
+
+ public byte[] encode() {
+ // Status codes are 16-bit unsigned integers
+ ByteBuffer code = ByteBuffer.allocate(2);
+ code.putShort((short) statusCode);
+
+ // TODO include optional UTF-8 explanation in closing frames
+ // (must adjust the size of the buffer accordingly)
+
+ return code.array();
+ }
+
+ @Override
+ public String toString() {
+ return this.name();
+ }
+ }
+
+ public static WebSocketFrame decode(final UpgradeProcessor> processor)
+ throws IOException
+ {
+ return decode(new InputStream() {
+ @Override
+ public int read() throws IOException {
+ return processor.read();
+ }
+ });
+ }
+
+ public static WebSocketFrame decode(InputStream input)
+ throws IOException {
+ // Read the first byte
+ int i = input.read();
+
+ if (i == -1) {
+ throw new IOException("reached end of stream");
+ }
+
+ // Build a frame from scratch
+ WebSocketFrame frame = new WebSocketFrame();
+
+ // Set the fin bit
+ frame.setFin((i & 0x80) > 0);
+
+ // Extract the reserved bits
+ boolean rsv1 = (i & 0x40) > 0;
+ boolean rsv2 = (i & 0x20) > 0;
+ boolean rsv3 = (i & 0x10) > 0;
+
+ // For now, require all reserved bits to be cleared
+ if (rsv1 || rsv2 || rsv3) {
+ // TODO better error message for reserved bits
+ throw new IOException("reserved bits must not be set");
+ }
+
+ // Set the opcode
+ frame.setOpcode(OpCode.getOpCodeByNumber((i & 0x0F)));
+
+ // Read the second byte
+ i = input.read();
+
+ // Set the mask
+ frame.setMask((i & 0x80) > 0);
+
+ // Read the payload length
+ // (not set until payload is actually read)
+ long payloadLength = i & 0x7F;
+
+ // Read the extended payload length field, if present
+ if (payloadLength == 126) {
+ // Read the 16-bit field
+ byte[] extended = readAll(input, 2);
+
+ // Set the actual payload length
+ payloadLength = Conversions.byteArrayToLong(extended);
+
+ } else if (payloadLength == 127) {
+ // Read the 63-bit field
+ byte[] extended = readAll(input, 8);
+
+ // Set the actual payload length
+ payloadLength = Conversions.byteArrayToLong(extended);
+
+ }
+
+ // Read the masking key, if present
+ if (frame.isMask()) {
+ byte[] maskingKey = readAll(input, 4);
+ frame.setMaskingKey(maskingKey);
+ } else {
+ // This is a server, so require client data to be masked
+ // TODO better error message for unmasked client data
+ throw new IOException("client data must be masked");
+ }
+
+ // Decode the payload
+ validatePayloadLength(payloadLength);
+ byte[] payload = readAll(input, (int) payloadLength);
+
+ // Set the payload (implicitly sets the payload length)
+ frame.setPayload(payload);
+
+ // Unmask the payload
+ frame.maskPayload();
+
+ // Return the fully decoded frame
+ return frame;
+ }
+
+ /**
+ * Writes this frame to the given stream
+ * @param OutputStream the stream to write to
+ */
+ public void encode(OutputStream output) throws IOException {
+ // Encode the first byte (flags and opcode)
+ int flagsAndOpcode = 0;
+
+ // Set the final fragment bit
+ flagsAndOpcode = flagsAndOpcode | (fin ? 0x80 : 0x00);
+
+ // Set reserve bits
+ // flagsAndOpcode = flagsAndOpcode | rsv1 | rsv2 | rsv3;
+
+ // Set the opcode
+ flagsAndOpcode = flagsAndOpcode | opcode.getOpCodeNumber();
+
+ // Write the first byte (flags and opcode)
+ output.write(flagsAndOpcode);
+
+ // Encode the second byte (masking bit and payload length)
+ int maskAndLength = 0;
+
+ // Set the masking bit
+ maskAndLength = maskAndLength | (mask ? 0x80 : 0x00);
+
+ // Determine if we need an extended length field
+ byte[] extendedLength = null;
+
+ if (payloadLength > 0xffff) { // 63-bit extended length
+ // Set the length field
+ maskAndLength = maskAndLength | 127;
+
+ // Write the extended field
+ extendedLength = getBytes(payloadLength, 8);
+
+ // TODO implement 63-bit payloads
+ throw new UnsupportedOperationException(
+ "63-bit payloads not supported");
+
+ } else if (payloadLength > 125) { // 16-bit extended length
+ // Set the length field
+ maskAndLength = maskAndLength | 126;
+
+ // Write the extended field
+ extendedLength = getBytes(payloadLength, 2);
+ }
+ else
+ {
+ // Set the length field
+ maskAndLength = maskAndLength | (int) payloadLength;
+ }
+
+ // Write the mask and length fields
+ output.write(maskAndLength);
+
+ // Write the extended length field, if any
+ if(extendedLength != null) {
+ output.write(extendedLength);
+ }
+
+ if (mask) {
+ // Write the masking key
+ output.write(maskingKey);
+
+ // Mask the payload
+ maskPayload();
+ }
+
+ // Write the payload
+ validatePayloadLength(payloadLength);
+ IOTools.flow(payload, output);
+ }
+
+ private static void validatePayloadLength(long length) {
+ // This is a first-draft implementation, without a fancy
+ // streaming API to support really huge messages, so we
+ // an artificial limit on payload length to make things easier.
+ // TODO remove this method when streaming payload is implemented
+ if (length > maxSupportedPayloadLength) {
+ throw new UnsupportedOperationException("payload too large");
+ }
+ }
+
+ /**
+ * Creates a new control frame for closing the connection
+ *
+ * @return normal closing frame
+ */
+ public static WebSocketFrame closeFrame() {
+ return new WebSocketFrame(true, OpCode.ConnectionClose,
+ StatusCode.NormalClose.encode());
+ }
+
+ /**
+ * Creates a new control frame for closing the connection with protocol
+ * error.
+ *
+ * @return protocol error closing frame
+ */
+ public static WebSocketFrame protocolErrorCloseFrame() {
+ return new WebSocketFrame(true, OpCode.ConnectionClose,
+ StatusCode.ProtocolErrorClose.encode());
+ }
+
+ /**
+ * Wrapper around constructor that allows to easily send a text message.
+ *
+ * @param message
+ * text of the message
+ * @return frame with message encoded as data in the frame
+ */
+ public static WebSocketFrame message(String message) {
+ return new WebSocketFrame(true, OpCode.Text,
+ message.getBytes(textCharset));
+ }
+
+ /**
+ * Convenient method that makes it easy to send a pong reply
+ *
+ * @param WebSocketFrame
+ * the ping message to which to reply
+ *
+ * @return the reply to the given ping
+ */
+ public static WebSocketFrame makePong(WebSocketFrame frame) {
+ // Actually, we just need to flip the mask and set the new opcode
+ frame.setMask(!frame.isMask());
+ frame.setOpcode(OpCode.Pong);
+
+ return frame;
+ }
+
+ /**
+ * Private constructor for null frames
+ */
+ private WebSocketFrame() {
+ }
+
+ /**
+ * Constructor for frames with unmasked payload
+ *
+ * @param fin
+ * whether FIN bit should be set
+ * @param opcode
+ * type of frame
+ * @param mask
+ * whether this frame is masked
+ * @param payload
+ * the byte array containing the payload
+ */
+ public WebSocketFrame(boolean fin, OpCode opcode, byte[] payload) {
+ this.fin = fin;
+ this.mask = false;
+ this.opcode = opcode;
+ setPayload(payload);
+ }
+
+ /**
+ * Constructor for frames with unmasked payload
+ *
+ * @param fin
+ * whether FIN bit should be set
+ * @param opcode
+ * type of frame
+ * @param payload
+ * the byte buffer containing the payload
+ */
+ public WebSocketFrame(boolean fin, OpCode opcode,
+ ByteBuffer payload) {
+ this.fin = fin;
+ this.mask = true;
+ this.opcode = opcode;
+ setPayload(payload);
+ }
+
+ @Override
+ public String toString() {
+ return String.format("FIN:%s OPCODE:%s MASK:%s LEN:%s\n", fin ? "1"
+ : "0", opcode, mask ? "1" : "0", payloadLength);
+ }
+
+ /**
+ * @return Whether this frame is the final frame.
+ */
+ public boolean isFin() {
+ return fin;
+ }
+
+ public void setFin(boolean fin) {
+ this.fin = fin;
+ }
+
+ public OpCode getOpcode() {
+ return opcode;
+ }
+
+ public void setOpcode(OpCode opcode) {
+ this.opcode = opcode;
+ }
+
+ /**
+ * Indicates whether this frame has Connection Close flag set and therefore
+ * the endpoint receiving this frame must close connection.
+ */
+ public boolean isClose() {
+ return getOpcode().equals(OpCode.ConnectionClose);
+ }
+
+ public boolean isMask() {
+ return mask;
+ }
+
+ public void setMask(boolean mask) {
+ this.mask = mask;
+ }
+
+ public void toggleMask() {
+ setMask(!mask);
+ }
+
+ /**
+ * @return true iff this frame contains binary or text data
+ */
+ public boolean isData() {
+ return opcode.equals(OpCode.Binary) || opcode.equals(OpCode.Text);
+ }
+
+ /**
+ * Finds out whether this frame is a control frame.
+ *
+ * @return true iff this frame is a control frame
+ */
+ public boolean isControl() {
+ return opcode.equals(OpCode.ConnectionClose)
+ || opcode.equals(OpCode.Ping) || opcode.equals(OpCode.Pong);
+ }
+
+ public byte[] getMaskingKey() {
+ return maskingKey;
+ }
+
+ public void setMaskingKey(byte[] maskingKey) {
+ this.maskingKey = maskingKey;
+ }
+
+ private void maskPayload()
+ {
+ payload = new MaskingStream(payload, maskingKey);
+ }
+
+ public long getPayloadLength() {
+ return payloadLength;
+ }
+
+ /**
+ * @returns the payload
+ */
+ public InputStream getPayload() {
+ return payload;
+ }
+
+ public Reader readPayload() {
+ return new InputStreamReader(payload, textCharset);
+ }
+
+ public void setPayload(InputStream newPayload, long newPayloadLength) {
+ this.payload = newPayload;
+ this.payloadLength = newPayloadLength;
+ }
+
+ public void setPayload(byte[] newPayload) {
+ // Convert to stream
+ setPayload(new ByteArrayInputStream(newPayload), newPayload.length);
+ }
+
+ public void setPayload(ByteBuffer newPayload) {
+ // Convert to stream
+ setPayload(new ByteArrayInputStream(newPayload.array(),
+ newPayload.position(), newPayload.remaining()),
+ newPayload.remaining());
+ }
+
+ /**
+ * Safely reads available bytes from stream into a byte array
+ * @param InputStream the stream to read from
+ * @param int the number of bytes to read
+ * @return a byte array containing input bytes or null on failure
+ * @throws IOException
+ */
+ public static byte[] readAll(InputStream input, int length)
+ throws IOException
+ {
+ // Declare the byte array
+ byte[] buffer = new byte[length];
+
+ // See how many bytes are returned
+ int totalBytesRead = 0;
+
+ // Read up bytes until we have them all or there aren't any more
+ while(totalBytesRead < length)
+ {
+ // Count the number of bytes read
+ int bytesRead = input.read(buffer, totalBytesRead,
+ length - totalBytesRead);
+
+ // Check for end of input
+ if(bytesRead == -1) break;
+
+ // Total the number of bytes read
+ totalBytesRead += bytesRead;
+ }
+
+ // Ensure we read all the bytes
+ if(totalBytesRead != length)
+ {
+ throw new IOException("stopped reading bytes prematurely");
+ }
+
+ // Return the byte array
+ return buffer;
+ }
+
+ /**
+ * Extracts the bytes of an unsigned integer as bytes
+ * @param long the unsigned integer whose bytes are to be extracted
+ * @param int the number of significant bytes (low-order)
+ * @return a byte array representing the bytes of the unsigned integer
+ */
+ public static byte[] getBytes(long unsignedInt, int numBytes)
+ {
+ // Initialize the byte array
+ byte[] array = new byte[numBytes];
+
+ // Extract each bytes
+ for(int i = 0; i < array.length; ++i)
+ {
+ array[numBytes - i - 1] = (byte) (unsignedInt >> (i * 8));
+ }
+
+ // Return the constructed array
+ return array;
+ }
+}
diff --git a/java/org/apache/catalina/websocket/WebSocketServlet.java b/java/org/apache/catalina/websocket/WebSocketServlet.java
index 1c11e6a..a363e83 100644
--- a/java/org/apache/catalina/websocket/WebSocketServlet.java
+++ b/java/org/apache/catalina/websocket/WebSocketServlet.java
@@ -62,11 +62,9 @@ public abstract class WebSocketServlet extends HttpServlet {
@Override
protected void doGet(HttpServletRequest req, HttpServletResponse resp)
throws ServletException, IOException {
-
+
// Information required to send the server handshake message
String key;
- String subProtocol = null;
- List extensions = Collections.emptyList();
if (!headerContains(req, "upgrade", "websocket")) {
resp.sendError(HttpServletResponse.SC_BAD_REQUEST);
@@ -96,27 +94,31 @@ public abstract class WebSocketServlet extends HttpServlet {
return;
}
- // TODO Read client handshake - Sec-WebSocket-Protocol
- // Sec-WebSocket-Extensions
+ String subprotocol = req.getHeader("Sec-WebSocket-Protocol");
+ if (subprotocol != null) {
+ // TODO future websocket subprotocols
+ }
// TODO Extensions require the ability to specify something (API TBD)
// that can be passed to the Tomcat internals and process extension
// data present when the frame is fragmented.
-
+ List extensions = Collections.emptyList();
+ String extensionString = req.getHeader("Sec-WebSocket-Extensions");
+ if (extensionString != null) {
+ // TODO read websocket extensions
+ }
+ if (!extensions.isEmpty()) {
+ // TODO future websocket extensions
+ }
+
// If we got this far, all is good. Accept the connection.
resp.setHeader("upgrade", "websocket");
resp.setHeader("connection", "upgrade");
resp.setHeader("Sec-WebSocket-Accept", getWebSocketAccept(key));
- if (subProtocol != null) {
- // TODO
- }
- if (!extensions.isEmpty()) {
- // TODO
- }
// Small hack until the Servlet API provides a way to do this.
- StreamInbound inbound = createWebSocketInbound();
- ((RequestFacade) req).doUpgrade(inbound);
+ WebSocketConnection connection = createWebSocketConnection();
+ ((RequestFacade) req).doUpgrade(connection);
}
@@ -163,5 +165,5 @@ public abstract class WebSocketServlet extends HttpServlet {
return true;
}
- protected abstract StreamInbound createWebSocketInbound();
+ protected abstract WebSocketConnection createWebSocketConnection();
}
diff --git a/java/org/apache/catalina/websocket/WsInputStream.java b/java/org/apache/catalina/websocket/WsInputStream.java
deleted file mode 100644
index 7886d49..0000000
--- a/java/org/apache/catalina/websocket/WsInputStream.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.catalina.websocket;
-
-import java.io.IOException;
-
-import org.apache.coyote.http11.upgrade.UpgradeProcessor;
-
-public class WsInputStream extends java.io.InputStream {
-
- private UpgradeProcessor> processor;
- private byte[] mask;
- private long remaining;
- private long read;
-
- public WsInputStream(UpgradeProcessor> processor, byte[] mask,
- long remaining) {
- this.processor = processor;
- this.mask = mask;
- this.remaining = remaining;
- this.read = 0;
- }
-
- @Override
- public int read() throws IOException {
- if (remaining == 0) {
- return -1;
- }
-
- remaining--;
- read++;
-
- int masked = processor.read();
- return masked ^ mask[(int) ((read - 1) % 4)];
- }
-
-}
diff --git a/java/org/apache/catalina/websocket/WsOutbound.java b/java/org/apache/catalina/websocket/WsOutbound.java
deleted file mode 100644
index 2aacbe5..0000000
--- a/java/org/apache/catalina/websocket/WsOutbound.java
+++ /dev/null
@@ -1,178 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.catalina.websocket;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.CharBuffer;
-
-import org.apache.coyote.http11.upgrade.UpgradeOutbound;
-import org.apache.tomcat.util.buf.B2CConverter;
-
-public class WsOutbound {
-
- private static final int DEFAULT_BUFFER_SIZE = 2048;
-
- private UpgradeOutbound upgradeOutbound;
- private ByteBuffer bb;
- private CharBuffer cb;
- protected Boolean text = null;
- protected boolean firstFrame = true;
-
-
- public WsOutbound(UpgradeOutbound upgradeOutbound) {
- this.upgradeOutbound = upgradeOutbound;
- // TODO: Make buffer size configurable
- // Byte buffer needs to be 4* char buffer to be sure that char buffer
- // can always we written into Byte buffer
- this.bb = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE * 4);
- this.cb = CharBuffer.allocate(DEFAULT_BUFFER_SIZE);
- }
-
-
- public void writeBinaryData(int b) throws IOException {
- if (bb.position() == bb.capacity()) {
- doFlush(false);
- }
- if (text == null) {
- text = Boolean.FALSE;
- } else if (text == Boolean.TRUE) {
- // Flush the character data
- flush();
- text = Boolean.FALSE;
- }
- bb.put((byte) (b & 0xFF));
- }
-
-
- public void writeTextData(char c) throws IOException {
- if (cb.position() == cb.capacity()) {
- doFlush(false);
- }
-
- if (text == null) {
- text = Boolean.TRUE;
- } else if (text == Boolean.FALSE) {
- // Flush the binary data
- flush();
- text = Boolean.TRUE;
- }
- cb.append(c);
- }
-
-
- public void writeBinaryMessage(ByteBuffer msgBb) throws IOException {
- if (text != null) {
- // Empty the buffer
- flush();
- }
- text = Boolean.FALSE;
- doWriteBinary(msgBb, true);
- }
-
-
- public void writeTextMessage(CharBuffer msgCb) throws IOException {
- if (text != null) {
- // Empty the buffer
- flush();
- }
- text = Boolean.TRUE;
- doWriteText(msgCb, true);
- }
-
-
- public void flush() throws IOException {
- doFlush(true);
- }
-
- private void doFlush(boolean finalFragment) throws IOException {
- if (text == null) {
- // No data
- return;
- }
- if (text.booleanValue()) {
- doWriteText(cb, finalFragment);
- } else {
- doWriteBinary(bb, finalFragment);
- }
- }
-
-
- public void close() throws IOException {
- doFlush(true);
-
- // TODO: Send a close message
- bb = null;
- cb = null;
- upgradeOutbound = null;
- }
-
-
- protected void doWriteBinary(ByteBuffer buffer, boolean finalFragment)
- throws IOException {
-
- // Prepare to write
- buffer.flip();
-
- // Work out the first byte
- int first = 0x00;
- if (finalFragment) {
- first = first + 0x80;
- }
- if (firstFrame) {
- if (text.booleanValue()) {
- first = first + 0x1;
- } else {
- first = first + 0x2;
- }
- }
- // Continuation frame is OpCode 0
- upgradeOutbound.write(first);
-
- // Note: buffer will never be more than 2^16 in length
- if (buffer.limit() < 126) {
- upgradeOutbound.write(buffer.limit());
- } else {
- upgradeOutbound.write(126);
- upgradeOutbound.write(buffer.limit() >>> 8);
- upgradeOutbound.write(buffer.limit() & 0xFF);
- }
-
- // Write the content
- upgradeOutbound.write(buffer.array(), 0, buffer.limit());
- upgradeOutbound.flush();
-
- // Reset
- if (finalFragment) {
- text = null;
- firstFrame = true;
- } else {
- firstFrame = false;
- }
- bb.clear();
- }
-
-
- protected void doWriteText(CharBuffer buffer, boolean finalFragment)
- throws IOException {
- buffer.flip();
- B2CConverter.UTF_8.newEncoder().encode(buffer, bb, true);
- doWriteBinary(bb, finalFragment);
- // Reset
- cb.clear();
- }
-}
diff --git a/java/org/apache/coyote/http11/upgrade/UpgradeAprProcessor.java b/java/org/apache/coyote/http11/upgrade/UpgradeAprProcessor.java
index 211b022..dce5c3a 100644
--- a/java/org/apache/coyote/http11/upgrade/UpgradeAprProcessor.java
+++ b/java/org/apache/coyote/http11/upgrade/UpgradeAprProcessor.java
@@ -65,6 +65,10 @@ public class UpgradeAprProcessor extends UpgradeProcessor {
return bytes[0];
}
+ @Override
+ public void close() throws IOException {
+ Socket.close(socket);
+ }
@Override
public int read(byte[] bytes) throws IOException {
diff --git a/java/org/apache/coyote/http11/upgrade/UpgradeBioProcessor.java b/java/org/apache/coyote/http11/upgrade/UpgradeBioProcessor.java
index 2f10c93..c656ff5 100644
--- a/java/org/apache/coyote/http11/upgrade/UpgradeBioProcessor.java
+++ b/java/org/apache/coyote/http11/upgrade/UpgradeBioProcessor.java
@@ -31,15 +31,19 @@ import org.apache.tomcat.util.net.SocketWrapper;
*/
public class UpgradeBioProcessor extends UpgradeProcessor {
- private final InputStream inputStream;
- private final OutputStream outputStream;
+ private InputStream inputStream;
+ private OutputStream outputStream;
+
+ private Socket socket;
public UpgradeBioProcessor(SocketWrapper wrapper,
UpgradeInbound upgradeInbound) throws IOException {
super(upgradeInbound);
- this.inputStream = wrapper.getSocket().getInputStream();
- this.outputStream = wrapper.getSocket().getOutputStream();
+ socket = wrapper.getSocket();
+
+ this.inputStream = socket.getInputStream();
+ this.outputStream = socket.getOutputStream();
}
@@ -71,4 +75,9 @@ public class UpgradeBioProcessor extends UpgradeProcessor {
public int read(byte[] bytes) throws IOException {
return inputStream.read(bytes);
}
+
+ @Override
+ public void close() throws IOException {
+ socket.close();
+ }
}
diff --git a/java/org/apache/coyote/http11/upgrade/UpgradeNioProcessor.java b/java/org/apache/coyote/http11/upgrade/UpgradeNioProcessor.java
index cc28089..627dd2c 100644
--- a/java/org/apache/coyote/http11/upgrade/UpgradeNioProcessor.java
+++ b/java/org/apache/coyote/http11/upgrade/UpgradeNioProcessor.java
@@ -176,4 +176,9 @@ public class UpgradeNioProcessor extends UpgradeProcessor {
}
return written;
}
+
+ @Override
+ public void close() throws IOException {
+ nioChannel.close();
+ }
}
diff --git a/java/org/apache/coyote/http11/upgrade/UpgradeProcessor.java b/java/org/apache/coyote/http11/upgrade/UpgradeProcessor.java
index ea30472..bca49c9 100644
--- a/java/org/apache/coyote/http11/upgrade/UpgradeProcessor.java
+++ b/java/org/apache/coyote/http11/upgrade/UpgradeProcessor.java
@@ -47,6 +47,8 @@ public abstract class UpgradeProcessor implements Processor {
// Input methods
public abstract int read() throws IOException;
public abstract int read(byte[] bytes) throws IOException;
+
+ public abstract void close() throws IOException;
@Override
public final UpgradeInbound getUpgradeInbound() {
diff --git a/test/org/apache/catalina/websocket/TestWebSocket.java b/test/org/apache/catalina/websocket/TestWebSocket.java
index 9a5ca1a..ed0bc10 100644
--- a/test/org/apache/catalina/websocket/TestWebSocket.java
+++ b/test/org/apache/catalina/websocket/TestWebSocket.java
@@ -16,10 +16,7 @@
*/
package org.apache.catalina.websocket;
-import java.io.InputStream;
-import java.io.Reader;
-import java.nio.ByteBuffer;
-import java.nio.CharBuffer;
+import java.io.IOException;
import org.junit.Test;
@@ -38,46 +35,29 @@ public class TestWebSocket extends TomcatBaseTest {
private static final long serialVersionUID = 1L;
@Override
- protected StreamInbound createWebSocketInbound() {
- return new SimpleStreamInbound();
+ protected WebSocketConnection createWebSocketConnection() {
+ return new SimpleConnection();
}
}
- private static final class SimpleStreamInbound extends StreamInbound {
-
+ private static final class SimpleConnection extends WebSocketConnection {
+
@Override
- protected void onBinaryData(InputStream is) {
+ protected void onTextData(WebSocketFrame frame) throws IOException {
// TODO Auto-generated method stub
+
}
@Override
- protected void onTextData(Reader r) {
+ protected void onBinaryData(WebSocketFrame frame) throws IOException {
// TODO Auto-generated method stub
+
}
- }
-
-
- private static final class MessageWebSocketServlet
- extends WebSocketServlet {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- protected StreamInbound createWebSocketInbound() {
- return new SimpleMessageInbound();
- }
- }
-
- private static final class SimpleMessageInbound extends MessageInbound {
-
- @Override
- protected void onBinaryMessage(ByteBuffer message) {
- // TODO Auto-generated method stub
- }
-
+
@Override
- protected void onTextMessage(CharBuffer message) {
+ protected void endOfMessage() {
// TODO Auto-generated method stub
+
}
}
}
diff --git a/webapps/examples/WEB-INF/classes/websocket/EchoMessage.java b/webapps/examples/WEB-INF/classes/websocket/EchoMessage.java
index 729d068..928dab4 100644
--- a/webapps/examples/WEB-INF/classes/websocket/EchoMessage.java
+++ b/webapps/examples/WEB-INF/classes/websocket/EchoMessage.java
@@ -17,34 +17,55 @@
package websocket;
import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.CharBuffer;
+import java.io.InputStream;
+import java.io.Reader;
-import org.apache.catalina.websocket.MessageInbound;
-import org.apache.catalina.websocket.StreamInbound;
+import org.apache.catalina.websocket.WebSocketConnection;
+import org.apache.catalina.websocket.WebSocketFrame;
import org.apache.catalina.websocket.WebSocketServlet;
-
public class EchoMessage extends WebSocketServlet {
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 1L;
- @Override
- protected StreamInbound createWebSocketInbound() {
- return new EchoMessageInbound();
- }
+ @Override
+ protected WebSocketConnection createWebSocketConnection() {
+ // Create a connection that prints out whatever messages it receives
+ return new PrintMessageConnection();
+ }
- private static final class EchoMessageInbound extends MessageInbound {
+ private final class PrintMessageConnection extends WebSocketConnection
+ {
+ @Override
+ protected void onTextData(WebSocketFrame frame) throws IOException {
+ Reader payload = frame.readPayload();
+
+ System.out.print("");
+
+ int i;
+ while((i = payload.read()) != -1)
+ {
+ System.out.print((char) i);
+ }
+ }
@Override
- protected void onBinaryMessage(ByteBuffer message) throws IOException {
- System.out.write(message.array(), 0, message.limit());
- System.out.print('\n');
+ protected void onBinaryData(WebSocketFrame frame) throws IOException {
+ InputStream payload = frame.getPayload();
+
+ System.out.println("");
+
+ int i;
+ while((i = payload.read()) != -1)
+ {
+ System.out.print((char) i);
+ }
}
@Override
- protected void onTextMessage(CharBuffer message) throws IOException {
- System.out.println(message);
+ protected void endOfMessage() {
+ System.out.println("");
}
- }
+
+ }
}
diff --git a/webapps/examples/WEB-INF/classes/websocket/EchoStream.java b/webapps/examples/WEB-INF/classes/websocket/EchoStream.java
index 1ab76cc..5a5b4c9 100644
--- a/webapps/examples/WEB-INF/classes/websocket/EchoStream.java
+++ b/webapps/examples/WEB-INF/classes/websocket/EchoStream.java
@@ -17,51 +17,51 @@
package websocket;
import java.io.IOException;
-import java.io.InputStream;
-import java.io.Reader;
-import org.apache.catalina.websocket.StreamInbound;
+import org.apache.catalina.websocket.WebSocketConnection;
+import org.apache.catalina.websocket.WebSocketFrame;
import org.apache.catalina.websocket.WebSocketServlet;
-import org.apache.catalina.websocket.WsOutbound;
-
public class EchoStream extends WebSocketServlet {
- private static final long serialVersionUID = 1L;
-
- @Override
- protected StreamInbound createWebSocketInbound() {
- return new EchoStreamInbound();
- }
+ private static final long serialVersionUID = 1L;
- private static final class EchoStreamInbound extends StreamInbound {
+ @Override
+ protected WebSocketConnection createWebSocketConnection() {
+ // Create a connection that echoes back anything it receives
+ return new EchoStreamConnection();
+ }
+ private final class EchoStreamConnection extends WebSocketConnection
+ {
@Override
- protected void onBinaryData(InputStream is) throws IOException {
- // Simply echo the data to back to the client.
- WsOutbound outbound = getStreamOutbound();
-
- int i = is.read();
- while (i != -1) {
- outbound.writeBinaryData(i);
- i = is.read();
- }
-
- outbound.flush();
+ protected void onTextData(WebSocketFrame frame) throws IOException {
+ System.out.println("");
+
+ // Toggle the masking flag
+ frame.toggleMask();
+
+ // Echo the frame right back
+ writeFrame(frame);
}
@Override
- protected void onTextData(Reader r) throws IOException {
- // Simply echo the data to back to the client.
- WsOutbound outbound = getStreamOutbound();
-
- int c = r.read();
- while (c != -1) {
- outbound.writeTextData((char) c);
- c = r.read();
- }
+ protected void onBinaryData(WebSocketFrame frame) throws IOException {
+ System.out.println("");
+
+ // Toggle the masking flag
+ frame.toggleMask();
+
+ // Echo the frame right back
+ writeFrame(frame);
+ }
- outbound.flush();
+ @Override
+ protected void endOfMessage() {
+ // That was the final fragment
}
- }
+
+ }
}