1 /**
2  * High-level stream API.
3  */
4 module asynchronous.streams;
5 
6 import std.algorithm : findSplit;
7 import std.array : Appender, empty;
8 import std.exception : enforce, enforceEx;
9 import std.socket : AddressFamily, AddressInfoFlags, ProtocolType, Socket,
10     SocketOSException;
11 import std.typecons : tuple;
12 import asynchronous.events;
13 import asynchronous.futures;
14 import asynchronous.tasks;
15 import asynchronous.transports;
16 import asynchronous.types;
17 
18 enum DEFAULT_LIMIT = 1 << 16; // 2 ** 16
19 
20 /**
21  * Incomplete read error.
22  */
23 class IncompleteReadError : Exception
24 {
25     /**
26      * Read bytes string before the end of stream was reached.
27      */
28     const(void)[] partial;
29 
30     /**
31      * Total number of expected bytes.
32      */
33     const size_t expected;
34 
35     this(const(void)[] partial, size_t expected, string file = __FILE__,
36         size_t line = __LINE__, Throwable next = null) @safe pure
37     {
38         import std.format : format;
39 
40         this.partial = partial;
41         this.expected = expected;
42 
43         super("%s bytes read on a total of %s expected bytes"
44                 .format(partial.length, expected), file, line, next);
45     }
46 }
47 
48 /**
49  * A wrapper for $(D_PSYMBOL createConnection()) returning a (reader, writer)
50  * pair.
51  *
52  * The reader returned is a $(D_PSYMBOL StreamReader) instance; the writer is a
53  * $(D_PSYMBOL StreamWriter) instance.
54  *
55  * The arguments are all the usual arguments to $(D_PSYMBOL createConnection())
56  * except protocolFactory; most common are host and port, with various optional
57  * arguments following.
58  *
59  * Additional arguments are $(D_PSYMBOL eventLoop) (to set the event loop
60  * instance to use) and $(D_PSYMBOL limit) (to set the buffer limit passed to
61  * the $(D_PSYMBOL StreamReader)).
62  *
63  * (If you want to customize the $(D_PSYMBOL StreamReader) and/or
64  * $(D_PSYMBOL StreamReaderProtocol) classes, just copy the code -- there's
65  * really nothing special here except some convenience.)
66  */
67 @Coroutine
68 auto openConnection(EventLoop eventLoop, in char[] host = null,
69     in char[] service = null, size_t limit = DEFAULT_LIMIT,
70     SslContext sslContext = null,
71     AddressFamily addressFamily = UNSPECIFIED!AddressFamily,
72     ProtocolType protocolType = UNSPECIFIED!ProtocolType,
73     AddressInfoFlags addressInfoFlags = UNSPECIFIED!AddressInfoFlags,
74     Socket socket = null, in char[] localHost = null,
75     in char[] localService = null, in char[] serverHostname = null)
76 {
77     if (eventLoop is null)
78         eventLoop = getEventLoop;
79 
80     auto streamReader = new StreamReader(eventLoop, limit);
81     auto connection = eventLoop.createConnection(
82         () => new StreamReaderProtocol(eventLoop, streamReader), host, service,
83         sslContext, addressFamily, protocolType, addressInfoFlags, socket,
84         localHost, localService, serverHostname);
85     auto streamWriter = new StreamWriter(eventLoop, connection.transport,
86         connection.protocol, streamReader);
87 
88     return tuple!("reader", "writer")(streamReader, streamWriter);
89 }
90 
91 alias ClientConnectedCallback = void delegate(StreamReader, StreamWriter);
92 /**
93  * Start a socket server, call back for each client connected.
94  *
95  * The first parameter, $(D_PSYMBOL clientConnectedCallback), takes two
96  * parameters: $(D_PSYMBOL clientReader), $(D_PSYMBOL clientWriter). $(D_PSYMBOL
97  * clientReader) is a $(D_PSYMBOL StreamReader) object, while $(D_PSYMBOL
98  * clientWriter) is a $(D_PSYMBOL StreamWriter) object.  This parameter is a
99  * coroutine, that will be automatically converted into a $(D_PSYMBOL Task).
100  *
101  * The rest of the arguments are all the usual arguments to $(D_PSYMBOL
102  * eventLoop.createServer()) except $(D_PSYMBOL protocolFactory). The return
103  * value is the same as $(D_PSYMBOL eventLoop.create_server()).
104  *
105  * Additional optional keyword arguments are loop (to set the event loop
106  * instance to use) and limit (to set the buffer limit passed to the
107  * $(D_PSYMBOL StreamReader)).
108  *
109  * The return value is the same as $(D_PSYMBOL loop.createServer()), i.e. a
110  * $(D_PSYMBOL Server) object which can be used to stop the service.
111  */
112 @Coroutine
113 Server startServer(EventLoop eventLoop,
114     ClientConnectedCallback clientConnectedCallback, in char[] host = null,
115     in char[] service = null, size_t limit = DEFAULT_LIMIT,
116     AddressFamily addressFamily = UNSPECIFIED!AddressFamily,
117     AddressInfoFlags addressInfoFlags = AddressInfoFlags.PASSIVE,
118     Socket socket = null, int backlog = 100, SslContext sslContext = null,
119     bool reuseAddress = true)
120 {
121     if (eventLoop is null)
122         eventLoop = getEventLoop;
123 
124     Protocol protocolFactory()
125     {
126         auto reader = new StreamReader(eventLoop, limit);
127         return new StreamReaderProtocol(eventLoop, reader,
128             clientConnectedCallback);
129     }
130 
131     return eventLoop.createServer(&protocolFactory, host, service,
132         addressFamily, addressInfoFlags, socket, backlog, sslContext,
133         reuseAddress);
134 }
135 
136 /**
137  * A wrapper for $(D_PSYMBOL createUnixConnection()) returning a (reader,
138  * writer) pair.
139  *
140  * See $(D_PSYMBOL openConnection()) for information about return value and
141  * other details.
142  */
143 version (Posix)
144 @Coroutine
145 auto openUnixConnection(EventLoop eventLoop, in char[] path = null,
146     size_t limit = DEFAULT_LIMIT, SslContext sslContext = null,
147     Socket socket = null, in char[] serverHostname = null)
148 {
149     if (eventLoop is null)
150         eventLoop = getEventLoop;
151 
152     auto streamReader = new StreamReader(eventLoop, limit);
153     auto connection = eventLoop.createUnixConnection(
154         () => new StreamReaderProtocol(eventLoop, streamReader), path,
155         sslContext, socket, serverHostname);
156     auto streamWriter = new StreamWriter(eventLoop, connection.transport,
157         connection.protocol, streamReader);
158 
159     return tuple!("reader", "writer")(streamReader, streamWriter);
160 }
161 
162 /**
163  * Start a UNIX Domain Socket server, with a callback for each client connected.
164  *
165  * See $(D_PSYMBOL startServer()) for information about return value and other
166  * details.
167  */
168 version (Posix)
169 @Coroutine
170 Server startUnixServer(EventLoop eventLoop,
171     ClientConnectedCallback clientConnectedCallback, in char[] path = null,
172     size_t limit = DEFAULT_LIMIT, Socket socket = null, int backlog = 100,
173     SslContext sslContext = null)
174 {
175     if (eventLoop is null)
176         eventLoop = getEventLoop;
177 
178     Protocol protocolFactory()
179     {
180         auto reader = new StreamReader(eventLoop, limit);
181         return new StreamReaderProtocol(eventLoop, reader,
182             clientConnectedCallback);
183     }
184 
185     return eventLoop.createUnixServer(&protocolFactory, path, socket, backlog,
186         sslContext);
187 }
188 
189 /**
190  * Reusable flow control logic for $(D_PSYMBOL StreamWriter.drain()).
191  *
192  * This implements the protocol methods $(D_PSYMBOL pauseWriting()), $(D_PSYMBOL
193  * resumeReading()) and $(D_PSYMBOL connectionLost()).  If the subclass
194  * overrides these it must call the super methods.
195  *
196  * $(D_PSYMBOL StreamWriter.drain()) must wait for $(D_PSYMBOL drainHelper())
197  * coroutine.
198  */
199 private abstract class FlowControlProtocol : Protocol
200 {
201     private EventLoop eventLoop;
202     private bool paused = false;
203     private Waiter drainWaiter = null;
204     private bool connectionLost_ = false;
205 
206     this(EventLoop eventLoop)
207     {
208         if (eventLoop is null)
209             this.eventLoop = getEventLoop;
210         else
211             this.eventLoop = eventLoop;
212 
213     }
214 
215     override void pauseWriting()
216     in
217     {
218         assert(!paused);
219     }
220     body
221     {
222         paused = true;
223     }
224 
225     override void resumeWriting()
226     in
227     {
228         assert(paused);
229     }
230     body
231     {
232         paused = false;
233 
234         if (drainWaiter !is null)
235         {
236             auto waiter = drainWaiter;
237 
238             drainWaiter = null;
239             if (!waiter.done)
240                 waiter.setResult;
241         }
242     }
243 
244     override void connectionMade(BaseTransport transport)
245     {
246         throw new NotImplementedException;
247     }
248 
249     override void connectionLost(Exception exception)
250     {
251         connectionLost_ = true;
252 
253         if (!paused || drainWaiter is null || drainWaiter.done)
254             return;
255 
256         auto waiter = drainWaiter;
257 
258         drainWaiter = null;
259         if (exception is null)
260             waiter.setResult;
261         else
262             waiter.setException(exception);
263     }
264 
265     @Coroutine
266     protected void drainHelper()
267     {
268         enforceEx!SocketOSException(!connectionLost_, "Connection lost");
269 
270         if (!paused)
271             return;
272 
273         assert(drainWaiter is null || drainWaiter.cancelled);
274 
275         drainWaiter = new Waiter(eventLoop);
276         eventLoop.waitFor(drainWaiter);
277     }
278 
279     override void dataReceived(const(void)[] data)
280     {
281         throw new NotImplementedException;
282     }
283 
284     override bool eofReceived()
285     {
286         throw new NotImplementedException;
287     }
288 }
289 
290 
291 /**
292  * Helper class to adapt between $(D_PSYMBOL Protocol) and $(D_PSYMBOL
293  * StreamReader).
294  *
295  * (This is a helper class instead of making StreamReader itself a $(D_PSYMBOL
296  * Protocol) subclass, because the StreamReader has other potential uses, and to
297  * prevent the user of the StreamReader to accidentally call inappropriate
298  * methods of the protocol.)
299  */
300 class StreamReaderProtocol : FlowControlProtocol
301 {
302     private StreamReader streamReader;
303     private StreamWriter streamWriter;
304     private ClientConnectedCallback clientConnectedCallback;
305 
306     this(EventLoop eventLoop, StreamReader streamReader,
307         ClientConnectedCallback clientConnectedCallback = null)
308     {
309         super(eventLoop);
310         this.streamReader = streamReader;
311         this.streamWriter = null;
312         this.clientConnectedCallback = clientConnectedCallback;
313     }
314 
315     override void connectionMade(BaseTransport transport)
316     {
317         auto transport1 = cast(Transport) transport;
318 
319         enforce(transport1 !is null);
320         streamReader.setTransport(transport1);
321 
322         if (clientConnectedCallback !is null)
323         {
324             streamWriter = new StreamWriter(eventLoop, transport1, this, 
325                 streamReader);
326             eventLoop.createTask(clientConnectedCallback, streamReader,
327                 streamWriter);
328         }
329     }
330 
331     override void connectionLost(Exception exception)
332     {
333         if (exception is null)
334             streamReader.feedEof;
335         else
336             streamReader.setException(exception);
337         super.connectionLost(exception);
338     }
339 
340     override void dataReceived(const(void)[] data)
341     {
342         streamReader.feedData(data);
343     }
344 
345     override bool eofReceived()
346     {
347         streamReader.feedEof;
348         return true;
349     }
350 }
351 
352 final class StreamReader
353 {
354     private EventLoop eventLoop;
355     private size_t limit;
356     private const(void)[] buffer;
357     private bool eof = false;
358     private Waiter flowWaiter;
359     private Throwable exception_;
360     private ReadTransport transport;
361     private bool paused = false;
362 
363     this(EventLoop eventLoop, size_t limit = DEFAULT_LIMIT)
364     {
365         if (eventLoop is null)
366             this.eventLoop = getEventLoop;
367         else
368             this.eventLoop = eventLoop;
369         // The line length limit is  a security feature;
370         // it also doubles as half the buffer limit.
371         this.limit = limit;
372     }
373 
374     /**
375      * Get the exception.
376      */
377     @property Throwable exception()
378     {
379         return exception_;
380     }
381 
382     /**
383      * Acknowledge the EOF.
384      */
385     void feedEof()
386     {
387         eof = true;
388         wakeupWaiter;
389     }
390 
391     /**
392      * Feed $(D_PSYMBOL data) bytes in the internal buffer. Any operations
393      * waiting for the data will be resumed.
394      */
395     void feedData(const(void)[] data)
396     in
397     {
398         assert(!eof); // feedData after feedEof
399     }
400     body
401     {
402         if (data.empty)
403             return;
404 
405         buffer ~= data;
406         wakeupWaiter;
407 
408         if (transport is null || paused || buffer.length <= 2 * limit)
409             return;
410 
411         try
412         {
413             transport.pauseReading;
414             paused = true;
415         }
416         catch (NotImplementedException)
417         {
418             // The transport can't be paused.
419             // We'll just have to buffer all data.
420             // Forget the transport so we don't keep trying.
421             transport = null;
422         }
423     }
424 
425     /**
426      * Set the exception.
427      */
428     void setException(Throwable exception)
429     {
430         exception_ = exception;
431 
432         if (flowWaiter is null)
433             return;
434 
435         auto waiter = flowWaiter;
436         flowWaiter = null;
437         if (!waiter.cancelled)
438             waiter.setException(exception);
439     }
440 
441     /**
442      * Set the exception.
443      */
444     void setTransport(ReadTransport transport)
445     in
446     {
447         assert(this.transport is null);
448     }
449     body
450     {
451         this.transport = transport;
452     }
453 
454     /**
455      * Wakeup $(D_PSYMBOL read()) or $(D_PSYMBOL readLine()) function waiting
456      * for data or EOF.
457      */
458     private void wakeupWaiter()
459     {
460         if (flowWaiter is null)
461             return;
462 
463         auto waiter = flowWaiter;
464         flowWaiter = null;
465         if (!waiter.cancelled)
466             waiter.setResult;
467     }
468 
469     private void maybeResumeTransport()
470     {
471         if (paused && buffer.length <= limit)
472         {
473             paused = false;
474             transport.resumeReading;
475         }
476     }
477 
478     /**
479      * Wait until $(D_PSYMBOL feedData()) or $(D_PSYMBOL feedEof()) is called.
480      */
481     @Coroutine
482     private void waitForData(string functionName)()
483     {
484         // StreamReader uses a future to link the protocol feed_data() method
485         // to a read coroutine. Running two read coroutines at the same time
486         // would have an unexpected behaviour. It would not possible to know
487         // which coroutine would get the next data.
488         enforce(flowWaiter is null, functionName
489             ~ "() called while another coroutine is already waiting for incoming data");
490 
491         flowWaiter = new Waiter(eventLoop);
492         eventLoop.waitFor(flowWaiter);
493         flowWaiter = null;
494     }
495 
496     /**
497      * Read up to $(D_PSYMBOL n) bytes. If $(D_PSYMBOL n) is not provided, or
498      * set to -1, read until EOF and return all read bytes.
499      *
500      * If the EOF was received and the internal buffer is empty, return an empty
501      * array.
502      */
503     @Coroutine
504     const(void)[] read(ptrdiff_t n = -1)
505     {
506         if (exception !is null)
507             throw exception;
508 
509         if (n == 0)
510             return null;
511 
512         if (n < 0)
513         {
514             // This used to just loop creating a new waiter hoping to
515             // collect everything in this.buffer, but that would
516             // deadlock if the subprocess sends more than this.limit
517             // bytes. So just call read(limit) until EOF.
518             Appender!(const(char)[]) buff;
519 
520             while (true)
521             {
522                 const(char)[] chunk = cast(const(char)[]) read(limit);
523                 if (chunk.empty)
524                     break;
525                 buff ~= chunk;
526             }
527 
528             return buff.data;
529         }
530 
531         if (buffer.empty && !eof)
532             waitForData!"read";
533 
534         const(void)[] data;
535 
536         if (buffer.length <= n)
537         {
538             data = buffer;
539             buffer = null;
540         }
541         else
542         {
543             data = buffer[0 .. n];
544             buffer = buffer[n .. $];
545         }
546 
547         maybeResumeTransport;
548 
549         return data;
550     }
551 
552     /**
553      * Read one line, where “line” is a sequence of bytes ending with '\n'.
554      *
555      * If EOF is received, and '\n' was not found, the method will return the
556      * partial read bytes.
557      *
558      * If the EOF was received and the internal buffer is empty, return an empty
559      * array.
560      */
561     @Coroutine
562     const(char)[] readLine()
563     {
564         if (exception !is null)
565             throw exception;
566 
567         Appender!(const(char)[]) line;
568         bool notEnough = true;
569 
570         while (notEnough)
571         {
572             while (!buffer.empty && notEnough)
573             {
574                 const(char)[] buffer1 = cast(const(char)[]) buffer;
575                 auto r = buffer1.findSplit(['\n']);
576                 line ~= r[0];
577                 buffer = r[2];
578 
579                 if (!r[1].empty)
580                     notEnough = false;
581 
582                 if (line.data.length > limit)
583                 {
584                     maybeResumeTransport;
585                     throw new Exception("Line is too long");
586                 }
587             }
588 
589             if (eof)
590                 break;
591 
592             if (notEnough)
593                 waitForData!"readLine";
594         }
595 
596         maybeResumeTransport;
597         return line.data;
598     }
599 
600     /**
601      * Read exactly $(D_PSYMBOL n) bytes. Raise an $(D_PSYMBOL
602      * IncompleteReadError) if the end of the stream is reached before
603      * $(D_PSYMBOL n) can be read, the $(D_PSYMBOL IncompleteReadError.partial)
604      * attribute of the exception contains the partial read bytes.
605      */
606     @Coroutine
607     const(void)[] readExactly(size_t n)
608     {
609         if (exception !is null)
610             throw exception;
611 
612         // There used to be "optimized" code here.  It created its own
613         // Future and waited until self._buffer had at least the n
614         // bytes, then called read(n).  Unfortunately, this could pause
615         // the transport if the argument was larger than the pause
616         // limit (which is twice self._limit).  So now we just read()
617         // into a local buffer.
618 
619         Appender!(const(char)[]) data;
620 
621         while (n > 0)
622         {
623             const(char)[] block = cast(const(char)[]) read(n);
624 
625             if (block.empty)
626                 throw new IncompleteReadError(data.data, data.data.length + n);
627 
628             data ~= block;
629             n -= block.length;
630         }
631 
632         return data.data;
633     }
634 
635     /**
636      * Return $(D_KEYWORD true) if the buffer is empty and $(D_PSYMBOL feedEof())
637      * was called.
638      */
639     bool atEof() const
640     {
641         return eof && buffer.empty;
642     }
643 }
644 
645 /**
646  * Wraps a Transport.
647  *
648  * This exposes $(D_PSYMBOL write()), $(D_PSYMBOL writeEof()), $(D_PSYMBOL
649  * canWriteEof()), $(D_PSYMBOL getExtraInfo()) and $(D_PSYMBOL close()). It adds
650  * $(D_PSYMBOL drain()) coroutine for waiting for flow control.  It also adds a
651  * $(D_PSYMBOL transport) property which references the Transport directly.
652  *
653  * This class is not thread safe.
654  */
655 final class StreamWriter
656 {
657     private EventLoop eventLoop;
658     private WriteTransport transport_;
659     private StreamReaderProtocol protocol;
660     private StreamReader streamReader;
661 
662     this(EventLoop eventLoop, Transport transport, Protocol protocol,
663         StreamReader streamReader)
664     {
665         this.eventLoop = eventLoop;
666         this.transport_ = transport;
667         this.protocol = cast(StreamReaderProtocol) protocol;
668         enforce(this.protocol !is null, "StreamReaderProtocol is needed");
669         this.streamReader = streamReader;
670     }
671 
672     override string toString() const
673     {
674         import std.format : format;
675 
676         return "%s(transport = %s, reader = %s)".format(typeid(this),
677             cast(WriteTransport) transport_, cast(StreamReader) streamReader);
678     }
679 
680     /**
681      * Transport.
682      */
683     @property WriteTransport transport()
684     {
685         return transport_;
686     }
687 
688     /**
689      * Return $(D_KEYWORD true) if the transport supports $(D_PSYMBOL
690      * writeEof()), $(D_KEYWORD false) if not. See $(D_PSYMBOL
691      * WriteTransport.canWriteEof()).
692      */
693     bool canWriteEof()
694     {
695         return transport_.canWriteEof;
696     }
697 
698     /**
699      * Close the transport: see $(D_PSYMBOL BaseTransport.close()).
700      */
701     void close()
702     {
703         return transport_.close;
704     }
705 
706     /**
707      * Let the write buffer of the underlying transport a chance to be flushed.
708      *
709      * The intended use is to write:
710      *
711      * w.write(data)
712      * w.drain()
713      *
714      * When the size of the transport buffer reaches the high-water limit (the
715      * protocol is paused), block until the size of the buffer is drained down
716      * to the low-water limit and the protocol is resumed. When there is nothing
717      * to wait for, the continues immediately.
718      *
719      * Calling $(D_PSYMBOL drain()) gives the opportunity for the loop to
720      * schedule the write operation and flush the buffer. It should especially
721      * be used when a possibly large amount of data is written to the transport,
722      * and the coroutine does not process the event loop between calls to
723      * $(D_PSYMBOL write()).
724      */
725     @Coroutine
726     void drain()
727     {
728         if (streamReader !is null)
729         {
730             auto exception = streamReader.exception;
731             if (exception !is null)
732                 throw exception;
733         }
734         protocol.drainHelper;
735     }
736 
737     /**
738      * Return optional transport information: see $(D_PSYMBOL
739      * BaseTransport.getExtraInfo()).
740      */
741     auto getExtraInfo(string name)()
742     {
743         return transport_.getExtraInfo!name;
744     }
745 
746     /**
747      * Write some data bytes to the transport: see $(D_PSYMBOL
748      * WriteTransport.write()).
749      */
750     void write(const(void)[] data)
751     {
752         transport_.write(data);
753     }
754 
755     /**
756      * Close the write end of the transport after flushing buffered data: see
757      * $(D_PSYMBOL WriteTransport.write_eof()).
758      */
759     void writeEof()
760     {
761         transport_.writeEof;
762     }
763 }