1 /**
2  * Event loop and event loop policy.
3  *
4  * Copyright: © 2015-2016 Dragos Carp
5  * License: Boost Software License - Version 1.0
6  * Authors: Dragos Carp
7  */
8 module asynchronous.events;
9 
10 import core.thread;
11 import std.algorithm;
12 import std.array;
13 import std.datetime;
14 import std.exception;
15 import std.process;
16 import std.socket;
17 import std.string;
18 import std.traits;
19 import std.typecons;
20 import asynchronous.futures;
21 import asynchronous.protocols;
22 import asynchronous.tasks;
23 import asynchronous.transports;
24 import asynchronous.types;
25 
26 alias Protocol = asynchronous.protocols.Protocol;
27 
28 interface CallbackHandle
29 {
30     /**
31      * Cancel the call. If the callback is already cancelled or executed, this
32      * method has no effect.
33      */
34     void cancel();
35 
36     /**
37      * Return $(D_KEYWORD true) if the callback was cancelled.
38      */
39     bool cancelled() const;
40 
41     package void opCall()
42     {
43         opCallImpl;
44     }
45 
46     protected void opCallImpl();
47 }
48 
49 /**
50  * A callback wrapper object returned by $(D_PSYMBOL EventLoop.callSoon),
51  * $(D_PSYMBOL EventLoop.callSoonThreadSafe),
52  * $(D_PSYMBOL EventLoop.callLater), and $(D_PSYMBOL EventLoop.callAt).
53  */
54 class Callback(Dg, Args...) : CallbackHandle
55 if (isDelegate!Dg)
56 {
57     private bool _cancelled;
58 
59     private EventLoop eventLoop;
60 
61     private Dg dg;
62 
63     private Args args;
64 
65     alias ResultType = ReturnType!Dg;
66 
67     this(EventLoop eventLoop, Dg dg, Args args)
68     {
69         this.eventLoop = eventLoop;
70         this._cancelled = false;
71         this.dg = dg;
72         static if (args.length > 0)
73         {
74             this.args = args;
75         }
76     }
77 
78     /**
79      * Cancel the call. If the callback is already cancelled or executed, this
80      * method has no effect.
81      */
82     override void cancel()
83     {
84         this._cancelled = true;
85         this.dg = null;
86         this.args = Args.init;
87     }
88 
89     /**
90      * Return $(D_KEYWORD true) if the callback was cancelled.
91      */
92     override bool cancelled() const
93     {
94         return this._cancelled;
95     }
96 
97     protected override void opCallImpl()
98     {
99         try
100         {
101             enforceEx!CancelledException(!this._cancelled, "Callback cancelled");
102 
103             this.dg(this.args);
104         }
105         catch (Throwable throwable)
106         {
107             if (this.eventLoop is null)
108                 this.eventLoop = getEventLoop;
109 
110             auto context = ExceptionContext("Exception on calling " ~ toString, throwable);
111             context.callback = this;
112             eventLoop.callExceptionHandler(context);
113         }
114     }
115 
116     override string toString() const
117     {
118         return "%s(dg: %s, cancelled: %s)".format(typeid(this),
119             __traits(identifier, dg), _cancelled);
120     }
121 }
122 
123 auto callback(Dg, Args...)(EventLoop eventLoop, Dg dg, Args args)
124 {
125     return new Callback!(Dg, Args)(eventLoop, dg, args);
126 }
127 
128 alias ExceptionHandler = void delegate(EventLoop, ExceptionContext);
129 
130 /**
131  * Exception conxtext for event exceptions
132  */
133 struct ExceptionContext
134 {
135     string message;            /// Error message.
136     Throwable throwable;       /// (optional) Throwable object.
137     FutureHandle future;       /// (optional) Future instance.
138     CallbackHandle callback;   /// (optional): CallbackHandle instance.
139     Protocol protocol;         /// (optional): Protocol instance.
140     Transport transport;       /// (optional): Transport instance.
141     Socket socket;             /// (optional): Socket instance.
142     ExceptionContext* context; /// (optional): Chained context.
143 
144     string toString() const
145     {
146         import std.array : appender;
147         import std.conv : to;
148 
149         auto result = appender("message: \"" ~ message ~ "\"");
150 
151         if (throwable !is null)
152             result ~= ", throwable: " ~ (cast(Throwable) throwable).to!string;
153         if (future !is null)
154             result ~= ", future: " ~ future.to!string;
155         if (callback !is null)
156             result ~= ", callback: " ~ callback.to!string;
157         if (protocol !is null)
158             result ~= ", protocol: " ~ protocol.to!string;
159         if (transport !is null)
160             result ~= ", transport: " ~ transport.to!string;
161         if (socket !is null)
162             result ~= ", socket: " ~ (cast(Socket) socket).to!string;
163         if (context !is null)
164             result ~= ", context: " ~ (*context).to!string;
165 
166         return "%s(%s)".format(typeid(this).to!string, result.data);
167     }
168 }
169 
170 unittest
171 {
172     auto exceptionContext = ExceptionContext("foo");
173     assert(!exceptionContext.toString.canFind("future"));
174     exceptionContext.future = new Future!int;
175     assert(exceptionContext.toString.canFind("future"));
176 }
177 
178 interface SslContext
179 {
180 }
181 
182 /**
183  * Interface server returned by $(D createServer()).
184  */
185 interface Server
186 {
187     /**
188      * Stop serving. This leaves existing connections open.
189      */
190     void close();
191 
192     /**
193      * Coroutine to wait until service is closed.
194      */
195     @Coroutine
196     void waitClosed();
197 }
198 
199 final class ServerImpl : Server
200 {
201     private EventLoop eventLoop;
202 
203     private Socket[] sockets;
204 
205     private size_t activeCount;
206 
207     private Waiter[] waiters;
208 
209     package this(EventLoop eventLoop, Socket[] sockets)
210     {
211         this.eventLoop = eventLoop;
212         this.sockets = sockets;
213     }
214 
215     void attach()
216     in
217     {
218         assert(!this.sockets.empty);
219     }
220     body
221     {
222         ++this.activeCount;
223     }
224 
225     void detach()
226     in
227     {
228         assert(this.activeCount > 0);
229     }
230     body
231     {
232         --activeCount;
233 
234         if (this.activeCount == 0 && this.sockets.empty)
235             wakeup;
236     }
237 
238     override void close()
239     {
240         if (this.sockets.empty)
241             return;
242 
243         Socket[] stopSockets = this.sockets;
244 
245         this.sockets = null;
246 
247         foreach (socket; stopSockets)
248             this.eventLoop.stopServing(socket);
249 
250         if (this.activeCount == 0)
251             wakeup;
252     }
253 
254     private void wakeup()
255     {
256         Waiter[] doneWaiters = this.waiters;
257 
258         this.waiters = null;
259 
260         foreach (waiter; doneWaiters)
261         {
262             if (!waiter.done)
263                 waiter.setResult;
264         }
265     }
266 
267     @Coroutine
268     override void waitClosed()
269     {
270         if (this.sockets.empty && this.activeCount == 0)
271             return;
272 
273         Waiter waiter = new Waiter(this.eventLoop);
274 
275         this.waiters ~= waiter;
276 
277         this.eventLoop.waitFor(waiter);
278     }
279 }
280 
281 /**
282  * Interface of event loop.
283  */
284 abstract class EventLoop
285 {
286     protected enum State : ubyte
287     {
288         STOPPED,
289         RUNNING,
290         STOPPING,
291         CLOSED,
292     }
293 
294     protected State state = State.STOPPED;
295 
296     // Run an event loop
297     /**
298      * Run until $(D_PSYMBOL stop()) is called.
299      */
300     void runForever();
301 
302     /**
303      * Run until $(PARAM future) is done.
304      *
305      * If the argument is a $(I coroutine object), it is wrapped by $(D_PSYMBOL
306      * task()).
307      *
308      * Returns: the Future's result, or throws its exception.
309      */
310     final T runUntilComplete(T)(Future!T future)
311     {
312         auto callback = (FutureHandle _) {
313             stop;
314         };
315 
316         future.addDoneCallback(callback);
317         runForever;
318         future.removeDoneCallback(callback);
319 
320         enforce(future.done, "Event loop stopped before Future completed");
321         return future.result;
322     }
323 
324     /**
325      * Returns: running status of event loop.
326      */
327     final bool isRunning()
328     {
329         return this.state == State.RUNNING || this.state == State.STOPPING;
330     }
331 
332     /**
333      * Stop running the event loop.
334      *
335      * Every callback scheduled before $(D_PSYMBOL stop()) is called will run.
336      * Callbacks scheduled after $(D_PSYMBOL stop()) is called will not run.
337      * However, those callbacks will run if $(D_PSYMBOL runForever()) is
338      * called again later.
339      */
340     final void stop()
341     {
342         final switch (this.state)
343         {
344         case State.STOPPED:
345         case State.STOPPING:
346             break;
347         case State.RUNNING:
348             this.state = State.STOPPING;
349             break;
350         case State.CLOSED:
351             throw new Exception("Cannot stop a closed event loop");
352         }
353     }
354 
355     /**
356      * Returns: $(D_KEYWORD true) if the event loop was closed.
357      */
358     final bool isClosed()
359     {
360         return this.state == State.CLOSED;
361     }
362 
363     /**
364      * Close the event loop. The loop must not be running.
365      *
366      * This clears the queues and shuts down the executor, but does not wait
367      * for the executor to finish.
368      *
369      * This is idempotent and irreversible. No other methods should be called
370      * after this one.
371      */
372     final void close()
373     {
374         final switch (this.state)
375         {
376         case State.STOPPED:
377             this.state = State.CLOSED;
378             break;
379         case State.RUNNING:
380         case State.STOPPING:
381             throw new Exception("Cannot close a running event loop");
382         case State.CLOSED:
383             break;
384         }
385     }
386 
387     // Calls
388 
389     /**
390      * Arrange for a callback to be called as soon as possible.
391      *
392      * This operates as a FIFO queue, callbacks are called in the order in
393      * which they are registered. Each callback will be called exactly once.
394      *
395      * Any positional arguments after the callback will be passed to the
396      * callback when it is called.
397      *
398      * Returns: an instance of $(D_PSYMBOL Callback).
399      */
400     final auto callSoon(Dg, Args...)(Dg dg, Args args)
401     {
402         auto callback = new Callback!(Dg, Args)(this, dg, args);
403 
404         scheduleCallback(callback);
405 
406         return callback;
407     }
408 
409     /**
410      * Like $(D_PSYMBOL callSoon()), but thread safe.
411      */
412     final auto callSoonThreadSafe(Dg, Args...)(Dg dg, Args args)
413     {
414         auto callback = new Callback!(Dg, Args)(this, dg, args);
415 
416         scheduleCallbackThreadSafe(callback);
417 
418         return callback;
419     }
420 
421     // Delayed calls
422 
423     /**
424      * Arrange for the callback to be called after the given delay.
425      *
426      * An instance of $(D_PSYMBOL Callback) is returned.
427      *
428      * $(D_PSYMBOL dg) delegate will be called exactly once per call to
429      * $(D_PSYMBOL callLater()). If two callbacks are scheduled for exactly
430      * the same time, it is undefined which will be called first.
431      *
432      * The optional positional args will be passed to the callback when it is
433      * called.
434      */
435     final auto callLater(Dg, Args...)(Duration delay, Dg dg, Args args)
436     {
437         auto callback = new Callback!(Dg, Args)(this, dg, args);
438 
439         scheduleCallback(delay, callback);
440 
441         return callback;
442     }
443 
444     /**
445      * Arrange for the callback to be called at the given absolute timestamp
446      * when (an int or float), using the same time reference as time().
447      *
448      * This method’s behavior is the same as $(D_PSYMBOL callLater()).
449      */
450     final auto callAt(Dg, Args...)(SysTime when, Dg dg, Args args)
451     {
452         auto callback = new Callback!(Dg, Args)(this, dg, args);
453 
454         scheduleCallback(when, callback);
455 
456         return callback;
457     }
458 
459     /**
460      * Return the current time according to the event loop’s internal clock.
461      *
462      * See_Also: $(D_PSYMBOL sleep()).
463      */
464     SysTime time()
465     {
466         return Clock.currTime;
467     }
468 
469 
470     // Fibers
471 
472     /**
473      * Schedule the execution of a fiber object: wrap it in a future.
474      *
475      * Third-party event loops can use their own subclass of Task for
476      * interoperability. In this case, the result type is a subclass of Task.
477      *
478      * Returns: a $(D_PSYMBOL Task) object.
479      *
480      * See_Also: $(D_PSYMBOL task()).
481      */
482     final auto createTask(Coroutine, Args...)(Coroutine coroutine, Args args)
483     if (isDelegate!Coroutine)
484     {
485         return new Task!(Coroutine, Args)(this, coroutine, args);
486     }
487 /*
488     // Methods for interacting with threads.
489 
490     ReturnType!callback runInExecutor(alias callback, Args...)(executor, Args args);
491 
492     void setDefaultExecutor(executor);
493 
494     // Network I/O methods returning Futures.
495 
496     AddressInfo[] getAddressInfo(T...)(in char[] node, T options);
497 
498     // def getNameInfo(sockaddr, flags = 0);
499 */
500     // Creating connections
501 
502     /**
503      * Create a streaming transport connection to a given Internet host and
504      * port: socket family $(D_PSYMBOL AddressFamily.INET) or $(D_PSYMBOL
505      * AddressFamily.INET6) depending on host (or family if specified), socket
506      * type $(D_PSYMBOL SocketType.STREAM).
507      *
508      * This method is a coroutine which will try to establish the connection in
509      * the background. When successful, the coroutine returns a (transport,
510      * protocol) tuple.
511      *
512      * The chronological synopsis of the underlying operation is as follows:
513      *
514      * The connection is established, and a transport is created to represent
515      * it. $(D_PSYMBOL protocolFactory) is called without arguments and must
516      * return a protocol instance.
517      * The protocol instance is tied to the transport, and its $(D_PSYMBOL
518      * connectionMade()) method is called.
519      * The coroutine returns successfully with the $(D_PSYMBOL (transport,
520      * protocol)) tuple.
521      *
522      * The created transport is an implementation-dependent bidirectional
523      * stream.
524      *
525      * Options allowing to change how the connection is created:
526      * Params:
527      *  protocolFactory = is a callable returning a protocol instance
528      *
529      *  host = if empty then the $(D_PSYMBOL socket) parameter should be
530      *      specified.
531      *
532      *  service = service name or port number.
533      *
534      *  sslContext = if not $(D_KEYWORD null), a SSL/TLS transport is created
535      *      (by default a plain TCP transport is created).
536      *
537      *  serverHostname = is only for use together with ssl, and sets or
538      *      overrides the hostname that the target server’s certificate will be
539      *      matched against. By default the value of the host argument is used.
540      *      If host is empty, there is no default and you must pass a value for
541      *      $(D_PSYMBOL serverHostname). If $(D_PSYMBOL serverHostname) is
542      *      empty, hostname matching is disabled (which is a serious security
543      *      risk, allowing for man-in-the-middle-attacks).
544      *
545      *  addressFamily = optional adress family.
546      *
547      *  protocolType = optional protocol.
548      *
549      *  addressInfoFlags = optional flags.
550      *
551      *  socket = if not $(D_KEYWORD null), should be an existing, already
552      *      connected $(D_PSYMBOL Socket) object to be used by the transport. If
553      *      $(D_PSYMBOL socket) is given, none of $(D_PSYMBOL host), $(D_PSYMBOL
554      *      service), $(D_PSYMBOL addressFamily), $(D_PSYMBOL protocolType),
555      *      $(D_PSYMBOL addressInfoFlags) and $(D_PSYMBOL localAddress) should
556      *      be specified.
557      *
558      *  localHost = if given, together with $(D_PSYMBOL localService) is used to
559      *      bind the socket locally. The $(D_PSYMBOL localHost) and
560      *      $(D_PSYMBOL localService) are looked up using $(D_PSYMBOL
561      *      getAddressInfo()), similarly to host and service.
562      *
563      *  localService = see $(D_PSYMBOL localHost).
564      *
565      * Returns: Tuple!(Transport, "transport", Protocol, "protocol")
566      */
567     @Coroutine
568     auto createConnection(ProtocolFactory protocolFactory,
569         in char[] host = null, in char[] service = null,
570         SslContext sslContext = null,
571         AddressFamily addressFamily = UNSPECIFIED!AddressFamily,
572         ProtocolType protocolType = UNSPECIFIED!ProtocolType,
573         AddressInfoFlags addressInfoFlags = UNSPECIFIED!AddressInfoFlags,
574         Socket socket = null, in char[] localHost = null,
575         in char[] localService = null, in char[] serverHostname = null)
576     {
577         enforce(serverHostname.empty || sslContext !is null,
578             "serverHostname is only meaningful with SSL");
579         enforce(serverHostname.empty || sslContext is null || !host.empty,
580             "You must set serverHostname when using SSL without a host");
581 
582         if (!host.empty || !service.empty)
583         {
584             enforce(socket is null,
585                 "host/service and socket can not be specified at the same time");
586 
587             Future!(AddressInfo[])[] fs = [
588                 createTask(&this.getAddressInfo, host, service, addressFamily,
589                     SocketType.STREAM, protocolType, addressInfoFlags)
590             ];
591 
592             if (!localHost.empty)
593                 fs ~= createTask(&this.getAddressInfo, localHost, localService,
594                     addressFamily, SocketType.STREAM, protocolType,
595                     addressInfoFlags);
596 
597             this.wait(fs);
598 
599             auto addressInfos = fs.map!"a.result";
600 
601             enforceEx!SocketOSException(addressInfos.all!(a => !a.empty),
602                 "getAddressInfo() returned empty list");
603 
604             SocketOSException[] exceptions = null;
605             bool connected = false;
606 
607             foreach (addressInfo; addressInfos[0])
608             {
609                 try
610                 {
611                     socket = new Socket(addressInfo);
612                     socket.blocking(false);
613 
614                     if (addressInfos.length > 1)
615                     {
616                         bool bound = false;
617 
618                         foreach (localAddressInfo; addressInfos[1])
619                         {
620                             try
621                             {
622                                 socket.bind(localAddressInfo.address);
623                                 bound = true;
624                                 break;
625                             }
626                             catch (SocketOSException socketOSException)
627                             {
628                                 exceptions ~= new SocketOSException(
629                                     "error while attempting to bind on address '%s'"
630                                         .format(localAddressInfo.address),
631                                     socketOSException);
632                             }
633                         }
634                         if (!bound)
635                         {
636                             socket.close;
637                             socket = null;
638                             continue;
639                         }
640                     }
641 
642                     socketConnect(socket, addressInfo.address);
643                     connected = true;
644                     break;
645                 }
646                 catch (SocketOSException socketOSException)
647                 {
648                     if (socket !is null)
649                         socket.close;
650 
651                     exceptions ~= socketOSException;
652                 }
653                 catch (Throwable throwable)
654                 {
655                     if (socket !is null)
656                         socket.close;
657 
658                     throw throwable;
659                 }
660             }
661 
662             if (!connected)
663             {
664                 assert(!exceptions.empty);
665 
666                 if (exceptions.length == 1)
667                 {
668                     throw exceptions[0];
669                 }
670                 else
671                 {
672                     if (exceptions.all!(a => a.msg == exceptions[0].msg))
673                         throw exceptions[0];
674 
675                     throw new SocketOSException(
676                         "Multiple exceptions: %(%s, %)".format(exceptions));
677                 }
678             }
679         }
680         else
681         {
682             enforce(socket !is null,
683                 "host and port was not specified and no socket specified");
684 
685             socket.blocking(false);
686         }
687 
688         return createConnectionTransport(socket, protocolFactory, sslContext,
689             serverHostname.empty ? serverHostname : host);
690     }
691 
692     /**
693      * Create datagram connection: socket family $(D_PSYMBOL AddressFamily.INET)
694      * or $(D_PSYMBOL AddressFamily.INET6) depending on host (or family if
695      * specified), socket type $(D_PSYMBOL SocketType.DGRAM).
696      *
697      * This method is a coroutine which will try to establish the connection in
698      * the background.
699      *
700      * See the create_connection() method for parameters.
701      *
702      * Returns: Tuple!(DatagramTransport, "datagramTransport",
703      *      DatagramProtocol, "datagramProtocol")
704      */
705     @Coroutine
706     auto createDatagramEndpoint(DatagramProtocolFactory datagramProtocolFactory,
707         in char[] localHost = null, in char[] localService = null,
708         in char[] remoteHost = null, in char[] remoteService = null,
709         AddressFamily addressFamily = UNSPECIFIED!AddressFamily,
710         ProtocolType protocolType = UNSPECIFIED!ProtocolType,
711         AddressInfoFlags addressInfoFlags = UNSPECIFIED!AddressInfoFlags)
712     {
713         alias AddressPairInfo = Tuple!(AddressFamily, "addressFamily",
714             ProtocolType, "protocolType", Address, "localAddress",
715             Address, "remoteAddress");
716 
717         AddressPairInfo[] addressPairInfos = null;
718 
719 
720         if (localHost.empty && remoteHost.empty)
721         {
722             enforce(addressFamily != UNSPECIFIED!AddressFamily,
723                 "Unexpected address family");
724             addressPairInfos ~= AddressPairInfo(addressFamily, protocolType,
725                 null, null);
726         }
727         else
728         {
729             enforce(remoteHost.empty,
730                 "Remote host parameter not supported yet");
731 
732             auto addressInfos = getAddressInfo(localHost, localService,
733                 addressFamily, SocketType.DGRAM, protocolType,
734                 addressInfoFlags);
735 
736             enforceEx!SocketOSException(!addressInfos.empty,
737                 "getAddressInfo() returned empty list");
738 
739             foreach (addressInfo; addressInfos)
740             {
741                 addressPairInfos ~= AddressPairInfo(addressInfo.family,
742                     addressInfo.protocol, addressInfo.address, null);
743             }
744         }
745 
746         Socket socket = null;
747         Address remoteAddress = null;
748         SocketOSException[] exceptions = null;
749 
750         foreach (addressPairInfo; addressPairInfos)
751         {
752             try
753             {
754                 socket = new Socket(addressPairInfo.addressFamily,
755                     SocketType.DGRAM, addressPairInfo.protocolType);
756                 socket.setOption(SocketOptionLevel.SOCKET,
757                     SocketOption.REUSEADDR, 1);
758                 socket.blocking(false);
759 
760                 if (addressPairInfo.localAddress)
761                     socket.bind(addressPairInfo.localAddress);
762 
763 
764                 remoteAddress = addressPairInfo.remoteAddress;
765                 enforce(remoteAddress is null,
766                     "remote connect not supported yet");
767 
768                 break;
769             }
770             catch (SocketOSException socketOSException)
771             {
772                 if (socket !is null)
773                     socket.close;
774 
775                 exceptions ~= socketOSException;
776             }
777             catch (Throwable throwable)
778             {
779                 if (socket !is null)
780                     socket.close;
781 
782                 throw throwable;
783             }
784         }
785 
786         auto protocol = datagramProtocolFactory();
787         auto waiter = new Waiter(this);
788         auto transport = makeDatagramTransport(socket, protocol, remoteAddress,
789             waiter);
790         try
791         {
792             this.waitFor(waiter);
793         }
794         catch (Throwable throwable)
795         {
796             transport.close;
797             throw throwable;
798         }
799 
800         return tuple!("datagramTransport", "datagramProtocol")(transport,
801             protocol);
802     }
803 
804     /**
805      * Create UNIX connection: socket family $(D_PSYMBOL AddressFamily.UNIX),
806      * socket type $(D_PSYMBOL SocketType.STREAM). The UNIX socket family is
807      * used to communicate between processes on the same machine efficiently.
808      *
809      * This method is a coroutine which will try to establish the connection in
810      * the background. When successful, the coroutine returns a $(D_PSYMBOL
811      * Tuple!(Transport, "transport", Protocol, "protocol"))
812      *
813      * See the $(D_PSYMBOL EventLoop.createConnection()) method for parameters.
814      */
815     version (Posix)
816     @Coroutine
817     auto createUnixConnection(ProtocolFactory protocolFactory,
818         in char[] path = null, SslContext sslContext = null,
819         Socket socket = null, in char[] serverHostname = null)
820     {
821         if (sslContext is null)
822             enforce(serverHostname.empty,
823                 "serverHostname is only meaningful with ssl");
824         else
825             enforce(!serverHostname.empty,
826                 "you have to pass server_hostname when using ssl");
827 
828         if (!path.empty)
829         {
830             enforce(socket is null,
831                 "path and socket can not be specified at the same time");
832 
833             try
834             {
835                 socket = new Socket(AddressFamily.UNIX, SocketType.STREAM);
836                 socket.blocking(false);
837                 socketConnect(socket, new UnixAddress(path));
838             }
839             catch (Throwable throwable)
840             {
841                 if (socket !is null)
842                     socket.close;
843 
844                 throw throwable;
845             }
846         }
847         else
848         {
849             enforce(socket !is null, "no path and socket were specified");
850             enforce(socket.addressFamily == AddressFamily.UNIX,
851                 "A UNIX Domain Socket was expected, got %s".format(socket));
852             socket.blocking(false);
853         }
854 
855         return createConnectionTransport(socket, protocolFactory, sslContext,
856             serverHostname);
857     }
858 
859 
860     //"""A coroutine which creates a UNIX Domain Socket server.
861 
862     //The return value is a Server object, which can be used to stop
863     //the service.
864 
865     //path is a str, representing a file systsem path to bind the
866     //server socket to.
867 
868     //socket can optionally be specified in order to use a preexisting
869     //socket object.
870 
871     //backlog is the maximum number of queued connections passed to
872     //listen() (defaults to 100).
873 
874     //ssl can be set to an SSLContext to enable SSL over the
875     //accepted connections.
876     //"""
877 
878     /**
879      * A coroutine which creates a TCP server bound to host and port.
880      *
881      * Params:
882      *  protocolFactory = is a callable returning a protocol instance.
883      *
884      *  host = if empty then all interfaces are assumed and a list of multiple
885      *      sockets will be returned (most likely one for IPv4 and another one
886      *      for IPv6).
887      *
888      *  service = service name or port number.
889      *
890      *  addressFamily = can be set to either $(D_PSYMBOL AddressFamily.INET) or
891      *      $(D_PSYMBOL AddressFamily.INET6) to force the socket to use IPv4 or
892      *      IPv6. If not set it will be determined from host (defaults to
893      *      $(D_PSYMBOL AddressFamily.UNSPEC)).
894      *
895      *  addressInfoFlags = a bitmask for getAddressInfo().
896      *
897      *  socket = can optionally be specified in order to use a preexisting
898      *      socket object.
899      *
900      *  backlog = the maximum number of queued connections passed to listen()
901      *      (defaults to 100).
902      *
903      *  sslContext = can be set to an SSLContext to enable SSL over the accepted
904      *      connections.
905      *
906      *  reuseAddress = tells the kernel to reuse a local socket in TIME_WAIT
907      *      state, without waiting for its natural timeout to expire. If not
908      *      specified will automatically be set to $(D_KEYWORD true) on UNIX.
909      *
910      * Returns: a Server object which can be used to stop the service.
911      */
912     @Coroutine
913     Server createServer(ProtocolFactory protocolFactory,
914         in char[] host = null, in char[] service = null,
915         AddressFamily addressFamily = UNSPECIFIED!AddressFamily,
916         AddressInfoFlags addressInfoFlags = AddressInfoFlags.PASSIVE,
917         Socket socket = null, int backlog = 100, SslContext sslContext = null,
918         bool reuseAddress = true)
919     {
920         enforce(sslContext is null, "SSL support not implemented yet");
921 
922         Socket[] sockets;
923 
924         scope (failure)
925         {
926             foreach (socket1; sockets)
927                 socket1.close;
928         }
929 
930         if (!host.empty || !service.empty)
931         {
932             enforce(socket is null,
933                 "host/service and socket can not be specified at the same time");
934 
935             AddressInfo[] addressInfos = getAddressInfo(host, service,
936                 addressFamily, SocketType.STREAM, UNSPECIFIED!ProtocolType,
937                 addressInfoFlags);
938 
939             enforceEx!SocketOSException(!addressInfos.empty,
940                 "getAddressInfo() returned empty list");
941 
942             foreach (addressInfo; addressInfos)
943             {
944                 try
945                 {
946                     socket = new Socket(addressInfo);
947                 }
948                 catch (SocketOSException socketOSException)
949                 {
950                     continue;
951                 }
952 
953                 sockets ~= socket;
954 
955                 if (reuseAddress)
956                     socket.setOption(SocketOptionLevel.SOCKET,
957                         SocketOption.REUSEADDR, true);
958 
959                 if (addressInfo.family == AddressFamily.INET6)
960                     socket.setOption(SocketOptionLevel.IPV6,
961                         SocketOption.IPV6_V6ONLY, true);
962 
963                 try
964                 {
965                     socket.bind(addressInfo.address);
966                 }
967                 catch (SocketException socketException)
968                 {
969                     throw new SocketException(
970                         "error while attempting to bind to address %s: %s"
971                             .format(addressInfo.address, socketException.msg));
972                 }
973             }
974         }
975         else
976         {
977             enforce(socket !is null,
978                 "Neither host/service nor socket were specified");
979 
980             sockets ~= socket;
981         }
982 
983         auto server = new ServerImpl(this, sockets);
984 
985         foreach (socket1; sockets)
986         {
987             socket1.listen(backlog);
988             socket1.blocking(false);
989             startServing(protocolFactory, socket1, sslContext, server);
990         }
991 
992         return server;
993     }
994 
995     /**
996      * Similar to $(D_PSYMBOL EventLoop.createServer()), but specific to the
997      * socket family $(D_PSYMBOL AddressFamily.UNIX).
998      */
999     version (Posix)
1000     @Coroutine
1001     Server createUnixServer(ProtocolFactory protocolFactory, in char[] path,
1002         Socket socket = null, int backlog = 100, SslContext sslContext = null)
1003     {
1004         if (!path.empty)
1005         {
1006             enforce(socket is null,
1007                 "path and socket can not be specified at the same time");
1008 
1009             socket = new Socket(AddressFamily.UNIX, SocketType.STREAM);
1010 
1011             try
1012             {
1013                 socket.bind(new UnixAddress(path));
1014             }
1015             catch (SocketOSException socketOSException)
1016             {
1017                 import core.stdc.errno : EADDRINUSE;
1018 
1019                 socket.close;
1020                 if (socketOSException.errorCode == EADDRINUSE)
1021                     throw new SocketOSException("Address %s is already in use"
1022                             .format(path), socketOSException.errorCode);
1023                 else
1024                     throw socketOSException;
1025             }
1026             catch (Throwable throwable)
1027             {
1028                 socket.close;
1029                 throw throwable;
1030             }
1031         }
1032         else
1033         {
1034             enforce(socket !is null,
1035                 "path was not specified, and no socket specified");
1036             enforce(socket.addressFamily == AddressFamily.UNIX,
1037                 "A UNIX Domain Socket was expected, got %s".format(socket));
1038         }
1039 
1040         auto server = new ServerImpl(this, [socket]);
1041 
1042         socket.listen(backlog);
1043         socket.blocking(false);
1044         startServing(protocolFactory, socket, sslContext, server);
1045 
1046         return server;
1047     }
1048 /*
1049     // Pipes and subprocesses.
1050 
1051     //"""Register read pipe in event loop.
1052 
1053     //protocol_factory should instantiate object with Protocol interface.
1054     //pipe is file-like object already switched to nonblocking.
1055     //Return pair (transport, protocol), where transport support
1056     //ReadTransport interface."""
1057     //# The reason to accept file-like object instead of just file descriptor
1058     //# is: we need to own pipe and close it at transport finishing
1059     //# Can got complicated errors if pass f.fileno(),
1060     //# close fd in pipe transport then close f and vise versa.
1061     Tuple!(Transport, Protocol) connectReadPipe(Protocol function() protocol_factory,
1062         Pipe pipe);
1063 
1064     //"""Register write pipe in event loop.
1065 
1066     //protocol_factory should instantiate object with BaseProtocol interface.
1067     //Pipe is file-like object already switched to nonblocking.
1068     //Return pair (transport, protocol), where transport support
1069     //WriteTransport interface."""
1070     //# The reason to accept file-like object instead of just file descriptor
1071     //# is: we need to own pipe and close it at transport finishing
1072     //# Can got complicated errors if pass f.fileno(),
1073     //# close fd in pipe transport then close f and vise versa.
1074     Tuple!(Transport, Protocol) connectWritePipe(Protocol function() protocol_factory,
1075         Pipe pipe);
1076 
1077     Tuple!(Transport, Protocol) processShell(Protocol function() protocol_factory,
1078         in char[] cmd, File stdin = subprocess.PIPE, File stdout = subprocess.PIPE,
1079         File stderr = subprocess.PIPE);
1080 
1081     Tuple!(Transport, Protocol) processProcess(Protocol function() protocol_factory,
1082         in char[][] args, File stdin = subprocess.PIPE, File stdout = subprocess.PIPE,
1083         File stderr = subprocess.PIPE);
1084 
1085     //# Ready-based callback registration methods.
1086     //# The add_*() methods return None.
1087     //# The remove_*() methods return True if something was removed,
1088     //# False if there was nothing to delete.
1089 
1090     void addReader(int fd, void delegate() callback);
1091 
1092     void removeReader(int fd);
1093 
1094     void addWriter(int fd, void delegate() callback);
1095 
1096     void removeWriter(int fd);
1097 
1098     // # Completion based I/O methods returning Futures.
1099 
1100     ptrdiff_t socketReceive(Socket sock, void[] buf);
1101 
1102     ptrdiff_t socketSend(Socket sock, void[] buf);
1103 
1104     Socket socketAccept(Socket sock);
1105 */
1106     // Resolve host name
1107 
1108     @Coroutine
1109     AddressInfo[] getAddressInfo(in char[] host, in char[] service,
1110         AddressFamily addressFamily = UNSPECIFIED!AddressFamily,
1111         SocketType socketType = UNSPECIFIED!SocketType,
1112         ProtocolType protocolType = UNSPECIFIED!ProtocolType,
1113         AddressInfoFlags addressInfoFlags = UNSPECIFIED!AddressInfoFlags);
1114 
1115 
1116     // Signal handling.
1117     version (Posix)
1118     {
1119         void addSignalHandler(int sig, void delegate() handler);
1120 
1121         void removeSignalHandler(int sig);
1122     }
1123 
1124     // Error handlers.
1125 
1126     private ExceptionHandler exceptionHandler;
1127 
1128     /**
1129      * Set handler as the new event loop exception handler.
1130      *
1131      * If handler is $(D_KEYWORD null), the default exception handler will be set.
1132      *
1133      * See_Also: $(D_PSYMBOL callExceptionHandler()).
1134      */
1135     final void setExceptionHandler(ExceptionHandler exceptionHandler)
1136     {
1137         this.exceptionHandler = exceptionHandler;
1138     }
1139 
1140     /**
1141      * Default exception handler.
1142      *
1143      * This is called when an exception occurs and no exception handler is set,
1144      * and can be called by a custom exception handler that wants to defer to
1145      * the default behavior.
1146      * The context parameter has the same meaning as in $(D_PSYMBOL
1147      * callExceptionHandler()).
1148      */
1149     final void defaultExceptionHandler(ExceptionContext exceptionContext)
1150     {
1151         if (exceptionContext.message.empty)
1152             exceptionContext.message = "Unhandled exception in event loop";
1153 
1154         if (cast(Error) exceptionContext.throwable)
1155             throw new Error("Uncaught Error: %s".format(exceptionContext));
1156         else
1157             throw new Exception("Uncaught Exception: %s".format(exceptionContext));
1158     }
1159 
1160     /**
1161      * Call the current event loop's exception handler.
1162      *
1163      * Note: New ExceptionContext members may be introduced in the future.
1164      */
1165     final void callExceptionHandler(ExceptionContext exceptionContext)
1166     {
1167         if (exceptionHandler is null)
1168         {
1169             defaultExceptionHandler(exceptionContext);
1170             return;
1171         }
1172 
1173         try
1174         {
1175             exceptionHandler(this, exceptionContext);
1176         }
1177         catch (Throwable throwable)
1178         {
1179             auto context = ExceptionContext("Unhandled error in exception handler",
1180                 throwable);
1181             context.context = &exceptionContext;
1182 
1183             defaultExceptionHandler(context);
1184         }
1185     }
1186 
1187     override string toString() const
1188     {
1189         return "%s(%s)".format(typeid(this), this.state);
1190     }
1191 
1192 protected:
1193 
1194     void scheduleCallback(CallbackHandle callback);
1195 
1196     void scheduleCallbackThreadSafe(CallbackHandle callback);
1197 
1198     void scheduleCallback(Duration delay, CallbackHandle callback);
1199 
1200     void scheduleCallback(SysTime when, CallbackHandle callback);
1201 
1202     void socketConnect(Socket socket, Address address);
1203 
1204     Transport makeSocketTransport(Socket socket, Protocol protocol,
1205         Waiter waiter);
1206 
1207     DatagramTransport makeDatagramTransport(Socket socket,
1208         DatagramProtocol datagramProtocol, Address remoteAddress,
1209         Waiter waiter);
1210 
1211     void startServing(ProtocolFactory protocolFactory, Socket socket,
1212         SslContext sslContext, ServerImpl server);
1213 
1214     void stopServing(Socket socket);
1215 private:
1216 
1217     @Coroutine
1218     auto createConnectionTransport(Socket socket,
1219         ProtocolFactory protocolFactory, SslContext sslContext,
1220         in char[] serverHostname = null)
1221     {
1222         Protocol protocol = protocolFactory();
1223         Transport transport = null;
1224         auto waiter = new Waiter(this);
1225 
1226         if (sslContext !is null)
1227         {
1228             assert(0, "SSL support not implemented yet");
1229             //sslcontext = None if isinstance(ssl, bool) else ssl
1230             //transport = self._make_ssl_transport(
1231             //    sock, protocol, sslcontext, waiter,
1232             //    server_side = False, server_hostname = server_hostname)
1233         }
1234         else
1235         {
1236             transport = makeSocketTransport(socket, protocol, waiter);
1237         }
1238 
1239         try
1240         {
1241             this.waitFor(waiter);
1242         }
1243         catch (Throwable throwable)
1244         {
1245             transport.close;
1246             throw throwable;
1247         }
1248 
1249         return tuple!("transport", "protocol")(transport, protocol);
1250     }
1251 }
1252 
1253 /**
1254  * Interface of policy for accessing the event loop.
1255  */
1256 abstract class EventLoopPolicy
1257 {
1258     protected EventLoop eventLoop = null;
1259     protected bool setCalled = false;
1260 
1261     /**
1262      * Get the event loop.
1263      *
1264      * This may be $(D null) or an instance of EventLoop.
1265      */
1266     EventLoop getEventLoop()
1267     {
1268         if (eventLoop is null && !setCalled && thread_isMainThread)
1269             setEventLoop(newEventLoop);
1270         return eventLoop;
1271     }
1272 
1273     /**
1274      * Set the event loop.
1275      */
1276     void setEventLoop(EventLoop loop)
1277     {
1278         setCalled = true;
1279         eventLoop = loop;
1280     }
1281 
1282     /**
1283      * Create a new event loop.
1284      *
1285      * You must call $(D setEventLoop()) to make this the current event loop.
1286      */
1287     EventLoop newEventLoop();
1288 
1289     //version (Posix)
1290     //{
1291     //    ChildWatcher getChildWatcher();
1292 
1293     //    void setChildWatcher(ChildWatcher watcher);
1294     //}
1295 }
1296 
1297 /**
1298  * Default policy implementation for accessing the event loop.
1299  *
1300  * In this policy, each thread has its own event loop.  However, we only
1301  * automatically create an event loop by default for the main thread; other
1302  * threads by default have no event loop.
1303  *
1304  * Other policies may have different rules (e.g. a single global event loop, or
1305  * automatically creating an event loop per thread, or using some other notion
1306  * of context to which an event loop is associated).
1307  */
1308 
1309 private __gshared EventLoopPolicy eventLoopPolicy;
1310 
1311 import asynchronous.libasync.events;
1312 
1313 alias DefaultEventLoopPolicy = LibasyncEventLoopPolicy;
1314 
1315 /**
1316  * Get the current event loop policy.
1317  */
1318 EventLoopPolicy getEventLoopPolicy()
1319 {
1320     if (eventLoopPolicy is null)
1321     {
1322         synchronized
1323         {
1324             if (eventLoopPolicy is null)
1325                 eventLoopPolicy = new DefaultEventLoopPolicy;
1326         }
1327     }
1328 
1329     return eventLoopPolicy;
1330 }
1331 
1332 /**
1333  * Set the current event loop policy.
1334  *
1335  * If policy is $(D null), the default policy is restored.
1336  */
1337 void setEventLoopPolicy(EventLoopPolicy policy)
1338 {
1339     eventLoopPolicy = policy;
1340 }
1341 
1342 /**
1343  * Equivalent to calling $(D getEventLoopPolicy.getEventLoop).
1344  */
1345 EventLoop getEventLoop()
1346 {
1347     return getEventLoopPolicy.getEventLoop;
1348 }
1349 
1350 unittest
1351 {
1352     assert(getEventLoop !is null);
1353 }
1354 
1355 /**
1356  * Equivalent to calling $(D getEventLoopPolicy.setEventLoop(loop)).
1357  */
1358 void setEventLoop(EventLoop loop)
1359 {
1360     getEventLoopPolicy.setEventLoop(loop);
1361 }
1362 
1363 /**
1364  * Equivalent to calling $(D getEventLoopPolicy.newEventLoop).
1365  */
1366 EventLoop newEventLoop()
1367 {
1368     return getEventLoopPolicy.newEventLoop;
1369 }