1 /**
2  * High-level stream API.
3  *
4  * Copyright: © 2015-2016 Dragos Carp
5  * License: Boost Software License - Version 1.0
6  * Authors: Dragos Carp
7  */
8 module asynchronous.streams;
9 
10 import std.algorithm : findSplit;
11 import std.array : Appender, empty;
12 import std.exception : enforce, enforceEx;
13 import std.socket : AddressFamily, AddressInfoFlags, ProtocolType, Socket,
14     SocketOSException;
15 import std.typecons : tuple;
16 import asynchronous.events;
17 import asynchronous.futures;
18 import asynchronous.tasks;
19 import asynchronous.transports;
20 import asynchronous.types;
21 
22 enum DEFAULT_LIMIT = 1 << 16; // 2 ** 16
23 
24 /**
25  * Incomplete read error.
26  */
27 class IncompleteReadError : Exception
28 {
29     /**
30      * Read bytes string before the end of stream was reached.
31      */
32     const(void)[] partial;
33 
34     /**
35      * Total number of expected bytes.
36      */
37     const size_t expected;
38 
39     this(const(void)[] partial, size_t expected, string file = __FILE__,
40         size_t line = __LINE__, Throwable next = null) @safe pure
41     {
42         import std.format : format;
43 
44         this.partial = partial;
45         this.expected = expected;
46 
47         super("%s bytes read on a total of %s expected bytes"
48                 .format(partial.length, expected), file, line, next);
49     }
50 }
51 
52 /**
53  * A wrapper for $(D_PSYMBOL createConnection()) returning a (reader, writer)
54  * pair.
55  *
56  * The reader returned is a $(D_PSYMBOL StreamReader) instance; the writer is a
57  * $(D_PSYMBOL StreamWriter) instance.
58  *
59  * The arguments are all the usual arguments to $(D_PSYMBOL createConnection())
60  * except protocolFactory; most common are host and port, with various optional
61  * arguments following.
62  *
63  * Additional arguments are $(D_PSYMBOL eventLoop) (to set the event loop
64  * instance to use) and $(D_PSYMBOL limit) (to set the buffer limit passed to
65  * the $(D_PSYMBOL StreamReader)).
66  *
67  * (If you want to customize the $(D_PSYMBOL StreamReader) and/or
68  * $(D_PSYMBOL StreamReaderProtocol) classes, just copy the code -- there's
69  * really nothing special here except some convenience.)
70  */
71 @Coroutine
72 auto openConnection(EventLoop eventLoop, in char[] host = null,
73     in char[] service = null, size_t limit = DEFAULT_LIMIT,
74     SslContext sslContext = null,
75     AddressFamily addressFamily = UNSPECIFIED!AddressFamily,
76     ProtocolType protocolType = UNSPECIFIED!ProtocolType,
77     AddressInfoFlags addressInfoFlags = UNSPECIFIED!AddressInfoFlags,
78     Socket socket = null, in char[] localHost = null,
79     in char[] localService = null, in char[] serverHostname = null)
80 {
81     if (eventLoop is null)
82         eventLoop = getEventLoop;
83 
84     auto streamReader = new StreamReader(eventLoop, limit);
85     auto connection = eventLoop.createConnection(
86         () => new StreamReaderProtocol(eventLoop, streamReader), host, service,
87         sslContext, addressFamily, protocolType, addressInfoFlags, socket,
88         localHost, localService, serverHostname);
89     auto streamWriter = new StreamWriter(eventLoop, connection.transport,
90         connection.protocol, streamReader);
91 
92     return tuple!("reader", "writer")(streamReader, streamWriter);
93 }
94 
95 alias ClientConnectedCallback = void delegate(StreamReader, StreamWriter);
96 /**
97  * Start a socket server, call back for each client connected.
98  *
99  * The first parameter, $(D_PSYMBOL clientConnectedCallback), takes two
100  * parameters: $(D_PSYMBOL clientReader), $(D_PSYMBOL clientWriter). $(D_PSYMBOL
101  * clientReader) is a $(D_PSYMBOL StreamReader) object, while $(D_PSYMBOL
102  * clientWriter) is a $(D_PSYMBOL StreamWriter) object.  This parameter is a
103  * coroutine, that will be automatically converted into a $(D_PSYMBOL Task).
104  *
105  * The rest of the arguments are all the usual arguments to $(D_PSYMBOL
106  * eventLoop.createServer()) except $(D_PSYMBOL protocolFactory). The return
107  * value is the same as $(D_PSYMBOL eventLoop.createServer()).
108  *
109  * Additional optional keyword arguments are loop (to set the event loop
110  * instance to use) and limit (to set the buffer limit passed to the
111  * $(D_PSYMBOL StreamReader)).
112  *
113  * The return value is the same as $(D_PSYMBOL loop.createServer()), i.e. a
114  * $(D_PSYMBOL Server) object which can be used to stop the service.
115  */
116 @Coroutine
117 Server startServer(EventLoop eventLoop,
118     ClientConnectedCallback clientConnectedCallback, in char[] host = null,
119     in char[] service = null, size_t limit = DEFAULT_LIMIT,
120     AddressFamily addressFamily = UNSPECIFIED!AddressFamily,
121     AddressInfoFlags addressInfoFlags = AddressInfoFlags.PASSIVE,
122     Socket socket = null, int backlog = 100, SslContext sslContext = null,
123     bool reuseAddress = true)
124 {
125     if (eventLoop is null)
126         eventLoop = getEventLoop;
127 
128     Protocol protocolFactory()
129     {
130         auto reader = new StreamReader(eventLoop, limit);
131         return new StreamReaderProtocol(eventLoop, reader,
132             clientConnectedCallback);
133     }
134 
135     return eventLoop.createServer(&protocolFactory, host, service,
136         addressFamily, addressInfoFlags, socket, backlog, sslContext,
137         reuseAddress);
138 }
139 
140 /**
141  * A wrapper for $(D_PSYMBOL createUnixConnection()) returning a (reader,
142  * writer) pair.
143  *
144  * See $(D_PSYMBOL openConnection()) for information about return value and
145  * other details.
146  */
147 version (Posix)
148 @Coroutine
149 auto openUnixConnection(EventLoop eventLoop, in char[] path = null,
150     size_t limit = DEFAULT_LIMIT, SslContext sslContext = null,
151     Socket socket = null, in char[] serverHostname = null)
152 {
153     if (eventLoop is null)
154         eventLoop = getEventLoop;
155 
156     auto streamReader = new StreamReader(eventLoop, limit);
157     auto connection = eventLoop.createUnixConnection(
158         () => new StreamReaderProtocol(eventLoop, streamReader), path,
159         sslContext, socket, serverHostname);
160     auto streamWriter = new StreamWriter(eventLoop, connection.transport,
161         connection.protocol, streamReader);
162 
163     return tuple!("reader", "writer")(streamReader, streamWriter);
164 }
165 
166 /**
167  * Start a UNIX Domain Socket server, with a callback for each client connected.
168  *
169  * See $(D_PSYMBOL startServer()) for information about return value and other
170  * details.
171  */
172 version (Posix)
173 @Coroutine
174 Server startUnixServer(EventLoop eventLoop,
175     ClientConnectedCallback clientConnectedCallback, in char[] path = null,
176     size_t limit = DEFAULT_LIMIT, Socket socket = null, int backlog = 100,
177     SslContext sslContext = null)
178 {
179     if (eventLoop is null)
180         eventLoop = getEventLoop;
181 
182     Protocol protocolFactory()
183     {
184         auto reader = new StreamReader(eventLoop, limit);
185         return new StreamReaderProtocol(eventLoop, reader,
186             clientConnectedCallback);
187     }
188 
189     return eventLoop.createUnixServer(&protocolFactory, path, socket, backlog,
190         sslContext);
191 }
192 
193 /**
194  * Reusable flow control logic for $(D_PSYMBOL StreamWriter.drain()).
195  *
196  * This implements the protocol methods $(D_PSYMBOL pauseWriting()), $(D_PSYMBOL
197  * resumeReading()) and $(D_PSYMBOL connectionLost()).  If the subclass
198  * overrides these it must call the super methods.
199  *
200  * $(D_PSYMBOL StreamWriter.drain()) must wait for $(D_PSYMBOL drainHelper())
201  * coroutine.
202  */
203 private abstract class FlowControlProtocol : Protocol
204 {
205     private EventLoop eventLoop;
206     private bool paused = false;
207     private Waiter drainWaiter = null;
208     private bool connectionLost_ = false;
209 
210     this(EventLoop eventLoop)
211     {
212         if (eventLoop is null)
213             this.eventLoop = getEventLoop;
214         else
215             this.eventLoop = eventLoop;
216 
217     }
218 
219     override void pauseWriting()
220     in
221     {
222         assert(!paused);
223     }
224     body
225     {
226         paused = true;
227     }
228 
229     override void resumeWriting()
230     in
231     {
232         assert(paused);
233     }
234     body
235     {
236         paused = false;
237 
238         if (drainWaiter !is null)
239         {
240             auto waiter = drainWaiter;
241 
242             drainWaiter = null;
243             if (!waiter.done)
244                 waiter.setResult;
245         }
246     }
247 
248     override void connectionMade(BaseTransport transport)
249     {
250         throw new NotImplementedException;
251     }
252 
253     override void connectionLost(Exception exception)
254     {
255         connectionLost_ = true;
256 
257         if (!paused || drainWaiter is null || drainWaiter.done)
258             return;
259 
260         auto waiter = drainWaiter;
261 
262         drainWaiter = null;
263         if (exception is null)
264             waiter.setResult;
265         else
266             waiter.setException(exception);
267     }
268 
269     @Coroutine
270     protected void drainHelper()
271     {
272         enforceEx!SocketOSException(!connectionLost_, "Connection lost");
273 
274         if (!paused)
275             return;
276 
277         assert(drainWaiter is null || drainWaiter.cancelled);
278 
279         drainWaiter = new Waiter(eventLoop);
280         eventLoop.waitFor(drainWaiter);
281     }
282 
283     override void dataReceived(const(void)[] data)
284     {
285         throw new NotImplementedException;
286     }
287 
288     override bool eofReceived()
289     {
290         throw new NotImplementedException;
291     }
292 }
293 
294 
295 /**
296  * Helper class to adapt between $(D_PSYMBOL Protocol) and $(D_PSYMBOL
297  * StreamReader).
298  *
299  * (This is a helper class instead of making StreamReader itself a $(D_PSYMBOL
300  * Protocol) subclass, because the StreamReader has other potential uses, and to
301  * prevent the user of the StreamReader to accidentally call inappropriate
302  * methods of the protocol.)
303  */
304 class StreamReaderProtocol : FlowControlProtocol
305 {
306     private StreamReader streamReader;
307     private StreamWriter streamWriter;
308     private ClientConnectedCallback clientConnectedCallback;
309 
310     this(EventLoop eventLoop, StreamReader streamReader,
311         ClientConnectedCallback clientConnectedCallback = null)
312     {
313         super(eventLoop);
314         this.streamReader = streamReader;
315         this.streamWriter = null;
316         this.clientConnectedCallback = clientConnectedCallback;
317     }
318 
319     override void connectionMade(BaseTransport transport)
320     {
321         auto transport1 = cast(Transport) transport;
322 
323         enforce(transport1 !is null);
324         streamReader.setTransport(transport1);
325 
326         if (clientConnectedCallback !is null)
327         {
328             streamWriter = new StreamWriter(eventLoop, transport1, this,
329                 streamReader);
330             eventLoop.createTask(clientConnectedCallback, streamReader,
331                 streamWriter);
332         }
333     }
334 
335     override void connectionLost(Exception exception)
336     {
337         if (exception is null)
338             streamReader.feedEof;
339         else
340             streamReader.setException(exception);
341         super.connectionLost(exception);
342     }
343 
344     override void dataReceived(const(void)[] data)
345     {
346         streamReader.feedData(data);
347     }
348 
349     override bool eofReceived()
350     {
351         streamReader.feedEof;
352         return true;
353     }
354 }
355 
356 final class StreamReader
357 {
358     private EventLoop eventLoop;
359     private size_t limit;
360     private const(void)[] buffer;
361     private bool eof = false;
362     private Waiter flowWaiter;
363     private Throwable exception_;
364     private ReadTransport transport;
365     private bool paused = false;
366 
367     this(EventLoop eventLoop, size_t limit = DEFAULT_LIMIT)
368     {
369         if (eventLoop is null)
370             this.eventLoop = getEventLoop;
371         else
372             this.eventLoop = eventLoop;
373         // The line length limit is a security feature;
374         // it also doubles as half the buffer limit.
375         this.limit = limit;
376     }
377 
378     /**
379      * Get the exception.
380      */
381     @property Throwable exception()
382     {
383         return exception_;
384     }
385 
386     /**
387      * Acknowledge the EOF.
388      */
389     void feedEof()
390     {
391         eof = true;
392         wakeupWaiter;
393     }
394 
395     /**
396      * Feed $(D_PSYMBOL data) bytes in the internal buffer. Any operations
397      * waiting for the data will be resumed.
398      */
399     void feedData(const(void)[] data)
400     in
401     {
402         assert(!eof); // feedData after feedEof
403     }
404     body
405     {
406         if (data.empty)
407             return;
408 
409         buffer ~= data;
410         wakeupWaiter;
411 
412         if (transport is null || paused || buffer.length <= 2 * limit)
413             return;
414 
415         try
416         {
417             transport.pauseReading;
418             paused = true;
419         }
420         catch (NotImplementedException)
421         {
422             // The transport can't be paused.
423             // We'll just have to buffer all data.
424             // Forget the transport so we don't keep trying.
425             transport = null;
426         }
427     }
428 
429     /**
430      * Set the exception.
431      */
432     void setException(Throwable exception)
433     {
434         exception_ = exception;
435 
436         if (flowWaiter is null)
437             return;
438 
439         auto waiter = flowWaiter;
440         flowWaiter = null;
441         if (!waiter.cancelled)
442             waiter.setException(exception);
443     }
444 
445     /**
446      * Set the transport.
447      */
448     void setTransport(ReadTransport transport)
449     in
450     {
451         assert(this.transport is null);
452     }
453     body
454     {
455         this.transport = transport;
456     }
457 
458     /**
459      * Wakeup $(D_PSYMBOL read()) or $(D_PSYMBOL readLine()) function waiting
460      * for data or EOF.
461      */
462     private void wakeupWaiter()
463     {
464         if (flowWaiter is null)
465             return;
466 
467         auto waiter = flowWaiter;
468         flowWaiter = null;
469         if (!waiter.cancelled)
470             waiter.setResult;
471     }
472 
473     private void maybeResumeTransport()
474     {
475         if (paused && buffer.length <= limit)
476         {
477             paused = false;
478             transport.resumeReading;
479         }
480     }
481 
482     /**
483      * Wait until $(D_PSYMBOL feedData()) or $(D_PSYMBOL feedEof()) is called.
484      */
485     @Coroutine
486     private void waitForData(string functionName)()
487     {
488         // StreamReader uses a future to link the protocol feed_data() method
489         // to a read coroutine. Running two read coroutines at the same time
490         // would have an unexpected behavior. It would not be possible to know
491         // which coroutine would get the next data.
492         enforce(flowWaiter is null, functionName
493             ~ "() called while another coroutine is already waiting for incoming data");
494 
495         flowWaiter = new Waiter(eventLoop);
496         eventLoop.waitFor(flowWaiter);
497         flowWaiter = null;
498     }
499 
500     /**
501      * Read up to $(D_PSYMBOL n) bytes. If $(D_PSYMBOL n) is not provided, or
502      * set to -1, read until EOF and return all read bytes.
503      *
504      * If the EOF was received and the internal buffer is empty, return an empty
505      * array.
506      */
507     @Coroutine
508     const(void)[] read(ptrdiff_t n = -1)
509     {
510         if (exception !is null)
511             throw exception;
512 
513         if (n == 0)
514             return null;
515 
516         if (n < 0)
517         {
518             // This used to just loop creating a new waiter hoping to
519             // collect everything in this.buffer, but that would
520             // deadlock if the subprocess sends more than this.limit
521             // bytes. So just call read(limit) until EOF.
522             Appender!(const(char)[]) buff;
523 
524             while (true)
525             {
526                 const(char)[] chunk = cast(const(char)[]) read(limit);
527                 if (chunk.empty)
528                     break;
529                 buff ~= chunk;
530             }
531 
532             return buff.data;
533         }
534 
535         if (buffer.empty && !eof)
536             waitForData!"read";
537 
538         const(void)[] data;
539 
540         if (buffer.length <= n)
541         {
542             data = buffer;
543             buffer = null;
544         }
545         else
546         {
547             data = buffer[0 .. n];
548             buffer = buffer[n .. $];
549         }
550 
551         maybeResumeTransport;
552 
553         return data;
554     }
555 
556     /**
557      * Read one line, where “line” is a sequence of bytes ending with '\n'.
558      *
559      * If EOF is received, and '\n' was not found, the method will return the
560      * partial read bytes.
561      *
562      * If the EOF was received and the internal buffer is empty, return an empty
563      * array.
564      */
565     @Coroutine
566     const(char)[] readLine()
567     {
568         if (exception !is null)
569             throw exception;
570 
571         Appender!(const(char)[]) line;
572         bool notEnough = true;
573 
574         while (notEnough)
575         {
576             while (!buffer.empty && notEnough)
577             {
578                 const(char)[] buffer1 = cast(const(char)[]) buffer;
579                 auto r = buffer1.findSplit(['\n']);
580                 line ~= r[0];
581                 buffer = r[2];
582 
583                 if (!r[1].empty)
584                     notEnough = false;
585 
586                 if (line.data.length > limit)
587                 {
588                     maybeResumeTransport;
589                     throw new Exception("Line is too long");
590                 }
591             }
592 
593             if (eof)
594                 break;
595 
596             if (notEnough)
597                 waitForData!"readLine";
598         }
599 
600         maybeResumeTransport;
601         return line.data;
602     }
603 
604     /**
605      * Read exactly $(D_PSYMBOL n) bytes. Raise an $(D_PSYMBOL
606      * IncompleteReadError) if the end of the stream is reached before
607      * $(D_PSYMBOL n) bytes can be read, the $(D_PSYMBOL
608      * IncompleteReadError.partial) attribute of the exception contains the
609      * partial read bytes.
610      */
611     @Coroutine
612     const(void)[] readExactly(size_t n)
613     {
614         if (exception !is null)
615             throw exception;
616 
617         // There used to be "optimized" code here.  It created its own
618         // Future and waited until self._buffer had at least the n
619         // bytes, then called read(n).  Unfortunately, this could pause
620         // the transport if the argument was larger than the pause
621         // limit (which is twice self._limit).  So now we just read()
622         // into a local buffer.
623 
624         Appender!(const(char)[]) data;
625 
626         while (n > 0)
627         {
628             const(char)[] block = cast(const(char)[]) read(n);
629 
630             if (block.empty)
631                 throw new IncompleteReadError(data.data, data.data.length + n);
632 
633             data ~= block;
634             n -= block.length;
635         }
636 
637         return data.data;
638     }
639 
640     /**
641      * Return $(D_KEYWORD true) if the buffer is empty and $(D_PSYMBOL feedEof())
642      * was called.
643      */
644     bool atEof() const
645     {
646         return eof && buffer.empty;
647     }
648 }
649 
650 /**
651  * Wraps a Transport.
652  *
653  * This exposes $(D_PSYMBOL write()), $(D_PSYMBOL writeEof()), $(D_PSYMBOL
654  * canWriteEof()), $(D_PSYMBOL getExtraInfo()) and $(D_PSYMBOL close()). It adds
655  * $(D_PSYMBOL drain()) coroutine for waiting for flow control.  It also adds a
656  * $(D_PSYMBOL transport) property which references the Transport directly.
657  *
658  * This class is not thread safe.
659  */
660 final class StreamWriter
661 {
662     private EventLoop eventLoop;
663     private WriteTransport transport_;
664     private StreamReaderProtocol protocol;
665     private StreamReader streamReader;
666 
667     this(EventLoop eventLoop, Transport transport, Protocol protocol,
668         StreamReader streamReader)
669     {
670         this.eventLoop = eventLoop;
671         this.transport_ = transport;
672         this.protocol = cast(StreamReaderProtocol) protocol;
673         enforce(this.protocol !is null, "StreamReaderProtocol is needed");
674         this.streamReader = streamReader;
675     }
676 
677     override string toString() const
678     {
679         import std.format : format;
680 
681         return "%s(transport = %s, reader = %s)".format(typeid(this),
682             cast(WriteTransport) transport_, cast(StreamReader) streamReader);
683     }
684 
685     /**
686      * Transport.
687      */
688     @property WriteTransport transport()
689     {
690         return transport_;
691     }
692 
693     /**
694      * Return $(D_KEYWORD true) if the transport supports $(D_PSYMBOL
695      * writeEof()), $(D_KEYWORD false) if not. See $(D_PSYMBOL
696      * WriteTransport.canWriteEof()).
697      */
698     bool canWriteEof()
699     {
700         return transport_.canWriteEof;
701     }
702 
703     /**
704      * Close the transport: see $(D_PSYMBOL BaseTransport.close()).
705      */
706     void close()
707     {
708         return transport_.close;
709     }
710 
711     /**
712      * Give the write buffer of the underlying transport a chance to be flushed.
713      *
714      * The intended use is to write:
715      *
716      * w.write(data)
717      * w.drain()
718      *
719      * When the size of the transport buffer reaches the high-water limit (the
720      * protocol is paused), block until the size of the buffer is drained down
721      * to the low-water limit and the protocol is resumed. When there is nothing
722      * to wait for, continue immediately.
723      *
724      * Calling $(D_PSYMBOL drain()) gives the opportunity for the loop to
725      * schedule the write operation and flush the buffer. It should especially
726      * be used when a possibly large amount of data is written to the transport,
727      * and the coroutine does not process the event loop between calls to
728      * $(D_PSYMBOL write()).
729      */
730     @Coroutine
731     void drain()
732     {
733         if (streamReader !is null)
734         {
735             auto exception = streamReader.exception;
736             if (exception !is null)
737                 throw exception;
738         }
739         protocol.drainHelper;
740     }
741 
742     /**
743      * Return optional transport information: see $(D_PSYMBOL
744      * BaseTransport.getExtraInfo()).
745      */
746     auto getExtraInfo(string name)()
747     {
748         return transport_.getExtraInfo!name;
749     }
750 
751     /**
752      * Write some data bytes to the transport: see $(D_PSYMBOL
753      * WriteTransport.write()).
754      */
755     void write(const(void)[] data)
756     {
757         transport_.write(data);
758     }
759 
760     /**
761      * Close the write end of the transport after flushing buffered data: see
762      * $(D_PSYMBOL WriteTransport.write_eof()).
763      */
764     void writeEof()
765     {
766         transport_.writeEof;
767     }
768 }