1 /** 2 * Event loop and event loop policy. 3 * 4 * Copyright: © 2015-2016 Dragos Carp 5 * License: Boost Software License - Version 1.0 6 * Authors: Dragos Carp 7 */ 8 module asynchronous.events; 9 10 import core.thread; 11 import std.algorithm; 12 import std.array; 13 import std.datetime; 14 import std.exception; 15 import std.process; 16 import std.socket; 17 import std.string; 18 import std.traits; 19 import std.typecons; 20 import asynchronous.futures; 21 import asynchronous.protocols; 22 import asynchronous.tasks; 23 import asynchronous.transports; 24 import asynchronous.types; 25 26 alias Protocol = asynchronous.protocols.Protocol; 27 28 interface CallbackHandle 29 { 30 /** 31 * Cancel the call. If the callback is already cancelled or executed, this 32 * method has no effect. 33 */ 34 void cancel(); 35 36 /** 37 * Return $(D_KEYWORD true) if the callback was cancelled. 38 */ 39 bool cancelled() const; 40 41 package void opCall() 42 { 43 opCallImpl; 44 } 45 46 protected void opCallImpl(); 47 } 48 49 /** 50 * A callback wrapper object returned by $(D_PSYMBOL EventLoop.callSoon), 51 * $(D_PSYMBOL EventLoop.callSoonThreadSafe), 52 * $(D_PSYMBOL EventLoop.callLater), and $(D_PSYMBOL EventLoop.callAt). 53 */ 54 class Callback(Dg, Args...) : CallbackHandle 55 if (isDelegate!Dg) 56 { 57 private bool _cancelled; 58 59 private EventLoop eventLoop; 60 61 private Dg dg; 62 63 private Args args; 64 65 alias ResultType = ReturnType!Dg; 66 67 this(EventLoop eventLoop, Dg dg, Args args) 68 { 69 this.eventLoop = eventLoop; 70 this._cancelled = false; 71 this.dg = dg; 72 static if (args.length > 0) 73 { 74 this.args = args; 75 } 76 } 77 78 /** 79 * Cancel the call. If the callback is already cancelled or executed, this 80 * method has no effect. 81 */ 82 override void cancel() 83 { 84 this._cancelled = true; 85 this.dg = null; 86 this.args = Args.init; 87 } 88 89 /** 90 * Return $(D_KEYWORD true) if the callback was cancelled. 91 */ 92 override bool cancelled() const 93 { 94 return this._cancelled; 95 } 96 97 protected override void opCallImpl() 98 { 99 try 100 { 101 enforceEx!CancelledException(!this._cancelled, "Callback cancelled"); 102 103 this.dg(this.args); 104 } 105 catch (Throwable throwable) 106 { 107 if (this.eventLoop is null) 108 this.eventLoop = getEventLoop; 109 110 auto context = ExceptionContext("Exception on calling " ~ toString, throwable); 111 context.callback = this; 112 eventLoop.callExceptionHandler(context); 113 } 114 } 115 116 override string toString() const 117 { 118 return "%s(dg: %s, cancelled: %s)".format(typeid(this), 119 __traits(identifier, dg), _cancelled); 120 } 121 } 122 123 auto callback(Dg, Args...)(EventLoop eventLoop, Dg dg, Args args) 124 { 125 return new Callback!(Dg, Args)(eventLoop, dg, args); 126 } 127 128 alias ExceptionHandler = void delegate(EventLoop, ExceptionContext); 129 130 /** 131 * Exception conxtext for event exceptions 132 */ 133 struct ExceptionContext 134 { 135 string message; /// Error message. 136 Throwable throwable; /// (optional) Throwable object. 137 FutureHandle future; /// (optional) Future instance. 138 CallbackHandle callback; /// (optional): CallbackHandle instance. 139 Protocol protocol; /// (optional): Protocol instance. 140 Transport transport; /// (optional): Transport instance. 141 Socket socket; /// (optional): Socket instance. 142 ExceptionContext* context; /// (optional): Chained context. 143 144 string toString() const 145 { 146 import std.array : appender; 147 import std.conv : to; 148 149 auto result = appender("message: \"" ~ message ~ "\""); 150 151 if (throwable !is null) 152 result ~= ", throwable: " ~ (cast(Throwable) throwable).to!string; 153 if (future !is null) 154 result ~= ", future: " ~ future.to!string; 155 if (callback !is null) 156 result ~= ", callback: " ~ callback.to!string; 157 if (protocol !is null) 158 result ~= ", protocol: " ~ protocol.to!string; 159 if (transport !is null) 160 result ~= ", transport: " ~ transport.to!string; 161 if (socket !is null) 162 result ~= ", socket: " ~ (cast(Socket) socket).to!string; 163 if (context !is null) 164 result ~= ", context: " ~ (*context).to!string; 165 166 return "%s(%s)".format(typeid(this).to!string, result.data); 167 } 168 } 169 170 unittest 171 { 172 auto exceptionContext = ExceptionContext("foo"); 173 assert(!exceptionContext.toString.canFind("future")); 174 exceptionContext.future = new Future!int; 175 assert(exceptionContext.toString.canFind("future")); 176 } 177 178 interface SslContext 179 { 180 } 181 182 /** 183 * Interface server returned by $(D createServer()). 184 */ 185 interface Server 186 { 187 /** 188 * Stop serving. This leaves existing connections open. 189 */ 190 void close(); 191 192 /** 193 * Coroutine to wait until service is closed. 194 */ 195 @Coroutine 196 void waitClosed(); 197 } 198 199 final class ServerImpl : Server 200 { 201 private EventLoop eventLoop; 202 203 private Socket[] sockets; 204 205 private size_t activeCount; 206 207 private Waiter[] waiters; 208 209 package this(EventLoop eventLoop, Socket[] sockets) 210 { 211 this.eventLoop = eventLoop; 212 this.sockets = sockets; 213 } 214 215 void attach() 216 in 217 { 218 assert(!this.sockets.empty); 219 } 220 body 221 { 222 ++this.activeCount; 223 } 224 225 void detach() 226 in 227 { 228 assert(this.activeCount > 0); 229 } 230 body 231 { 232 --activeCount; 233 234 if (this.activeCount == 0 && this.sockets.empty) 235 wakeup; 236 } 237 238 override void close() 239 { 240 if (this.sockets.empty) 241 return; 242 243 Socket[] stopSockets = this.sockets; 244 245 this.sockets = null; 246 247 foreach (socket; stopSockets) 248 this.eventLoop.stopServing(socket); 249 250 if (this.activeCount == 0) 251 wakeup; 252 } 253 254 private void wakeup() 255 { 256 Waiter[] doneWaiters = this.waiters; 257 258 this.waiters = null; 259 260 foreach (waiter; doneWaiters) 261 { 262 if (!waiter.done) 263 waiter.setResult; 264 } 265 } 266 267 @Coroutine 268 override void waitClosed() 269 { 270 if (this.sockets.empty && this.activeCount == 0) 271 return; 272 273 Waiter waiter = new Waiter(this.eventLoop); 274 275 this.waiters ~= waiter; 276 277 this.eventLoop.waitFor(waiter); 278 } 279 } 280 281 /** 282 * Interface of event loop. 283 */ 284 abstract class EventLoop 285 { 286 protected enum State : ubyte 287 { 288 STOPPED, 289 RUNNING, 290 STOPPING, 291 CLOSED, 292 } 293 294 protected State state = State.STOPPED; 295 296 // Run an event loop 297 /** 298 * Run until $(D_PSYMBOL stop()) is called. 299 */ 300 void runForever(); 301 302 /** 303 * Run until $(PARAM future) is done. 304 * 305 * If the argument is a $(I coroutine object), it is wrapped by $(D_PSYMBOL 306 * task()). 307 * 308 * Returns: the Future's result, or throws its exception. 309 */ 310 final T runUntilComplete(T)(Future!T future) 311 { 312 auto callback = (FutureHandle _) { 313 stop; 314 }; 315 316 future.addDoneCallback(callback); 317 runForever; 318 future.removeDoneCallback(callback); 319 320 enforce(future.done, "Event loop stopped before Future completed"); 321 return future.result; 322 } 323 324 /** 325 * Returns: running status of event loop. 326 */ 327 final bool isRunning() 328 { 329 return this.state == State.RUNNING || this.state == State.STOPPING; 330 } 331 332 /** 333 * Stop running the event loop. 334 * 335 * Every callback scheduled before $(D_PSYMBOL stop()) is called will run. 336 * Callbacks scheduled after $(D_PSYMBOL stop()) is called will not run. 337 * However, those callbacks will run if $(D_PSYMBOL runForever()) is 338 * called again later. 339 */ 340 final void stop() 341 { 342 final switch (this.state) 343 { 344 case State.STOPPED: 345 case State.STOPPING: 346 break; 347 case State.RUNNING: 348 this.state = State.STOPPING; 349 break; 350 case State.CLOSED: 351 throw new Exception("Cannot stop a closed event loop"); 352 } 353 } 354 355 /** 356 * Returns: $(D_KEYWORD true) if the event loop was closed. 357 */ 358 final bool isClosed() 359 { 360 return this.state == State.CLOSED; 361 } 362 363 /** 364 * Close the event loop. The loop must not be running. 365 * 366 * This clears the queues and shuts down the executor, but does not wait 367 * for the executor to finish. 368 * 369 * This is idempotent and irreversible. No other methods should be called 370 * after this one. 371 */ 372 final void close() 373 { 374 final switch (this.state) 375 { 376 case State.STOPPED: 377 this.state = State.CLOSED; 378 break; 379 case State.RUNNING: 380 case State.STOPPING: 381 throw new Exception("Cannot close a running event loop"); 382 case State.CLOSED: 383 break; 384 } 385 } 386 387 // Calls 388 389 /** 390 * Arrange for a callback to be called as soon as possible. 391 * 392 * This operates as a FIFO queue, callbacks are called in the order in 393 * which they are registered. Each callback will be called exactly once. 394 * 395 * Any positional arguments after the callback will be passed to the 396 * callback when it is called. 397 * 398 * Returns: an instance of $(D_PSYMBOL Callback). 399 */ 400 final auto callSoon(Dg, Args...)(Dg dg, Args args) 401 { 402 auto callback = new Callback!(Dg, Args)(this, dg, args); 403 404 scheduleCallback(callback); 405 406 return callback; 407 } 408 409 /** 410 * Like $(D_PSYMBOL callSoon()), but thread safe. 411 */ 412 final auto callSoonThreadSafe(Dg, Args...)(Dg dg, Args args) 413 { 414 auto callback = new Callback!(Dg, Args)(this, dg, args); 415 416 scheduleCallbackThreadSafe(callback); 417 418 return callback; 419 } 420 421 // Delayed calls 422 423 /** 424 * Arrange for the callback to be called after the given delay. 425 * 426 * An instance of $(D_PSYMBOL Callback) is returned. 427 * 428 * $(D_PSYMBOL dg) delegate will be called exactly once per call to 429 * $(D_PSYMBOL callLater()). If two callbacks are scheduled for exactly 430 * the same time, it is undefined which will be called first. 431 * 432 * The optional positional args will be passed to the callback when it is 433 * called. 434 */ 435 final auto callLater(Dg, Args...)(Duration delay, Dg dg, Args args) 436 { 437 auto callback = new Callback!(Dg, Args)(this, dg, args); 438 439 scheduleCallback(delay, callback); 440 441 return callback; 442 } 443 444 /** 445 * Arrange for the callback to be called at the given absolute timestamp 446 * when (an int or float), using the same time reference as time(). 447 * 448 * This method’s behavior is the same as $(D_PSYMBOL callLater()). 449 */ 450 final auto callAt(Dg, Args...)(SysTime when, Dg dg, Args args) 451 { 452 auto callback = new Callback!(Dg, Args)(this, dg, args); 453 454 scheduleCallback(when, callback); 455 456 return callback; 457 } 458 459 /** 460 * Return the current time according to the event loop’s internal clock. 461 * 462 * See_Also: $(D_PSYMBOL sleep()). 463 */ 464 SysTime time() 465 { 466 return Clock.currTime; 467 } 468 469 470 // Fibers 471 472 /** 473 * Schedule the execution of a fiber object: wrap it in a future. 474 * 475 * Third-party event loops can use their own subclass of Task for 476 * interoperability. In this case, the result type is a subclass of Task. 477 * 478 * Returns: a $(D_PSYMBOL Task) object. 479 * 480 * See_Also: $(D_PSYMBOL task()). 481 */ 482 final auto createTask(Coroutine, Args...)(Coroutine coroutine, Args args) 483 if (isDelegate!Coroutine) 484 { 485 return new Task!(Coroutine, Args)(this, coroutine, args); 486 } 487 /* 488 // Methods for interacting with threads. 489 490 ReturnType!callback runInExecutor(alias callback, Args...)(executor, Args args); 491 492 void setDefaultExecutor(executor); 493 494 // Network I/O methods returning Futures. 495 496 AddressInfo[] getAddressInfo(T...)(in char[] node, T options); 497 498 // def getNameInfo(sockaddr, flags = 0); 499 */ 500 // Creating connections 501 502 /** 503 * Create a streaming transport connection to a given Internet host and 504 * port: socket family $(D_PSYMBOL AddressFamily.INET) or $(D_PSYMBOL 505 * AddressFamily.INET6) depending on host (or family if specified), socket 506 * type $(D_PSYMBOL SocketType.STREAM). 507 * 508 * This method is a coroutine which will try to establish the connection in 509 * the background. When successful, the coroutine returns a (transport, 510 * protocol) tuple. 511 * 512 * The chronological synopsis of the underlying operation is as follows: 513 * 514 * The connection is established, and a transport is created to represent 515 * it. $(D_PSYMBOL protocolFactory) is called without arguments and must 516 * return a protocol instance. 517 * The protocol instance is tied to the transport, and its $(D_PSYMBOL 518 * connectionMade()) method is called. 519 * The coroutine returns successfully with the $(D_PSYMBOL (transport, 520 * protocol)) tuple. 521 * 522 * The created transport is an implementation-dependent bidirectional 523 * stream. 524 * 525 * Options allowing to change how the connection is created: 526 * Params: 527 * protocolFactory = is a callable returning a protocol instance 528 * 529 * host = if empty then the $(D_PSYMBOL socket) parameter should be 530 * specified. 531 * 532 * service = service name or port number. 533 * 534 * sslContext = if not $(D_KEYWORD null), a SSL/TLS transport is created 535 * (by default a plain TCP transport is created). 536 * 537 * serverHostname = is only for use together with ssl, and sets or 538 * overrides the hostname that the target server’s certificate will be 539 * matched against. By default the value of the host argument is used. 540 * If host is empty, there is no default and you must pass a value for 541 * $(D_PSYMBOL serverHostname). If $(D_PSYMBOL serverHostname) is 542 * empty, hostname matching is disabled (which is a serious security 543 * risk, allowing for man-in-the-middle-attacks). 544 * 545 * addressFamily = optional adress family. 546 * 547 * protocolType = optional protocol. 548 * 549 * addressInfoFlags = optional flags. 550 * 551 * socket = if not $(D_KEYWORD null), should be an existing, already 552 * connected $(D_PSYMBOL Socket) object to be used by the transport. If 553 * $(D_PSYMBOL socket) is given, none of $(D_PSYMBOL host), $(D_PSYMBOL 554 * service), $(D_PSYMBOL addressFamily), $(D_PSYMBOL protocolType), 555 * $(D_PSYMBOL addressInfoFlags) and $(D_PSYMBOL localAddress) should 556 * be specified. 557 * 558 * localHost = if given, together with $(D_PSYMBOL localService) is used to 559 * bind the socket locally. The $(D_PSYMBOL localHost) and 560 * $(D_PSYMBOL localService) are looked up using $(D_PSYMBOL 561 * getAddressInfo()), similarly to host and service. 562 * 563 * localService = see $(D_PSYMBOL localHost). 564 * 565 * Returns: Tuple!(Transport, "transport", Protocol, "protocol") 566 */ 567 @Coroutine 568 auto createConnection(ProtocolFactory protocolFactory, 569 in char[] host = null, in char[] service = null, 570 SslContext sslContext = null, 571 AddressFamily addressFamily = UNSPECIFIED!AddressFamily, 572 ProtocolType protocolType = UNSPECIFIED!ProtocolType, 573 AddressInfoFlags addressInfoFlags = UNSPECIFIED!AddressInfoFlags, 574 Socket socket = null, in char[] localHost = null, 575 in char[] localService = null, in char[] serverHostname = null) 576 { 577 enforce(serverHostname.empty || sslContext !is null, 578 "serverHostname is only meaningful with SSL"); 579 enforce(serverHostname.empty || sslContext is null || !host.empty, 580 "You must set serverHostname when using SSL without a host"); 581 582 if (!host.empty || !service.empty) 583 { 584 enforce(socket is null, 585 "host/service and socket can not be specified at the same time"); 586 587 Future!(AddressInfo[])[] fs = [ 588 createTask(&this.getAddressInfo, host, service, addressFamily, 589 SocketType.STREAM, protocolType, addressInfoFlags) 590 ]; 591 592 if (!localHost.empty) 593 fs ~= createTask(&this.getAddressInfo, localHost, localService, 594 addressFamily, SocketType.STREAM, protocolType, 595 addressInfoFlags); 596 597 this.wait(fs); 598 599 auto addressInfos = fs.map!"a.result"; 600 601 enforceEx!SocketOSException(addressInfos.all!(a => !a.empty), 602 "getAddressInfo() returned empty list"); 603 604 SocketOSException[] exceptions = null; 605 bool connected = false; 606 607 foreach (addressInfo; addressInfos[0]) 608 { 609 try 610 { 611 socket = new Socket(addressInfo); 612 socket.blocking(false); 613 614 if (addressInfos.length > 1) 615 { 616 bool bound = false; 617 618 foreach (localAddressInfo; addressInfos[1]) 619 { 620 try 621 { 622 socket.bind(localAddressInfo.address); 623 bound = true; 624 break; 625 } 626 catch (SocketOSException socketOSException) 627 { 628 exceptions ~= new SocketOSException( 629 "error while attempting to bind on address '%s'" 630 .format(localAddressInfo.address), 631 socketOSException); 632 } 633 } 634 if (!bound) 635 { 636 socket.close; 637 socket = null; 638 continue; 639 } 640 } 641 642 socketConnect(socket, addressInfo.address); 643 connected = true; 644 break; 645 } 646 catch (SocketOSException socketOSException) 647 { 648 if (socket !is null) 649 socket.close; 650 651 exceptions ~= socketOSException; 652 } 653 catch (Throwable throwable) 654 { 655 if (socket !is null) 656 socket.close; 657 658 throw throwable; 659 } 660 } 661 662 if (!connected) 663 { 664 assert(!exceptions.empty); 665 666 if (exceptions.length == 1) 667 { 668 throw exceptions[0]; 669 } 670 else 671 { 672 if (exceptions.all!(a => a.msg == exceptions[0].msg)) 673 throw exceptions[0]; 674 675 throw new SocketOSException( 676 "Multiple exceptions: %(%s, %)".format(exceptions)); 677 } 678 } 679 } 680 else 681 { 682 enforce(socket !is null, 683 "host and port was not specified and no socket specified"); 684 685 socket.blocking(false); 686 } 687 688 return createConnectionTransport(socket, protocolFactory, sslContext, 689 serverHostname.empty ? serverHostname : host); 690 } 691 692 /** 693 * Create datagram connection: socket family $(D_PSYMBOL AddressFamily.INET) 694 * or $(D_PSYMBOL AddressFamily.INET6) depending on host (or family if 695 * specified), socket type $(D_PSYMBOL SocketType.DGRAM). 696 * 697 * This method is a coroutine which will try to establish the connection in 698 * the background. 699 * 700 * See the create_connection() method for parameters. 701 * 702 * Returns: Tuple!(DatagramTransport, "datagramTransport", 703 * DatagramProtocol, "datagramProtocol") 704 */ 705 @Coroutine 706 auto createDatagramEndpoint(DatagramProtocolFactory datagramProtocolFactory, 707 in char[] localHost = null, in char[] localService = null, 708 in char[] remoteHost = null, in char[] remoteService = null, 709 AddressFamily addressFamily = UNSPECIFIED!AddressFamily, 710 ProtocolType protocolType = UNSPECIFIED!ProtocolType, 711 AddressInfoFlags addressInfoFlags = UNSPECIFIED!AddressInfoFlags) 712 { 713 alias AddressPairInfo = Tuple!(AddressFamily, "addressFamily", 714 ProtocolType, "protocolType", Address, "localAddress", 715 Address, "remoteAddress"); 716 717 AddressPairInfo[] addressPairInfos = null; 718 719 720 if (localHost.empty && remoteHost.empty) 721 { 722 enforce(addressFamily != UNSPECIFIED!AddressFamily, 723 "Unexpected address family"); 724 addressPairInfos ~= AddressPairInfo(addressFamily, protocolType, 725 null, null); 726 } 727 else 728 { 729 enforce(remoteHost.empty, 730 "Remote host parameter not supported yet"); 731 732 auto addressInfos = getAddressInfo(localHost, localService, 733 addressFamily, SocketType.DGRAM, protocolType, 734 addressInfoFlags); 735 736 enforceEx!SocketOSException(!addressInfos.empty, 737 "getAddressInfo() returned empty list"); 738 739 foreach (addressInfo; addressInfos) 740 { 741 addressPairInfos ~= AddressPairInfo(addressInfo.family, 742 addressInfo.protocol, addressInfo.address, null); 743 } 744 } 745 746 Socket socket = null; 747 Address remoteAddress = null; 748 SocketOSException[] exceptions = null; 749 750 foreach (addressPairInfo; addressPairInfos) 751 { 752 try 753 { 754 socket = new Socket(addressPairInfo.addressFamily, 755 SocketType.DGRAM, addressPairInfo.protocolType); 756 socket.setOption(SocketOptionLevel.SOCKET, 757 SocketOption.REUSEADDR, 1); 758 socket.blocking(false); 759 760 if (addressPairInfo.localAddress) 761 socket.bind(addressPairInfo.localAddress); 762 763 764 remoteAddress = addressPairInfo.remoteAddress; 765 enforce(remoteAddress is null, 766 "remote connect not supported yet"); 767 768 break; 769 } 770 catch (SocketOSException socketOSException) 771 { 772 if (socket !is null) 773 socket.close; 774 775 exceptions ~= socketOSException; 776 } 777 catch (Throwable throwable) 778 { 779 if (socket !is null) 780 socket.close; 781 782 throw throwable; 783 } 784 } 785 786 auto protocol = datagramProtocolFactory(); 787 auto waiter = new Waiter(this); 788 auto transport = makeDatagramTransport(socket, protocol, remoteAddress, 789 waiter); 790 try 791 { 792 this.waitFor(waiter); 793 } 794 catch (Throwable throwable) 795 { 796 transport.close; 797 throw throwable; 798 } 799 800 return tuple!("datagramTransport", "datagramProtocol")(transport, 801 protocol); 802 } 803 804 /** 805 * Create UNIX connection: socket family $(D_PSYMBOL AddressFamily.UNIX), 806 * socket type $(D_PSYMBOL SocketType.STREAM). The UNIX socket family is 807 * used to communicate between processes on the same machine efficiently. 808 * 809 * This method is a coroutine which will try to establish the connection in 810 * the background. When successful, the coroutine returns a $(D_PSYMBOL 811 * Tuple!(Transport, "transport", Protocol, "protocol")) 812 * 813 * See the $(D_PSYMBOL EventLoop.createConnection()) method for parameters. 814 */ 815 version (Posix) 816 @Coroutine 817 auto createUnixConnection(ProtocolFactory protocolFactory, 818 in char[] path = null, SslContext sslContext = null, 819 Socket socket = null, in char[] serverHostname = null) 820 { 821 if (sslContext is null) 822 enforce(serverHostname.empty, 823 "serverHostname is only meaningful with ssl"); 824 else 825 enforce(!serverHostname.empty, 826 "you have to pass server_hostname when using ssl"); 827 828 if (!path.empty) 829 { 830 enforce(socket is null, 831 "path and socket can not be specified at the same time"); 832 833 try 834 { 835 socket = new Socket(AddressFamily.UNIX, SocketType.STREAM); 836 socket.blocking(false); 837 socketConnect(socket, new UnixAddress(path)); 838 } 839 catch (Throwable throwable) 840 { 841 if (socket !is null) 842 socket.close; 843 844 throw throwable; 845 } 846 } 847 else 848 { 849 enforce(socket !is null, "no path and socket were specified"); 850 enforce(socket.addressFamily == AddressFamily.UNIX, 851 "A UNIX Domain Socket was expected, got %s".format(socket)); 852 socket.blocking(false); 853 } 854 855 return createConnectionTransport(socket, protocolFactory, sslContext, 856 serverHostname); 857 } 858 859 860 //"""A coroutine which creates a UNIX Domain Socket server. 861 862 //The return value is a Server object, which can be used to stop 863 //the service. 864 865 //path is a str, representing a file systsem path to bind the 866 //server socket to. 867 868 //socket can optionally be specified in order to use a preexisting 869 //socket object. 870 871 //backlog is the maximum number of queued connections passed to 872 //listen() (defaults to 100). 873 874 //ssl can be set to an SSLContext to enable SSL over the 875 //accepted connections. 876 //""" 877 878 /** 879 * A coroutine which creates a TCP server bound to host and port. 880 * 881 * Params: 882 * protocolFactory = is a callable returning a protocol instance. 883 * 884 * host = if empty then all interfaces are assumed and a list of multiple 885 * sockets will be returned (most likely one for IPv4 and another one 886 * for IPv6). 887 * 888 * service = service name or port number. 889 * 890 * addressFamily = can be set to either $(D_PSYMBOL AddressFamily.INET) or 891 * $(D_PSYMBOL AddressFamily.INET6) to force the socket to use IPv4 or 892 * IPv6. If not set it will be determined from host (defaults to 893 * $(D_PSYMBOL AddressFamily.UNSPEC)). 894 * 895 * addressInfoFlags = a bitmask for getAddressInfo(). 896 * 897 * socket = can optionally be specified in order to use a preexisting 898 * socket object. 899 * 900 * backlog = the maximum number of queued connections passed to listen() 901 * (defaults to 100). 902 * 903 * sslContext = can be set to an SSLContext to enable SSL over the accepted 904 * connections. 905 * 906 * reuseAddress = tells the kernel to reuse a local socket in TIME_WAIT 907 * state, without waiting for its natural timeout to expire. If not 908 * specified will automatically be set to $(D_KEYWORD true) on UNIX. 909 * 910 * Returns: a Server object which can be used to stop the service. 911 */ 912 @Coroutine 913 Server createServer(ProtocolFactory protocolFactory, 914 in char[] host = null, in char[] service = null, 915 AddressFamily addressFamily = UNSPECIFIED!AddressFamily, 916 AddressInfoFlags addressInfoFlags = AddressInfoFlags.PASSIVE, 917 Socket socket = null, int backlog = 100, SslContext sslContext = null, 918 bool reuseAddress = true) 919 { 920 enforce(sslContext is null, "SSL support not implemented yet"); 921 922 Socket[] sockets; 923 924 scope (failure) 925 { 926 foreach (socket1; sockets) 927 socket1.close; 928 } 929 930 if (!host.empty || !service.empty) 931 { 932 enforce(socket is null, 933 "host/service and socket can not be specified at the same time"); 934 935 AddressInfo[] addressInfos = getAddressInfo(host, service, 936 addressFamily, SocketType.STREAM, UNSPECIFIED!ProtocolType, 937 addressInfoFlags); 938 939 enforceEx!SocketOSException(!addressInfos.empty, 940 "getAddressInfo() returned empty list"); 941 942 foreach (addressInfo; addressInfos) 943 { 944 try 945 { 946 socket = new Socket(addressInfo); 947 } 948 catch (SocketOSException socketOSException) 949 { 950 continue; 951 } 952 953 sockets ~= socket; 954 955 if (reuseAddress) 956 socket.setOption(SocketOptionLevel.SOCKET, 957 SocketOption.REUSEADDR, true); 958 959 if (addressInfo.family == AddressFamily.INET6) 960 socket.setOption(SocketOptionLevel.IPV6, 961 SocketOption.IPV6_V6ONLY, true); 962 963 try 964 { 965 socket.bind(addressInfo.address); 966 } 967 catch (SocketException socketException) 968 { 969 throw new SocketException( 970 "error while attempting to bind to address %s: %s" 971 .format(addressInfo.address, socketException.msg)); 972 } 973 } 974 } 975 else 976 { 977 enforce(socket !is null, 978 "Neither host/service nor socket were specified"); 979 980 sockets ~= socket; 981 } 982 983 auto server = new ServerImpl(this, sockets); 984 985 foreach (socket1; sockets) 986 { 987 socket1.listen(backlog); 988 socket1.blocking(false); 989 startServing(protocolFactory, socket1, sslContext, server); 990 } 991 992 return server; 993 } 994 995 /** 996 * Similar to $(D_PSYMBOL EventLoop.createServer()), but specific to the 997 * socket family $(D_PSYMBOL AddressFamily.UNIX). 998 */ 999 version (Posix) 1000 @Coroutine 1001 Server createUnixServer(ProtocolFactory protocolFactory, in char[] path, 1002 Socket socket = null, int backlog = 100, SslContext sslContext = null) 1003 { 1004 if (!path.empty) 1005 { 1006 enforce(socket is null, 1007 "path and socket can not be specified at the same time"); 1008 1009 socket = new Socket(AddressFamily.UNIX, SocketType.STREAM); 1010 1011 try 1012 { 1013 socket.bind(new UnixAddress(path)); 1014 } 1015 catch (SocketOSException socketOSException) 1016 { 1017 import core.stdc.errno : EADDRINUSE; 1018 1019 socket.close; 1020 if (socketOSException.errorCode == EADDRINUSE) 1021 throw new SocketOSException("Address %s is already in use" 1022 .format(path), socketOSException.errorCode); 1023 else 1024 throw socketOSException; 1025 } 1026 catch (Throwable throwable) 1027 { 1028 socket.close; 1029 throw throwable; 1030 } 1031 } 1032 else 1033 { 1034 enforce(socket !is null, 1035 "path was not specified, and no socket specified"); 1036 enforce(socket.addressFamily == AddressFamily.UNIX, 1037 "A UNIX Domain Socket was expected, got %s".format(socket)); 1038 } 1039 1040 auto server = new ServerImpl(this, [socket]); 1041 1042 socket.listen(backlog); 1043 socket.blocking(false); 1044 startServing(protocolFactory, socket, sslContext, server); 1045 1046 return server; 1047 } 1048 /* 1049 // Pipes and subprocesses. 1050 1051 //"""Register read pipe in event loop. 1052 1053 //protocol_factory should instantiate object with Protocol interface. 1054 //pipe is file-like object already switched to nonblocking. 1055 //Return pair (transport, protocol), where transport support 1056 //ReadTransport interface.""" 1057 //# The reason to accept file-like object instead of just file descriptor 1058 //# is: we need to own pipe and close it at transport finishing 1059 //# Can got complicated errors if pass f.fileno(), 1060 //# close fd in pipe transport then close f and vise versa. 1061 Tuple!(Transport, Protocol) connectReadPipe(Protocol function() protocol_factory, 1062 Pipe pipe); 1063 1064 //"""Register write pipe in event loop. 1065 1066 //protocol_factory should instantiate object with BaseProtocol interface. 1067 //Pipe is file-like object already switched to nonblocking. 1068 //Return pair (transport, protocol), where transport support 1069 //WriteTransport interface.""" 1070 //# The reason to accept file-like object instead of just file descriptor 1071 //# is: we need to own pipe and close it at transport finishing 1072 //# Can got complicated errors if pass f.fileno(), 1073 //# close fd in pipe transport then close f and vise versa. 1074 Tuple!(Transport, Protocol) connectWritePipe(Protocol function() protocol_factory, 1075 Pipe pipe); 1076 1077 Tuple!(Transport, Protocol) processShell(Protocol function() protocol_factory, 1078 in char[] cmd, File stdin = subprocess.PIPE, File stdout = subprocess.PIPE, 1079 File stderr = subprocess.PIPE); 1080 1081 Tuple!(Transport, Protocol) processProcess(Protocol function() protocol_factory, 1082 in char[][] args, File stdin = subprocess.PIPE, File stdout = subprocess.PIPE, 1083 File stderr = subprocess.PIPE); 1084 1085 //# Ready-based callback registration methods. 1086 //# The add_*() methods return None. 1087 //# The remove_*() methods return True if something was removed, 1088 //# False if there was nothing to delete. 1089 1090 void addReader(int fd, void delegate() callback); 1091 1092 void removeReader(int fd); 1093 1094 void addWriter(int fd, void delegate() callback); 1095 1096 void removeWriter(int fd); 1097 1098 // # Completion based I/O methods returning Futures. 1099 1100 ptrdiff_t socketReceive(Socket sock, void[] buf); 1101 1102 ptrdiff_t socketSend(Socket sock, void[] buf); 1103 1104 Socket socketAccept(Socket sock); 1105 */ 1106 // Resolve host name 1107 1108 @Coroutine 1109 AddressInfo[] getAddressInfo(in char[] host, in char[] service, 1110 AddressFamily addressFamily = UNSPECIFIED!AddressFamily, 1111 SocketType socketType = UNSPECIFIED!SocketType, 1112 ProtocolType protocolType = UNSPECIFIED!ProtocolType, 1113 AddressInfoFlags addressInfoFlags = UNSPECIFIED!AddressInfoFlags); 1114 1115 1116 // Signal handling. 1117 version (Posix) 1118 { 1119 void addSignalHandler(int sig, void delegate() handler); 1120 1121 void removeSignalHandler(int sig); 1122 } 1123 1124 // Error handlers. 1125 1126 private ExceptionHandler exceptionHandler; 1127 1128 /** 1129 * Set handler as the new event loop exception handler. 1130 * 1131 * If handler is $(D_KEYWORD null), the default exception handler will be set. 1132 * 1133 * See_Also: $(D_PSYMBOL callExceptionHandler()). 1134 */ 1135 final void setExceptionHandler(ExceptionHandler exceptionHandler) 1136 { 1137 this.exceptionHandler = exceptionHandler; 1138 } 1139 1140 /** 1141 * Default exception handler. 1142 * 1143 * This is called when an exception occurs and no exception handler is set, 1144 * and can be called by a custom exception handler that wants to defer to 1145 * the default behavior. 1146 * The context parameter has the same meaning as in $(D_PSYMBOL 1147 * callExceptionHandler()). 1148 */ 1149 final void defaultExceptionHandler(ExceptionContext exceptionContext) 1150 { 1151 if (exceptionContext.message.empty) 1152 exceptionContext.message = "Unhandled exception in event loop"; 1153 1154 if (cast(Error) exceptionContext.throwable) 1155 throw new Error("Uncaught Error: %s".format(exceptionContext)); 1156 else 1157 throw new Exception("Uncaught Exception: %s".format(exceptionContext)); 1158 } 1159 1160 /** 1161 * Call the current event loop's exception handler. 1162 * 1163 * Note: New ExceptionContext members may be introduced in the future. 1164 */ 1165 final void callExceptionHandler(ExceptionContext exceptionContext) 1166 { 1167 if (exceptionHandler is null) 1168 { 1169 defaultExceptionHandler(exceptionContext); 1170 return; 1171 } 1172 1173 try 1174 { 1175 exceptionHandler(this, exceptionContext); 1176 } 1177 catch (Throwable throwable) 1178 { 1179 auto context = ExceptionContext("Unhandled error in exception handler", 1180 throwable); 1181 context.context = &exceptionContext; 1182 1183 defaultExceptionHandler(context); 1184 } 1185 } 1186 1187 override string toString() const 1188 { 1189 return "%s(%s)".format(typeid(this), this.state); 1190 } 1191 1192 protected: 1193 1194 void scheduleCallback(CallbackHandle callback); 1195 1196 void scheduleCallbackThreadSafe(CallbackHandle callback); 1197 1198 void scheduleCallback(Duration delay, CallbackHandle callback); 1199 1200 void scheduleCallback(SysTime when, CallbackHandle callback); 1201 1202 void socketConnect(Socket socket, Address address); 1203 1204 Transport makeSocketTransport(Socket socket, Protocol protocol, 1205 Waiter waiter); 1206 1207 DatagramTransport makeDatagramTransport(Socket socket, 1208 DatagramProtocol datagramProtocol, Address remoteAddress, 1209 Waiter waiter); 1210 1211 void startServing(ProtocolFactory protocolFactory, Socket socket, 1212 SslContext sslContext, ServerImpl server); 1213 1214 void stopServing(Socket socket); 1215 private: 1216 1217 @Coroutine 1218 auto createConnectionTransport(Socket socket, 1219 ProtocolFactory protocolFactory, SslContext sslContext, 1220 in char[] serverHostname = null) 1221 { 1222 Protocol protocol = protocolFactory(); 1223 Transport transport = null; 1224 auto waiter = new Waiter(this); 1225 1226 if (sslContext !is null) 1227 { 1228 assert(0, "SSL support not implemented yet"); 1229 //sslcontext = None if isinstance(ssl, bool) else ssl 1230 //transport = self._make_ssl_transport( 1231 // sock, protocol, sslcontext, waiter, 1232 // server_side = False, server_hostname = server_hostname) 1233 } 1234 else 1235 { 1236 transport = makeSocketTransport(socket, protocol, waiter); 1237 } 1238 1239 try 1240 { 1241 this.waitFor(waiter); 1242 } 1243 catch (Throwable throwable) 1244 { 1245 transport.close; 1246 throw throwable; 1247 } 1248 1249 return tuple!("transport", "protocol")(transport, protocol); 1250 } 1251 } 1252 1253 /** 1254 * Interface of policy for accessing the event loop. 1255 */ 1256 abstract class EventLoopPolicy 1257 { 1258 protected EventLoop eventLoop = null; 1259 protected bool setCalled = false; 1260 1261 /** 1262 * Get the event loop. 1263 * 1264 * This may be $(D null) or an instance of EventLoop. 1265 */ 1266 EventLoop getEventLoop() 1267 { 1268 if (eventLoop is null && !setCalled && thread_isMainThread) 1269 setEventLoop(newEventLoop); 1270 return eventLoop; 1271 } 1272 1273 /** 1274 * Set the event loop. 1275 */ 1276 void setEventLoop(EventLoop loop) 1277 { 1278 setCalled = true; 1279 eventLoop = loop; 1280 } 1281 1282 /** 1283 * Create a new event loop. 1284 * 1285 * You must call $(D setEventLoop()) to make this the current event loop. 1286 */ 1287 EventLoop newEventLoop(); 1288 1289 //version (Posix) 1290 //{ 1291 // ChildWatcher getChildWatcher(); 1292 1293 // void setChildWatcher(ChildWatcher watcher); 1294 //} 1295 } 1296 1297 /** 1298 * Default policy implementation for accessing the event loop. 1299 * 1300 * In this policy, each thread has its own event loop. However, we only 1301 * automatically create an event loop by default for the main thread; other 1302 * threads by default have no event loop. 1303 * 1304 * Other policies may have different rules (e.g. a single global event loop, or 1305 * automatically creating an event loop per thread, or using some other notion 1306 * of context to which an event loop is associated). 1307 */ 1308 1309 private __gshared EventLoopPolicy eventLoopPolicy; 1310 1311 import asynchronous.libasync.events; 1312 1313 alias DefaultEventLoopPolicy = LibasyncEventLoopPolicy; 1314 1315 /** 1316 * Get the current event loop policy. 1317 */ 1318 EventLoopPolicy getEventLoopPolicy() 1319 { 1320 if (eventLoopPolicy is null) 1321 { 1322 synchronized 1323 { 1324 if (eventLoopPolicy is null) 1325 eventLoopPolicy = new DefaultEventLoopPolicy; 1326 } 1327 } 1328 1329 return eventLoopPolicy; 1330 } 1331 1332 /** 1333 * Set the current event loop policy. 1334 * 1335 * If policy is $(D null), the default policy is restored. 1336 */ 1337 void setEventLoopPolicy(EventLoopPolicy policy) 1338 { 1339 eventLoopPolicy = policy; 1340 } 1341 1342 /** 1343 * Equivalent to calling $(D getEventLoopPolicy.getEventLoop). 1344 */ 1345 EventLoop getEventLoop() 1346 { 1347 return getEventLoopPolicy.getEventLoop; 1348 } 1349 1350 unittest 1351 { 1352 assert(getEventLoop !is null); 1353 } 1354 1355 /** 1356 * Equivalent to calling $(D getEventLoopPolicy.setEventLoop(loop)). 1357 */ 1358 void setEventLoop(EventLoop loop) 1359 { 1360 getEventLoopPolicy.setEventLoop(loop); 1361 } 1362 1363 /** 1364 * Equivalent to calling $(D getEventLoopPolicy.newEventLoop). 1365 */ 1366 EventLoop newEventLoop() 1367 { 1368 return getEventLoopPolicy.newEventLoop; 1369 }