1 module asynchronous.libasync.events;
2 
3 import std.algorithm;
4 import std.array;
5 import std.datetime;
6 import std.process;
7 import std.socket;
8 import std.string;
9 import std.traits;
10 import std.typecons;
11 import libasync.events : EventLoop_ = EventLoop, NetworkAddress;
12 import libasync.timer : AsyncTimer;
13 import libasync.tcp : AsyncTCPConnection, AsyncTCPListener, TCPEvent;
14 import libasync.threads : destroyAsyncThreads, gs_threads;
15 import libasync.udp : AsyncUDPSocket, UDPEvent;
16 import asynchronous.events;
17 import asynchronous.futures;
18 import asynchronous.protocols;
19 import asynchronous.tasks;
20 import asynchronous.transports;
21 import asynchronous.types;
22 
23 alias Protocol = asynchronous.protocols.Protocol;
24 
25 shared static ~this()
26 {
27     destroyAsyncThreads;
28 }
29 
30 package class LibasyncEventLoop : EventLoop
31 {
32     private EventLoop_ eventLoop;
33 
34     alias Timers = ResourcePool!(AsyncTimer, EventLoop_);
35     private Timers timers;
36 
37     private Appender!(CallbackHandle[]) nextCallbacks1;
38     private Appender!(CallbackHandle[]) nextCallbacks2;
39     private Appender!(CallbackHandle[])* currentAppender;
40 
41     private LibasyncTransport[] pendingConnections;
42 
43     alias Listener = Tuple!(ServerImpl, "server", AsyncTCPListener, "listener");
44 
45     private Listener[] activeListeners;
46 
47     this()
48     {
49         this.eventLoop = new EventLoop_;
50         this.timers = new Timers(this.eventLoop);
51         this.currentAppender = &nextCallbacks1;
52     }
53 
54     override void runForever()
55     {
56         enforce(this.state == State.STOPPED,
57             "Unexpected event loop state %s".format(this.state));
58 
59         this.state = State.RUNNING;
60 
61         while (true)
62         {
63             final switch (this.state)
64             {
65             case State.STOPPED:
66             case State.STOPPING:
67                 this.state = State.STOPPED;
68                 return;
69             case State.RUNNING:
70                 auto callbacks = this.currentAppender;
71 
72                 if (this.currentAppender == &this.nextCallbacks1)
73                     this.currentAppender = &this.nextCallbacks2;
74                 else
75                     this.currentAppender = &this.nextCallbacks1;
76 
77                 assert(this.currentAppender.data.empty);
78 
79                 if (callbacks.data.empty)
80                 {
81                     this.eventLoop.loop(-1.msecs);
82                 }
83                 else
84                 {
85                     foreach (callback; callbacks.data.filter!(a => !a.cancelled))
86                         callback();
87 
88                     callbacks.clear;
89                 }
90                 break;
91             case State.CLOSED:
92                 throw new Exception("Event loop closed while running.");
93             }
94         }
95     }
96 
97     override void scheduleCallback(CallbackHandle callback)
98     {
99         if (!callback.cancelled)
100             currentAppender.put(callback);
101     }
102 
103     override void scheduleCallback(Duration delay, CallbackHandle callback)
104     {
105         if (delay <= Duration.zero)
106         {
107             scheduleCallback(callback);
108             return;
109         }
110 
111         auto timer = this.timers.acquire();
112         timer.duration(delay).run({
113             scheduleCallback(callback);
114             this.timers.release(timer);
115         });
116     }
117 
118     override void scheduleCallback(SysTime when, CallbackHandle callback)
119     {
120         Duration duration = when - time;
121 
122         scheduleCallback(duration, callback);
123     }
124 
125     @Coroutine
126     override void socketConnect(Socket socket, Address address)
127     {
128         auto asyncTCPConnection = new AsyncTCPConnection(this.eventLoop,
129             socket.handle);
130         NetworkAddress peerAddress;
131 
132         (cast(byte*) peerAddress.sockAddr)[0 .. address.nameLen] =
133             (cast(byte*) address.name)[0 .. address.nameLen];
134         asyncTCPConnection.peer = peerAddress;
135 
136         auto connection = new LibasyncTransport(this, socket,
137             asyncTCPConnection);
138 
139         asyncTCPConnection.run(&connection.handleTCPEvent);
140 
141         this.pendingConnections ~= connection;
142     }
143 
144     override Transport makeSocketTransport(Socket socket, Protocol protocol,
145         Waiter waiter)
146     {
147         auto index = this.pendingConnections.countUntil!(
148             a => a.socket == socket);
149 
150         enforce(index >= 0, "Internal error");
151 
152         auto transport = this.pendingConnections[index];
153 
154         if (this.pendingConnections.length > 1)
155         {
156             this.pendingConnections[index] = this.pendingConnections[$ - 1];
157         }
158         this.pendingConnections.length -= 1;
159 
160         transport.setProtocol(protocol, waiter);
161 
162         return transport;
163     }
164 
165     override DatagramTransport makeDatagramTransport(Socket socket,
166         DatagramProtocol datagramProtocol, Address remoteAddress, Waiter waiter)
167     {
168         auto asyncUDPSocket = new AsyncUDPSocket(this.eventLoop, socket.handle);
169 
170         auto datagramTransport = new LibasyncDatagramTransport(this, socket,
171             asyncUDPSocket);
172 
173         asyncUDPSocket.run(&datagramTransport.handleUDPEvent);
174 
175         datagramTransport.setProtocol(datagramProtocol, waiter);
176 
177         if (waiter !is null)
178             this.callSoon(&waiter.setResultUnlessCancelled);
179 
180         return datagramTransport;
181     }
182 
183     @Coroutine
184     override AddressInfo[] getAddressInfo(in char[] host, in char[] service,
185         AddressFamily addressFamily = UNSPECIFIED!AddressFamily,
186         SocketType socketType = UNSPECIFIED!SocketType,
187         ProtocolType protocolType = UNSPECIFIED!ProtocolType,
188         AddressInfoFlags addressInfoFlags = UNSPECIFIED!AddressInfoFlags)
189     {
190         // no async implementation in libasync yet, use the std.socket.getAddresInfo implementation;
191         return std.socket.getAddressInfo(host, service, addressFamily,
192             socketType, protocolType, addressInfoFlags);
193     }
194 
195     override void startServing(ProtocolFactory protocolFactory, Socket socket,
196             SslContext sslContext, ServerImpl server)
197     {
198         auto asyncTCPListener = new AsyncTCPListener(this.eventLoop,
199             socket.handle);
200         NetworkAddress localAddress;
201         Address address = socket.localAddress;
202 
203         (cast(byte*) localAddress.sockAddr)[0 .. address.nameLen] =
204             (cast(byte*) address.name)[0 .. address.nameLen];
205         asyncTCPListener.local(localAddress);
206 
207         this.activeListeners ~= Listener(server, asyncTCPListener);
208 
209         asyncTCPListener.run((AsyncTCPConnection connection) {
210             auto socket1 = new Socket(cast(socket_t) connection.socket,
211                 socket.addressFamily);
212 
213             auto transport = new LibasyncTransport(this, socket1, connection);
214 
215             transport.setProtocol(protocolFactory());
216             return &transport.handleTCPEvent;
217         });
218         server.attach;
219     }
220 
221     override void stopServing(Socket socket)
222     {
223         auto found = this.activeListeners.find!(
224             l => l.listener.socket == socket.handle);
225 
226         assert(!found.empty);
227 
228         auto server = found[0].server;
229         auto listener = found[0].listener;
230 
231         found[0] = found[$ - 1];
232         found[$ - 1] = Listener(null, null);
233         --this.activeListeners.length;
234 
235         listener.kill;
236 
237         // normally detach should be called on closing event of the listening
238         // socket, but libasync does not have such an event. so just call it
239         // 10 milliseconds later.
240         scheduleCallback(10.msecs, callback(this, &server.detach));
241     }
242 
243 
244     version (Posix)
245     {
246         override void addSignalHandler(int sig, void delegate() handler)
247         {
248             assert(0, "addSignalHandler not implemented yet");
249         }
250 
251         override void removeSignalHandler(int sig)
252         {
253             assert(0, "removeSignalHandler not implemented yet");
254         }
255     }
256 
257     //void setExceptionHandler(void function(EventLoop, ExceptionContext) handler)
258     //{
259 
260     //}
261 
262     //void defaultExceptionHandler(ExceptionContext context)
263     //{
264 
265     //}
266 
267     //void callExceptionHandler(ExceptionContext context)
268     //{
269 
270     //}
271 }
272 
273 class LibasyncEventLoopPolicy : EventLoopPolicy
274 {
275     override EventLoop newEventLoop()
276     {
277         return new LibasyncEventLoop;
278     }
279 
280     //version (Posix)
281     //{
282     //    override ChildWatcher getChildWatcher()
283     //    {
284     //        assert(0, "Not implemented");
285     //    }
286 
287     //    override void setChildWatcher(ChildWatcher watcher)
288     //    {
289     //        assert(0, "Not implemented");
290     //    }
291     //}
292 }
293 
294 private final class LibasyncTransport : AbstractBaseTransport, Transport
295 {
296     private EventLoop eventLoop;
297     private Socket _socket;
298     private AsyncTCPConnection connection;
299     private Waiter waiter = null;
300     private Protocol protocol;
301     private uint connectionLost = 0;
302     private bool closing = false;
303     private bool readingPaused = false;
304     private bool writingPaused = false;
305     private void[] writeBuffer;
306     private BufferLimits writeBufferLimits;
307 
308     this(EventLoop eventLoop, Socket socket, AsyncTCPConnection connection)
309     in
310     {
311         assert(eventLoop !is null);
312         assert(socket !is null);
313         assert(connection !is null);
314     }
315     body
316     {
317         this.eventLoop = eventLoop;
318         this._socket = socket;
319         this.connection = connection;
320         setWriteBufferLimits;
321     }
322 
323     @property
324     Socket socket()
325     {
326         return this._socket;
327     }
328 
329     void setProtocol(Protocol protocol, Waiter waiter = null)
330     in
331     {
332         assert(protocol !is null);
333         assert(this.protocol is null);
334         assert(this.waiter is null);
335     }
336     body
337     {
338         this.protocol = protocol;
339         this.waiter = waiter;
340     }
341 
342     private void onConnect()
343     in
344     {
345         assert(this.connection.isConnected);
346         assert(this._socket.handle == this.connection.socket);
347         assert(this.protocol !is null);
348     }
349     body
350     {
351         this.eventLoop.callSoon(&this.protocol.connectionMade, this);
352 
353         if (this.waiter !is null)
354             this.eventLoop.callSoon(&this.waiter.setResultUnlessCancelled);
355     }
356 
357     private void onRead()
358     in
359     {
360         assert(this.protocol !is null);
361     }
362     body
363     {
364         if (this.readingPaused)
365             return;
366 
367         static ubyte[] readBuffer = new ubyte[64 * 1024];
368         Appender!(ubyte[]) receivedData;
369 
370         while (true)
371         {
372             auto length = this.connection.recv(readBuffer);
373 
374             receivedData ~= readBuffer[0 .. length];
375 
376             if (length < readBuffer.length)
377                 break;
378         }
379 
380         if (!receivedData.data.empty)
381         {
382             this.eventLoop.callSoon(&this.protocol.dataReceived,
383                 receivedData.data);
384         }
385     }
386 
387     private void onWrite()
388     in
389     {
390         assert(this.protocol !is null);
391     }
392     body
393     {
394         if (!this.writeBuffer.empty)
395         {
396             auto sent = this.connection.send(cast(ubyte[]) this.writeBuffer);
397             this.writeBuffer = this.writeBuffer[sent .. $];
398         }
399 
400         if (this.writingPaused &&
401             this.writeBuffer.length <= this.writeBufferLimits.low)
402         {
403             this.protocol.resumeWriting;
404             this.writingPaused = false;
405         }
406     }
407 
408     private void onClose(SocketOSException socketOSException)
409     in
410     {
411         assert(this.protocol !is null);
412     }
413     body
414     {
415         this.connection = null;
416         this.eventLoop.callSoon(&this.protocol.connectionLost,
417             socketOSException);
418     }
419 
420     void handleTCPEvent(TCPEvent event)
421     {
422         final switch (event)
423         {
424         case TCPEvent.CONNECT:
425             onConnect();
426             break;
427         case TCPEvent.READ:
428             onRead();
429             break;
430         case TCPEvent.WRITE:
431             onWrite();
432             break;
433         case TCPEvent.CLOSE:
434             onClose(null);
435             break;
436         case TCPEvent.ERROR:
437             onClose(new SocketOSException(connection.error()));
438         }
439     }
440 
441     /**
442      * Transport interface
443      */
444     override Socket getExtraInfoSocket()
445     {
446         return socket;
447     }
448 
449     void close()
450     {
451         if (this.closing)
452             return;
453 
454         this.closing = true;
455         this.eventLoop.callSoon(&this.connection.kill, false);
456     }
457 
458     void pauseReading()
459     {
460         if (this.closing || this.connectionLost)
461             throw new Exception("Cannot pauseReading() when closing");
462         if (this.readingPaused)
463             throw new Exception("Reading is already paused");
464 
465         this.readingPaused = true;
466     }
467 
468     void resumeReading()
469     {
470         if (!this.readingPaused)
471             throw new Exception("Reading is not paused");
472         this.readingPaused = false;
473     }
474 
475     void abort()
476     {
477         if (this.connectionLost)
478             return;
479 
480         this.eventLoop.callSoon(&this.connection.kill, true);
481         ++this.connectionLost;
482     }
483 
484     bool canWriteEof()
485     {
486         return true;
487     }
488 
489     size_t getWriteBufferSize()
490     {
491         return writeBuffer.length;
492     }
493 
494     BufferLimits getWriteBufferLimits()
495     {
496         return writeBufferLimits;
497     }
498 
499     void setWriteBufferLimits(Nullable!size_t high = Nullable!size_t(),
500         Nullable!size_t low = Nullable!size_t())
501     {
502         if (high.isNull)
503         {
504             if (low.isNull)
505                 high = 64 * 1024;
506             else
507                 high = 4 * low;
508         }
509 
510         if (low.isNull)
511             low = high / 4;
512 
513         if (high < low)
514             low = high;
515 
516         this.writeBufferLimits.high = high;
517         this.writeBufferLimits.low = low;
518     }
519 
520     void write(const(void)[] data)
521     in
522     {
523         assert(this.protocol !is null);
524         assert(!this.writingPaused);
525     }
526     body
527     {
528         enforce(this.connection !is null, "Disconnected transport");
529 
530         if (this.writeBuffer.empty)
531         {
532             auto sent = this.connection.send(cast(const ubyte[]) data);
533             if (sent < data.length)
534                 this.writeBuffer = data[sent .. $].dup;
535         }
536         else
537         {
538             this.writeBuffer ~= data;
539         }
540 
541         if (this.writeBuffer.length >= this.writeBufferLimits.high)
542         {
543             this.protocol.pauseWriting;
544             this.writingPaused = true;
545         }
546     }
547 
548     void writeEof()
549     {
550         close;
551     }
552 }
553 
554 private final class LibasyncDatagramTransport : AbstractBaseTransport,  DatagramTransport
555 {
556     private EventLoop eventLoop;
557     private Socket _socket;
558     private AsyncUDPSocket udpSocket;
559     private Waiter waiter = null;
560     private DatagramProtocol datagramProtocol = null;
561 
562     this(EventLoop eventLoop, Socket socket, AsyncUDPSocket udpSocket)
563     in
564     {
565         assert(eventLoop !is null);
566         assert(socket !is null);
567         assert(udpSocket !is null);
568         assert(socket.handle == udpSocket.socket);
569     }
570     body
571     {
572         this.eventLoop = eventLoop;
573         this._socket = socket;
574         this.udpSocket = udpSocket;
575     }
576 
577     @property
578     Socket socket()
579     {
580         return this._socket;
581     }
582 
583     void setProtocol(DatagramProtocol datagramProtocol, Waiter waiter = null)
584     in
585     {
586         assert(datagramProtocol !is null);
587         assert(this.datagramProtocol is null);
588         assert(this.waiter is null);
589     }
590     body
591     {
592         this.datagramProtocol = datagramProtocol;
593         this.waiter = waiter;
594     }
595 
596     private void onRead()
597     in
598     {
599         assert(this.datagramProtocol !is null);
600     }
601     body
602     {
603         ubyte[] readBuffer = new ubyte[1501];
604         NetworkAddress networkAddress;
605 
606         auto length = this.udpSocket.recvFrom(readBuffer, networkAddress);
607 
608         enforce(length < readBuffer.length,
609             "Unexpected UDP package size > 1500 bytes");
610 
611         Address tmp;
612         Address address = new UnknownAddressReference(
613             cast(typeof(tmp.name)) networkAddress.sockAddr,
614             cast(uint) networkAddress.sockAddrLen);
615 
616         this.eventLoop.callSoon(&this.datagramProtocol.datagramReceived,
617             readBuffer[0 .. length], address);
618     }
619 
620     private void onWrite()
621     in
622     {
623         assert(this.datagramProtocol !is null);
624     }
625     body
626     {
627     }
628 
629     void handleUDPEvent(UDPEvent event)
630     {
631         final switch (event)
632         {
633         case UDPEvent.READ:
634             onRead();
635             break;
636         case UDPEvent.WRITE:
637             onWrite();
638             break;
639         case UDPEvent.ERROR:
640             assert(0, udpSocket.error());
641         }
642     }
643 
644     /**
645      * DatagramTransport interface
646      */
647 
648     override Socket getExtraInfoSocket()
649     {
650         return socket;
651     }
652 
653     void close()
654     {
655         this.eventLoop.callSoon(&this.udpSocket.kill);
656     }
657 
658     void sendTo(const(void)[] data, Address address = null)
659     in
660     {
661         assert(this.datagramProtocol !is null);
662     }
663     body
664     {
665         enforce(address !is null, "default remote not supported yet");
666 
667         NetworkAddress networkAddress;
668 
669         (cast(byte*) networkAddress.sockAddr)[0 .. address.nameLen] =
670             (cast(byte*) address.name)[0 .. address.nameLen];
671 
672         this.udpSocket.sendTo(cast(const ubyte[]) data, networkAddress);
673     }
674 
675     void abort()
676     {
677         this.eventLoop.callSoon(&this.udpSocket.kill);
678     }
679 }