1 /**
2  * Event loop and event loop policy.
3  */
4 module asynchronous.events;
5 
6 import core.thread;
7 import std.algorithm;
8 import std.array;
9 import std.datetime;
10 import std.exception;
11 import std.process;
12 import std.socket;
13 import std.string;
14 import std.traits;
15 import std.typecons;
16 import asynchronous.futures;
17 import asynchronous.protocols;
18 import asynchronous.tasks;
19 import asynchronous.transports;
20 import asynchronous.types;
21 
22 alias Protocol = asynchronous.protocols.Protocol;
23 
24 interface CallbackHandle
25 {
26     /**
27      * Cancel the call. If the callback is already canceled or executed, this
28      * method has no effect.
29      */
30     void cancel();
31 
32     /**
33      * Return $(D_KEYWORD true) if the callback was cancelled.
34      */
35     bool cancelled() const;
36 
37     package void opCall()
38     {
39         opCallImpl;
40     }
41 
42     protected void opCallImpl();
43 }
44 
45 /**
46  * A callback wrapper object returned by $(D_PSYMBOL EventLoop.callSoon),
47  * $(D_PSYMBOL EventLoop.callSoonThreadsafe),
48  * $(D_PSYMBOL EventLoop.callLater), and $(D_PSYMBOL EventLoop.callAt).
49  */
50 class Callback(Dg, Args...) : CallbackHandle
51 if (isDelegate!Dg)
52 {
53     private bool _cancelled;
54 
55     private EventLoop eventLoop;
56 
57     private Dg dg;
58 
59     private Args args;
60 
61     alias ResultType = ReturnType!Dg;
62 
63     this(EventLoop eventLoop, Dg dg, Args args)
64     {
65         this.eventLoop = eventLoop;
66         this._cancelled = false;
67         this.dg = dg;
68         static if (args.length > 0)
69         {
70             this.args = args;
71         }
72     }
73 
74     override void cancel()
75     {
76         this._cancelled = true;
77         this.dg = null;
78         this.args = Args.init;
79     }
80 
81     override bool cancelled() const
82     {
83         return this._cancelled;
84     }
85 
86     protected override void opCallImpl()
87     {
88         try
89         {
90             enforceEx!CancelledException(!this._cancelled, "Callback cancelled");
91 
92             this.dg(this.args);
93         }
94         catch (Throwable throwable)
95         {
96             if (this.eventLoop is null)
97                 this.eventLoop = getEventLoop;
98 
99             auto context = ExceptionContext("Exception on calling " ~ toString, throwable);
100             context.callback = this;
101             eventLoop.callExceptionHandler(context);
102         }
103     }
104 
105     override string toString() const
106     {
107         return "%s(dg: %s, cancelled: %s)".format(typeid(this),
108             __traits(identifier, dg), _cancelled);
109     }
110 }
111 
112 auto callback(Dg, Args...)(EventLoop eventLoop, Dg dg, Args args)
113 {
114     return new Callback!(Dg, Args)(eventLoop, dg, args);
115 }
116 
117 alias ExceptionHandler = void delegate(EventLoop, ExceptionContext);
118 
119 /**
120  * Exception conxtext for event exceptions
121  */
122 struct ExceptionContext
123 {
124     string message;            /// Error message.
125     Throwable throwable;       /// (optional) Throwable object.
126     FutureHandle future;       /// (optional) Future instance.
127     CallbackHandle callback;   /// (optional): CallbackHandle instance.
128     Protocol protocol;         /// (optional): Protocol instance.
129     Transport transport;       /// (optional): Transport instance.
130     Socket socket;             /// (optional): Socket instance.
131     ExceptionContext* context; /// (optional): Chained context.
132 
133     string toString() const
134     {
135         import std.array : appender;
136         import std.conv : to;
137 
138         auto result = appender("message: \"" ~ message ~ "\"");
139 
140         if (throwable !is null)
141             result ~= ", throwable: " ~ (cast(Throwable) throwable).to!string;
142         if (future !is null)
143             result ~= ", future: " ~ future.to!string;
144         if (callback !is null)
145             result ~= ", callback: " ~ callback.to!string;
146         if (protocol !is null)
147             result ~= ", protocol: " ~ protocol.to!string;
148         if (transport !is null)
149             result ~= ", transport: " ~ transport.to!string;
150         if (socket !is null)
151             result ~= ", socket: " ~ (cast(Socket) socket).to!string;
152         if (context !is null)
153             result ~= ", context: " ~ (*context).to!string;
154 
155         return "%s(%s)".format(typeid(this).to!string, result.data);
156     }
157 }
158 
159 unittest
160 {
161     auto exceptionContext = ExceptionContext("foo");
162     assert(!exceptionContext.toString.canFind("future"));
163     exceptionContext.future = new Future!int;
164     assert(exceptionContext.toString.canFind("future"));
165 }
166 
167 interface SslContext
168 {
169 }
170 
171 /**
172  * Interface server returned by $(D createServer()).
173  */
174 interface Server
175 {
176     /**
177      * Stop serving. This leaves existing connections open.
178      */
179     void close();
180 
181     /**
182      * Coroutine to wait until service is closed.
183      */
184     @Coroutine
185     void waitClosed();
186 }
187 
188 package class ServerImpl : Server
189 {
190     private EventLoop eventLoop;
191 
192     private Socket[] sockets;
193 
194     private size_t activeCount;
195 
196     private Waiter[] waiters;
197 
198     this(EventLoop eventLoop, Socket[] sockets)
199     {
200         this.eventLoop = eventLoop;
201         this.sockets = sockets;
202     }
203 
204     package void attach()
205     in
206     {
207         assert(!this.sockets.empty);
208     }
209     body
210     {
211         ++this.activeCount;
212     }
213 
214     package void detach()
215     in
216     {
217         assert(this.activeCount > 0);
218     }
219     body
220     {
221         --activeCount;
222 
223         if (this.activeCount == 0 && this.sockets.empty)
224             wakeup;
225     }
226 
227     override void close()
228     {
229         if (this.sockets.empty)
230             return;
231 
232         Socket[] stopSockets = this.sockets;
233 
234         this.sockets = null;
235 
236         foreach (socket; stopSockets)
237             this.eventLoop.stopServing(socket);
238 
239         if (this.activeCount == 0)
240             wakeup;
241     }
242 
243     private void wakeup()
244     {
245         Waiter[] doneWaiters = this.waiters;
246 
247         this.waiters = null;
248 
249         foreach (waiter; doneWaiters)
250         {
251             if (!waiter.done)
252                 waiter.setResult;
253         }
254     }
255 
256     @Coroutine
257     override void waitClosed()
258     {
259         if (this.sockets.empty && this.activeCount == 0)
260             return;
261 
262         Waiter waiter = new Waiter(this.eventLoop);
263 
264         this.waiters ~= waiter;
265 
266         this.eventLoop.waitFor(waiter);
267     }
268 }
269 
270 /**
271  * Interface of event loop.
272  */
273 abstract class EventLoop
274 {
275     protected enum State
276     {
277         STOPPED,
278         RUNNING,
279         STOPPING,
280         CLOSED,
281     }
282 
283     protected State state = State.STOPPED;
284 
285     // Run an event loop
286     /**
287      * Run until $(D_PSYMBOL stop()) is called.
288      */
289     void runForever();
290 
291     /**
292      * Run until $(PARAM future) is done.
293      *
294      * If the argument is a $(I coroutine object), it is wrapped by $(D_PSYMBOL
295      * task()).
296      *
297      * Returns: the Future's result, or throws its exception.
298      */
299     final T runUntilComplete(T)(Future!T future)
300     {
301         future.addDoneCallback(&stop);
302         runForever;
303         assert(future.done);
304         if (future.exception)
305             throw future.exception;
306 
307         static if (!is(T == void))
308         {
309             return future.result;
310         }
311     }
312 
313     /**
314      * Returns: running status of event loop.
315      */
316     final bool isRunning()
317     {
318         return this.state == State.RUNNING || this.state == State.STOPPING;
319     }
320 
321     /**
322      * Stop running the event loop.
323      *
324      * Every callback scheduled before $(D_PSYMBOL stop()) is called will run.
325      * Callbacks scheduled after $(D_PSYMBOL stop()) is called will not run.
326      * Howerver, those callbacks will run if $(D_PSYMBOL runForever()) is
327      * called again later.
328      */
329     final void stop()
330     {
331         final switch (this.state)
332         {
333         case State.STOPPED:
334         case State.STOPPING:
335             break;
336         case State.RUNNING:
337             this.state = State.STOPPING;
338             break;
339         case State.CLOSED:
340             throw new Exception("Cannot stop a closed event loop");
341         }
342     }
343 
344     /**
345      * Returns: $(D_KEYWORD true) if the event loop was closed.
346      */
347     final bool isClosed()
348     {
349         return this.state == State.CLOSED;
350     }
351 
352     /**
353      * Close the event loop. The loop must not be running.
354      *
355      * This clears the queues and shuts down the executor, but does not wait
356      * for the executor to finish.
357      *
358      * This is idempotent and irreversible. No other methods should be called
359      * after this one.
360      */
361     final void close()
362     {
363         final switch (this.state)
364         {
365         case State.STOPPED:
366             this.state = State.CLOSED;
367             break;
368         case State.RUNNING:
369         case State.STOPPING:
370             throw new Exception("Cannot close a running event loop");
371         case State.CLOSED:
372             break;
373         }
374     }
375 
376     // Calls
377 
378     /**
379      * Arrange for a callback to be called as soon as possible.
380      *
381      * This operates as a FIFO queue, callbacks are called in the order in
382      * which they are registered. Each callback will be called exactly once.
383      *
384      * Any positional arguments after the callback will be passed to the
385      * callback when it is called.
386      *
387      * Returns: an instance of $(D_PSYMBOL Callback).
388      */
389     final auto callSoon(Dg, Args...)(Dg dg, Args args)
390     {
391         auto callback = new Callback!(Dg, Args)(this, dg, args);
392 
393         scheduleCallback(callback);
394 
395         return callback;
396     }
397 
398     /+
399     /**
400      * Like $(D_PSYMBOL call_soon()), but thread safe.
401      */
402     final auto callSoonThreadsafe(alias callback, Args...)(Args args)
403     {
404         return callLater!callback(0, args);
405     }
406     +/
407 
408     // Delayed calls
409 
410     /**
411      * Arrange for the callback to be called after the given delay.
412      *
413      * An instance of $(D_PSYMBOL Callback) is returned.
414      *
415      * $(D_PSYMBOL dg) delegate will be called exactly once per call to
416      * $(D_PSYMBOL callLater()). If two callbacks are scheduled for exactly
417      * the same time, it is undefined which will be called first.
418      *
419      * The optional positional args will be passed to the callback when it is
420      * called.
421      */
422     final auto callLater(Dg, Args...)(Duration delay, Dg dg, Args args)
423     {
424         auto callback = new Callback!(Dg, Args)(this, dg, args);
425 
426         scheduleCallback(delay, callback);
427 
428         return callback;
429     }
430 
431     /**
432      * Arrange for the callback to be called at the given absolute timestamp
433      * when (an int or float), using the same time reference as time().
434      *
435      * This method’s behavior is the same as $(D_PSYMBOL callLater()).
436      */
437     final auto callAt(Dg, Args...)(SysTime when, Dg dg, Args args)
438     {
439         auto callback = new Callback!(Dg, Args)(this, dg, args);
440 
441         scheduleCallback(when, callback);
442 
443         return callback;
444     }
445 
446     /**
447      * Return the current time according to the event loop’s internal clock.
448      *
449      * See_Also: $(D_PSYMBOL sleep()).
450      */
451     SysTime time()
452     {
453         return Clock.currTime;
454     }
455 
456 
457     // Fibers
458 
459     /**
460      * Schedule the execution of a fiber object: wrap it in a future.
461      *
462      * Third-party event loops can use their own subclass of Task for
463      * interoperability. In this case, the result type is a subclass of Task.
464      *
465      * Returns: a $(D_PSYMBOL Task) object.
466      *
467      * See_Also: $(D_PSYMBOL task()).
468      */
469     final auto createTask(Coroutine, Args...)(Coroutine coroutine, Args args)
470     if (isDelegate!Coroutine)
471     {
472         return new Task!(Coroutine, Args)(this, coroutine, args);
473     }
474 /*
475     // Methods for interacting with threads.
476 
477     ReturnType!callback runInExecutor(alias callback, Args...)(executor, Args args);
478 
479     void setDefaultExecutor(executor);
480 
481     // Network I/O methods returning Futures.
482 
483     AddressInfo[] getAddressInfo(T...)(in char[] node, T options);
484 
485     // def getNameInfo(sockaddr, flags = 0);
486 */
487     // Creating connections
488 
489     /**
490      * Create a streaming transport connection to a given Internet host and
491      * port: socket family $(D_PSYMBOL AddressFamily.INET) or $(D_PSYMBOL
492      * AddressFamily.INET6) depending on host (or family if specified), socket
493      * type $(D_PSYMBOL SocketType.STREAM).
494      *
495      * This method is a coroutine which will try to establish the connection in
496      * the background. When successful, the coroutine returns a (transport,
497      * protocol) tuple.
498      *
499      * The chronological synopsis of the underlying operation is as follows:
500      *
501      * The connection is established, and a transport is created to represent
502      * it. $(D_PSYMBOL protocolFactory) is called without arguments and must
503      * return a protocol instance.
504      * The protocol instance is tied to the transport, and its $(D_PSYMBOL
505      * connectionMade()) method is called.
506      * The coroutine returns successfully with the $(D_PSYMBOL (transport,
507      * protocol)) tuple.
508      *
509      * The created transport is an implementation-dependent bidirectional
510      * stream.
511      *
512      * Options allowing to change how the connection is created:
513      * Params:
514      *  protocolFactory = is a callable returning a protocol instance
515      *
516      *  host = if empty then the $(D_PSYMBOL socket) parameter should be
517      *      specified.
518      *
519      *  service = service name or port number.
520      *
521      *  sslContext = if not $(D_KEYWORD null), a SSL/TLS transport is created
522      *      (by default a plain TCP transport is created).
523      *
524      *  serverHostname = is only for use together with ssl, and sets or
525      *      overrides the hostname that the target server’s certificate will be
526      *      matched against. By default the value of the host argument is used.
527      *      If host is empty, there is no default and you must pass a value for
528      *      $(D_PSYMBOL serverHostname). If $(D_PSYMBOL serverHostname) is an
529      *      empty, hostname matching is disabled (which is a serious security
530      *      risk, allowing for man-in-the-middle-attacks).
531      *
532      *  addressFamily = optional adress family.
533      *
534      *  protocolType = optional protocol.
535      *
536      *  addressInfoFlags = optional flags.
537      *
538      *  socket = if not $(D_KEYWORD null), should be an existing, already
539      *      connected $(D_PSYMBOL Socket) object to be used by the transport. If
540      *      $(D_PSYMBOL socket) is given, none of $(D_PSYMBOL host), $(D_PSYMBOL
541      *      service), $(D_PSYMBOL addressFamily), $(D_PSYMBOL protocolType),
542      *      $(D_PSYMBOL addressInfoFlags) and $(D_PSYMBOL localAddress) should
543      *      be specified.
544      *
545      *  localHost = if given, together with $(D_PSYMBOL localService) is used to
546      *      bind the socket to locally. The $(D_PSYMBOL localHost) and
547      *      $(D_PSYMBOL localService) are looked up using $(D_PSYMBOL
548      *      getAddressInfo()), similarly to host and service.
549      *
550      *  localService = see $(D_PSYMBOL localHost).
551      *
552      * Returns: Tuple!(Transport, "transport", Protocol, "protocol")
553      */
554     @Coroutine
555     auto createConnection(ProtocolFactory protocolFactory,
556         in char[] host = null, in char[] service = null,
557         SslContext sslContext = null,
558         AddressFamily addressFamily = UNSPECIFIED!AddressFamily,
559         ProtocolType protocolType = UNSPECIFIED!ProtocolType,
560         AddressInfoFlags addressInfoFlags = UNSPECIFIED!AddressInfoFlags,
561         Socket socket = null, in char[] localHost = null,
562         in char[] localService = null, in char[] serverHostname = null)
563     {
564         enforce(serverHostname.empty || sslContext !is null,
565             "serverHostname is only meaningful with SSL");
566         enforce(serverHostname.empty || sslContext is null || !host.empty,
567             "You must set serverHostname when using SSL without a host");
568 
569         if (!host.empty || !service.empty)
570         {
571             enforce(socket is null,
572                 "host/service and socket can not be specified at the same time");
573 
574             Future!(AddressInfo[])[] fs = [
575                 createTask(&this.getAddressInfo, host, service, addressFamily,
576                     SocketType.STREAM, protocolType, addressInfoFlags)
577             ];
578 
579             if (!localHost.empty)
580                 fs ~= createTask(&this.getAddressInfo, localHost, localService,
581                     addressFamily, SocketType.STREAM, protocolType,
582                     addressInfoFlags);
583 
584             this.wait(fs);
585 
586             auto addressInfos = fs.map!"a.result";
587 
588             enforceEx!SocketOSException(addressInfos.all!(a => !a.empty),
589                 "getAddressInfo() returned empty list");
590 
591             SocketOSException[] exceptions = null;
592             bool connected = false;
593 
594             foreach (addressInfo; addressInfos[0])
595             {
596                 try
597                 {
598                     socket = new Socket(addressInfo);
599                     socket.blocking(false);
600 
601                     if (addressInfos.length > 1)
602                     {
603                         bool bound = false;
604 
605                         foreach (localAddressInfo; addressInfos[1])
606                         {
607                             try
608                             {
609                                 socket.bind(localAddressInfo.address);
610                                 bound = true;
611                                 break;
612                             }
613                             catch (SocketOSException socketOSException)
614                             {
615                                 exceptions ~= new SocketOSException(
616                                     "error while attempting to bind on address '%s'"
617                                         .format(localAddressInfo.address),
618                                     socketOSException);
619                             }
620                         }
621                         if (!bound)
622                         {
623                             socket.close;
624                             socket = null;
625                             continue;
626                         }
627                     }
628 
629                     socketConnect(socket, addressInfo.address);
630                     connected = true;
631                     break;
632                 }
633                 catch (SocketOSException socketOSException)
634                 {
635                     if (socket !is null)
636                         socket.close;
637 
638                     exceptions ~= socketOSException;
639                 }
640                 catch (Throwable throwable)
641                 {
642                     if (socket !is null)
643                         socket.close;
644 
645                     throw throwable;
646                 }
647             }
648 
649             if (!connected)
650             {
651                 assert(!exceptions.empty);
652 
653                 if (exceptions.length == 1)
654                 {
655                     throw exceptions[0];
656                 }
657                 else
658                 {
659                     if (exceptions.all!(a => a.msg == exceptions[0].msg))
660                         throw exceptions[0];
661 
662                     throw new SocketOSException(
663                         "Multiple exceptions: %(%s, %)".format(exceptions));
664                 }
665             }
666         }
667         else
668         {
669             enforce(socket !is null,
670                 "host and port was not specified and no socket specified");
671 
672             socket.blocking(false);
673         }
674 
675         return createConnectionTransport(socket, protocolFactory, sslContext,
676             serverHostname.empty ? serverHostname : host);
677     }
678 
679     /**
680      * Create datagram connection: socket family $(D_PSYMBOL AddressFamily.INET)
681      * or $(D_PSYMBOL AddressFamily.INET6) depending on host (or family if
682      * specified), socket type $(D_PSYMBOL SocketType.DGRAM).
683      *
684      * This method is a coroutine which will try to establish the connection in
685      * the background.
686      *
687      * See the create_connection() method for parameters.
688      *
689      * Returns: Tuple!(DatagramTransport, "datagramTransport",
690      *      DatagramProtocol, "datagramProtocol")
691      */
692     @Coroutine
693     auto createDatagramEndpoint(DatagramProtocolFactory datagramProtocolFactory,
694         in char[] localHost = null, in char[] localService = null,
695         in char[] remoteHost = null, in char[] remoteService = null,
696         AddressFamily addressFamily = UNSPECIFIED!AddressFamily,
697         ProtocolType protocolType = UNSPECIFIED!ProtocolType,
698         AddressInfoFlags addressInfoFlags = UNSPECIFIED!AddressInfoFlags)
699     {
700         alias AddressPairInfo = Tuple!(AddressFamily, "addressFamily",
701             ProtocolType, "protocolType", Address, "localAddress",
702             Address, "remoteAddress");
703 
704         AddressPairInfo[] addressPairInfos = null;
705 
706 
707         if (localHost.empty && remoteHost.empty)
708         {
709             enforce(addressFamily != UNSPECIFIED!AddressFamily,
710                 "Unexpected address family");
711             addressPairInfos ~= AddressPairInfo(addressFamily, protocolType,
712                 null, null);
713         }
714         else
715         {
716             enforce(remoteHost.empty,
717                 "Remote host parameter not supportored yet");
718 
719             auto addressInfos = getAddressInfo(localHost, localService,
720                 addressFamily, SocketType.DGRAM, protocolType,
721                 addressInfoFlags);
722 
723             enforceEx!SocketOSException(!addressInfos.empty,
724                 "getAddressInfo() returned empty list");
725 
726             foreach (addressInfo; addressInfos)
727             {
728                 addressPairInfos ~= AddressPairInfo(addressInfo.family,
729                     addressInfo.protocol, addressInfo.address, null);
730             }
731         }
732 
733         Socket socket = null;
734         Address remoteAddress = null;
735         SocketOSException[] exceptions = null;
736 
737         foreach (addressPairInfo; addressPairInfos)
738         {
739             try
740             {
741                 socket = new Socket(addressPairInfo.addressFamily,
742                     SocketType.DGRAM, addressPairInfo.protocolType);
743                 socket.setOption(SocketOptionLevel.SOCKET,
744                     SocketOption.REUSEADDR, 1);
745                 socket.blocking(false);
746 
747                 if (addressPairInfo.localAddress)
748                     socket.bind(addressPairInfo.localAddress);
749 
750 
751                 remoteAddress = addressPairInfo.remoteAddress;
752                 enforce(remoteAddress is null,
753                     "remote connect not supported yet");
754 
755                 break;
756             }
757             catch (SocketOSException socketOSException)
758             {
759                 if (socket !is null)
760                     socket.close;
761 
762                 exceptions ~= socketOSException;
763             }
764             catch (Throwable throwable)
765             {
766                 if (socket !is null)
767                     socket.close;
768 
769                 throw throwable;
770             }
771         }
772 
773         auto protocol = datagramProtocolFactory();
774         auto waiter = new Waiter(this);
775         auto transport = makeDatagramTransport(socket, protocol, remoteAddress,
776             waiter);
777         try
778         {
779             this.waitFor(waiter);
780         }
781         catch (Throwable throwable)
782         {
783             transport.close;
784             throw throwable;
785         }
786 
787         return tuple!("datagramTransport", "datagramProtocol")(transport,
788             protocol);
789     }
790 
791     /**
792      * Create UNIX connection: socket family $(D_PSYMBOL AddressFamily.UNIX),
793      * socket type $(D_PSYMBOL SocketType.STREAM). The UNIX socket family is
794      * used to communicate between processes on the same machine efficiently.
795      *
796      * This method is a coroutine which will try to establish the connection in
797      * the background. When successful, the coroutine returns a $(D_PSYMBOL
798      * Tuple!(Transport, "transport", Protocol, "protocol"))
799      *
800      * See the $(D_PSYMBOL EventLoop.createConnection()) method for parameters.
801      */
802     version (Posix)
803     @Coroutine
804     auto createUnixConnection(ProtocolFactory protocolFactory,
805         in char[] path = null, SslContext sslContext = null,
806         Socket socket = null, in char[] serverHostname = null)
807     {
808         if (sslContext is null)
809             enforce(serverHostname.empty,
810                 "serverHostname is only meaningful with ssl");
811         else
812             enforce(!serverHostname.empty,
813                 "you have to pass server_hostname when using ssl");
814 
815         if (!path.empty)
816         {
817             enforce(socket is null,
818                 "path and socket can not be specified at the same time");
819 
820             try
821             {
822                 socket = new Socket(AddressFamily.UNIX, SocketType.STREAM);
823                 socket.blocking(false);
824                 socketConnect(socket, new UnixAddress(path));
825             }
826             catch (Throwable throwable)
827             {
828                 if (socket !is null)
829                     socket.close;
830 
831                 throw throwable;
832             }
833         }
834         else
835         {
836             enforce(socket !is null, "no path and sock were specified");
837             enforce(socket.addressFamily == AddressFamily.UNIX,
838                 "A UNIX Domain Socket was expected, got %s".format(socket));
839             socket.blocking(false);
840         }
841 
842         return createConnectionTransport(socket, protocolFactory, sslContext,
843             serverHostname);
844     }
845 
846 
847     //"""A coroutine which creates a UNIX Domain Socket server.
848 
849     //The return value is a Server object, which can be used to stop
850     //the service.
851 
852     //path is a str, representing a file systsem path to bind the
853     //server socket to.
854 
855     //sock can optionally be specified in order to use a preexisting
856     //socket object.
857 
858     //backlog is the maximum number of queued connections passed to
859     //listen() (defaults to 100).
860 
861     //ssl can be set to an SSLContext to enable SSL over the
862     //accepted connections.
863     //"""
864 
865     /**
866      * A coroutine which creates a TCP server bound to host and port.
867      *
868      * Params:
869      *  protocolFactory = is a callable returning a protocol instance.
870      *
871      *  host = if empty then all interfaces are assumed and a list of multiple
872      *      sockets will be returned (most likely one for IPv4 and another one
873      *      for IPv6).
874      *
875      *  service = service name or port number.
876      *
877      *  addressFamily = can be set to either $(D_PSYMBOL AddressFamily.INET) or
878      *      $(D_PSYMBOL AddressFamily.INET6) to force the socket to use IPv4 or
879      *      IPv6. If not set it will be determined from host (defaults to
880      *      $(D_PSYMBOL AddressFamily.UNSPEC)).
881      *
882      *  addressInfoFlags = a bitmask for getAddressInfo().
883      *
884      *  socket = can optionally be specified in order to use a preexisting
885      *      socket object.
886      *
887      *  backlog = the maximum number of queued connections passed to listen()
888      *      (defaults to 100).
889      *
890      *  sslContext = can be set to an SSLContext to enable SSL over the accepted
891      *      connections.
892      *
893      *  reuseAddress = tells the kernel to reuse a local socket in TIME_WAIT
894      *      state, without waiting for its natural timeout to expire. If not
895      *      specified will automatically be set to $(D_KEYWORD true) on UNIX.
896      *
897      * Returns: a Server object which can be used to stop the service.
898      */
899     @Coroutine
900     Server createServer(ProtocolFactory protocolFactory,
901         in char[] host = null, in char[] service = null,
902         AddressFamily addressFamily = UNSPECIFIED!AddressFamily,
903         AddressInfoFlags addressInfoFlags = AddressInfoFlags.PASSIVE,
904         Socket socket = null, int backlog = 100, SslContext sslContext = null,
905         bool reuseAddress = true)
906     {
907         enforce(sslContext is null, "SSL support not implemented yet");
908 
909         Socket[] sockets;
910 
911         scope (failure)
912         {
913             foreach (socket1; sockets)
914                 socket1.close;
915         }
916 
917         if (!host.empty || !service.empty)
918         {
919             enforce(socket is null,
920                 "host/service and socket can not be specified at the same time");
921 
922             AddressInfo[] addressInfos = getAddressInfo(host, service,
923                 addressFamily, SocketType.STREAM, UNSPECIFIED!ProtocolType,
924                 addressInfoFlags);
925 
926             enforceEx!SocketOSException(!addressInfos.empty,
927                 "getAddressInfo() returned empty list");
928 
929             foreach (addressInfo; addressInfos)
930             {
931                 try
932                 {
933                     socket = new Socket(addressInfo);
934                 }
935                 catch (SocketOSException socketOSException)
936                 {
937                     continue;
938                 }
939 
940                 sockets ~= socket;
941 
942                 if (reuseAddress)
943                     socket.setOption(SocketOptionLevel.SOCKET,
944                         SocketOption.REUSEADDR, true);
945 
946                 if (addressInfo.family == AddressFamily.INET6)
947                     socket.setOption(SocketOptionLevel.IPV6,
948                         SocketOption.IPV6_V6ONLY, true);
949 
950                 try
951                 {
952                     socket.bind(addressInfo.address);
953                 }
954                 catch (SocketException socketException)
955                 {
956                     throw new SocketException(
957                         "error while attempting to bind to address %s: %s"
958                             .format(addressInfo.address, socketException.msg));
959                 }
960             }
961         }
962         else
963         {
964             enforce(socket !is null,
965                 "Neither host/service nor socket were specified");
966 
967             sockets ~= socket;
968         }
969 
970         auto server = new ServerImpl(this, sockets);
971 
972         foreach (socket1; sockets)
973         {
974             socket1.listen(backlog);
975             socket1.blocking(false);
976             startServing(protocolFactory, socket1, sslContext, server);
977         }
978 
979         return server;
980     }
981 
982     /**
983      * Similar to $(D_PSYMBOL EventLoop.createServer()), but specific to the
984      * socket family $(D_PSYMBOL AddressFamily.UNIX).
985      */
986     version (Posix)
987     @Coroutine
988     Server createUnixServer(ProtocolFactory protocolFactory, in char[] path,
989         Socket socket = null, int backlog = 100, SslContext sslContext = null)
990     {
991         if (!path.empty)
992         {
993             enforce(socket is null,
994                 "path and socket can not be specified at the same time");
995 
996             socket = new Socket(AddressFamily.UNIX, SocketType.STREAM);
997 
998             try
999             {
1000                 socket.bind(new UnixAddress(path));
1001             }
1002             catch (SocketOSException socketOSException)
1003             {
1004                 import core.stdc.errno : EADDRINUSE;
1005 
1006                 socket.close;
1007                 if (socketOSException.errorCode == EADDRINUSE)
1008                     throw new SocketOSException("Address %s is already in use"
1009                             .format(path), socketOSException.errorCode);
1010                 else
1011                     throw socketOSException;
1012             }
1013             catch (Throwable throwable)
1014             {
1015                 socket.close;
1016                 throw throwable;
1017             }
1018         }
1019         else
1020         {
1021             enforce(socket !is null,
1022                 "path was not specified, and no socket specified");
1023             enforce(socket.addressFamily == AddressFamily.UNIX,
1024                 "A UNIX Domain Socket was expected, got %s".format(socket));
1025         }
1026 
1027         auto server = new ServerImpl(this, [socket]);
1028 
1029         socket.listen(backlog);
1030         socket.blocking(false);
1031         startServing(protocolFactory, socket, sslContext, server);
1032 
1033         return server;
1034     }
1035 /*
1036     // Pipes and subprocesses.
1037 
1038     //"""Register read pipe in event loop.
1039 
1040     //protocol_factory should instantiate object with Protocol interface.
1041     //pipe is file-like object already switched to nonblocking.
1042     //Return pair (transport, protocol), where transport support
1043     //ReadTransport interface."""
1044     //# The reason to accept file-like object instead of just file descriptor
1045     //# is: we need to own pipe and close it at transport finishing
1046     //# Can got complicated errors if pass f.fileno(),
1047     //# close fd in pipe transport then close f and vise versa.
1048     Tuple!(Transport, Protocol) connectReadPipe(Protocol function() protocol_factory,
1049         Pipe pipe);
1050 
1051     //"""Register write pipe in event loop.
1052 
1053     //protocol_factory should instantiate object with BaseProtocol interface.
1054     //Pipe is file-like object already switched to nonblocking.
1055     //Return pair (transport, protocol), where transport support
1056     //WriteTransport interface."""
1057     //# The reason to accept file-like object instead of just file descriptor
1058     //# is: we need to own pipe and close it at transport finishing
1059     //# Can got complicated errors if pass f.fileno(),
1060     //# close fd in pipe transport then close f and vise versa.
1061     Tuple!(Transport, Protocol) connectWritePipe(Protocol function() protocol_factory,
1062         Pipe pipe);
1063 
1064     Tuple!(Transport, Protocol) processShell(Protocol function() protocol_factory,
1065         in char[] cmd, File stdin = subprocess.PIPE, File stdout = subprocess.PIPE,
1066         File stderr = subprocess.PIPE);
1067 
1068     Tuple!(Transport, Protocol) processProcess(Protocol function() protocol_factory,
1069         in char[][] args, File stdin = subprocess.PIPE, File stdout = subprocess.PIPE,
1070         File stderr = subprocess.PIPE);
1071 
1072     //# Ready-based callback registration methods.
1073     //# The add_*() methods return None.
1074     //# The remove_*() methods return True if something was removed,
1075     //# False if there was nothing to delete.
1076 
1077     void addReader(int fd, void delegate() callback);
1078 
1079     void removeReader(int fd);
1080 
1081     void addWriter(int fd, void delegate() callback);
1082 
1083     void removeWriter(int fd);
1084 
1085     // # Completion based I/O methods returning Futures.
1086 
1087     ptrdiff_t socketReceive(Socket sock, void[] buf);
1088 
1089     ptrdiff_t socketSend(Socket sock, void[] buf);
1090 
1091     Socket socketAccept(Socket sock);
1092 */
1093     // Resolve host name
1094 
1095     @Coroutine
1096     AddressInfo[] getAddressInfo(in char[] host, in char[] service,
1097         AddressFamily addressFamily = UNSPECIFIED!AddressFamily,
1098         SocketType socketType = UNSPECIFIED!SocketType,
1099         ProtocolType protocolType = UNSPECIFIED!ProtocolType,
1100         AddressInfoFlags addressInfoFlags = UNSPECIFIED!AddressInfoFlags);
1101 
1102 
1103     // Signal handling.
1104     version (Posix)
1105     {
1106         void addSignalHandler(int sig, void delegate() handler);
1107 
1108         void removeSignalHandler(int sig);
1109     }
1110 
1111     // Error handlers.
1112 
1113     private ExceptionHandler exceptionHandler;
1114 
1115     /**
1116      * Set handler as the new event loop exception handler.
1117      *
1118      * If handler is $(D_KEYWORD null), the default exception handler will be set.
1119      *
1120      * See_Also: $(D_PSYMBOL callExceptionHandler()).
1121      */
1122     final void setExceptionHandler(ExceptionHandler exceptionHandler)
1123     {
1124         this.exceptionHandler = exceptionHandler;
1125     }
1126 
1127     /**
1128      * Default exception handler.
1129      *
1130      * This is called when an exception occurs and no exception handler is set,
1131      * and can be called by a custom exception handler that wants to defer to
1132      * the default behavior.
1133      * The context parameter has the same meaning as in $(D_PSYMBOL
1134      * callExceptionHandler()).
1135      */
1136     final void defaultExceptionHandler(ExceptionContext exceptionContext)
1137     {
1138         import std.stdio : stderr;
1139 
1140         if (exceptionContext.message.empty)
1141             exceptionContext.message = "Unhandled exception in event loop";
1142 
1143         if (cast(Error) exceptionContext.throwable)
1144             throw new Error("Uncaught Error: %s".format(exceptionContext));
1145 
1146         stderr.writefln("Uncaught Exception: %s", exceptionContext);
1147     }
1148 
1149     /**
1150      * Call the current event loop's exception handler.
1151      *
1152      * Note: New ExceptionContext members may be introduced in the future.
1153      */
1154     final void callExceptionHandler(ExceptionContext exceptionContext)
1155     {
1156         if (exceptionHandler is null)
1157         {
1158             defaultExceptionHandler(exceptionContext);
1159             return;
1160         }
1161 
1162         try
1163         {
1164             exceptionHandler(this, exceptionContext);
1165         }
1166         catch (Throwable throwable)
1167         {
1168             auto context = ExceptionContext("Unhandled error in exception handler",
1169                 throwable);
1170             context.context = &exceptionContext;
1171 
1172             defaultExceptionHandler(context);
1173         }
1174     }
1175 
1176     override string toString() const
1177     {
1178         return "%s(%s)".format(typeid(this), this.state);
1179     }
1180 
1181 protected:
1182 
1183     void scheduleCallback(CallbackHandle callback);
1184 
1185     void scheduleCallback(Duration delay, CallbackHandle callback);
1186 
1187     void scheduleCallback(SysTime when, CallbackHandle callback);
1188 
1189     void socketConnect(Socket socket, Address address);
1190 
1191     Transport makeSocketTransport(Socket socket, Protocol protocol,
1192         Waiter waiter);
1193 
1194     DatagramTransport makeDatagramTransport(Socket socket,
1195         DatagramProtocol datagramProtocol, Address remoteAddress,
1196         Waiter waiter);
1197 
1198     void startServing(ProtocolFactory protocolFactory, Socket socket,
1199         SslContext sslContext, ServerImpl server);
1200 
1201     void stopServing(Socket socket);
1202 private:
1203 
1204     @Coroutine
1205     auto createConnectionTransport(Socket socket,
1206         ProtocolFactory protocolFactory, SslContext sslContext,
1207         in char[] serverHostname = null)
1208     {
1209         Protocol protocol = protocolFactory();
1210         Transport transport = null;
1211         auto waiter = new Waiter(this);
1212 
1213         if (sslContext !is null)
1214         {
1215             assert(0, "SSL support not implemented yet");
1216             //sslcontext = None if isinstance(ssl, bool) else ssl
1217             //transport = self._make_ssl_transport(
1218             //    sock, protocol, sslcontext, waiter,
1219             //    server_side = False, server_hostname = server_hostname)
1220         }
1221         else
1222         {
1223             transport = makeSocketTransport(socket, protocol, waiter);
1224         }
1225 
1226         try
1227         {
1228             this.waitFor(waiter);
1229         }
1230         catch (Throwable throwable)
1231         {
1232             transport.close;
1233             throw throwable;
1234         }
1235 
1236         return tuple!("transport", "protocol")(transport, protocol);
1237     }
1238 }
1239 
1240 /**
1241  * Interface of policy for accessing the event loop.
1242  */
1243 abstract class EventLoopPolicy
1244 {
1245     protected EventLoop eventLoop = null;
1246     protected bool setCalled = false;
1247 
1248     /**
1249      * Get the event loop.
1250      *
1251      * This may be $(D null) or an instance of EventLoop.
1252      */
1253     EventLoop getEventLoop()
1254     {
1255         if (eventLoop is null && !setCalled && thread_isMainThread)
1256             setEventLoop(newEventLoop);
1257         return eventLoop;
1258     }
1259 
1260     /**
1261      * Set the event loop.
1262      */
1263     void setEventLoop(EventLoop loop)
1264     {
1265         setCalled = true;
1266         eventLoop = loop;
1267     }
1268 
1269     /**
1270      * Create a new event loop.
1271      *
1272      * You must call $(D setEventLoop()) to make this the current event loop.
1273      */
1274     EventLoop newEventLoop();
1275 
1276     //version (Posix)
1277     //{
1278     //    ChildWatcher getChildWatcher();
1279 
1280     //    void setChildWatcher(ChildWatcher watcher);
1281     //}
1282 }
1283 
1284 /**
1285  * Default policy implementation for accessing the event loop.
1286  *
1287  * In this policy, each thread has its own event loop.  However, we only
1288  * automatically create an event loop by default for the main thread; other
1289  * threads by default have no event loop.
1290  *
1291  * Other policies may have different rules (e.g. a single global event loop, or
1292  * automatically creating an event loop per thread, or using some other notion
1293  * of context to which an event loop is associated).
1294  */
1295 
1296 private __gshared EventLoopPolicy eventLoopPolicy;
1297 
1298 import asynchronous.libasync.events;
1299 
1300 alias DefaultEventLoopPolicy = LibasyncEventLoopPolicy;
1301 
1302 /**
1303  * Get the current event loop policy.
1304  */
1305 EventLoopPolicy getEventLoopPolicy()
1306 {
1307     if (eventLoopPolicy is null)
1308     {
1309         synchronized
1310         {
1311             if (eventLoopPolicy is null)
1312                 eventLoopPolicy = new DefaultEventLoopPolicy;
1313         }
1314     }
1315 
1316     return eventLoopPolicy;
1317 }
1318 
1319 /**
1320  * Set the current event loop policy.
1321  *
1322  * If policy is $(D null), the default policy is restored.
1323  */
1324 void setEventLoopPolicy(EventLoopPolicy policy)
1325 {
1326     eventLoopPolicy = policy;
1327 }
1328 
1329 /**
1330  * Equivalent to calling $(D getEventLoopPolicy.getEventLoop).
1331  */
1332 EventLoop getEventLoop()
1333 {
1334     return getEventLoopPolicy.getEventLoop;
1335 }
1336 
1337 unittest
1338 {
1339     assert(getEventLoop !is null);
1340 }
1341 
1342 /**
1343  * Equivalent to calling $(D getEventLoopPolicy.setEventLoop(loop)).
1344  */
1345 void setEventLoop(EventLoop loop)
1346 {
1347     getEventLoopPolicy.setEventLoop(loop);
1348 }
1349 
1350 /**
1351  * Equivalent to calling $(D getEventLoopPolicy.newEventLoop).
1352  */
1353 EventLoop newEventLoop()
1354 {
1355     return getEventLoopPolicy.newEventLoop;
1356 }