1 /** 2 * High-level stream API. 3 */ 4 module asynchronous.streams; 5 6 import std.algorithm : findSplit; 7 import std.array : Appender, empty; 8 import std.exception : enforce, enforceEx; 9 import std.socket : AddressFamily, AddressInfoFlags, ProtocolType, Socket, 10 SocketOSException; 11 import std.typecons : tuple; 12 import asynchronous.events; 13 import asynchronous.futures; 14 import asynchronous.tasks; 15 import asynchronous.transports; 16 import asynchronous.types; 17 18 enum DEFAULT_LIMIT = 1 << 16; // 2 ** 16 19 20 /** 21 * Incomplete read error. 22 */ 23 class IncompleteReadError : Exception 24 { 25 /** 26 * Read bytes string before the end of stream was reached. 27 */ 28 const(void)[] partial; 29 30 /** 31 * Total number of expected bytes. 32 */ 33 const size_t expected; 34 35 this(const(void)[] partial, size_t expected, string file = __FILE__, 36 size_t line = __LINE__, Throwable next = null) @safe pure 37 { 38 import std.format : format; 39 40 this.partial = partial; 41 this.expected = expected; 42 43 super("%s bytes read on a total of %s expected bytes" 44 .format(partial.length, expected), file, line, next); 45 } 46 } 47 48 /** 49 * A wrapper for $(D_PSYMBOL createConnection()) returning a (reader, writer) 50 * pair. 51 * 52 * The reader returned is a $(D_PSYMBOL StreamReader) instance; the writer is a 53 * $(D_PSYMBOL StreamWriter) instance. 54 * 55 * The arguments are all the usual arguments to $(D_PSYMBOL createConnection()) 56 * except protocolFactory; most common are host and port, with various optional 57 * arguments following. 58 * 59 * Additional arguments are $(D_PSYMBOL eventLoop) (to set the event loop 60 * instance to use) and $(D_PSYMBOL limit) (to set the buffer limit passed to 61 * the $(D_PSYMBOL StreamReader)). 62 * 63 * (If you want to customize the $(D_PSYMBOL StreamReader) and/or 64 * $(D_PSYMBOL StreamReaderProtocol) classes, just copy the code -- there's 65 * really nothing special here except some convenience.) 66 */ 67 @Coroutine 68 auto openConnection(EventLoop eventLoop, in char[] host = null, 69 in char[] service = null, size_t limit = DEFAULT_LIMIT, 70 SslContext sslContext = null, 71 AddressFamily addressFamily = UNSPECIFIED!AddressFamily, 72 ProtocolType protocolType = UNSPECIFIED!ProtocolType, 73 AddressInfoFlags addressInfoFlags = UNSPECIFIED!AddressInfoFlags, 74 Socket socket = null, in char[] localHost = null, 75 in char[] localService = null, in char[] serverHostname = null) 76 { 77 if (eventLoop is null) 78 eventLoop = getEventLoop; 79 80 auto streamReader = new StreamReader(eventLoop, limit); 81 auto connection = eventLoop.createConnection( 82 () => new StreamReaderProtocol(eventLoop, streamReader), host, service, 83 sslContext, addressFamily, protocolType, addressInfoFlags, socket, 84 localHost, localService, serverHostname); 85 auto streamWriter = new StreamWriter(eventLoop, connection.transport, 86 connection.protocol, streamReader); 87 88 return tuple!("reader", "writer")(streamReader, streamWriter); 89 } 90 91 alias ClientConnectedCallback = void delegate(StreamReader, StreamWriter); 92 /** 93 * Start a socket server, call back for each client connected. 94 * 95 * The first parameter, $(D_PSYMBOL clientConnectedCallback), takes two 96 * parameters: $(D_PSYMBOL clientReader), $(D_PSYMBOL clientWriter). $(D_PSYMBOL 97 * clientReader) is a $(D_PSYMBOL StreamReader) object, while $(D_PSYMBOL 98 * clientWriter) is a $(D_PSYMBOL StreamWriter) object. This parameter is a 99 * coroutine, that will be automatically converted into a $(D_PSYMBOL Task). 100 * 101 * The rest of the arguments are all the usual arguments to $(D_PSYMBOL 102 * eventLoop.createServer()) except $(D_PSYMBOL protocolFactory). The return 103 * value is the same as $(D_PSYMBOL eventLoop.create_server()). 104 * 105 * Additional optional keyword arguments are loop (to set the event loop 106 * instance to use) and limit (to set the buffer limit passed to the 107 * $(D_PSYMBOL StreamReader)). 108 * 109 * The return value is the same as $(D_PSYMBOL loop.createServer()), i.e. a 110 * $(D_PSYMBOL Server) object which can be used to stop the service. 111 */ 112 @Coroutine 113 Server startServer(EventLoop eventLoop, 114 ClientConnectedCallback clientConnectedCallback, in char[] host = null, 115 in char[] service = null, size_t limit = DEFAULT_LIMIT, 116 AddressFamily addressFamily = UNSPECIFIED!AddressFamily, 117 AddressInfoFlags addressInfoFlags = AddressInfoFlags.PASSIVE, 118 Socket socket = null, int backlog = 100, SslContext sslContext = null, 119 bool reuseAddress = true) 120 { 121 if (eventLoop is null) 122 eventLoop = getEventLoop; 123 124 Protocol protocolFactory() 125 { 126 auto reader = new StreamReader(eventLoop, limit); 127 return new StreamReaderProtocol(eventLoop, reader, 128 clientConnectedCallback); 129 } 130 131 return eventLoop.createServer(&protocolFactory, host, service, 132 addressFamily, addressInfoFlags, socket, backlog, sslContext, 133 reuseAddress); 134 } 135 136 /** 137 * A wrapper for $(D_PSYMBOL createUnixConnection()) returning a (reader, 138 * writer) pair. 139 * 140 * See $(D_PSYMBOL openConnection()) for information about return value and 141 * other details. 142 */ 143 version (Posix) 144 @Coroutine 145 auto openUnixConnection(EventLoop eventLoop, in char[] path = null, 146 size_t limit = DEFAULT_LIMIT, SslContext sslContext = null, 147 Socket socket = null, in char[] serverHostname = null) 148 { 149 if (eventLoop is null) 150 eventLoop = getEventLoop; 151 152 auto streamReader = new StreamReader(eventLoop, limit); 153 auto connection = eventLoop.createUnixConnection( 154 () => new StreamReaderProtocol(eventLoop, streamReader), path, 155 sslContext, socket, serverHostname); 156 auto streamWriter = new StreamWriter(eventLoop, connection.transport, 157 connection.protocol, streamReader); 158 159 return tuple!("reader", "writer")(streamReader, streamWriter); 160 } 161 162 /** 163 * Start a UNIX Domain Socket server, with a callback for each client connected. 164 * 165 * See $(D_PSYMBOL startServer()) for information about return value and other 166 * details. 167 */ 168 version (Posix) 169 @Coroutine 170 Server startUnixServer(EventLoop eventLoop, 171 ClientConnectedCallback clientConnectedCallback, in char[] path = null, 172 size_t limit = DEFAULT_LIMIT, Socket socket = null, int backlog = 100, 173 SslContext sslContext = null) 174 { 175 if (eventLoop is null) 176 eventLoop = getEventLoop; 177 178 Protocol protocolFactory() 179 { 180 auto reader = new StreamReader(eventLoop, limit); 181 return new StreamReaderProtocol(eventLoop, reader, 182 clientConnectedCallback); 183 } 184 185 return eventLoop.createUnixServer(&protocolFactory, path, socket, backlog, 186 sslContext); 187 } 188 189 /** 190 * Reusable flow control logic for $(D_PSYMBOL StreamWriter.drain()). 191 * 192 * This implements the protocol methods $(D_PSYMBOL pauseWriting()), $(D_PSYMBOL 193 * resumeReading()) and $(D_PSYMBOL connectionLost()). If the subclass 194 * overrides these it must call the super methods. 195 * 196 * $(D_PSYMBOL StreamWriter.drain()) must wait for $(D_PSYMBOL drainHelper()) 197 * coroutine. 198 */ 199 private abstract class FlowControlProtocol : Protocol 200 { 201 private EventLoop eventLoop; 202 private bool paused = false; 203 private Waiter drainWaiter = null; 204 private bool connectionLost_ = false; 205 206 this(EventLoop eventLoop) 207 { 208 if (eventLoop is null) 209 this.eventLoop = getEventLoop; 210 else 211 this.eventLoop = eventLoop; 212 213 } 214 215 override void pauseWriting() 216 in 217 { 218 assert(!paused); 219 } 220 body 221 { 222 paused = true; 223 } 224 225 override void resumeWriting() 226 in 227 { 228 assert(paused); 229 } 230 body 231 { 232 paused = false; 233 234 if (drainWaiter !is null) 235 { 236 auto waiter = drainWaiter; 237 238 drainWaiter = null; 239 if (!waiter.done) 240 waiter.setResult; 241 } 242 } 243 244 override void connectionMade(BaseTransport transport) 245 { 246 throw new NotImplementedException; 247 } 248 249 override void connectionLost(Exception exception) 250 { 251 connectionLost_ = true; 252 253 if (!paused || drainWaiter is null || drainWaiter.done) 254 return; 255 256 auto waiter = drainWaiter; 257 258 drainWaiter = null; 259 if (exception is null) 260 waiter.setResult; 261 else 262 waiter.setException(exception); 263 } 264 265 @Coroutine 266 protected void drainHelper() 267 { 268 enforceEx!SocketOSException(!connectionLost_, "Connection lost"); 269 270 if (!paused) 271 return; 272 273 assert(drainWaiter is null || drainWaiter.cancelled); 274 275 drainWaiter = new Waiter(eventLoop); 276 eventLoop.waitFor(drainWaiter); 277 } 278 279 override void dataReceived(const(void)[] data) 280 { 281 throw new NotImplementedException; 282 } 283 284 override bool eofReceived() 285 { 286 throw new NotImplementedException; 287 } 288 } 289 290 291 /** 292 * Helper class to adapt between $(D_PSYMBOL Protocol) and $(D_PSYMBOL 293 * StreamReader). 294 * 295 * (This is a helper class instead of making StreamReader itself a $(D_PSYMBOL 296 * Protocol) subclass, because the StreamReader has other potential uses, and to 297 * prevent the user of the StreamReader to accidentally call inappropriate 298 * methods of the protocol.) 299 */ 300 class StreamReaderProtocol : FlowControlProtocol 301 { 302 private StreamReader streamReader; 303 private StreamWriter streamWriter; 304 private ClientConnectedCallback clientConnectedCallback; 305 306 this(EventLoop eventLoop, StreamReader streamReader, 307 ClientConnectedCallback clientConnectedCallback = null) 308 { 309 super(eventLoop); 310 this.streamReader = streamReader; 311 this.streamWriter = null; 312 this.clientConnectedCallback = clientConnectedCallback; 313 } 314 315 override void connectionMade(BaseTransport transport) 316 { 317 auto transport1 = cast(Transport) transport; 318 319 enforce(transport1 !is null); 320 streamReader.setTransport(transport1); 321 322 if (clientConnectedCallback !is null) 323 { 324 streamWriter = new StreamWriter(eventLoop, transport1, this, 325 streamReader); 326 eventLoop.createTask(clientConnectedCallback, streamReader, 327 streamWriter); 328 } 329 } 330 331 override void connectionLost(Exception exception) 332 { 333 if (exception is null) 334 streamReader.feedEof; 335 else 336 streamReader.setException(exception); 337 super.connectionLost(exception); 338 } 339 340 override void dataReceived(const(void)[] data) 341 { 342 streamReader.feedData(data); 343 } 344 345 override bool eofReceived() 346 { 347 streamReader.feedEof; 348 return true; 349 } 350 } 351 352 final class StreamReader 353 { 354 private EventLoop eventLoop; 355 private size_t limit; 356 private const(void)[] buffer; 357 private bool eof = false; 358 private Waiter flowWaiter; 359 private Throwable exception_; 360 private ReadTransport transport; 361 private bool paused = false; 362 363 this(EventLoop eventLoop, size_t limit = DEFAULT_LIMIT) 364 { 365 if (eventLoop is null) 366 this.eventLoop = getEventLoop; 367 else 368 this.eventLoop = eventLoop; 369 // The line length limit is a security feature; 370 // it also doubles as half the buffer limit. 371 this.limit = limit; 372 } 373 374 /** 375 * Get the exception. 376 */ 377 @property Throwable exception() 378 { 379 return exception_; 380 } 381 382 /** 383 * Acknowledge the EOF. 384 */ 385 void feedEof() 386 { 387 eof = true; 388 wakeupWaiter; 389 } 390 391 /** 392 * Feed $(D_PSYMBOL data) bytes in the internal buffer. Any operations 393 * waiting for the data will be resumed. 394 */ 395 void feedData(const(void)[] data) 396 in 397 { 398 assert(!eof); // feedData after feedEof 399 } 400 body 401 { 402 if (data.empty) 403 return; 404 405 buffer ~= data; 406 wakeupWaiter; 407 408 if (transport is null || paused || buffer.length <= 2 * limit) 409 return; 410 411 try 412 { 413 transport.pauseReading; 414 paused = true; 415 } 416 catch (NotImplementedException) 417 { 418 // The transport can't be paused. 419 // We'll just have to buffer all data. 420 // Forget the transport so we don't keep trying. 421 transport = null; 422 } 423 } 424 425 /** 426 * Set the exception. 427 */ 428 void setException(Throwable exception) 429 { 430 exception_ = exception; 431 432 if (flowWaiter is null) 433 return; 434 435 auto waiter = flowWaiter; 436 flowWaiter = null; 437 if (!waiter.cancelled) 438 waiter.setException(exception); 439 } 440 441 /** 442 * Set the exception. 443 */ 444 void setTransport(ReadTransport transport) 445 in 446 { 447 assert(this.transport is null); 448 } 449 body 450 { 451 this.transport = transport; 452 } 453 454 /** 455 * Wakeup $(D_PSYMBOL read()) or $(D_PSYMBOL readLine()) function waiting 456 * for data or EOF. 457 */ 458 private void wakeupWaiter() 459 { 460 if (flowWaiter is null) 461 return; 462 463 auto waiter = flowWaiter; 464 flowWaiter = null; 465 if (!waiter.cancelled) 466 waiter.setResult; 467 } 468 469 private void maybeResumeTransport() 470 { 471 if (paused && buffer.length <= limit) 472 { 473 paused = false; 474 transport.resumeReading; 475 } 476 } 477 478 /** 479 * Wait until $(D_PSYMBOL feedData()) or $(D_PSYMBOL feedEof()) is called. 480 */ 481 @Coroutine 482 private void waitForData(string functionName)() 483 { 484 // StreamReader uses a future to link the protocol feed_data() method 485 // to a read coroutine. Running two read coroutines at the same time 486 // would have an unexpected behaviour. It would not possible to know 487 // which coroutine would get the next data. 488 enforce(flowWaiter is null, functionName 489 ~ "() called while another coroutine is already waiting for incoming data"); 490 491 flowWaiter = new Waiter(eventLoop); 492 eventLoop.waitFor(flowWaiter); 493 flowWaiter = null; 494 } 495 496 /** 497 * Read up to $(D_PSYMBOL n) bytes. If $(D_PSYMBOL n) is not provided, or 498 * set to -1, read until EOF and return all read bytes. 499 * 500 * If the EOF was received and the internal buffer is empty, return an empty 501 * array. 502 */ 503 @Coroutine 504 const(void)[] read(ptrdiff_t n = -1) 505 { 506 if (exception !is null) 507 throw exception; 508 509 if (n == 0) 510 return null; 511 512 if (n < 0) 513 { 514 // This used to just loop creating a new waiter hoping to 515 // collect everything in this.buffer, but that would 516 // deadlock if the subprocess sends more than this.limit 517 // bytes. So just call read(limit) until EOF. 518 Appender!(const(char)[]) buff; 519 520 while (true) 521 { 522 const(char)[] chunk = cast(const(char)[]) read(limit); 523 if (chunk.empty) 524 break; 525 buff ~= chunk; 526 } 527 528 return buff.data; 529 } 530 531 if (buffer.empty && !eof) 532 waitForData!"read"; 533 534 const(void)[] data; 535 536 if (buffer.length <= n) 537 { 538 data = buffer; 539 buffer = null; 540 } 541 else 542 { 543 data = buffer[0 .. n]; 544 buffer = buffer[n .. $]; 545 } 546 547 maybeResumeTransport; 548 549 return data; 550 } 551 552 /** 553 * Read one line, where “line” is a sequence of bytes ending with '\n'. 554 * 555 * If EOF is received, and '\n' was not found, the method will return the 556 * partial read bytes. 557 * 558 * If the EOF was received and the internal buffer is empty, return an empty 559 * array. 560 */ 561 @Coroutine 562 const(char)[] readLine() 563 { 564 if (exception !is null) 565 throw exception; 566 567 Appender!(const(char)[]) line; 568 bool notEnough = true; 569 570 while (notEnough) 571 { 572 while (!buffer.empty && notEnough) 573 { 574 const(char)[] buffer1 = cast(const(char)[]) buffer; 575 auto r = buffer1.findSplit(['\n']); 576 line ~= r[0]; 577 buffer = r[2]; 578 579 if (!r[1].empty) 580 notEnough = false; 581 582 if (line.data.length > limit) 583 { 584 maybeResumeTransport; 585 throw new Exception("Line is too long"); 586 } 587 } 588 589 if (eof) 590 break; 591 592 if (notEnough) 593 waitForData!"readLine"; 594 } 595 596 maybeResumeTransport; 597 return line.data; 598 } 599 600 /** 601 * Read exactly $(D_PSYMBOL n) bytes. Raise an $(D_PSYMBOL 602 * IncompleteReadError) if the end of the stream is reached before 603 * $(D_PSYMBOL n) can be read, the $(D_PSYMBOL IncompleteReadError.partial) 604 * attribute of the exception contains the partial read bytes. 605 */ 606 @Coroutine 607 const(void)[] readExactly(size_t n) 608 { 609 if (exception !is null) 610 throw exception; 611 612 // There used to be "optimized" code here. It created its own 613 // Future and waited until self._buffer had at least the n 614 // bytes, then called read(n). Unfortunately, this could pause 615 // the transport if the argument was larger than the pause 616 // limit (which is twice self._limit). So now we just read() 617 // into a local buffer. 618 619 Appender!(const(char)[]) data; 620 621 while (n > 0) 622 { 623 const(char)[] block = cast(const(char)[]) read(n); 624 625 if (block.empty) 626 throw new IncompleteReadError(data.data, data.data.length + n); 627 628 data ~= block; 629 n -= block.length; 630 } 631 632 return data.data; 633 } 634 635 /** 636 * Return $(D_KEYWORD true) if the buffer is empty and $(D_PSYMBOL feedEof()) 637 * was called. 638 */ 639 bool atEof() const 640 { 641 return eof && buffer.empty; 642 } 643 } 644 645 /** 646 * Wraps a Transport. 647 * 648 * This exposes $(D_PSYMBOL write()), $(D_PSYMBOL writeEof()), $(D_PSYMBOL 649 * canWriteEof()), $(D_PSYMBOL getExtraInfo()) and $(D_PSYMBOL close()). It adds 650 * $(D_PSYMBOL drain()) coroutine for waiting for flow control. It also adds a 651 * $(D_PSYMBOL transport) property which references the Transport directly. 652 * 653 * This class is not thread safe. 654 */ 655 final class StreamWriter 656 { 657 private EventLoop eventLoop; 658 private WriteTransport transport_; 659 private StreamReaderProtocol protocol; 660 private StreamReader streamReader; 661 662 this(EventLoop eventLoop, Transport transport, Protocol protocol, 663 StreamReader streamReader) 664 { 665 this.eventLoop = eventLoop; 666 this.transport_ = transport; 667 this.protocol = cast(StreamReaderProtocol) protocol; 668 enforce(this.protocol !is null, "StreamReaderProtocol is needed"); 669 this.streamReader = streamReader; 670 } 671 672 override string toString() const 673 { 674 import std.format : format; 675 676 return "%s(transport = %s, reader = %s)".format(typeid(this), 677 cast(WriteTransport) transport_, cast(StreamReader) streamReader); 678 } 679 680 /** 681 * Transport. 682 */ 683 @property WriteTransport transport() 684 { 685 return transport_; 686 } 687 688 /** 689 * Return $(D_KEYWORD true) if the transport supports $(D_PSYMBOL 690 * writeEof()), $(D_KEYWORD false) if not. See $(D_PSYMBOL 691 * WriteTransport.canWriteEof()). 692 */ 693 bool canWriteEof() 694 { 695 return transport_.canWriteEof; 696 } 697 698 /** 699 * Close the transport: see $(D_PSYMBOL BaseTransport.close()). 700 */ 701 void close() 702 { 703 return transport_.close; 704 } 705 706 /** 707 * Let the write buffer of the underlying transport a chance to be flushed. 708 * 709 * The intended use is to write: 710 * 711 * w.write(data) 712 * w.drain() 713 * 714 * When the size of the transport buffer reaches the high-water limit (the 715 * protocol is paused), block until the size of the buffer is drained down 716 * to the low-water limit and the protocol is resumed. When there is nothing 717 * to wait for, the continues immediately. 718 * 719 * Calling $(D_PSYMBOL drain()) gives the opportunity for the loop to 720 * schedule the write operation and flush the buffer. It should especially 721 * be used when a possibly large amount of data is written to the transport, 722 * and the coroutine does not process the event loop between calls to 723 * $(D_PSYMBOL write()). 724 */ 725 @Coroutine 726 void drain() 727 { 728 if (streamReader !is null) 729 { 730 auto exception = streamReader.exception; 731 if (exception !is null) 732 throw exception; 733 } 734 protocol.drainHelper; 735 } 736 737 /** 738 * Return optional transport information: see $(D_PSYMBOL 739 * BaseTransport.getExtraInfo()). 740 */ 741 auto getExtraInfo(string name)() 742 { 743 return transport_.getExtraInfo!name; 744 } 745 746 /** 747 * Write some data bytes to the transport: see $(D_PSYMBOL 748 * WriteTransport.write()). 749 */ 750 void write(const(void)[] data) 751 { 752 transport_.write(data); 753 } 754 755 /** 756 * Close the write end of the transport after flushing buffered data: see 757 * $(D_PSYMBOL WriteTransport.write_eof()). 758 */ 759 void writeEof() 760 { 761 transport_.writeEof; 762 } 763 }