1 /**
2  * libasync event loop integration.
3  *
4  * Copyright: © 2015-2016 Dragos Carp
5  * License: Boost Software License - Version 1.0
6  * Authors: Dragos Carp
7  */
8 module asynchronous.libasync.events;
9 
10 import std.algorithm;
11 import std.array;
12 import std.datetime;
13 import std.exception : enforce;
14 import std.process;
15 import std.socket;
16 import std.string;
17 import std.traits;
18 import std.typecons;
19 import libasync.events : EventLoop_ = EventLoop, NetworkAddress;
20 import libasync.signal : AsyncSignal;
21 import libasync.timer : AsyncTimer;
22 import libasync.tcp : AsyncTCPConnection, AsyncTCPListener, TCPEvent;
23 import libasync.threads : destroyAsyncThreads, gs_threads;
24 import libasync.udp : AsyncUDPSocket, UDPEvent;
25 import asynchronous.events;
26 import asynchronous.futures;
27 import asynchronous.protocols;
28 import asynchronous.tasks;
29 import asynchronous.transports;
30 import asynchronous.types;
31 
32 alias Protocol = asynchronous.protocols.Protocol;
33 
34 shared static ~this()
35 {
36     destroyAsyncThreads;
37 }
38 
39 package class LibasyncEventLoop : EventLoop
40 {
41     private EventLoop_ eventLoop;
42 
43     alias Timers = ResourcePool!(AsyncTimer, EventLoop_);
44     private Timers timers;
45 
46     private Appender!(CallbackHandle[]) nextCallbacks1;
47     private Appender!(CallbackHandle[]) nextCallbacks2;
48     private Appender!(CallbackHandle[])* currentAppender;
49 
50     private Appender!(CallbackHandle[]) nextThreadSafeCallbacks;
51     private shared AsyncSignal newThreadSafeCallbacks;
52 
53     private LibasyncTransport[] pendingConnections;
54 
55     alias Listener = Tuple!(ServerImpl, "server", AsyncTCPListener, "listener");
56 
57     private Listener[] activeListeners;
58 
59     this()
60     {
61         this.eventLoop = new EventLoop_;
62         this.timers = new Timers(this.eventLoop);
63         this.currentAppender = &nextCallbacks1;
64 
65         this.newThreadSafeCallbacks = new shared AsyncSignal(this.eventLoop);
66         this.newThreadSafeCallbacks.run(&scheduleThreadSafeCallbacks);
67     }
68 
69     override void runForever()
70     {
71         enforce(this.state == State.STOPPED,
72             "Unexpected event loop state %s".format(this.state));
73 
74         this.state = State.RUNNING;
75 
76         while (true)
77         {
78             final switch (this.state)
79             {
80             case State.STOPPED:
81             case State.STOPPING:
82                 this.state = State.STOPPED;
83                 return;
84             case State.RUNNING:
85                 if (this.currentAppender.data.empty)
86                     this.eventLoop.loop(-1.msecs);
87 
88                 auto callbacks = this.currentAppender;
89 
90                 if (this.currentAppender == &this.nextCallbacks1)
91                     this.currentAppender = &this.nextCallbacks2;
92                 else
93                     this.currentAppender = &this.nextCallbacks1;
94 
95                 assert(this.currentAppender.data.empty);
96 
97                 foreach (callback; callbacks.data.filter!(a => !a.cancelled))
98                     callback();
99 
100                 callbacks.clear;
101                 break;
102             case State.CLOSED:
103                 throw new Exception("Event loop closed while running.");
104             }
105         }
106     }
107 
108     override void scheduleCallback(CallbackHandle callback)
109     {
110         if (!callback.cancelled)
111             currentAppender.put(callback);
112     }
113 
114     private void scheduleThreadSafeCallbacks()
115     {
116         synchronized (this)
117         {
118             foreach (callback; nextThreadSafeCallbacks.data)
119                 scheduleCallback(callback);
120 
121             nextThreadSafeCallbacks.clear;
122         }
123     }
124 
125     override void scheduleCallbackThreadSafe(CallbackHandle callback)
126     {
127         synchronized (this)
128         {
129             nextThreadSafeCallbacks ~= callback;
130         }
131 
132         newThreadSafeCallbacks.trigger;
133     }
134 
135     override void scheduleCallback(Duration delay, CallbackHandle callback)
136     {
137         if (delay <= Duration.zero)
138         {
139             scheduleCallback(callback);
140             return;
141         }
142 
143         auto timer = this.timers.acquire();
144         timer.duration(delay).run({
145             scheduleCallback(callback);
146             this.timers.release(timer);
147         });
148     }
149 
150     override void scheduleCallback(SysTime when, CallbackHandle callback)
151     {
152         Duration duration = when - time;
153 
154         scheduleCallback(duration, callback);
155     }
156 
157     @Coroutine
158     override void socketConnect(Socket socket, Address address)
159     {
160         auto asyncTCPConnection = new AsyncTCPConnection(this.eventLoop,
161             socket.handle);
162         NetworkAddress peerAddress;
163 
164         (cast(byte*) peerAddress.sockAddr)[0 .. address.nameLen] =
165             (cast(byte*) address.name)[0 .. address.nameLen];
166         asyncTCPConnection.peer = peerAddress;
167 
168         auto connection = new LibasyncTransport(this, socket,
169             asyncTCPConnection);
170 
171         asyncTCPConnection.run(&connection.handleTCPEvent);
172 
173         this.pendingConnections ~= connection;
174     }
175 
176     override Transport makeSocketTransport(Socket socket, Protocol protocol,
177         Waiter waiter)
178     {
179         auto index = this.pendingConnections.countUntil!(
180             a => a.socket == socket);
181 
182         enforce(index >= 0, "Internal error");
183 
184         auto transport = this.pendingConnections[index];
185 
186         this.pendingConnections = this.pendingConnections.remove(index);
187         transport.setProtocol(protocol, waiter);
188 
189         return transport;
190     }
191 
192     override DatagramTransport makeDatagramTransport(Socket socket,
193         DatagramProtocol datagramProtocol, Address remoteAddress, Waiter waiter)
194     {
195         auto asyncUDPSocket = new AsyncUDPSocket(this.eventLoop, socket.handle);
196 
197         auto datagramTransport = new LibasyncDatagramTransport(this, socket,
198             asyncUDPSocket);
199 
200         asyncUDPSocket.run(&datagramTransport.handleUDPEvent);
201 
202         datagramTransport.setProtocol(datagramProtocol, waiter);
203 
204         if (waiter !is null)
205             this.callSoon(&waiter.setResultUnlessCancelled);
206 
207         return datagramTransport;
208     }
209 
210     @Coroutine
211     override AddressInfo[] getAddressInfo(in char[] host, in char[] service,
212         AddressFamily addressFamily = UNSPECIFIED!AddressFamily,
213         SocketType socketType = UNSPECIFIED!SocketType,
214         ProtocolType protocolType = UNSPECIFIED!ProtocolType,
215         AddressInfoFlags addressInfoFlags = UNSPECIFIED!AddressInfoFlags)
216     {
217         // no async implementation in libasync yet, use the
218         // std.socket.getAddresInfo implementation;
219         return std.socket.getAddressInfo(host, service, addressFamily,
220             socketType, protocolType, addressInfoFlags);
221     }
222 
223     override void startServing(ProtocolFactory protocolFactory, Socket socket,
224             SslContext sslContext, ServerImpl server)
225     {
226         auto asyncTCPListener = new AsyncTCPListener(this.eventLoop,
227             socket.handle);
228         NetworkAddress localAddress;
229         Address address = socket.localAddress;
230 
231         (cast(byte*) localAddress.sockAddr)[0 .. address.nameLen] =
232             (cast(byte*) address.name)[0 .. address.nameLen];
233         asyncTCPListener.local(localAddress);
234 
235         this.activeListeners ~= Listener(server, asyncTCPListener);
236 
237         asyncTCPListener.run((AsyncTCPConnection connection) {
238             auto socket1 = new Socket(cast(socket_t) connection.socket,
239                 socket.addressFamily);
240 
241             auto transport = new LibasyncTransport(this, socket1, connection);
242 
243             transport.setProtocol(protocolFactory());
244             return &transport.handleTCPEvent;
245         });
246         server.attach;
247     }
248 
249     override void stopServing(Socket socket)
250     {
251         auto found = this.activeListeners.find!(
252             l => l.listener.socket == socket.handle);
253 
254         assert(!found.empty);
255 
256         auto server = found[0].server;
257         auto listener = found[0].listener;
258 
259         found[0] = found[$ - 1];
260         found[$ - 1] = Listener(null, null);
261         --this.activeListeners.length;
262 
263         listener.kill;
264 
265         // normally detach should be called on closing event of the listening
266         // socket, but libasync does not have such an event. so just call it
267         // 10 milliseconds later.
268         scheduleCallback(10.msecs, callback(this, &server.detach));
269     }
270 
271     version (Posix)
272     {
273         override void addSignalHandler(int sig, void delegate() handler)
274         {
275             assert(0, "addSignalHandler not implemented yet");
276         }
277 
278         override void removeSignalHandler(int sig)
279         {
280             assert(0, "removeSignalHandler not implemented yet");
281         }
282     }
283 }
284 
285 class LibasyncEventLoopPolicy : EventLoopPolicy
286 {
287     override EventLoop newEventLoop()
288     {
289         return new LibasyncEventLoop;
290     }
291 
292     //version (Posix)
293     //{
294     //    override ChildWatcher getChildWatcher()
295     //    {
296     //        assert(0, "Not implemented");
297     //    }
298 
299     //    override void setChildWatcher(ChildWatcher watcher)
300     //    {
301     //        assert(0, "Not implemented");
302     //    }
303     //}
304 }
305 
306 private final class LibasyncTransport : AbstractBaseTransport, Transport
307 {
308     private enum State : ubyte
309     {
310         CONNECTING,
311         CONNECTED,
312         EOF,
313         DISCONNECTED,
314     }
315 
316     private EventLoop eventLoop;
317     private Socket _socket;
318     private AsyncTCPConnection connection;
319     private Waiter waiter = null;
320     private Protocol protocol;
321     private State state;
322     private bool readingPaused = false;
323     private bool writingPaused = false;
324     private void[] writeBuffer;
325     private BufferLimits writeBufferLimits;
326     private Duration writeRescheduleInterval = 1.msecs;
327 
328     this(EventLoop eventLoop, Socket socket, AsyncTCPConnection connection)
329     in
330     {
331         assert(eventLoop !is null);
332         assert(socket !is null);
333         assert(connection !is null);
334     }
335     body
336     {
337         this.state = State.CONNECTING;
338         this.eventLoop = eventLoop;
339         this._socket = socket;
340         this.connection = connection;
341         setWriteBufferLimits;
342     }
343 
344     @property
345     Socket socket()
346     {
347         return this._socket;
348     }
349 
350     void setProtocol(Protocol protocol, Waiter waiter = null)
351     in
352     {
353         assert(this.state == State.CONNECTING);
354         assert(protocol !is null);
355         assert(this.protocol is null);
356         assert(this.waiter is null);
357     }
358     body
359     {
360         this.protocol = protocol;
361         this.waiter = waiter;
362     }
363 
364     private void onConnect()
365     in
366     {
367         assert(this.state == State.CONNECTING);
368         assert(this.connection.isConnected);
369         assert(this._socket.handle == this.connection.socket);
370         assert(this.protocol !is null);
371     }
372     body
373     {
374         state = State.CONNECTED;
375 
376         if (this.waiter !is null)
377         {
378             this.eventLoop.callSoon(&this.waiter.setResultUnlessCancelled);
379             this.waiter = null;
380         }
381 
382         this.eventLoop.callSoon(&this.protocol.connectionMade, this);
383     }
384 
385     private void onRead()
386     in
387     {
388         assert(this.state == State.CONNECTED);
389         assert(this.protocol !is null);
390     }
391     body
392     {
393         if (this.readingPaused)
394             return;
395 
396         static ubyte[] readBuffer = new ubyte[64 * 1024];
397         Appender!(ubyte[]) receivedData;
398 
399         while (true)
400         {
401             auto length = this.connection.recv(readBuffer);
402 
403             receivedData ~= readBuffer[0 .. length];
404 
405             if (length < readBuffer.length)
406                 break;
407         }
408 
409         if (!receivedData.data.empty)
410             this.eventLoop.callSoon(&this.protocol.dataReceived,
411                 receivedData.data);
412     }
413 
414     private void onWrite()
415     in
416     {
417         assert(this.state == State.CONNECTED || this.state == State.EOF);
418         assert(this.protocol !is null);
419     }
420     body
421     {
422         if (!this.writeBuffer.empty)
423         {
424             auto sent = this.connection.send(cast(ubyte[]) this.writeBuffer);
425             this.writeBuffer = this.writeBuffer[sent .. $];
426         }
427 
428         if (this.writingPaused &&
429             this.writeBuffer.length <= this.writeBufferLimits.low)
430         {
431             this.protocol.resumeWriting;
432             this.writingPaused = false;
433         }
434     }
435 
436     private void onClose()
437     in
438     {
439         assert(this.state != State.CONNECTING);
440         assert(this.protocol !is null);
441     }
442     body
443     {
444         if (this.connection is null)
445             return;
446 
447         this.connection = null;
448         this.state = State.DISCONNECTED;
449         this.eventLoop.callSoon(&this.protocol.connectionLost, null);
450     }
451 
452     private void onError()
453     in
454     {
455         assert(this.protocol !is null);
456     }
457     body
458     {
459         if (this.connection is null)
460             return;
461 
462         if (this.state == State.CONNECTING)
463         {
464             if (this.waiter)
465                 waiter.setException(new SocketOSException(connection.error()));
466         }
467         else
468         {
469             this.eventLoop.callSoon(&this.protocol.connectionLost,
470                 new SocketOSException(connection.error()));
471         }
472 
473         this.connection = null;
474         this.state = State.DISCONNECTED;
475     }
476 
477     void handleTCPEvent(TCPEvent event)
478     {
479         final switch (event)
480         {
481         case TCPEvent.CONNECT:
482             onConnect;
483             break;
484         case TCPEvent.READ:
485             onRead;
486             break;
487         case TCPEvent.WRITE:
488             onWrite;
489             break;
490         case TCPEvent.CLOSE:
491             onClose;
492             break;
493         case TCPEvent.ERROR:
494             onError;
495             break;
496         }
497     }
498 
499     /**
500      * Transport interface
501      */
502     override Socket getExtraInfoSocket()
503     {
504         return socket;
505     }
506 
507     void close()
508     {
509         if (this.state == State.DISCONNECTED)
510             return;
511 
512         this.state = State.DISCONNECTED;
513         this.eventLoop.callSoon(&this.connection.kill, false);
514     }
515 
516     void pauseReading()
517     {
518         if (this.state != State.CONNECTED)
519             throw new Exception("Cannot pauseReading() when closing");
520         if (this.readingPaused)
521             throw new Exception("Reading is already paused");
522 
523         this.readingPaused = true;
524     }
525 
526     void resumeReading()
527     {
528         if (!this.readingPaused)
529             throw new Exception("Reading is not paused");
530         this.readingPaused = false;
531     }
532 
533     void abort()
534     {
535         if (this.state == State.DISCONNECTED)
536             return;
537 
538         this.state = State.DISCONNECTED;
539         this.eventLoop.callSoon(&this.connection.kill, true);
540     }
541 
542     bool canWriteEof()
543     {
544         return true;
545     }
546 
547     size_t getWriteBufferSize()
548     {
549         return writeBuffer.length;
550     }
551 
552     BufferLimits getWriteBufferLimits()
553     {
554         return writeBufferLimits;
555     }
556 
557     void setWriteBufferLimits(Nullable!size_t high = Nullable!size_t(),
558         Nullable!size_t low = Nullable!size_t())
559     {
560         if (high.isNull)
561         {
562             if (low.isNull)
563                 high = 64 * 1024;
564             else
565                 high = 4 * low;
566         }
567 
568         if (low.isNull)
569             low = high / 4;
570 
571         if (high < low)
572             low = high;
573 
574         this.writeBufferLimits.high = high;
575         this.writeBufferLimits.low = low;
576     }
577 
578     void write(const(void)[] data)
579     in
580     {
581         assert(this.protocol !is null);
582         assert(!this.writingPaused);
583     }
584     body
585     {
586         enforce(this.connection !is null, "Disconnected transport");
587 
588         if (this.writeBuffer.empty)
589         {
590             auto sent = this.connection.send(cast(const ubyte[]) data);
591             if (sent < data.length)
592                 this.writeBuffer = data[sent .. $].dup;
593         }
594         else
595         {
596             this.writeBuffer ~= data;
597             if (!this.writingPaused && this.writeBuffer.length >= this.writeBufferLimits.high)
598             {
599                 this.protocol.pauseWriting;
600                 this.writingPaused = true;
601             }
602             rescheduleOnWrite;
603         }
604     }
605 
606     private void rescheduleOnWrite()
607     {
608         if (this.state == State.DISCONNECTED)
609             return;
610 
611         if (this.writeBuffer.empty)
612             return;
613 
614         auto bufferLength = this.writeBuffer.length;
615 
616         onWrite;
617         if (this.writeBuffer.empty)
618             return;
619 
620         if (this.writeBuffer.length < bufferLength)
621         {
622             this.writeRescheduleInterval = 1.msecs;
623             this.eventLoop.callSoon(&this.rescheduleOnWrite);
624         }
625         else
626         {
627             this.writeRescheduleInterval = min(this.writeRescheduleInterval * 2, 2.seconds);
628             this.eventLoop.callLater(this.writeRescheduleInterval, &this.rescheduleOnWrite);
629         }
630     }
631 
632     void writeEof()
633     {
634         close;
635     }
636 }
637 
638 private final class LibasyncDatagramTransport : AbstractBaseTransport,  DatagramTransport
639 {
640     private EventLoop eventLoop;
641     private Socket _socket;
642     private AsyncUDPSocket udpSocket;
643     private Waiter waiter = null;
644     private DatagramProtocol datagramProtocol = null;
645 
646     this(EventLoop eventLoop, Socket socket, AsyncUDPSocket udpSocket)
647     in
648     {
649         assert(eventLoop !is null);
650         assert(socket !is null);
651         assert(udpSocket !is null);
652         assert(socket.handle == udpSocket.socket);
653     }
654     body
655     {
656         this.eventLoop = eventLoop;
657         this._socket = socket;
658         this.udpSocket = udpSocket;
659     }
660 
661     @property
662     Socket socket()
663     {
664         return this._socket;
665     }
666 
667     void setProtocol(DatagramProtocol datagramProtocol, Waiter waiter = null)
668     in
669     {
670         assert(datagramProtocol !is null);
671         assert(this.datagramProtocol is null);
672         assert(this.waiter is null);
673     }
674     body
675     {
676         this.datagramProtocol = datagramProtocol;
677         this.waiter = waiter;
678     }
679 
680     private void onRead()
681     in
682     {
683         assert(this.datagramProtocol !is null);
684     }
685     body
686     {
687         ubyte[] readBuffer = new ubyte[1501];
688         NetworkAddress networkAddress;
689 
690         auto length = this.udpSocket.recvFrom(readBuffer, networkAddress);
691 
692         enforce(length < readBuffer.length,
693             "Unexpected UDP package size > 1500 bytes");
694 
695         Address address = new UnknownAddressReference(
696             cast(typeof(Address.init.name)) networkAddress.sockAddr,
697             cast(uint) networkAddress.sockAddrLen);
698 
699         this.eventLoop.callSoon(&this.datagramProtocol.datagramReceived,
700             readBuffer[0 .. length], address);
701     }
702 
703     private void onWrite()
704     in
705     {
706         assert(this.datagramProtocol !is null);
707     }
708     body
709     {
710     }
711 
712     private void onError()
713     in
714     {
715         assert(this.datagramProtocol !is null);
716     }
717     body
718     {
719         this.eventLoop.callSoon(&this.datagramProtocol.errorReceived,
720             new SocketOSException(udpSocket.error()));
721     }
722 
723     void handleUDPEvent(UDPEvent event)
724     {
725         final switch (event)
726         {
727         case UDPEvent.READ:
728             onRead;
729             break;
730         case UDPEvent.WRITE:
731             onWrite;
732             break;
733         case UDPEvent.ERROR:
734             onError;
735             break;
736         }
737     }
738 
739     /**
740      * DatagramTransport interface
741      */
742 
743     override Socket getExtraInfoSocket()
744     {
745         return socket;
746     }
747 
748     void close()
749     {
750         this.eventLoop.callSoon(&this.udpSocket.kill);
751     }
752 
753     void sendTo(const(void)[] data, Address address = null)
754     in
755     {
756         assert(this.datagramProtocol !is null);
757     }
758     body
759     {
760         enforce(address !is null, "default remote not supported yet");
761 
762         NetworkAddress networkAddress;
763 
764         (cast(byte*) networkAddress.sockAddr)[0 .. address.nameLen] =
765             (cast(byte*) address.name)[0 .. address.nameLen];
766 
767         this.udpSocket.sendTo(cast(const ubyte[]) data, networkAddress);
768     }
769 
770     void abort()
771     {
772         this.eventLoop.callSoon(&this.udpSocket.kill);
773     }
774 }