Line 0
Link Here
|
|
|
1 |
/* |
2 |
* Licensed to the Apache Software Foundation (ASF) under one or more |
3 |
* contributor license agreements. See the NOTICE file distributed with |
4 |
* this work for additional information regarding copyright ownership. |
5 |
* The ASF licenses this file to You under the Apache License, Version 2.0 |
6 |
* (the "License"); you may not use this file except in compliance with |
7 |
* the License. You may obtain a copy of the License at |
8 |
* |
9 |
* http://www.apache.org/licenses/LICENSE-2.0 |
10 |
* |
11 |
* Unless required by applicable law or agreed to in writing, software |
12 |
* distributed under the License is distributed on an "AS IS" BASIS, |
13 |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
14 |
* See the License for the specific language governing permissions and |
15 |
* limitations under the License. |
16 |
*/ |
17 |
package org.apache.catalina.websocket; |
18 |
|
19 |
import java.io.IOException; |
20 |
import java.nio.ByteBuffer; |
21 |
import java.nio.CharBuffer; |
22 |
import java.nio.charset.CharsetEncoder; |
23 |
import java.nio.charset.CoderResult; |
24 |
|
25 |
import org.apache.catalina.CometEvent; |
26 |
import org.apache.tomcat.util.buf.B2CConverter; |
27 |
import org.apache.tomcat.util.res.StringManager; |
28 |
|
29 |
/** |
30 |
* Provides the means to write WebSocket messages to the client. All methods |
31 |
* that write to the client (or update a buffer that is later written to the |
32 |
* client) are synchronized to prevent multiple threads trying to write to the |
33 |
* client at the same time. |
34 |
*/ |
35 |
public class WsOutbound { |
36 |
|
37 |
private static final StringManager sm = |
38 |
StringManager.getManager(Constants.Package); |
39 |
public static final int DEFAULT_BUFFER_SIZE = 8192; |
40 |
|
41 |
private CometEvent event; |
42 |
private ByteBuffer bb; |
43 |
private CharBuffer cb; |
44 |
private boolean closed = false; |
45 |
private Boolean text = null; |
46 |
private boolean firstFrame = true; |
47 |
|
48 |
|
49 |
public WsOutbound(CometEvent event) { |
50 |
this(event, DEFAULT_BUFFER_SIZE, DEFAULT_BUFFER_SIZE); |
51 |
} |
52 |
|
53 |
|
54 |
public WsOutbound(CometEvent event, int byteBufferSize, |
55 |
int charBufferSize) { |
56 |
this.event = event; |
57 |
this.bb = ByteBuffer.allocate(byteBufferSize); |
58 |
this.cb = CharBuffer.allocate(charBufferSize); |
59 |
} |
60 |
|
61 |
|
62 |
/** |
63 |
* Adds the data to the buffer for binary data. If a textual message is |
64 |
* currently in progress that message will be completed and a new binary |
65 |
* message started. If the buffer for binary data is full, the buffer will |
66 |
* be flushed and a new binary continuation fragment started. |
67 |
* |
68 |
* @param b The byte (only the least significant byte is used) of data to |
69 |
* send to the client. |
70 |
* |
71 |
* @throws IOException If a flush is required and an error occurs writing |
72 |
* the WebSocket frame to the client |
73 |
*/ |
74 |
public synchronized void writeBinaryData(int b) throws IOException { |
75 |
if (closed) { |
76 |
throw new IOException(sm.getString("outbound.closed")); |
77 |
} |
78 |
|
79 |
if (bb.position() == bb.capacity()) { |
80 |
doFlush(false); |
81 |
} |
82 |
if (text == null) { |
83 |
text = Boolean.FALSE; |
84 |
} else if (text == Boolean.TRUE) { |
85 |
// Flush the character data |
86 |
flush(); |
87 |
text = Boolean.FALSE; |
88 |
} |
89 |
bb.put((byte) (b & 0xFF)); |
90 |
} |
91 |
|
92 |
|
93 |
/** |
94 |
* Adds the data to the buffer for textual data. If a binary message is |
95 |
* currently in progress that message will be completed and a new textual |
96 |
* message started. If the buffer for textual data is full, the buffer will |
97 |
* be flushed and a new textual continuation fragment started. |
98 |
* |
99 |
* @param c The character to send to the client. |
100 |
* |
101 |
* @throws IOException If a flush is required and an error occurs writing |
102 |
* the WebSocket frame to the client |
103 |
*/ |
104 |
public synchronized void writeTextData(char c) throws IOException { |
105 |
if (closed) { |
106 |
throw new IOException(sm.getString("outbound.closed")); |
107 |
} |
108 |
|
109 |
if (cb.position() == cb.capacity()) { |
110 |
doFlush(false); |
111 |
} |
112 |
|
113 |
if (text == null) { |
114 |
text = Boolean.TRUE; |
115 |
} else if (text == Boolean.FALSE) { |
116 |
// Flush the binary data |
117 |
flush(); |
118 |
text = Boolean.TRUE; |
119 |
} |
120 |
cb.append(c); |
121 |
} |
122 |
|
123 |
|
124 |
/** |
125 |
* Flush any message (binary or textual) that may be buffered and then send |
126 |
* a WebSocket binary message as a single frame with the provided buffer as |
127 |
* the payload of the message. |
128 |
* |
129 |
* @param msgBb The buffer containing the payload |
130 |
* |
131 |
* @throws IOException If an error occurs writing to the client |
132 |
*/ |
133 |
public synchronized void writeBinaryMessage(ByteBuffer msgBb) |
134 |
throws IOException { |
135 |
|
136 |
if (closed) { |
137 |
throw new IOException(sm.getString("outbound.closed")); |
138 |
} |
139 |
|
140 |
if (text != null) { |
141 |
// Empty the buffer |
142 |
flush(); |
143 |
} |
144 |
text = Boolean.FALSE; |
145 |
doWriteBytes(msgBb, true); |
146 |
} |
147 |
|
148 |
|
149 |
/** |
150 |
* Flush any message (binary or textual) that may be buffered and then send |
151 |
* a WebSocket text message as a single frame with the provided buffer as |
152 |
* the payload of the message. |
153 |
* |
154 |
* @param msgCb The buffer containing the payload |
155 |
* |
156 |
* @throws IOException If an error occurs writing to the client |
157 |
*/ |
158 |
public synchronized void writeTextMessage(CharBuffer msgCb) |
159 |
throws IOException { |
160 |
|
161 |
if (closed) { |
162 |
throw new IOException(sm.getString("outbound.closed")); |
163 |
} |
164 |
|
165 |
if (text != null) { |
166 |
// Empty the buffer |
167 |
flush(); |
168 |
} |
169 |
text = Boolean.TRUE; |
170 |
doWriteText(msgCb, true); |
171 |
} |
172 |
|
173 |
|
174 |
/** |
175 |
* Flush any message (binary or textual) that may be buffered. |
176 |
* |
177 |
* @throws IOException If an error occurs writing to the client |
178 |
*/ |
179 |
public synchronized void flush() throws IOException { |
180 |
if (closed) { |
181 |
throw new IOException(sm.getString("outbound.closed")); |
182 |
} |
183 |
doFlush(true); |
184 |
} |
185 |
|
186 |
|
187 |
private void doFlush(boolean finalFragment) throws IOException { |
188 |
if (text == null) { |
189 |
// No data |
190 |
return; |
191 |
} |
192 |
if (text.booleanValue()) { |
193 |
cb.flip(); |
194 |
doWriteText(cb, finalFragment); |
195 |
} else { |
196 |
bb.flip(); |
197 |
doWriteBytes(bb, finalFragment); |
198 |
} |
199 |
} |
200 |
|
201 |
|
202 |
/** |
203 |
* Respond to a client close by sending a close that echoes the status code |
204 |
* and message. |
205 |
* |
206 |
* @param frame The close frame received from a client |
207 |
* |
208 |
* @throws IOException If an error occurs writing to the client |
209 |
*/ |
210 |
protected void close(WsFrame frame) throws IOException { |
211 |
if (frame.getPayLoadLength() > 0) { |
212 |
// Must be status (2 bytes) plus optional message |
213 |
if (frame.getPayLoadLength() == 1) { |
214 |
throw new IOException(); |
215 |
} |
216 |
int status = (frame.getPayLoad().get() & 0xFF) << 8; |
217 |
status += frame.getPayLoad().get() & 0xFF; |
218 |
|
219 |
if (validateCloseStatus(status)) { |
220 |
// Echo the status back to the client |
221 |
close(status, frame.getPayLoad()); |
222 |
} else { |
223 |
// Invalid close code |
224 |
close(Constants.STATUS_PROTOCOL_ERROR, null); |
225 |
} |
226 |
} else { |
227 |
// No status |
228 |
close(0, null); |
229 |
} |
230 |
} |
231 |
|
232 |
|
233 |
private boolean validateCloseStatus(int status) { |
234 |
|
235 |
if (status == Constants.STATUS_CLOSE_NORMAL || |
236 |
status == Constants.STATUS_SHUTDOWN || |
237 |
status == Constants.STATUS_PROTOCOL_ERROR || |
238 |
status == Constants.STATUS_UNEXPECTED_DATA_TYPE || |
239 |
status == Constants.STATUS_BAD_DATA || |
240 |
status == Constants.STATUS_POLICY_VIOLATION || |
241 |
status == Constants.STATUS_MESSAGE_TOO_LARGE || |
242 |
status == Constants.STATUS_REQUIRED_EXTENSION || |
243 |
status == Constants.STATUS_UNEXPECTED_CONDITION || |
244 |
(status > 2999 && status < 5000)) { |
245 |
// Other 1xxx reserved / not permitted |
246 |
// 2xxx reserved |
247 |
// 3xxx framework defined |
248 |
// 4xxx application defined |
249 |
return true; |
250 |
} |
251 |
// <1000 unused |
252 |
// >4999 undefined |
253 |
return false; |
254 |
} |
255 |
|
256 |
|
257 |
/** |
258 |
* Send a close message to the client |
259 |
* |
260 |
* @param status Must be a valid status code or zero to send no code |
261 |
* @param data Optional message. If message is defined, a valid status |
262 |
* code must be provided. |
263 |
* |
264 |
* @throws IOException If an error occurs writing to the client |
265 |
*/ |
266 |
public synchronized void close(int status, ByteBuffer data) |
267 |
throws IOException { |
268 |
|
269 |
if (closed) { |
270 |
return; |
271 |
} |
272 |
closed = true; |
273 |
|
274 |
event.getHttpServletResponse().getOutputStream().write(0x88); |
275 |
if (status == 0) { |
276 |
event.getHttpServletResponse().getOutputStream().write(0); |
277 |
} else if (data == null || data.position() == data.limit()) { |
278 |
event.getHttpServletResponse().getOutputStream().write(2); |
279 |
event.getHttpServletResponse().getOutputStream().write(status >>> 8); |
280 |
event.getHttpServletResponse().getOutputStream().write(status); |
281 |
} else { |
282 |
event.getHttpServletResponse().getOutputStream().write(2 + data.limit() - data.position()); |
283 |
event.getHttpServletResponse().getOutputStream().write(status >>> 8); |
284 |
event.getHttpServletResponse().getOutputStream().write(status); |
285 |
event.getHttpServletResponse().getOutputStream().write(data.array(), data.position(), |
286 |
data.limit() - data.position()); |
287 |
} |
288 |
event.getHttpServletResponse().flushBuffer(); |
289 |
|
290 |
bb = null; |
291 |
cb = null; |
292 |
event = null; |
293 |
} |
294 |
|
295 |
|
296 |
/** |
297 |
* Send a pong message to the client |
298 |
* |
299 |
* @param data Optional message. |
300 |
* |
301 |
* @throws IOException If an error occurs writing to the client |
302 |
*/ |
303 |
public synchronized void pong(ByteBuffer data) throws IOException { |
304 |
|
305 |
if (closed) { |
306 |
throw new IOException(sm.getString("outbound.closed")); |
307 |
} |
308 |
|
309 |
doFlush(true); |
310 |
|
311 |
event.getHttpServletResponse().getOutputStream().write(0x8A); |
312 |
if (data == null) { |
313 |
event.getHttpServletResponse().getOutputStream().write(0); |
314 |
} else { |
315 |
event.getHttpServletResponse().getOutputStream().write(data.limit() - data.position()); |
316 |
event.getHttpServletResponse().getOutputStream().write(data.array(), data.position(), |
317 |
data.limit() - data.position()); |
318 |
} |
319 |
|
320 |
event.getHttpServletResponse().flushBuffer(); |
321 |
} |
322 |
|
323 |
|
324 |
/** |
325 |
* Writes the provided bytes as the payload in a new WebSocket frame. |
326 |
* |
327 |
* @param buffer The bytes to include in the payload. |
328 |
* @param finalFragment Do these bytes represent the final fragment of a |
329 |
* WebSocket message? |
330 |
* @throws IOException |
331 |
*/ |
332 |
private void doWriteBytes(ByteBuffer buffer, boolean finalFragment) |
333 |
throws IOException { |
334 |
|
335 |
// Work out the first byte |
336 |
int first = 0x00; |
337 |
if (finalFragment) { |
338 |
first = first + 0x80; |
339 |
} |
340 |
if (firstFrame) { |
341 |
if (text.booleanValue()) { |
342 |
first = first + 0x1; |
343 |
} else { |
344 |
first = first + 0x2; |
345 |
} |
346 |
} |
347 |
// Continuation frame is OpCode 0 |
348 |
event.getHttpServletResponse().getOutputStream().write(first); |
349 |
|
350 |
if (buffer.limit() < 126) { |
351 |
event.getHttpServletResponse().getOutputStream().write(buffer.limit()); |
352 |
} else if (buffer.limit() < 65536) { |
353 |
event.getHttpServletResponse().getOutputStream().write(126); |
354 |
event.getHttpServletResponse().getOutputStream().write(buffer.limit() >>> 8); |
355 |
event.getHttpServletResponse().getOutputStream().write(buffer.limit() & 0xFF); |
356 |
} else { |
357 |
// Will never be more than 2^31-1 |
358 |
event.getHttpServletResponse().getOutputStream().write(127); |
359 |
event.getHttpServletResponse().getOutputStream().write(0); |
360 |
event.getHttpServletResponse().getOutputStream().write(0); |
361 |
event.getHttpServletResponse().getOutputStream().write(0); |
362 |
event.getHttpServletResponse().getOutputStream().write(0); |
363 |
event.getHttpServletResponse().getOutputStream().write(buffer.limit() >>> 24); |
364 |
event.getHttpServletResponse().getOutputStream().write(buffer.limit() >>> 16); |
365 |
event.getHttpServletResponse().getOutputStream().write(buffer.limit() >>> 8); |
366 |
event.getHttpServletResponse().getOutputStream().write(buffer.limit() & 0xFF); |
367 |
|
368 |
} |
369 |
|
370 |
// Write the content |
371 |
event.getHttpServletResponse().getOutputStream().write(buffer.array(), 0, buffer.limit()); |
372 |
event.getHttpServletResponse().flushBuffer(); |
373 |
|
374 |
// Reset |
375 |
if (finalFragment) { |
376 |
text = null; |
377 |
firstFrame = true; |
378 |
} else { |
379 |
firstFrame = false; |
380 |
} |
381 |
bb.clear(); |
382 |
} |
383 |
|
384 |
|
385 |
/* |
386 |
* Convert the textual message to bytes and then output it. |
387 |
*/ |
388 |
private void doWriteText(CharBuffer buffer, boolean finalFragment) |
389 |
throws IOException { |
390 |
CharsetEncoder encoder = B2CConverter.UTF_8.newEncoder(); |
391 |
do { |
392 |
CoderResult cr = encoder.encode(buffer, bb, true); |
393 |
if (cr.isError()) { |
394 |
cr.throwException(); |
395 |
} |
396 |
bb.flip(); |
397 |
if (buffer.hasRemaining()) { |
398 |
doWriteBytes(bb, false); |
399 |
} else { |
400 |
doWriteBytes(bb, finalFragment); |
401 |
} |
402 |
} while (buffer.hasRemaining()); |
403 |
|
404 |
// Reset - bb will be cleared in doWriteBytes() |
405 |
cb.clear(); |
406 |
} |
407 |
} |