1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18 package org.apache.commons.net.telnet;
19
20 import java.io.BufferedInputStream;
21 import java.io.IOException;
22 import java.io.InputStream;
23 import java.io.InterruptedIOException;
24
25 /***
26 *
27 * <p>
28 *
29 * <p>
30 * <p>
31 * @author Daniel F. Savarese
32 * @author Bruno D'Avanzo
33 ***/
34
35
36 final class TelnetInputStream extends BufferedInputStream implements Runnable
37 {
38 static final int _STATE_DATA = 0, _STATE_IAC = 1, _STATE_WILL = 2,
39 _STATE_WONT = 3, _STATE_DO = 4, _STATE_DONT = 5,
40 _STATE_SB = 6, _STATE_SE = 7, _STATE_CR = 8, _STATE_IAC_SB = 9;
41
42 private boolean __hasReachedEOF, __isClosed;
43 private boolean __readIsWaiting;
44 private int __receiveState, __queueHead, __queueTail, __bytesAvailable;
45 private int[] __queue;
46 private TelnetClient __client;
47 private Thread __thread;
48 private IOException __ioException;
49
50 /* TERMINAL-TYPE option (start)*/
51 private int __suboption[] = new int[256];
52 private int __suboption_count = 0;
53 /* TERMINAL-TYPE option (end)*/
54
55 private boolean __threaded;
56
57 TelnetInputStream(InputStream input, TelnetClient client,
58 boolean readerThread)
59 {
60 super(input);
61 __client = client;
62 __receiveState = _STATE_DATA;
63 __isClosed = true;
64 __hasReachedEOF = false;
65 // Make it 2049, because when full, one slot will go unused, and we
66 // want a 2048 byte buffer just to have a round number (base 2 that is)
67 __queue = new int[2049];
68 __queueHead = 0;
69 __queueTail = 0;
70 __bytesAvailable = 0;
71 __ioException = null;
72 __readIsWaiting = false;
73 __threaded = false;
74 if(readerThread)
75 __thread = new Thread(this);
76 else
77 __thread = null;
78 }
79
80 TelnetInputStream(InputStream input, TelnetClient client) {
81 this(input, client, true);
82 }
83
84 void _start()
85 {
86 if(__thread == null)
87 return;
88
89 int priority;
90 __isClosed = false;
91 // TODO remove this
92 // Need to set a higher priority in case JVM does not use pre-emptive
93 // threads. This should prevent scheduler induced deadlock (rather than
94 // deadlock caused by a bug in this code).
95 priority = Thread.currentThread().getPriority() + 1;
96 if (priority > Thread.MAX_PRIORITY)
97 priority = Thread.MAX_PRIORITY;
98 __thread.setPriority(priority);
99 __thread.setDaemon(true);
100 __thread.start();
101 __threaded = true;
102 }
103
104
105 // synchronized(__client) critical sections are to protect against
106 // TelnetOutputStream writing through the telnet client at same time
107 // as a processDo/Will/etc. command invoked from TelnetInputStream
108 // tries to write.
109 private int __read(boolean mayBlock) throws IOException
110 {
111 int ch;
112
113 _loop:
114 while (true)
115 {
116
117 // If there is no more data AND we were told not to block, just return -2. (More efficient than exception.)
118 if(!mayBlock && super.available() == 0)
119 return -2;
120
121 // Otherwise, exit only when we reach end of stream.
122 if ((ch = super.read()) < 0)
123 return -1;
124
125 ch = (ch & 0xff);
126
127 /* Code Section added for supporting AYT (start)*/
128 synchronized (__client)
129 {
130 __client._processAYTResponse();
131 }
132 /* Code Section added for supporting AYT (end)*/
133
134 /* Code Section added for supporting spystreams (start)*/
135 __client._spyRead(ch);
136 /* Code Section added for supporting spystreams (end)*/
137
138 _mainSwitch:
139 switch (__receiveState)
140 {
141
142 case _STATE_CR:
143 if (ch == '\0')
144 {
145 // Strip null
146 continue;
147 }
148 // How do we handle newline after cr?
149 // else if (ch == '\n' && _requestedDont(TelnetOption.ECHO) &&
150
151 // Handle as normal data by falling through to _STATE_DATA case
152
153 case _STATE_DATA:
154 if (ch == TelnetCommand.IAC)
155 {
156 __receiveState = _STATE_IAC;
157 continue;
158 }
159
160
161 if (ch == '\r')
162 {
163 synchronized (__client)
164 {
165 if (__client._requestedDont(TelnetOption.BINARY))
166 __receiveState = _STATE_CR;
167 else
168 __receiveState = _STATE_DATA;
169 }
170 }
171 else
172 __receiveState = _STATE_DATA;
173 break;
174
175 case _STATE_IAC:
176 switch (ch)
177 {
178 case TelnetCommand.WILL:
179 __receiveState = _STATE_WILL;
180 continue;
181 case TelnetCommand.WONT:
182 __receiveState = _STATE_WONT;
183 continue;
184 case TelnetCommand.DO:
185 __receiveState = _STATE_DO;
186 continue;
187 case TelnetCommand.DONT:
188 __receiveState = _STATE_DONT;
189 continue;
190 /* TERMINAL-TYPE option (start)*/
191 case TelnetCommand.SB:
192 __suboption_count = 0;
193 __receiveState = _STATE_SB;
194 continue;
195 /* TERMINAL-TYPE option (end)*/
196 case TelnetCommand.IAC:
197 __receiveState = _STATE_DATA;
198 break;
199 default:
200 break;
201 }
202 __receiveState = _STATE_DATA;
203 continue;
204 case _STATE_WILL:
205 synchronized (__client)
206 {
207 __client._processWill(ch);
208 __client._flushOutputStream();
209 }
210 __receiveState = _STATE_DATA;
211 continue;
212 case _STATE_WONT:
213 synchronized (__client)
214 {
215 __client._processWont(ch);
216 __client._flushOutputStream();
217 }
218 __receiveState = _STATE_DATA;
219 continue;
220 case _STATE_DO:
221 synchronized (__client)
222 {
223 __client._processDo(ch);
224 __client._flushOutputStream();
225 }
226 __receiveState = _STATE_DATA;
227 continue;
228 case _STATE_DONT:
229 synchronized (__client)
230 {
231 __client._processDont(ch);
232 __client._flushOutputStream();
233 }
234 __receiveState = _STATE_DATA;
235 continue;
236 /* TERMINAL-TYPE option (start)*/
237 case _STATE_SB:
238 switch (ch)
239 {
240 case TelnetCommand.IAC:
241 __receiveState = _STATE_IAC_SB;
242 continue;
243 default:
244 // store suboption char
245 __suboption[__suboption_count++] = ch;
246 break;
247 }
248 __receiveState = _STATE_SB;
249 continue;
250 case _STATE_IAC_SB:
251 switch (ch)
252 {
253 case TelnetCommand.SE:
254 synchronized (__client)
255 {
256 __client._processSuboption(__suboption, __suboption_count);
257 __client._flushOutputStream();
258 }
259 __receiveState = _STATE_DATA;
260 continue;
261 default:
262 __receiveState = _STATE_SB;
263 break;
264 }
265 __receiveState = _STATE_DATA;
266 continue;
267 /* TERMINAL-TYPE option (end)*/
268 }
269
270 break;
271 }
272
273 return ch;
274 }
275
276 // synchronized(__client) critical sections are to protect against
277 // TelnetOutputStream writing through the telnet client at same time
278 // as a processDo/Will/etc. command invoked from TelnetInputStream
279 // tries to write.
280 private void __processChar(int ch) throws InterruptedException
281 {
282 // Critical section because we're altering __bytesAvailable,
283 // __queueTail, and the contents of _queue.
284 synchronized (__queue)
285 {
286 while (__bytesAvailable >= __queue.length - 1)
287 {
288 // The queue is full. We need to wait before adding any more data to it. Hopefully the stream owner
289 // will consume some data soon!
290 if(__threaded)
291 {
292 __queue.notify();
293 try
294 {
295 __queue.wait();
296 }
297 catch (InterruptedException e)
298 {
299 throw e;
300 }
301 }
302 else
303 {
304 // We've been asked to add another character to the queue, but it is already full and there's
305 // no other thread to drain it. This should not have happened!
306 throw new IllegalStateException("Queue is full! Cannot process another character.");
307 }
308 }
309
310 // Need to do this in case we're not full, but block on a read
311 if (__readIsWaiting && __threaded)
312 {
313 __queue.notify();
314 }
315
316 __queue[__queueTail] = ch;
317 ++__bytesAvailable;
318
319 if (++__queueTail >= __queue.length)
320 __queueTail = 0;
321 }
322 }
323
324 @Override
325 public int read() throws IOException
326 {
327 // Critical section because we're altering __bytesAvailable,
328 // __queueHead, and the contents of _queue in addition to
329 // testing value of __hasReachedEOF.
330 synchronized (__queue)
331 {
332
333 while (true)
334 {
335 if (__ioException != null)
336 {
337 IOException e;
338 e = __ioException;
339 __ioException = null;
340 throw e;
341 }
342
343 if (__bytesAvailable == 0)
344 {
345 // Return -1 if at end of file
346 if (__hasReachedEOF)
347 return -1;
348
349 // Otherwise, we have to wait for queue to get something
350 if(__threaded)
351 {
352 __queue.notify();
353 try
354 {
355 __readIsWaiting = true;
356 __queue.wait();
357 __readIsWaiting = false;
358 }
359 catch (InterruptedException e)
360 {
361 throw new InterruptedIOException("Fatal thread interruption during read.");
362 }
363 }
364 else
365 {
366 //__alreadyread = false;
367 __readIsWaiting = true;
368 int ch;
369 boolean mayBlock = true; // block on the first read only
370
371 do
372 {
373 try
374 {
375 if ((ch = __read(mayBlock)) < 0)
376 if(ch != -2)
377 return (ch);
378 }
379 catch (InterruptedIOException e)
380 {
381 synchronized (__queue)
382 {
383 __ioException = e;
384 __queue.notifyAll();
385 try
386 {
387 __queue.wait(100);
388 }
389 catch (InterruptedException interrupted)
390 {
391 }
392 }
393 return (-1);
394 }
395
396
397 try
398 {
399 if(ch != -2)
400 {
401 __processChar(ch);
402 }
403 }
404 catch (InterruptedException e)
405 {
406 if (__isClosed)
407 return (-1);
408 }
409
410 // Reads should not block on subsequent iterations. Potentially, this could happen if the
411 // remaining buffered socket data consists entirely of Telnet command sequence and no "user" data.
412 mayBlock = false;
413
414 }
415 // Continue reading as long as there is data available and the queue is not full.
416 while (super.available() > 0 && __bytesAvailable < __queue.length - 1);
417
418 __readIsWaiting = false;
419 }
420 continue;
421 }
422 else
423 {
424 int ch;
425
426 ch = __queue[__queueHead];
427
428 if (++__queueHead >= __queue.length)
429 __queueHead = 0;
430
431 --__bytesAvailable;
432
433 // Need to explicitly notify() so available() works properly
434 if(__bytesAvailable == 0 && __threaded) {
435 __queue.notify();
436 }
437
438 return ch;
439 }
440 }
441 }
442 }
443
444
445 /***
446 * Reads the next number of bytes from the stream into an array and
447 * returns the number of bytes read. Returns -1 if the end of the
448 * stream has been reached.
449 * <p>
450 * @param buffer The byte array in which to store the data.
451 * @return The number of bytes read. Returns -1 if the
452 * end of the message has been reached.
453 * @exception IOException If an error occurs in reading the underlying
454 * stream.
455 ***/
456 @Override
457 public int read(byte buffer[]) throws IOException
458 {
459 return read(buffer, 0, buffer.length);
460 }
461
462
463 /***
464 * Reads the next number of bytes from the stream into an array and returns
465 * the number of bytes read. Returns -1 if the end of the
466 * message has been reached. The characters are stored in the array
467 * starting from the given offset and up to the length specified.
468 * <p>
469 * @param buffer The byte array in which to store the data.
470 * @param offset The offset into the array at which to start storing data.
471 * @param length The number of bytes to read.
472 * @return The number of bytes read. Returns -1 if the
473 * end of the stream has been reached.
474 * @exception IOException If an error occurs while reading the underlying
475 * stream.
476 ***/
477 @Override
478 public int read(byte buffer[], int offset, int length) throws IOException
479 {
480 int ch, off;
481
482 if (length < 1)
483 return 0;
484
485 // Critical section because run() may change __bytesAvailable
486 synchronized (__queue)
487 {
488 if (length > __bytesAvailable)
489 length = __bytesAvailable;
490 }
491
492 if ((ch = read()) == -1)
493 return -1;
494
495 off = offset;
496
497 do
498 {
499 buffer[offset++] = (byte)ch;
500 }
501 while (--length > 0 && (ch = read()) != -1);
502
503 //__client._spyRead(buffer, off, offset - off);
504 return (offset - off);
505 }
506
507
508 /*** Returns false. Mark is not supported. ***/
509 @Override
510 public boolean markSupported()
511 {
512 return false;
513 }
514
515 @Override
516 public int available() throws IOException
517 {
518 // Critical section because run() may change __bytesAvailable
519 synchronized (__queue)
520 {
521 return __bytesAvailable;
522 }
523 }
524
525
526 // Cannot be synchronized. Will cause deadlock if run() is blocked
527 // in read because BufferedInputStream read() is synchronized.
528 @Override
529 public void close() throws IOException
530 {
531 // Completely disregard the fact thread may still be running.
532 // We can't afford to block on this close by waiting for
533 // thread to terminate because few if any JVM's will actually
534 // interrupt a system read() from the interrupt() method.
535 super.close();
536
537 synchronized (__queue)
538 {
539 __hasReachedEOF = true;
540 __isClosed = true;
541
542 if (__thread != null && __thread.isAlive())
543 {
544 __thread.interrupt();
545 }
546
547 __queue.notifyAll();
548 }
549
550 __threaded = false;
551 }
552
553 public void run()
554 {
555 int ch;
556
557 try
558 {
559 _outerLoop:
560 while (!__isClosed)
561 {
562 try
563 {
564 if ((ch = __read(true)) < 0)
565 break;
566 }
567 catch (InterruptedIOException e)
568 {
569 synchronized (__queue)
570 {
571 __ioException = e;
572 __queue.notifyAll();
573 try
574 {
575 __queue.wait(100);
576 }
577 catch (InterruptedException interrupted)
578 {
579 if (__isClosed)
580 break _outerLoop;
581 }
582 continue;
583 }
584 } catch(RuntimeException re) {
585 // We treat any runtime exceptions as though the
586 // stream has been closed. We close the
587 // underlying stream just to be sure.
588 super.close();
589 // Breaking the loop has the effect of setting
590 // the state to closed at the end of the method.
591 break _outerLoop;
592 }
593
594 try
595 {
596 __processChar(ch);
597 }
598 catch (InterruptedException e)
599 {
600 if (__isClosed)
601 break _outerLoop;
602 }
603 }
604 }
605 catch (IOException ioe)
606 {
607 synchronized (__queue)
608 {
609 __ioException = ioe;
610 }
611 }
612
613 synchronized (__queue)
614 {
615 __isClosed = true; // Possibly redundant
616 __hasReachedEOF = true;
617 __queue.notify();
618 }
619
620 __threaded = false;
621 }
622 }
623
624 /* Emacs configuration
625 * Local variables: **
626 * mode: java **
627 * c-basic-offset: 4 **
628 * indent-tabs-mode: nil **
629 * End: **
630 */