1 /** 2 * High-level stream API. 3 * 4 * Copyright: © 2015-2016 Dragos Carp 5 * License: Boost Software License - Version 1.0 6 * Authors: Dragos Carp 7 */ 8 module asynchronous.streams; 9 10 import std.algorithm : findSplit; 11 import std.array : Appender, empty; 12 import std.exception : enforce, enforceEx; 13 import std.socket : AddressFamily, AddressInfoFlags, ProtocolType, Socket, 14 SocketOSException; 15 import std.typecons : tuple; 16 import asynchronous.events; 17 import asynchronous.futures; 18 import asynchronous.tasks; 19 import asynchronous.transports; 20 import asynchronous.types; 21 22 enum DEFAULT_LIMIT = 1 << 16; // 2 ** 16 23 24 /** 25 * Incomplete read error. 26 */ 27 class IncompleteReadError : Exception 28 { 29 /** 30 * Read bytes string before the end of stream was reached. 31 */ 32 const(void)[] partial; 33 34 /** 35 * Total number of expected bytes. 36 */ 37 const size_t expected; 38 39 this(const(void)[] partial, size_t expected, string file = __FILE__, 40 size_t line = __LINE__, Throwable next = null) @safe pure 41 { 42 import std.format : format; 43 44 this.partial = partial; 45 this.expected = expected; 46 47 super("%s bytes read on a total of %s expected bytes" 48 .format(partial.length, expected), file, line, next); 49 } 50 } 51 52 /** 53 * A wrapper for $(D_PSYMBOL createConnection()) returning a (reader, writer) 54 * pair. 55 * 56 * The reader returned is a $(D_PSYMBOL StreamReader) instance; the writer is a 57 * $(D_PSYMBOL StreamWriter) instance. 58 * 59 * The arguments are all the usual arguments to $(D_PSYMBOL createConnection()) 60 * except protocolFactory; most common are host and port, with various optional 61 * arguments following. 62 * 63 * Additional arguments are $(D_PSYMBOL eventLoop) (to set the event loop 64 * instance to use) and $(D_PSYMBOL limit) (to set the buffer limit passed to 65 * the $(D_PSYMBOL StreamReader)). 66 * 67 * (If you want to customize the $(D_PSYMBOL StreamReader) and/or 68 * $(D_PSYMBOL StreamReaderProtocol) classes, just copy the code -- there's 69 * really nothing special here except some convenience.) 70 */ 71 @Coroutine 72 auto openConnection(EventLoop eventLoop, in char[] host = null, 73 in char[] service = null, size_t limit = DEFAULT_LIMIT, 74 SslContext sslContext = null, 75 AddressFamily addressFamily = UNSPECIFIED!AddressFamily, 76 ProtocolType protocolType = UNSPECIFIED!ProtocolType, 77 AddressInfoFlags addressInfoFlags = UNSPECIFIED!AddressInfoFlags, 78 Socket socket = null, in char[] localHost = null, 79 in char[] localService = null, in char[] serverHostname = null) 80 { 81 if (eventLoop is null) 82 eventLoop = getEventLoop; 83 84 auto streamReader = new StreamReader(eventLoop, limit); 85 auto connection = eventLoop.createConnection( 86 () => new StreamReaderProtocol(eventLoop, streamReader), host, service, 87 sslContext, addressFamily, protocolType, addressInfoFlags, socket, 88 localHost, localService, serverHostname); 89 auto streamWriter = new StreamWriter(eventLoop, connection.transport, 90 connection.protocol, streamReader); 91 92 return tuple!("reader", "writer")(streamReader, streamWriter); 93 } 94 95 alias ClientConnectedCallback = void delegate(StreamReader, StreamWriter); 96 /** 97 * Start a socket server, call back for each client connected. 98 * 99 * The first parameter, $(D_PSYMBOL clientConnectedCallback), takes two 100 * parameters: $(D_PSYMBOL clientReader), $(D_PSYMBOL clientWriter). $(D_PSYMBOL 101 * clientReader) is a $(D_PSYMBOL StreamReader) object, while $(D_PSYMBOL 102 * clientWriter) is a $(D_PSYMBOL StreamWriter) object. This parameter is a 103 * coroutine, that will be automatically converted into a $(D_PSYMBOL Task). 104 * 105 * The rest of the arguments are all the usual arguments to $(D_PSYMBOL 106 * eventLoop.createServer()) except $(D_PSYMBOL protocolFactory). The return 107 * value is the same as $(D_PSYMBOL eventLoop.createServer()). 108 * 109 * Additional optional keyword arguments are loop (to set the event loop 110 * instance to use) and limit (to set the buffer limit passed to the 111 * $(D_PSYMBOL StreamReader)). 112 * 113 * The return value is the same as $(D_PSYMBOL loop.createServer()), i.e. a 114 * $(D_PSYMBOL Server) object which can be used to stop the service. 115 */ 116 @Coroutine 117 Server startServer(EventLoop eventLoop, 118 ClientConnectedCallback clientConnectedCallback, in char[] host = null, 119 in char[] service = null, size_t limit = DEFAULT_LIMIT, 120 AddressFamily addressFamily = UNSPECIFIED!AddressFamily, 121 AddressInfoFlags addressInfoFlags = AddressInfoFlags.PASSIVE, 122 Socket socket = null, int backlog = 100, SslContext sslContext = null, 123 bool reuseAddress = true) 124 { 125 if (eventLoop is null) 126 eventLoop = getEventLoop; 127 128 Protocol protocolFactory() 129 { 130 auto reader = new StreamReader(eventLoop, limit); 131 return new StreamReaderProtocol(eventLoop, reader, 132 clientConnectedCallback); 133 } 134 135 return eventLoop.createServer(&protocolFactory, host, service, 136 addressFamily, addressInfoFlags, socket, backlog, sslContext, 137 reuseAddress); 138 } 139 140 /** 141 * A wrapper for $(D_PSYMBOL createUnixConnection()) returning a (reader, 142 * writer) pair. 143 * 144 * See $(D_PSYMBOL openConnection()) for information about return value and 145 * other details. 146 */ 147 version (Posix) 148 @Coroutine 149 auto openUnixConnection(EventLoop eventLoop, in char[] path = null, 150 size_t limit = DEFAULT_LIMIT, SslContext sslContext = null, 151 Socket socket = null, in char[] serverHostname = null) 152 { 153 if (eventLoop is null) 154 eventLoop = getEventLoop; 155 156 auto streamReader = new StreamReader(eventLoop, limit); 157 auto connection = eventLoop.createUnixConnection( 158 () => new StreamReaderProtocol(eventLoop, streamReader), path, 159 sslContext, socket, serverHostname); 160 auto streamWriter = new StreamWriter(eventLoop, connection.transport, 161 connection.protocol, streamReader); 162 163 return tuple!("reader", "writer")(streamReader, streamWriter); 164 } 165 166 /** 167 * Start a UNIX Domain Socket server, with a callback for each client connected. 168 * 169 * See $(D_PSYMBOL startServer()) for information about return value and other 170 * details. 171 */ 172 version (Posix) 173 @Coroutine 174 Server startUnixServer(EventLoop eventLoop, 175 ClientConnectedCallback clientConnectedCallback, in char[] path = null, 176 size_t limit = DEFAULT_LIMIT, Socket socket = null, int backlog = 100, 177 SslContext sslContext = null) 178 { 179 if (eventLoop is null) 180 eventLoop = getEventLoop; 181 182 Protocol protocolFactory() 183 { 184 auto reader = new StreamReader(eventLoop, limit); 185 return new StreamReaderProtocol(eventLoop, reader, 186 clientConnectedCallback); 187 } 188 189 return eventLoop.createUnixServer(&protocolFactory, path, socket, backlog, 190 sslContext); 191 } 192 193 /** 194 * Reusable flow control logic for $(D_PSYMBOL StreamWriter.drain()). 195 * 196 * This implements the protocol methods $(D_PSYMBOL pauseWriting()), $(D_PSYMBOL 197 * resumeReading()) and $(D_PSYMBOL connectionLost()). If the subclass 198 * overrides these it must call the super methods. 199 * 200 * $(D_PSYMBOL StreamWriter.drain()) must wait for $(D_PSYMBOL drainHelper()) 201 * coroutine. 202 */ 203 private abstract class FlowControlProtocol : Protocol 204 { 205 private EventLoop eventLoop; 206 private bool paused = false; 207 private Waiter drainWaiter = null; 208 private bool connectionLost_ = false; 209 210 this(EventLoop eventLoop) 211 { 212 if (eventLoop is null) 213 this.eventLoop = getEventLoop; 214 else 215 this.eventLoop = eventLoop; 216 217 } 218 219 override void pauseWriting() 220 in 221 { 222 assert(!paused); 223 } 224 body 225 { 226 paused = true; 227 } 228 229 override void resumeWriting() 230 in 231 { 232 assert(paused); 233 } 234 body 235 { 236 paused = false; 237 238 if (drainWaiter !is null) 239 { 240 auto waiter = drainWaiter; 241 242 drainWaiter = null; 243 if (!waiter.done) 244 waiter.setResult; 245 } 246 } 247 248 override void connectionMade(BaseTransport transport) 249 { 250 throw new NotImplementedException; 251 } 252 253 override void connectionLost(Exception exception) 254 { 255 connectionLost_ = true; 256 257 if (!paused || drainWaiter is null || drainWaiter.done) 258 return; 259 260 auto waiter = drainWaiter; 261 262 drainWaiter = null; 263 if (exception is null) 264 waiter.setResult; 265 else 266 waiter.setException(exception); 267 } 268 269 @Coroutine 270 protected void drainHelper() 271 { 272 enforceEx!SocketOSException(!connectionLost_, "Connection lost"); 273 274 if (!paused) 275 return; 276 277 assert(drainWaiter is null || drainWaiter.cancelled); 278 279 drainWaiter = new Waiter(eventLoop); 280 eventLoop.waitFor(drainWaiter); 281 } 282 283 override void dataReceived(const(void)[] data) 284 { 285 throw new NotImplementedException; 286 } 287 288 override bool eofReceived() 289 { 290 throw new NotImplementedException; 291 } 292 } 293 294 295 /** 296 * Helper class to adapt between $(D_PSYMBOL Protocol) and $(D_PSYMBOL 297 * StreamReader). 298 * 299 * (This is a helper class instead of making StreamReader itself a $(D_PSYMBOL 300 * Protocol) subclass, because the StreamReader has other potential uses, and to 301 * prevent the user of the StreamReader to accidentally call inappropriate 302 * methods of the protocol.) 303 */ 304 class StreamReaderProtocol : FlowControlProtocol 305 { 306 private StreamReader streamReader; 307 private StreamWriter streamWriter; 308 private ClientConnectedCallback clientConnectedCallback; 309 310 this(EventLoop eventLoop, StreamReader streamReader, 311 ClientConnectedCallback clientConnectedCallback = null) 312 { 313 super(eventLoop); 314 this.streamReader = streamReader; 315 this.streamWriter = null; 316 this.clientConnectedCallback = clientConnectedCallback; 317 } 318 319 override void connectionMade(BaseTransport transport) 320 { 321 auto transport1 = cast(Transport) transport; 322 323 enforce(transport1 !is null); 324 streamReader.setTransport(transport1); 325 326 if (clientConnectedCallback !is null) 327 { 328 streamWriter = new StreamWriter(eventLoop, transport1, this, 329 streamReader); 330 eventLoop.createTask(clientConnectedCallback, streamReader, 331 streamWriter); 332 } 333 } 334 335 override void connectionLost(Exception exception) 336 { 337 if (exception is null) 338 streamReader.feedEof; 339 else 340 streamReader.setException(exception); 341 super.connectionLost(exception); 342 } 343 344 override void dataReceived(const(void)[] data) 345 { 346 streamReader.feedData(data); 347 } 348 349 override bool eofReceived() 350 { 351 streamReader.feedEof; 352 return true; 353 } 354 } 355 356 final class StreamReader 357 { 358 private EventLoop eventLoop; 359 private size_t limit; 360 private const(void)[] buffer; 361 private bool eof = false; 362 private Waiter flowWaiter; 363 private Throwable exception_; 364 private ReadTransport transport; 365 private bool paused = false; 366 367 this(EventLoop eventLoop, size_t limit = DEFAULT_LIMIT) 368 { 369 if (eventLoop is null) 370 this.eventLoop = getEventLoop; 371 else 372 this.eventLoop = eventLoop; 373 // The line length limit is a security feature; 374 // it also doubles as half the buffer limit. 375 this.limit = limit; 376 } 377 378 /** 379 * Get the exception. 380 */ 381 @property Throwable exception() 382 { 383 return exception_; 384 } 385 386 /** 387 * Acknowledge the EOF. 388 */ 389 void feedEof() 390 { 391 eof = true; 392 wakeupWaiter; 393 } 394 395 /** 396 * Feed $(D_PSYMBOL data) bytes in the internal buffer. Any operations 397 * waiting for the data will be resumed. 398 */ 399 void feedData(const(void)[] data) 400 in 401 { 402 assert(!eof); // feedData after feedEof 403 } 404 body 405 { 406 if (data.empty) 407 return; 408 409 buffer ~= data; 410 wakeupWaiter; 411 412 if (transport is null || paused || buffer.length <= 2 * limit) 413 return; 414 415 try 416 { 417 transport.pauseReading; 418 paused = true; 419 } 420 catch (NotImplementedException) 421 { 422 // The transport can't be paused. 423 // We'll just have to buffer all data. 424 // Forget the transport so we don't keep trying. 425 transport = null; 426 } 427 } 428 429 /** 430 * Set the exception. 431 */ 432 void setException(Throwable exception) 433 { 434 exception_ = exception; 435 436 if (flowWaiter is null) 437 return; 438 439 auto waiter = flowWaiter; 440 flowWaiter = null; 441 if (!waiter.cancelled) 442 waiter.setException(exception); 443 } 444 445 /** 446 * Set the transport. 447 */ 448 void setTransport(ReadTransport transport) 449 in 450 { 451 assert(this.transport is null); 452 } 453 body 454 { 455 this.transport = transport; 456 } 457 458 /** 459 * Wakeup $(D_PSYMBOL read()) or $(D_PSYMBOL readLine()) function waiting 460 * for data or EOF. 461 */ 462 private void wakeupWaiter() 463 { 464 if (flowWaiter is null) 465 return; 466 467 auto waiter = flowWaiter; 468 flowWaiter = null; 469 if (!waiter.cancelled) 470 waiter.setResult; 471 } 472 473 private void maybeResumeTransport() 474 { 475 if (paused && buffer.length <= limit) 476 { 477 paused = false; 478 transport.resumeReading; 479 } 480 } 481 482 /** 483 * Wait until $(D_PSYMBOL feedData()) or $(D_PSYMBOL feedEof()) is called. 484 */ 485 @Coroutine 486 private void waitForData(string functionName)() 487 { 488 // StreamReader uses a future to link the protocol feed_data() method 489 // to a read coroutine. Running two read coroutines at the same time 490 // would have an unexpected behavior. It would not be possible to know 491 // which coroutine would get the next data. 492 enforce(flowWaiter is null, functionName 493 ~ "() called while another coroutine is already waiting for incoming data"); 494 495 flowWaiter = new Waiter(eventLoop); 496 eventLoop.waitFor(flowWaiter); 497 flowWaiter = null; 498 } 499 500 /** 501 * Read up to $(D_PSYMBOL n) bytes. If $(D_PSYMBOL n) is not provided, or 502 * set to -1, read until EOF and return all read bytes. 503 * 504 * If the EOF was received and the internal buffer is empty, return an empty 505 * array. 506 */ 507 @Coroutine 508 const(void)[] read(ptrdiff_t n = -1) 509 { 510 if (exception !is null) 511 throw exception; 512 513 if (n == 0) 514 return null; 515 516 if (n < 0) 517 { 518 // This used to just loop creating a new waiter hoping to 519 // collect everything in this.buffer, but that would 520 // deadlock if the subprocess sends more than this.limit 521 // bytes. So just call read(limit) until EOF. 522 Appender!(const(char)[]) buff; 523 524 while (true) 525 { 526 const(char)[] chunk = cast(const(char)[]) read(limit); 527 if (chunk.empty) 528 break; 529 buff ~= chunk; 530 } 531 532 return buff.data; 533 } 534 535 if (buffer.empty && !eof) 536 waitForData!"read"; 537 538 const(void)[] data; 539 540 if (buffer.length <= n) 541 { 542 data = buffer; 543 buffer = null; 544 } 545 else 546 { 547 data = buffer[0 .. n]; 548 buffer = buffer[n .. $]; 549 } 550 551 maybeResumeTransport; 552 553 return data; 554 } 555 556 /** 557 * Read one line, where “line” is a sequence of bytes ending with '\n'. 558 * 559 * If EOF is received, and '\n' was not found, the method will return the 560 * partial read bytes. 561 * 562 * If the EOF was received and the internal buffer is empty, return an empty 563 * array. 564 */ 565 @Coroutine 566 const(char)[] readLine() 567 { 568 if (exception !is null) 569 throw exception; 570 571 Appender!(const(char)[]) line; 572 bool notEnough = true; 573 574 while (notEnough) 575 { 576 while (!buffer.empty && notEnough) 577 { 578 const(char)[] buffer1 = cast(const(char)[]) buffer; 579 auto r = buffer1.findSplit(['\n']); 580 line ~= r[0]; 581 buffer = r[2]; 582 583 if (!r[1].empty) 584 notEnough = false; 585 586 if (line.data.length > limit) 587 { 588 maybeResumeTransport; 589 throw new Exception("Line is too long"); 590 } 591 } 592 593 if (eof) 594 break; 595 596 if (notEnough) 597 waitForData!"readLine"; 598 } 599 600 maybeResumeTransport; 601 return line.data; 602 } 603 604 /** 605 * Read exactly $(D_PSYMBOL n) bytes. Raise an $(D_PSYMBOL 606 * IncompleteReadError) if the end of the stream is reached before 607 * $(D_PSYMBOL n) bytes can be read, the $(D_PSYMBOL 608 * IncompleteReadError.partial) attribute of the exception contains the 609 * partial read bytes. 610 */ 611 @Coroutine 612 const(void)[] readExactly(size_t n) 613 { 614 if (exception !is null) 615 throw exception; 616 617 // There used to be "optimized" code here. It created its own 618 // Future and waited until self._buffer had at least the n 619 // bytes, then called read(n). Unfortunately, this could pause 620 // the transport if the argument was larger than the pause 621 // limit (which is twice self._limit). So now we just read() 622 // into a local buffer. 623 624 Appender!(const(char)[]) data; 625 626 while (n > 0) 627 { 628 const(char)[] block = cast(const(char)[]) read(n); 629 630 if (block.empty) 631 throw new IncompleteReadError(data.data, data.data.length + n); 632 633 data ~= block; 634 n -= block.length; 635 } 636 637 return data.data; 638 } 639 640 /** 641 * Return $(D_KEYWORD true) if the buffer is empty and $(D_PSYMBOL feedEof()) 642 * was called. 643 */ 644 bool atEof() const 645 { 646 return eof && buffer.empty; 647 } 648 } 649 650 /** 651 * Wraps a Transport. 652 * 653 * This exposes $(D_PSYMBOL write()), $(D_PSYMBOL writeEof()), $(D_PSYMBOL 654 * canWriteEof()), $(D_PSYMBOL getExtraInfo()) and $(D_PSYMBOL close()). It adds 655 * $(D_PSYMBOL drain()) coroutine for waiting for flow control. It also adds a 656 * $(D_PSYMBOL transport) property which references the Transport directly. 657 * 658 * This class is not thread safe. 659 */ 660 final class StreamWriter 661 { 662 private EventLoop eventLoop; 663 private WriteTransport transport_; 664 private StreamReaderProtocol protocol; 665 private StreamReader streamReader; 666 667 this(EventLoop eventLoop, Transport transport, Protocol protocol, 668 StreamReader streamReader) 669 { 670 this.eventLoop = eventLoop; 671 this.transport_ = transport; 672 this.protocol = cast(StreamReaderProtocol) protocol; 673 enforce(this.protocol !is null, "StreamReaderProtocol is needed"); 674 this.streamReader = streamReader; 675 } 676 677 override string toString() const 678 { 679 import std.format : format; 680 681 return "%s(transport = %s, reader = %s)".format(typeid(this), 682 cast(WriteTransport) transport_, cast(StreamReader) streamReader); 683 } 684 685 /** 686 * Transport. 687 */ 688 @property WriteTransport transport() 689 { 690 return transport_; 691 } 692 693 /** 694 * Return $(D_KEYWORD true) if the transport supports $(D_PSYMBOL 695 * writeEof()), $(D_KEYWORD false) if not. See $(D_PSYMBOL 696 * WriteTransport.canWriteEof()). 697 */ 698 bool canWriteEof() 699 { 700 return transport_.canWriteEof; 701 } 702 703 /** 704 * Close the transport: see $(D_PSYMBOL BaseTransport.close()). 705 */ 706 void close() 707 { 708 return transport_.close; 709 } 710 711 /** 712 * Give the write buffer of the underlying transport a chance to be flushed. 713 * 714 * The intended use is to write: 715 * 716 * w.write(data) 717 * w.drain() 718 * 719 * When the size of the transport buffer reaches the high-water limit (the 720 * protocol is paused), block until the size of the buffer is drained down 721 * to the low-water limit and the protocol is resumed. When there is nothing 722 * to wait for, continue immediately. 723 * 724 * Calling $(D_PSYMBOL drain()) gives the opportunity for the loop to 725 * schedule the write operation and flush the buffer. It should especially 726 * be used when a possibly large amount of data is written to the transport, 727 * and the coroutine does not process the event loop between calls to 728 * $(D_PSYMBOL write()). 729 */ 730 @Coroutine 731 void drain() 732 { 733 if (streamReader !is null) 734 { 735 auto exception = streamReader.exception; 736 if (exception !is null) 737 throw exception; 738 } 739 protocol.drainHelper; 740 } 741 742 /** 743 * Return optional transport information: see $(D_PSYMBOL 744 * BaseTransport.getExtraInfo()). 745 */ 746 auto getExtraInfo(string name)() 747 { 748 return transport_.getExtraInfo!name; 749 } 750 751 /** 752 * Write some data bytes to the transport: see $(D_PSYMBOL 753 * WriteTransport.write()). 754 */ 755 void write(const(void)[] data) 756 { 757 transport_.write(data); 758 } 759 760 /** 761 * Close the write end of the transport after flushing buffered data: see 762 * $(D_PSYMBOL WriteTransport.write_eof()). 763 */ 764 void writeEof() 765 { 766 transport_.writeEof; 767 } 768 }