1 /** 2 * Event loop and event loop policy. 3 */ 4 module asynchronous.events; 5 6 import core.thread; 7 import std.algorithm; 8 import std.array; 9 import std.datetime; 10 import std.exception; 11 import std.process; 12 import std.socket; 13 import std.string; 14 import std.traits; 15 import std.typecons; 16 import asynchronous.futures; 17 import asynchronous.protocols; 18 import asynchronous.tasks; 19 import asynchronous.transports; 20 import asynchronous.types; 21 22 alias Protocol = asynchronous.protocols.Protocol; 23 24 interface CallbackHandle 25 { 26 /** 27 * Cancel the call. If the callback is already canceled or executed, this 28 * method has no effect. 29 */ 30 void cancel(); 31 32 /** 33 * Return $(D_KEYWORD true) if the callback was cancelled. 34 */ 35 bool cancelled() const; 36 37 package void opCall() 38 { 39 opCallImpl; 40 } 41 42 protected void opCallImpl(); 43 } 44 45 /** 46 * A callback wrapper object returned by $(D_PSYMBOL EventLoop.callSoon), 47 * $(D_PSYMBOL EventLoop.callSoonThreadsafe), 48 * $(D_PSYMBOL EventLoop.callLater), and $(D_PSYMBOL EventLoop.callAt). 49 */ 50 class Callback(Dg, Args...) : CallbackHandle 51 if (isDelegate!Dg) 52 { 53 private bool _cancelled; 54 55 private EventLoop eventLoop; 56 57 private Dg dg; 58 59 private Args args; 60 61 alias ResultType = ReturnType!Dg; 62 63 this(EventLoop eventLoop, Dg dg, Args args) 64 { 65 this.eventLoop = eventLoop; 66 this._cancelled = false; 67 this.dg = dg; 68 static if (args.length > 0) 69 { 70 this.args = args; 71 } 72 } 73 74 override void cancel() 75 { 76 this._cancelled = true; 77 this.dg = null; 78 this.args = Args.init; 79 } 80 81 override bool cancelled() const 82 { 83 return this._cancelled; 84 } 85 86 protected override void opCallImpl() 87 { 88 try 89 { 90 enforceEx!CancelledException(!this._cancelled, "Callback cancelled"); 91 92 this.dg(this.args); 93 } 94 catch (Throwable throwable) 95 { 96 if (this.eventLoop is null) 97 this.eventLoop = getEventLoop; 98 99 auto context = ExceptionContext("Exception on calling " ~ toString, throwable); 100 context.callback = this; 101 eventLoop.callExceptionHandler(context); 102 } 103 } 104 105 override string toString() const 106 { 107 return "%s(dg: %s, cancelled: %s)".format(typeid(this), 108 __traits(identifier, dg), _cancelled); 109 } 110 } 111 112 auto callback(Dg, Args...)(EventLoop eventLoop, Dg dg, Args args) 113 { 114 return new Callback!(Dg, Args)(eventLoop, dg, args); 115 } 116 117 alias ExceptionHandler = void delegate(EventLoop, ExceptionContext); 118 119 /** 120 * Exception conxtext for event exceptions 121 */ 122 struct ExceptionContext 123 { 124 string message; /// Error message. 125 Throwable throwable; /// (optional) Throwable object. 126 FutureHandle future; /// (optional) Future instance. 127 CallbackHandle callback; /// (optional): CallbackHandle instance. 128 Protocol protocol; /// (optional): Protocol instance. 129 Transport transport; /// (optional): Transport instance. 130 Socket socket; /// (optional): Socket instance. 131 ExceptionContext* context; /// (optional): Chained context. 132 133 string toString() const 134 { 135 import std.array : appender; 136 import std.conv : to; 137 138 auto result = appender("message: \"" ~ message ~ "\""); 139 140 if (throwable !is null) 141 result ~= ", throwable: " ~ (cast(Throwable) throwable).to!string; 142 if (future !is null) 143 result ~= ", future: " ~ future.to!string; 144 if (callback !is null) 145 result ~= ", callback: " ~ callback.to!string; 146 if (protocol !is null) 147 result ~= ", protocol: " ~ protocol.to!string; 148 if (transport !is null) 149 result ~= ", transport: " ~ transport.to!string; 150 if (socket !is null) 151 result ~= ", socket: " ~ (cast(Socket) socket).to!string; 152 if (context !is null) 153 result ~= ", context: " ~ (*context).to!string; 154 155 return "%s(%s)".format(typeid(this).to!string, result.data); 156 } 157 } 158 159 unittest 160 { 161 auto exceptionContext = ExceptionContext("foo"); 162 assert(!exceptionContext.toString.canFind("future")); 163 exceptionContext.future = new Future!int; 164 assert(exceptionContext.toString.canFind("future")); 165 } 166 167 interface SslContext 168 { 169 } 170 171 /** 172 * Interface server returned by $(D createServer()). 173 */ 174 interface Server 175 { 176 /** 177 * Stop serving. This leaves existing connections open. 178 */ 179 void close(); 180 181 /** 182 * Coroutine to wait until service is closed. 183 */ 184 @Coroutine 185 void waitClosed(); 186 } 187 188 package class ServerImpl : Server 189 { 190 private EventLoop eventLoop; 191 192 private Socket[] sockets; 193 194 private size_t activeCount; 195 196 private Waiter[] waiters; 197 198 this(EventLoop eventLoop, Socket[] sockets) 199 { 200 this.eventLoop = eventLoop; 201 this.sockets = sockets; 202 } 203 204 package void attach() 205 in 206 { 207 assert(!this.sockets.empty); 208 } 209 body 210 { 211 ++this.activeCount; 212 } 213 214 package void detach() 215 in 216 { 217 assert(this.activeCount > 0); 218 } 219 body 220 { 221 --activeCount; 222 223 if (this.activeCount == 0 && this.sockets.empty) 224 wakeup; 225 } 226 227 override void close() 228 { 229 if (this.sockets.empty) 230 return; 231 232 Socket[] stopSockets = this.sockets; 233 234 this.sockets = null; 235 236 foreach (socket; stopSockets) 237 this.eventLoop.stopServing(socket); 238 239 if (this.activeCount == 0) 240 wakeup; 241 } 242 243 private void wakeup() 244 { 245 Waiter[] doneWaiters = this.waiters; 246 247 this.waiters = null; 248 249 foreach (waiter; doneWaiters) 250 { 251 if (!waiter.done) 252 waiter.setResult; 253 } 254 } 255 256 @Coroutine 257 override void waitClosed() 258 { 259 if (this.sockets.empty && this.activeCount == 0) 260 return; 261 262 Waiter waiter = new Waiter(this.eventLoop); 263 264 this.waiters ~= waiter; 265 266 this.eventLoop.waitFor(waiter); 267 } 268 } 269 270 /** 271 * Interface of event loop. 272 */ 273 abstract class EventLoop 274 { 275 protected enum State 276 { 277 STOPPED, 278 RUNNING, 279 STOPPING, 280 CLOSED, 281 } 282 283 protected State state = State.STOPPED; 284 285 // Run an event loop 286 /** 287 * Run until $(D_PSYMBOL stop()) is called. 288 */ 289 void runForever(); 290 291 /** 292 * Run until $(PARAM future) is done. 293 * 294 * If the argument is a $(I coroutine object), it is wrapped by $(D_PSYMBOL 295 * task()). 296 * 297 * Returns: the Future's result, or throws its exception. 298 */ 299 final T runUntilComplete(T)(Future!T future) 300 { 301 future.addDoneCallback(&stop); 302 runForever; 303 assert(future.done); 304 if (future.exception) 305 throw future.exception; 306 307 static if (!is(T == void)) 308 { 309 return future.result; 310 } 311 } 312 313 /** 314 * Returns: running status of event loop. 315 */ 316 final bool isRunning() 317 { 318 return this.state == State.RUNNING || this.state == State.STOPPING; 319 } 320 321 /** 322 * Stop running the event loop. 323 * 324 * Every callback scheduled before $(D_PSYMBOL stop()) is called will run. 325 * Callbacks scheduled after $(D_PSYMBOL stop()) is called will not run. 326 * Howerver, those callbacks will run if $(D_PSYMBOL runForever()) is 327 * called again later. 328 */ 329 final void stop() 330 { 331 final switch (this.state) 332 { 333 case State.STOPPED: 334 case State.STOPPING: 335 break; 336 case State.RUNNING: 337 this.state = State.STOPPING; 338 break; 339 case State.CLOSED: 340 throw new Exception("Cannot stop a closed event loop"); 341 } 342 } 343 344 /** 345 * Returns: $(D_KEYWORD true) if the event loop was closed. 346 */ 347 final bool isClosed() 348 { 349 return this.state == State.CLOSED; 350 } 351 352 /** 353 * Close the event loop. The loop must not be running. 354 * 355 * This clears the queues and shuts down the executor, but does not wait 356 * for the executor to finish. 357 * 358 * This is idempotent and irreversible. No other methods should be called 359 * after this one. 360 */ 361 final void close() 362 { 363 final switch (this.state) 364 { 365 case State.STOPPED: 366 this.state = State.CLOSED; 367 break; 368 case State.RUNNING: 369 case State.STOPPING: 370 throw new Exception("Cannot close a running event loop"); 371 case State.CLOSED: 372 break; 373 } 374 } 375 376 // Calls 377 378 /** 379 * Arrange for a callback to be called as soon as possible. 380 * 381 * This operates as a FIFO queue, callbacks are called in the order in 382 * which they are registered. Each callback will be called exactly once. 383 * 384 * Any positional arguments after the callback will be passed to the 385 * callback when it is called. 386 * 387 * Returns: an instance of $(D_PSYMBOL Callback). 388 */ 389 final auto callSoon(Dg, Args...)(Dg dg, Args args) 390 { 391 auto callback = new Callback!(Dg, Args)(this, dg, args); 392 393 scheduleCallback(callback); 394 395 return callback; 396 } 397 398 /+ 399 /** 400 * Like $(D_PSYMBOL call_soon()), but thread safe. 401 */ 402 final auto callSoonThreadsafe(alias callback, Args...)(Args args) 403 { 404 return callLater!callback(0, args); 405 } 406 +/ 407 408 // Delayed calls 409 410 /** 411 * Arrange for the callback to be called after the given delay. 412 * 413 * An instance of $(D_PSYMBOL Callback) is returned. 414 * 415 * $(D_PSYMBOL dg) delegate will be called exactly once per call to 416 * $(D_PSYMBOL callLater()). If two callbacks are scheduled for exactly 417 * the same time, it is undefined which will be called first. 418 * 419 * The optional positional args will be passed to the callback when it is 420 * called. 421 */ 422 final auto callLater(Dg, Args...)(Duration delay, Dg dg, Args args) 423 { 424 auto callback = new Callback!(Dg, Args)(this, dg, args); 425 426 scheduleCallback(delay, callback); 427 428 return callback; 429 } 430 431 /** 432 * Arrange for the callback to be called at the given absolute timestamp 433 * when (an int or float), using the same time reference as time(). 434 * 435 * This method’s behavior is the same as $(D_PSYMBOL callLater()). 436 */ 437 final auto callAt(Dg, Args...)(SysTime when, Dg dg, Args args) 438 { 439 auto callback = new Callback!(Dg, Args)(this, dg, args); 440 441 scheduleCallback(when, callback); 442 443 return callback; 444 } 445 446 /** 447 * Return the current time according to the event loop’s internal clock. 448 * 449 * See_Also: $(D_PSYMBOL sleep()). 450 */ 451 SysTime time() 452 { 453 return Clock.currTime; 454 } 455 456 457 // Fibers 458 459 /** 460 * Schedule the execution of a fiber object: wrap it in a future. 461 * 462 * Third-party event loops can use their own subclass of Task for 463 * interoperability. In this case, the result type is a subclass of Task. 464 * 465 * Returns: a $(D_PSYMBOL Task) object. 466 * 467 * See_Also: $(D_PSYMBOL task()). 468 */ 469 final auto createTask(Coroutine, Args...)(Coroutine coroutine, Args args) 470 if (isDelegate!Coroutine) 471 { 472 return new Task!(Coroutine, Args)(this, coroutine, args); 473 } 474 /* 475 // Methods for interacting with threads. 476 477 ReturnType!callback runInExecutor(alias callback, Args...)(executor, Args args); 478 479 void setDefaultExecutor(executor); 480 481 // Network I/O methods returning Futures. 482 483 AddressInfo[] getAddressInfo(T...)(in char[] node, T options); 484 485 // def getNameInfo(sockaddr, flags = 0); 486 */ 487 // Creating connections 488 489 /** 490 * Create a streaming transport connection to a given Internet host and 491 * port: socket family $(D_PSYMBOL AddressFamily.INET) or $(D_PSYMBOL 492 * AddressFamily.INET6) depending on host (or family if specified), socket 493 * type $(D_PSYMBOL SocketType.STREAM). 494 * 495 * This method is a coroutine which will try to establish the connection in 496 * the background. When successful, the coroutine returns a (transport, 497 * protocol) tuple. 498 * 499 * The chronological synopsis of the underlying operation is as follows: 500 * 501 * The connection is established, and a transport is created to represent 502 * it. $(D_PSYMBOL protocolFactory) is called without arguments and must 503 * return a protocol instance. 504 * The protocol instance is tied to the transport, and its $(D_PSYMBOL 505 * connectionMade()) method is called. 506 * The coroutine returns successfully with the $(D_PSYMBOL (transport, 507 * protocol)) tuple. 508 * 509 * The created transport is an implementation-dependent bidirectional 510 * stream. 511 * 512 * Options allowing to change how the connection is created: 513 * Params: 514 * protocolFactory = is a callable returning a protocol instance 515 * 516 * host = if empty then the $(D_PSYMBOL socket) parameter should be 517 * specified. 518 * 519 * service = service name or port number. 520 * 521 * sslContext = if not $(D_KEYWORD null), a SSL/TLS transport is created 522 * (by default a plain TCP transport is created). 523 * 524 * serverHostname = is only for use together with ssl, and sets or 525 * overrides the hostname that the target server’s certificate will be 526 * matched against. By default the value of the host argument is used. 527 * If host is empty, there is no default and you must pass a value for 528 * $(D_PSYMBOL serverHostname). If $(D_PSYMBOL serverHostname) is an 529 * empty, hostname matching is disabled (which is a serious security 530 * risk, allowing for man-in-the-middle-attacks). 531 * 532 * addressFamily = optional adress family. 533 * 534 * protocolType = optional protocol. 535 * 536 * addressInfoFlags = optional flags. 537 * 538 * socket = if not $(D_KEYWORD null), should be an existing, already 539 * connected $(D_PSYMBOL Socket) object to be used by the transport. If 540 * $(D_PSYMBOL socket) is given, none of $(D_PSYMBOL host), $(D_PSYMBOL 541 * service), $(D_PSYMBOL addressFamily), $(D_PSYMBOL protocolType), 542 * $(D_PSYMBOL addressInfoFlags) and $(D_PSYMBOL localAddress) should 543 * be specified. 544 * 545 * localHost = if given, together with $(D_PSYMBOL localService) is used to 546 * bind the socket to locally. The $(D_PSYMBOL localHost) and 547 * $(D_PSYMBOL localService) are looked up using $(D_PSYMBOL 548 * getAddressInfo()), similarly to host and service. 549 * 550 * localService = see $(D_PSYMBOL localHost). 551 * 552 * Returns: Tuple!(Transport, "transport", Protocol, "protocol") 553 */ 554 @Coroutine 555 auto createConnection(ProtocolFactory protocolFactory, 556 in char[] host = null, in char[] service = null, 557 SslContext sslContext = null, 558 AddressFamily addressFamily = UNSPECIFIED!AddressFamily, 559 ProtocolType protocolType = UNSPECIFIED!ProtocolType, 560 AddressInfoFlags addressInfoFlags = UNSPECIFIED!AddressInfoFlags, 561 Socket socket = null, in char[] localHost = null, 562 in char[] localService = null, in char[] serverHostname = null) 563 { 564 enforce(serverHostname.empty || sslContext !is null, 565 "serverHostname is only meaningful with SSL"); 566 enforce(serverHostname.empty || sslContext is null || !host.empty, 567 "You must set serverHostname when using SSL without a host"); 568 569 if (!host.empty || !service.empty) 570 { 571 enforce(socket is null, 572 "host/service and socket can not be specified at the same time"); 573 574 Future!(AddressInfo[])[] fs = [ 575 createTask(&this.getAddressInfo, host, service, addressFamily, 576 SocketType.STREAM, protocolType, addressInfoFlags) 577 ]; 578 579 if (!localHost.empty) 580 fs ~= createTask(&this.getAddressInfo, localHost, localService, 581 addressFamily, SocketType.STREAM, protocolType, 582 addressInfoFlags); 583 584 this.wait(fs); 585 586 auto addressInfos = fs.map!"a.result"; 587 588 enforceEx!SocketOSException(addressInfos.all!(a => !a.empty), 589 "getAddressInfo() returned empty list"); 590 591 SocketOSException[] exceptions = null; 592 bool connected = false; 593 594 foreach (addressInfo; addressInfos[0]) 595 { 596 try 597 { 598 socket = new Socket(addressInfo); 599 socket.blocking(false); 600 601 if (addressInfos.length > 1) 602 { 603 bool bound = false; 604 605 foreach (localAddressInfo; addressInfos[1]) 606 { 607 try 608 { 609 socket.bind(localAddressInfo.address); 610 bound = true; 611 break; 612 } 613 catch (SocketOSException socketOSException) 614 { 615 exceptions ~= new SocketOSException( 616 "error while attempting to bind on address '%s'" 617 .format(localAddressInfo.address), 618 socketOSException); 619 } 620 } 621 if (!bound) 622 { 623 socket.close; 624 socket = null; 625 continue; 626 } 627 } 628 629 socketConnect(socket, addressInfo.address); 630 connected = true; 631 break; 632 } 633 catch (SocketOSException socketOSException) 634 { 635 if (socket !is null) 636 socket.close; 637 638 exceptions ~= socketOSException; 639 } 640 catch (Throwable throwable) 641 { 642 if (socket !is null) 643 socket.close; 644 645 throw throwable; 646 } 647 } 648 649 if (!connected) 650 { 651 assert(!exceptions.empty); 652 653 if (exceptions.length == 1) 654 { 655 throw exceptions[0]; 656 } 657 else 658 { 659 if (exceptions.all!(a => a.msg == exceptions[0].msg)) 660 throw exceptions[0]; 661 662 throw new SocketOSException( 663 "Multiple exceptions: %(%s, %)".format(exceptions)); 664 } 665 } 666 } 667 else 668 { 669 enforce(socket !is null, 670 "host and port was not specified and no socket specified"); 671 672 socket.blocking(false); 673 } 674 675 return createConnectionTransport(socket, protocolFactory, sslContext, 676 serverHostname.empty ? serverHostname : host); 677 } 678 679 /** 680 * Create datagram connection: socket family $(D_PSYMBOL AddressFamily.INET) 681 * or $(D_PSYMBOL AddressFamily.INET6) depending on host (or family if 682 * specified), socket type $(D_PSYMBOL SocketType.DGRAM). 683 * 684 * This method is a coroutine which will try to establish the connection in 685 * the background. 686 * 687 * See the create_connection() method for parameters. 688 * 689 * Returns: Tuple!(DatagramTransport, "datagramTransport", 690 * DatagramProtocol, "datagramProtocol") 691 */ 692 @Coroutine 693 auto createDatagramEndpoint(DatagramProtocolFactory datagramProtocolFactory, 694 in char[] localHost = null, in char[] localService = null, 695 in char[] remoteHost = null, in char[] remoteService = null, 696 AddressFamily addressFamily = UNSPECIFIED!AddressFamily, 697 ProtocolType protocolType = UNSPECIFIED!ProtocolType, 698 AddressInfoFlags addressInfoFlags = UNSPECIFIED!AddressInfoFlags) 699 { 700 alias AddressPairInfo = Tuple!(AddressFamily, "addressFamily", 701 ProtocolType, "protocolType", Address, "localAddress", 702 Address, "remoteAddress"); 703 704 AddressPairInfo[] addressPairInfos = null; 705 706 707 if (localHost.empty && remoteHost.empty) 708 { 709 enforce(addressFamily != UNSPECIFIED!AddressFamily, 710 "Unexpected address family"); 711 addressPairInfos ~= AddressPairInfo(addressFamily, protocolType, 712 null, null); 713 } 714 else 715 { 716 enforce(remoteHost.empty, 717 "Remote host parameter not supportored yet"); 718 719 auto addressInfos = getAddressInfo(localHost, localService, 720 addressFamily, SocketType.DGRAM, protocolType, 721 addressInfoFlags); 722 723 enforceEx!SocketOSException(!addressInfos.empty, 724 "getAddressInfo() returned empty list"); 725 726 foreach (addressInfo; addressInfos) 727 { 728 addressPairInfos ~= AddressPairInfo(addressInfo.family, 729 addressInfo.protocol, addressInfo.address, null); 730 } 731 } 732 733 Socket socket = null; 734 Address remoteAddress = null; 735 SocketOSException[] exceptions = null; 736 737 foreach (addressPairInfo; addressPairInfos) 738 { 739 try 740 { 741 socket = new Socket(addressPairInfo.addressFamily, 742 SocketType.DGRAM, addressPairInfo.protocolType); 743 socket.setOption(SocketOptionLevel.SOCKET, 744 SocketOption.REUSEADDR, 1); 745 socket.blocking(false); 746 747 if (addressPairInfo.localAddress) 748 socket.bind(addressPairInfo.localAddress); 749 750 751 remoteAddress = addressPairInfo.remoteAddress; 752 enforce(remoteAddress is null, 753 "remote connect not supported yet"); 754 755 break; 756 } 757 catch (SocketOSException socketOSException) 758 { 759 if (socket !is null) 760 socket.close; 761 762 exceptions ~= socketOSException; 763 } 764 catch (Throwable throwable) 765 { 766 if (socket !is null) 767 socket.close; 768 769 throw throwable; 770 } 771 } 772 773 auto protocol = datagramProtocolFactory(); 774 auto waiter = new Waiter(this); 775 auto transport = makeDatagramTransport(socket, protocol, remoteAddress, 776 waiter); 777 try 778 { 779 this.waitFor(waiter); 780 } 781 catch (Throwable throwable) 782 { 783 transport.close; 784 throw throwable; 785 } 786 787 return tuple!("datagramTransport", "datagramProtocol")(transport, 788 protocol); 789 } 790 791 /** 792 * Create UNIX connection: socket family $(D_PSYMBOL AddressFamily.UNIX), 793 * socket type $(D_PSYMBOL SocketType.STREAM). The UNIX socket family is 794 * used to communicate between processes on the same machine efficiently. 795 * 796 * This method is a coroutine which will try to establish the connection in 797 * the background. When successful, the coroutine returns a $(D_PSYMBOL 798 * Tuple!(Transport, "transport", Protocol, "protocol")) 799 * 800 * See the $(D_PSYMBOL EventLoop.createConnection()) method for parameters. 801 */ 802 version (Posix) 803 @Coroutine 804 auto createUnixConnection(ProtocolFactory protocolFactory, 805 in char[] path = null, SslContext sslContext = null, 806 Socket socket = null, in char[] serverHostname = null) 807 { 808 if (sslContext is null) 809 enforce(serverHostname.empty, 810 "serverHostname is only meaningful with ssl"); 811 else 812 enforce(!serverHostname.empty, 813 "you have to pass server_hostname when using ssl"); 814 815 if (!path.empty) 816 { 817 enforce(socket is null, 818 "path and socket can not be specified at the same time"); 819 820 try 821 { 822 socket = new Socket(AddressFamily.UNIX, SocketType.STREAM); 823 socket.blocking(false); 824 socketConnect(socket, new UnixAddress(path)); 825 } 826 catch (Throwable throwable) 827 { 828 if (socket !is null) 829 socket.close; 830 831 throw throwable; 832 } 833 } 834 else 835 { 836 enforce(socket !is null, "no path and sock were specified"); 837 enforce(socket.addressFamily == AddressFamily.UNIX, 838 "A UNIX Domain Socket was expected, got %s".format(socket)); 839 socket.blocking(false); 840 } 841 842 return createConnectionTransport(socket, protocolFactory, sslContext, 843 serverHostname); 844 } 845 846 847 //"""A coroutine which creates a UNIX Domain Socket server. 848 849 //The return value is a Server object, which can be used to stop 850 //the service. 851 852 //path is a str, representing a file systsem path to bind the 853 //server socket to. 854 855 //sock can optionally be specified in order to use a preexisting 856 //socket object. 857 858 //backlog is the maximum number of queued connections passed to 859 //listen() (defaults to 100). 860 861 //ssl can be set to an SSLContext to enable SSL over the 862 //accepted connections. 863 //""" 864 865 /** 866 * A coroutine which creates a TCP server bound to host and port. 867 * 868 * Params: 869 * protocolFactory = is a callable returning a protocol instance. 870 * 871 * host = if empty then all interfaces are assumed and a list of multiple 872 * sockets will be returned (most likely one for IPv4 and another one 873 * for IPv6). 874 * 875 * service = service name or port number. 876 * 877 * addressFamily = can be set to either $(D_PSYMBOL AddressFamily.INET) or 878 * $(D_PSYMBOL AddressFamily.INET6) to force the socket to use IPv4 or 879 * IPv6. If not set it will be determined from host (defaults to 880 * $(D_PSYMBOL AddressFamily.UNSPEC)). 881 * 882 * addressInfoFlags = a bitmask for getAddressInfo(). 883 * 884 * socket = can optionally be specified in order to use a preexisting 885 * socket object. 886 * 887 * backlog = the maximum number of queued connections passed to listen() 888 * (defaults to 100). 889 * 890 * sslContext = can be set to an SSLContext to enable SSL over the accepted 891 * connections. 892 * 893 * reuseAddress = tells the kernel to reuse a local socket in TIME_WAIT 894 * state, without waiting for its natural timeout to expire. If not 895 * specified will automatically be set to $(D_KEYWORD true) on UNIX. 896 * 897 * Returns: a Server object which can be used to stop the service. 898 */ 899 @Coroutine 900 Server createServer(ProtocolFactory protocolFactory, 901 in char[] host = null, in char[] service = null, 902 AddressFamily addressFamily = UNSPECIFIED!AddressFamily, 903 AddressInfoFlags addressInfoFlags = AddressInfoFlags.PASSIVE, 904 Socket socket = null, int backlog = 100, SslContext sslContext = null, 905 bool reuseAddress = true) 906 { 907 enforce(sslContext is null, "SSL support not implemented yet"); 908 909 Socket[] sockets; 910 911 scope (failure) 912 { 913 foreach (socket1; sockets) 914 socket1.close; 915 } 916 917 if (!host.empty || !service.empty) 918 { 919 enforce(socket is null, 920 "host/service and socket can not be specified at the same time"); 921 922 AddressInfo[] addressInfos = getAddressInfo(host, service, 923 addressFamily, SocketType.STREAM, UNSPECIFIED!ProtocolType, 924 addressInfoFlags); 925 926 enforceEx!SocketOSException(!addressInfos.empty, 927 "getAddressInfo() returned empty list"); 928 929 foreach (addressInfo; addressInfos) 930 { 931 try 932 { 933 socket = new Socket(addressInfo); 934 } 935 catch (SocketOSException socketOSException) 936 { 937 continue; 938 } 939 940 sockets ~= socket; 941 942 if (reuseAddress) 943 socket.setOption(SocketOptionLevel.SOCKET, 944 SocketOption.REUSEADDR, true); 945 946 if (addressInfo.family == AddressFamily.INET6) 947 socket.setOption(SocketOptionLevel.IPV6, 948 SocketOption.IPV6_V6ONLY, true); 949 950 try 951 { 952 socket.bind(addressInfo.address); 953 } 954 catch (SocketException socketException) 955 { 956 throw new SocketException( 957 "error while attempting to bind to address %s: %s" 958 .format(addressInfo.address, socketException.msg)); 959 } 960 } 961 } 962 else 963 { 964 enforce(socket !is null, 965 "Neither host/service nor socket were specified"); 966 967 sockets ~= socket; 968 } 969 970 auto server = new ServerImpl(this, sockets); 971 972 foreach (socket1; sockets) 973 { 974 socket1.listen(backlog); 975 socket1.blocking(false); 976 startServing(protocolFactory, socket1, sslContext, server); 977 } 978 979 return server; 980 } 981 982 /** 983 * Similar to $(D_PSYMBOL EventLoop.createServer()), but specific to the 984 * socket family $(D_PSYMBOL AddressFamily.UNIX). 985 */ 986 version (Posix) 987 @Coroutine 988 Server createUnixServer(ProtocolFactory protocolFactory, in char[] path, 989 Socket socket = null, int backlog = 100, SslContext sslContext = null) 990 { 991 if (!path.empty) 992 { 993 enforce(socket is null, 994 "path and socket can not be specified at the same time"); 995 996 socket = new Socket(AddressFamily.UNIX, SocketType.STREAM); 997 998 try 999 { 1000 socket.bind(new UnixAddress(path)); 1001 } 1002 catch (SocketOSException socketOSException) 1003 { 1004 import core.stdc.errno : EADDRINUSE; 1005 1006 socket.close; 1007 if (socketOSException.errorCode == EADDRINUSE) 1008 throw new SocketOSException("Address %s is already in use" 1009 .format(path), socketOSException.errorCode); 1010 else 1011 throw socketOSException; 1012 } 1013 catch (Throwable throwable) 1014 { 1015 socket.close; 1016 throw throwable; 1017 } 1018 } 1019 else 1020 { 1021 enforce(socket !is null, 1022 "path was not specified, and no socket specified"); 1023 enforce(socket.addressFamily == AddressFamily.UNIX, 1024 "A UNIX Domain Socket was expected, got %s".format(socket)); 1025 } 1026 1027 auto server = new ServerImpl(this, [socket]); 1028 1029 socket.listen(backlog); 1030 socket.blocking(false); 1031 startServing(protocolFactory, socket, sslContext, server); 1032 1033 return server; 1034 } 1035 /* 1036 // Pipes and subprocesses. 1037 1038 //"""Register read pipe in event loop. 1039 1040 //protocol_factory should instantiate object with Protocol interface. 1041 //pipe is file-like object already switched to nonblocking. 1042 //Return pair (transport, protocol), where transport support 1043 //ReadTransport interface.""" 1044 //# The reason to accept file-like object instead of just file descriptor 1045 //# is: we need to own pipe and close it at transport finishing 1046 //# Can got complicated errors if pass f.fileno(), 1047 //# close fd in pipe transport then close f and vise versa. 1048 Tuple!(Transport, Protocol) connectReadPipe(Protocol function() protocol_factory, 1049 Pipe pipe); 1050 1051 //"""Register write pipe in event loop. 1052 1053 //protocol_factory should instantiate object with BaseProtocol interface. 1054 //Pipe is file-like object already switched to nonblocking. 1055 //Return pair (transport, protocol), where transport support 1056 //WriteTransport interface.""" 1057 //# The reason to accept file-like object instead of just file descriptor 1058 //# is: we need to own pipe and close it at transport finishing 1059 //# Can got complicated errors if pass f.fileno(), 1060 //# close fd in pipe transport then close f and vise versa. 1061 Tuple!(Transport, Protocol) connectWritePipe(Protocol function() protocol_factory, 1062 Pipe pipe); 1063 1064 Tuple!(Transport, Protocol) processShell(Protocol function() protocol_factory, 1065 in char[] cmd, File stdin = subprocess.PIPE, File stdout = subprocess.PIPE, 1066 File stderr = subprocess.PIPE); 1067 1068 Tuple!(Transport, Protocol) processProcess(Protocol function() protocol_factory, 1069 in char[][] args, File stdin = subprocess.PIPE, File stdout = subprocess.PIPE, 1070 File stderr = subprocess.PIPE); 1071 1072 //# Ready-based callback registration methods. 1073 //# The add_*() methods return None. 1074 //# The remove_*() methods return True if something was removed, 1075 //# False if there was nothing to delete. 1076 1077 void addReader(int fd, void delegate() callback); 1078 1079 void removeReader(int fd); 1080 1081 void addWriter(int fd, void delegate() callback); 1082 1083 void removeWriter(int fd); 1084 1085 // # Completion based I/O methods returning Futures. 1086 1087 ptrdiff_t socketReceive(Socket sock, void[] buf); 1088 1089 ptrdiff_t socketSend(Socket sock, void[] buf); 1090 1091 Socket socketAccept(Socket sock); 1092 */ 1093 // Resolve host name 1094 1095 @Coroutine 1096 AddressInfo[] getAddressInfo(in char[] host, in char[] service, 1097 AddressFamily addressFamily = UNSPECIFIED!AddressFamily, 1098 SocketType socketType = UNSPECIFIED!SocketType, 1099 ProtocolType protocolType = UNSPECIFIED!ProtocolType, 1100 AddressInfoFlags addressInfoFlags = UNSPECIFIED!AddressInfoFlags); 1101 1102 1103 // Signal handling. 1104 version (Posix) 1105 { 1106 void addSignalHandler(int sig, void delegate() handler); 1107 1108 void removeSignalHandler(int sig); 1109 } 1110 1111 // Error handlers. 1112 1113 private ExceptionHandler exceptionHandler; 1114 1115 /** 1116 * Set handler as the new event loop exception handler. 1117 * 1118 * If handler is $(D_KEYWORD null), the default exception handler will be set. 1119 * 1120 * See_Also: $(D_PSYMBOL callExceptionHandler()). 1121 */ 1122 final void setExceptionHandler(ExceptionHandler exceptionHandler) 1123 { 1124 this.exceptionHandler = exceptionHandler; 1125 } 1126 1127 /** 1128 * Default exception handler. 1129 * 1130 * This is called when an exception occurs and no exception handler is set, 1131 * and can be called by a custom exception handler that wants to defer to 1132 * the default behavior. 1133 * The context parameter has the same meaning as in $(D_PSYMBOL 1134 * callExceptionHandler()). 1135 */ 1136 final void defaultExceptionHandler(ExceptionContext exceptionContext) 1137 { 1138 import std.stdio : stderr; 1139 1140 if (exceptionContext.message.empty) 1141 exceptionContext.message = "Unhandled exception in event loop"; 1142 1143 if (cast(Error) exceptionContext.throwable) 1144 throw new Error("Uncaught Error: %s".format(exceptionContext)); 1145 1146 stderr.writefln("Uncaught Exception: %s", exceptionContext); 1147 } 1148 1149 /** 1150 * Call the current event loop's exception handler. 1151 * 1152 * Note: New ExceptionContext members may be introduced in the future. 1153 */ 1154 final void callExceptionHandler(ExceptionContext exceptionContext) 1155 { 1156 if (exceptionHandler is null) 1157 { 1158 defaultExceptionHandler(exceptionContext); 1159 return; 1160 } 1161 1162 try 1163 { 1164 exceptionHandler(this, exceptionContext); 1165 } 1166 catch (Throwable throwable) 1167 { 1168 auto context = ExceptionContext("Unhandled error in exception handler", 1169 throwable); 1170 context.context = &exceptionContext; 1171 1172 defaultExceptionHandler(context); 1173 } 1174 } 1175 1176 override string toString() const 1177 { 1178 return "%s(%s)".format(typeid(this), this.state); 1179 } 1180 1181 protected: 1182 1183 void scheduleCallback(CallbackHandle callback); 1184 1185 void scheduleCallback(Duration delay, CallbackHandle callback); 1186 1187 void scheduleCallback(SysTime when, CallbackHandle callback); 1188 1189 void socketConnect(Socket socket, Address address); 1190 1191 Transport makeSocketTransport(Socket socket, Protocol protocol, 1192 Waiter waiter); 1193 1194 DatagramTransport makeDatagramTransport(Socket socket, 1195 DatagramProtocol datagramProtocol, Address remoteAddress, 1196 Waiter waiter); 1197 1198 void startServing(ProtocolFactory protocolFactory, Socket socket, 1199 SslContext sslContext, ServerImpl server); 1200 1201 void stopServing(Socket socket); 1202 private: 1203 1204 @Coroutine 1205 auto createConnectionTransport(Socket socket, 1206 ProtocolFactory protocolFactory, SslContext sslContext, 1207 in char[] serverHostname = null) 1208 { 1209 Protocol protocol = protocolFactory(); 1210 Transport transport = null; 1211 auto waiter = new Waiter(this); 1212 1213 if (sslContext !is null) 1214 { 1215 assert(0, "SSL support not implemented yet"); 1216 //sslcontext = None if isinstance(ssl, bool) else ssl 1217 //transport = self._make_ssl_transport( 1218 // sock, protocol, sslcontext, waiter, 1219 // server_side = False, server_hostname = server_hostname) 1220 } 1221 else 1222 { 1223 transport = makeSocketTransport(socket, protocol, waiter); 1224 } 1225 1226 try 1227 { 1228 this.waitFor(waiter); 1229 } 1230 catch (Throwable throwable) 1231 { 1232 transport.close; 1233 throw throwable; 1234 } 1235 1236 return tuple!("transport", "protocol")(transport, protocol); 1237 } 1238 } 1239 1240 /** 1241 * Interface of policy for accessing the event loop. 1242 */ 1243 abstract class EventLoopPolicy 1244 { 1245 protected EventLoop eventLoop = null; 1246 protected bool setCalled = false; 1247 1248 /** 1249 * Get the event loop. 1250 * 1251 * This may be $(D null) or an instance of EventLoop. 1252 */ 1253 EventLoop getEventLoop() 1254 { 1255 if (eventLoop is null && !setCalled && thread_isMainThread) 1256 setEventLoop(newEventLoop); 1257 return eventLoop; 1258 } 1259 1260 /** 1261 * Set the event loop. 1262 */ 1263 void setEventLoop(EventLoop loop) 1264 { 1265 setCalled = true; 1266 eventLoop = loop; 1267 } 1268 1269 /** 1270 * Create a new event loop. 1271 * 1272 * You must call $(D setEventLoop()) to make this the current event loop. 1273 */ 1274 EventLoop newEventLoop(); 1275 1276 //version (Posix) 1277 //{ 1278 // ChildWatcher getChildWatcher(); 1279 1280 // void setChildWatcher(ChildWatcher watcher); 1281 //} 1282 } 1283 1284 /** 1285 * Default policy implementation for accessing the event loop. 1286 * 1287 * In this policy, each thread has its own event loop. However, we only 1288 * automatically create an event loop by default for the main thread; other 1289 * threads by default have no event loop. 1290 * 1291 * Other policies may have different rules (e.g. a single global event loop, or 1292 * automatically creating an event loop per thread, or using some other notion 1293 * of context to which an event loop is associated). 1294 */ 1295 1296 private __gshared EventLoopPolicy eventLoopPolicy; 1297 1298 import asynchronous.libasync.events; 1299 1300 alias DefaultEventLoopPolicy = LibasyncEventLoopPolicy; 1301 1302 /** 1303 * Get the current event loop policy. 1304 */ 1305 EventLoopPolicy getEventLoopPolicy() 1306 { 1307 if (eventLoopPolicy is null) 1308 { 1309 synchronized 1310 { 1311 if (eventLoopPolicy is null) 1312 eventLoopPolicy = new DefaultEventLoopPolicy; 1313 } 1314 } 1315 1316 return eventLoopPolicy; 1317 } 1318 1319 /** 1320 * Set the current event loop policy. 1321 * 1322 * If policy is $(D null), the default policy is restored. 1323 */ 1324 void setEventLoopPolicy(EventLoopPolicy policy) 1325 { 1326 eventLoopPolicy = policy; 1327 } 1328 1329 /** 1330 * Equivalent to calling $(D getEventLoopPolicy.getEventLoop). 1331 */ 1332 EventLoop getEventLoop() 1333 { 1334 return getEventLoopPolicy.getEventLoop; 1335 } 1336 1337 unittest 1338 { 1339 assert(getEventLoop !is null); 1340 } 1341 1342 /** 1343 * Equivalent to calling $(D getEventLoopPolicy.setEventLoop(loop)). 1344 */ 1345 void setEventLoop(EventLoop loop) 1346 { 1347 getEventLoopPolicy.setEventLoop(loop); 1348 } 1349 1350 /** 1351 * Equivalent to calling $(D getEventLoopPolicy.newEventLoop). 1352 */ 1353 EventLoop newEventLoop() 1354 { 1355 return getEventLoopPolicy.newEventLoop; 1356 }