1 /**
2  * Support for tasks, coroutines and the scheduler.
3  *
4  * Copyright: © 2015-2016 Dragos Carp
5  * License: Boost Software License - Version 1.0
6  * Authors: Dragos Carp
7  */
8 module asynchronous.tasks;
9 
10 import core.thread;
11 import std.algorithm;
12 import std.array;
13 import std.exception;
14 import std.traits;
15 import std.typecons;
16 import asynchronous.events;
17 import asynchronous.futures;
18 import asynchronous.types;
19 
20 package class TaskRepository
21 {
22     private static TaskHandle[EventLoop] currentTasks;
23 
24     private static TaskHandle[][EventLoop] tasks;
25 
26     static TaskHandle[] allTasks(EventLoop eventLoop)
27     in
28     {
29         assert(eventLoop !is null);
30     }
31     body
32     {
33         return tasks.get(eventLoop, null).dup;
34     }
35 
36     static TaskHandle currentTask(EventLoop eventLoop)
37     in
38     {
39         assert(eventLoop !is null);
40     }
41     body
42     {
43         return currentTasks.get(eventLoop, null);
44     }
45 
46     static void resetCurrentTask(EventLoop eventLoop,
47         TaskHandle taskHandle = null)
48     in
49     {
50         assert(eventLoop !is null);
51     }
52     body
53     {
54         currentTasks[eventLoop] = taskHandle;
55     }
56 
57     package static void registerTask(EventLoop eventLoop, TaskHandle taskHandle)
58     in
59     {
60         assert(eventLoop !is null);
61         assert(taskHandle !is null);
62     }
63     body
64     {
65         tasks[eventLoop] ~= taskHandle;
66     }
67 
68     package static void unregisterTask(EventLoop eventLoop,
69         TaskHandle taskHandle)
70     in
71     {
72         assert(eventLoop !is null);
73         assert(taskHandle !is null);
74     }
75     body
76     {
77         tasks[eventLoop] = tasks[eventLoop].remove!(a => a is taskHandle);
78     }
79 }
80 
81 interface TaskHandle : FutureHandle
82 {
83     protected @property Throwable injectException();
84 
85     protected void run();
86 
87     package void scheduleStep()
88     {
89         scheduleStepImpl(cast(Throwable) null);
90     }
91 
92     package void scheduleStep(Throwable throwable)
93     {
94         scheduleStepImpl(throwable);
95     }
96 
97     // Convenience overload for calling in addDoneCallback.
98     // It ignores the provided argument.
99     package void scheduleStep(FutureHandle future)
100     {
101         scheduleStepImpl(cast(Throwable) null);
102     }
103 
104     protected void scheduleStepImpl(Throwable throwable);
105 
106     /**
107      * Returns: a set of all tasks for an event loop.
108      *
109      * By default all tasks for the current event loop are returned.
110      */
111     static TaskHandle[] allTasks(EventLoop eventLoop = null)
112     {
113         if (eventLoop is null)
114             eventLoop = getEventLoop;
115 
116         return TaskRepository.allTasks(eventLoop);
117     }
118 
119     /**
120      * Returns: the currently running task in an event loop or
121      *          $(D_KEYWORD null).
122      *
123      * By default the current task for the current event loop is returned.
124      *
125      * $(D_KEYWORD null) is returned when called not in the context of a Task.
126      */
127     static TaskHandle currentTask(EventLoop eventLoop = null)
128     {
129         if (eventLoop is null)
130             eventLoop = getEventLoop;
131 
132         return TaskRepository.currentTask(eventLoop);
133     }
134 
135     static TaskHandle getThis()
136     {
137         auto taskFiber = TaskFiber.getThis;
138 
139         return taskFiber is null ? null : taskFiber.taskHandle;
140     }
141 
142     /**
143      * Forces a context switch to occur away from the calling task.
144      *
145      * Params:
146      *  eventLoop = event loop, $(D_PSYMBOL getEventLoop) if not specified.
147      */
148     @Coroutine
149     package static void yield()
150     {
151         Fiber.yield;
152 
153         auto thisTask = getThis;
154         enforce(thisTask !is null,
155             "'TaskHandle.yield' called outside of a task.");
156 
157         if (thisTask.injectException !is null)
158             throw thisTask.injectException;
159     }
160 }
161 
162 private class TaskFiber : Fiber
163 {
164     private TaskHandle task_ = null;
165 
166     this()
167     {
168         super({ }, 16 * 4096UL);
169     }
170 
171     @property TaskHandle taskHandle()
172     {
173         return task_;
174     }
175 
176     void reset(TaskHandle task = null)
177     in
178     {
179         assert(state == (task is null ? Fiber.State.TERM : Fiber.State.HOLD));
180     }
181     body
182     {
183         task_ = task;
184         if (task is null)
185             super.reset;
186         else
187             super.reset(&(task.run));
188     }
189 
190     static TaskFiber getThis()
191     {
192         return cast(TaskFiber) Fiber.getThis;
193     }
194 }
195 
196 
197 private static ResourcePool!TaskFiber taskFibers;
198 
199 static this()
200 {
201     taskFibers = new ResourcePool!TaskFiber;
202 }
203 
204 /**
205  * Schedule the execution of a fiber: wrap it in a future. A task is a
206  * subclass of Future.
207  *
208  * A task is responsible for executing a fiber object in an event loop. If the
209  * wrapped fiber yields from a future, the task suspends the execution of the
210  * wrapped fiber and waits for the completion of the future. When the future
211  * is done, the execution of the wrapped fiber restarts with the result or the
212  * exception of the fiber.
213  *
214  * Event loops use cooperative scheduling: an event loop only runs one task at
215  * a time. Other tasks may run in parallel if other event loops are running in
216  * different threads. While a task waits for the completion of a future, the
217  * event loop executes a new task.
218  *
219  * The cancellation of a task is different from the cancellation of a future.
220  * Calling $(D_PSYMBOL cancel()) will throw a $(D_PSYMBOL CancelledException)
221  * to the wrapped fiber. $(D_PSYMBOL cancelled()) only returns $(D_KEYWORD
222  * true) if the wrapped fiber did not catch the $(D_PSYMBOL
223  * CancelledException), or raised a $(D_PSYMBOL CancelledException).
224  *
225  * If a pending task is destroyed, the execution of its wrapped fiber did not
226  * complete. It is probably a bug and a warning is logged: see Pending task
227  * destroyed.
228  *
229  * Don’t directly create $(D_PSYMBOL Task) instances: use the $(D_PSYMBOL
230  * task()) function or the $(D_PSYMBOL EventLoop.create_task()) method.
231  */
232 class Task(Coroutine, Args...) : Future!(ReturnType!Coroutine), TaskHandle
233 if (isDelegate!Coroutine)
234 {
235     alias ResultType = ReturnType!Coroutine;
236 
237     package TaskFiber fiber;
238 
239     private Coroutine coroutine;
240     private Args args;
241 
242     private Throwable _injectException;
243 
244     protected override @property Throwable injectException()
245     {
246         return this._injectException;
247     }
248 
249     debug (tasks)
250         package string id()
251         {
252             import std.format : format;
253 
254             return "%s".format(cast(void*) cast(TaskHandle) this);
255         }
256 
257     this(EventLoop eventLoop, Coroutine coroutine, Args args)
258     {
259         super(eventLoop);
260 
261         this.coroutine = coroutine;
262         static if (args.length > 0)
263         {
264             this.args = args;
265         }
266 
267         this.fiber = taskFibers.acquire(this);
268         TaskRepository.registerTask(this.eventLoop, this);
269         this.eventLoop.callSoon(&step, null);
270 
271         debug (tasks)
272             std.stdio.writeln("Create task ", id);
273     }
274 
275     override protected void run()
276     {
277         debug (tasks)
278             std.stdio.writeln("Start task ", id);
279 
280         static if (is(ResultType == void))
281         {
282             coroutine(args);
283             setResult;
284         }
285         else
286         {
287             setResult(coroutine(args));
288         }
289 
290         debug (tasks)
291             std.stdio.writeln("End task ", id);
292     }
293 
294     /**
295      * Request that this task cancel itself.
296      *
297      * This arranges for a $(D_PSYMBOL CancelledException) to be thrown into
298      * the wrapped coroutine on the next cycle through the event loop. The
299      * coroutine then has a chance to clean up or even deny the request using
300      * try/catch/finally.
301      *
302      * Unlike $(D_PSYMBOL Future.cancel()), this does not guarantee that the
303      * task will be cancelled: the exception might be caught and acted upon,
304      * delaying cancellation of the task or preventing cancellation completely.
305      * The task may also return a value or raise a different exception.
306      *
307      * Immediately after this method is called, $(D_PSYMBOL cancelled()) will
308      * not return $(D_KEYWORD true) (unless the task was already cancelled). A
309      * task will be marked as cancelled when the wrapped coroutine terminates
310      * with a $(D_PSYMBOL CancelledException) (even if cancel() was not
311      * called).
312      */
313     override bool cancel()
314     {
315         if (done)
316             return false;
317 
318         this._injectException = new CancelledException;
319         return true;
320     }
321 
322     private void step(Throwable throwable = null)
323     {
324         // TODO why rescheduled when it is already terminated?
325         if (this.fiber is null)
326             return;
327 
328         final switch (this.fiber.state)
329         {
330         case Fiber.State.HOLD:
331             debug (tasks)
332                 std.stdio.writeln("Resume task ", id);
333 
334             TaskRepository.resetCurrentTask(this.eventLoop, this);
335 
336             if (throwable !is null)
337                 this._injectException = throwable;
338 
339             try
340             {
341                 this.fiber.call;
342             }
343             catch (CancelledException cancelledException)
344             {
345                 assert(this.fiber.state == Fiber.State.TERM);
346                 super.cancel;
347             }
348             catch (Throwable throwable)
349             {
350                 assert(this.fiber.state == Fiber.State.TERM);
351                 setException(throwable);
352             }
353 
354             TaskRepository.resetCurrentTask(this.eventLoop);
355 
356             if (this.fiber.state == Fiber.State.TERM)
357                 goto case Fiber.State.TERM;
358 
359             debug (tasks)
360                 std.stdio.writeln("Suspend task ", id);
361             break;
362         case Fiber.State.EXEC:
363             assert(0, "Internal error");
364         case Fiber.State.TERM:
365             assert(done);
366             TaskRepository.unregisterTask(this.eventLoop, this);
367             taskFibers.release(this.fiber);
368             this.fiber = null;
369             break;
370         }
371     }
372 
373     protected override void scheduleStepImpl(Throwable throwable)
374     {
375         debug (tasks)
376         {
377             if (throwable is null)
378                 std.stdio.writeln("Schedule task ", id);
379             else
380                 std.stdio.writeln("Schedule task ", id, " to throw ",
381                     throwable);
382         }
383 
384         if (TaskFiber.getThis is null)
385             step(throwable);
386         else
387             this.eventLoop.callSoon(&step, throwable);
388     }
389 
390     override string toString() const
391     {
392         import std.format : format;
393 
394         return "%s(done: %s, cancelled: %s)".format(typeid(this), done,
395             cancelled);
396     }
397 }
398 
399 unittest
400 {
401     import std.functional : toDelegate;
402 
403     auto testCoroutine = (int i) {
404         if (i > 0)
405             throw new Exception("bar");
406         return "foo";
407     };
408     auto eventLoop = getEventLoop;
409     auto testTask = eventLoop.createTask(testCoroutine.toDelegate, 0);
410     eventLoop.runUntilComplete(testTask);
411     assert(testTask.done);
412     assert(testTask.result == "foo");
413 
414     testTask = eventLoop.createTask(testCoroutine.toDelegate, 1);
415     assertThrown!Exception(eventLoop.runUntilComplete(testTask));
416     assert(testTask.done);
417     assert(testTask.exception !is null);
418     assert(testTask.exception.msg == "bar");
419 }
420 
421 /**
422  * Return a generator whose values, when waited for, are Future instances in
423  * the order they complete.
424  *
425  * Raises $(D_PSYMBOL TimeoutException) if the timeout occurs before all futures
426  * are done.
427  *
428  * Example:
429  *
430  *  foreach (f; getEventLoop.asCompleted(fs))
431  *       // use f.result
432  *
433  * Note: The futures $(D_PSYMBOL f) are not necessarily members of $(D_PSYMBOL
434  *       fs).
435  */
436 auto asCompleted(EventLoop eventLoop, FutureHandle[] futures,
437     Duration timeout = Duration.zero)
438 in
439 {
440     futures.all!"a !is null";
441 }
442 body
443 {
444     if (eventLoop is null)
445         eventLoop = getEventLoop;
446 
447     import asynchronous.queues : Queue;
448     auto done = new Queue!FutureHandle;
449     CallbackHandle timeoutCallback = null;
450     Tuple!(FutureHandle, void delegate(FutureHandle))[] todo;
451 
452     foreach (future; futures)
453     {
454         auto doneCallback = (FutureHandle f) {
455             return (FutureHandle _) {
456                 if (todo.empty)
457                     return; // timeoutCallback was here first
458                 todo = todo.remove!(a => a[0] is f);
459                 done.putNowait(f);
460                 if (todo.empty && timeoutCallback !is null)
461                     timeoutCallback.cancel;
462             };
463         }(future); // workaround bug #2043;
464 
465         todo ~= tuple(future, doneCallback);
466         future.addDoneCallback(doneCallback);
467     }
468 
469     if (!todo.empty && timeout > Duration.zero)
470     {
471         timeoutCallback = eventLoop.callLater(timeout, {
472             foreach (future_callback; todo)
473             {
474                 future_callback[0].removeDoneCallback(future_callback[1]);
475                 future_callback[0].cancel;
476             }
477             if (!todo.empty)
478                 done.putNowait(null);
479             todo = null;
480         });
481     }
482 
483     return new GeneratorTask!FutureHandle(eventLoop, {
484         while (!todo.empty || !done.empty)
485         {
486             auto f = done.get;
487             if (f is null)
488                 throw new TimeoutException;
489             yieldValue(f);
490         }
491     });
492 }
493 
494 unittest
495 {
496     auto eventLoop = getEventLoop;
497     auto task1 = eventLoop.createTask({
498         eventLoop.sleep(3.msecs);
499     });
500     auto task2 = eventLoop.createTask({
501         eventLoop.sleep(2.msecs);
502     });
503     auto testTask = eventLoop.createTask({
504         auto tasks = asCompleted(eventLoop, [task1, task2]).array;
505 
506         assert(tasks[0] is cast(FutureHandle) task2);
507         assert(tasks[1] is cast(FutureHandle) task1);
508     });
509     eventLoop.runUntilComplete(testTask);
510 
511     task1 = eventLoop.createTask({
512         eventLoop.sleep(10.msecs);
513     });
514     task2 = eventLoop.createTask({
515         eventLoop.sleep(2.msecs);
516     });
517     testTask = eventLoop.createTask({
518         auto tasks = asCompleted(eventLoop, [task1, task2], 5.msecs).array;
519 
520         assert(tasks.length == 1);
521         assert(tasks[0] is cast(FutureHandle) task2);
522     });
523     eventLoop.runUntilComplete(testTask);
524 }
525 
526 /**
527  * Schedule the execution of a coroutine: wrap it in a future. Return a
528  * $(D_PSYMBOL Task) object.
529  *
530  * If the argument is a $(D_PSYMBOL Future), it is returned directly.
531  */
532 auto ensureFuture(Coroutine, Args...)(EventLoop eventLoop, Coroutine coroutine,
533     Args args)
534 {
535     static if (is(typeof(coroutine) : FutureHandle))
536     {
537         static assert(args.length == 0);
538 
539         return coroutine;
540     }
541     else
542     {
543         return new Task!(Coroutine, Args)(eventLoop, coroutine, args);
544     }
545 }
546 
547 unittest
548 {
549     import std.functional : toDelegate;
550 
551     auto task1 = ensureFuture(null, toDelegate({
552         return 3 + 39;
553     }));
554 
555     assert(!task1.done);
556     assert(!task1.cancelled);
557 
558     task1.cancel;
559     assert(!task1.cancelled);
560 
561     const result = getEventLoop.runUntilComplete(task1);
562 
563     assert(task1.done);
564     assert(!task1.cancelled);
565     assert(result == 42);
566 
567     assert(task1 == ensureFuture(null, task1));
568 }
569 
570 /**
571  * Create a coroutine that completes after a given time. If $(D_PSYMBOL result)
572  * is provided, it is produced to the caller when the coroutine completes.
573  *
574  * The resolution of the sleep depends on the granularity of the event loop.
575  */
576 @Coroutine
577 auto sleep(Result...)(EventLoop eventLoop, Duration delay, Result result)
578 if (result.length <= 1)
579 {
580     if (eventLoop is null)
581         eventLoop = getEventLoop;
582 
583     static if (result.length == 0)
584     {
585         auto future = new Future!void(eventLoop);
586         auto handle = eventLoop.callLater(delay,
587             &future.setResultUnlessCancelled);
588     }
589     else
590     {
591         auto future = new Future!Result(eventLoop);
592         auto handle = eventLoop.callLater(delay,
593             &future.setResultUnlessCancelled, result);
594     }
595     scope (exit) handle.cancel;
596 
597     return eventLoop.waitFor(future);
598 }
599 
600 unittest
601 {
602     auto eventLoop = getEventLoop;
603     auto task1 = eventLoop.ensureFuture({
604         return eventLoop.sleep(5.msecs, "foo");
605     });
606 
607     eventLoop.runUntilComplete(task1);
608     assert(task1.result == "foo");
609 }
610 
611 /**
612  * Wait for a future, shielding it from cancellation.
613  *
614  * The statement
615  *
616  *   $(D_CODE res = shield(something());)
617  *
618  * is exactly equivalent to the statement
619  *
620  *   $(D_CODE res = something();)
621  *
622  * $(I except) that if the coroutine containing it is cancelled, the task
623  * running in $(D_PSYMBOL something()) is not cancelled. From the point of view
624  * of $(D_PSYMBOL something()), the cancellation did not happen. But its caller
625  * is still cancelled, so the call still raises $(D_PSYMBOL CancelledException).
626  * Note: If $(D_PSYMBOL something()) is cancelled by other means this will still
627  * cancel $(D_PSYMBOL shield()).
628  *
629  * If you want to completely ignore cancellation (not recommended) you can
630  * combine $(D_PSYMBOL shield()) with a try/catch clause, as follows:
631  *
632  * $(D_CODE
633  *   try
634  *       res = shield(something());
635  *   catch (CancelledException)
636  *       res = null;
637  * )
638  */
639 auto shield(Coroutine, Args...)(EventLoop eventLoop, Coroutine coroutine,
640     Args args)
641 {
642     auto inner = ensureFuture(eventLoop, coroutine, args);
643     if (inner.done())
644         return inner;
645 
646     auto outer = new Future!(inner.ResultType)(eventLoop);
647 
648     void doneCallback(FutureHandle _)
649     {
650         if (outer.cancelled())
651         {
652             if (!inner.cancelled())
653                 inner.exception(); // Mark inner's result as retrieved.
654             return;
655         }
656 
657         if (inner.cancelled())
658         {
659             outer.cancel();
660         }
661         else
662         {
663             auto exception = inner.exception();
664             if (exception !is null)
665             {
666                 outer.setException(exception);
667             }
668             else
669             {
670                 static if (is(inner.ResultType == void))
671                 {
672                     outer.setResult;
673                 }
674                 else
675                 {
676                     outer.setResult(inner.result);
677                 }
678             }
679         }
680     }
681 
682     inner.addDoneCallback(&doneCallback);
683     return outer;
684 }
685 
686 unittest
687 {
688     import std.functional : toDelegate;
689 
690     int add(int a, int b)
691     {
692         return a + b;
693     }
694 
695     auto eventLoop = getEventLoop;
696 
697     auto task1 = eventLoop.ensureFuture(&add, 3, 4);
698     auto future1 = eventLoop.shield(task1);
699 
700     eventLoop.runUntilComplete(future1);
701     assert(future1.result == 7);
702 
703     task1 = eventLoop.ensureFuture(&add, 3, 4);
704     future1 = eventLoop.shield(task1);
705     future1.cancel;
706 
707     eventLoop.runUntilComplete(task1);
708     assert(future1.cancelled);
709     assert(task1.result == 7);
710 
711     auto task2 = eventLoop.ensureFuture({ }.toDelegate);
712     auto future2 = eventLoop.shield(task2);
713     future2.cancel;
714 
715     eventLoop.runUntilComplete(task2);
716     assert(future2.cancelled);
717     assert(task2.done);
718 }
719 
720 
721 enum ReturnWhen
722 {
723     FIRST_COMPLETED,
724     FIRST_EXCEPTION,
725     ALL_COMPLETED,
726 }
727 
728 /**
729  * Wait for the Future instances given by $(PARAM futures) to complete.
730  *
731  * Params:
732  *  eventLoop = event loop, $(D_PSYMBOL getEventLoop) if not specified.
733  *
734  *  futures = futures to wait for.
735  *
736  *  timeout = can be used to control the maximum number of seconds to wait
737  *      before returning. If $(PARAM timeout) is 0 or negative there is no limit
738  *      to the wait time.
739  *
740  *  returnWhen = indicates when this function should return. It must be one of
741  *      the following constants:
742  *          FIRST_COMPLETED The function will return when any future finishes or
743  *              is cancelled.
744  *          FIRST_EXCEPTION The function will return when any future finishes by
745  *              raising an exception. If no future raises an exception then it
746  *              is equivalent to ALL_COMPLETED.
747  *          ALL_COMPLETED The function will return when all futures finish or
748  *              are cancelled.
749  *
750  * Returns: a named 2-tuple of arrays. The first array, named $(D_PSYMBOL done),
751  *      contains the futures that completed (finished or were cancelled) before
752  *      the wait completed. The second array, named $(D_PSYMBOL notDone),
753  *      contains uncompleted futures.
754  */
755 @Coroutine
756 auto wait(Future)(EventLoop eventLoop, Future[] futures,
757     Duration timeout = Duration.zero,
758     ReturnWhen returnWhen = ReturnWhen.ALL_COMPLETED)
759 if (is(Future : FutureHandle))
760 {
761     if (eventLoop is null)
762         eventLoop = getEventLoop;
763 
764     auto thisTask = TaskHandle.getThis;
765 
766     assert(thisTask !is null);
767     if (futures.empty)
768     {
769         thisTask.scheduleStep;
770         TaskHandle.yield;
771 
772         return Tuple!(Future[], "done", Future[], "notDone")(null, null);
773     }
774 
775     foreach (future; futures)
776     {
777         future.addDoneCallback(&thisTask.scheduleStep);
778     }
779 
780     scope (exit)
781     {
782         foreach (future; futures)
783         {
784             future.removeDoneCallback(&thisTask.scheduleStep);
785         }
786     }
787 
788     bool completed = false;
789     CallbackHandle timeoutCallback;
790 
791     if (timeout > Duration.zero)
792     {
793         timeoutCallback = eventLoop.callLater(timeout, {
794             completed = true;
795             thisTask.scheduleStep;
796         });
797     }
798 
799     while (!completed)
800     {
801         TaskHandle.yield;
802 
803         if (completed)
804             break;
805 
806         final switch (returnWhen)
807         {
808         case ReturnWhen.FIRST_COMPLETED:
809             completed = futures.any!"a.done";
810             break;
811         case ReturnWhen.FIRST_EXCEPTION:
812             completed = futures.any!"a.exception !is null" ||
813                 futures.all!"a.done";
814             break;
815         case ReturnWhen.ALL_COMPLETED:
816             completed = futures.all!"a.done";
817             break;
818         }
819     }
820 
821     if (timeoutCallback !is null)
822         timeoutCallback.cancel;
823 
824     Tuple!(Future[], "done", Future[], "notDone") result;
825 
826     result.done = futures.filter!"a.done".array;
827     result.notDone = futures.filter!"!a.done".array;
828 
829     return result;
830 }
831 
832 unittest
833 {
834     auto eventLoop = getEventLoop;
835 
836     int add(int a, int b)
837     {
838         return a + b;
839     }
840 
841     auto tasks = [
842         eventLoop.createTask({
843             eventLoop.sleep(10.msecs);
844             return add(10, 11);
845         }),
846         eventLoop.createTask({
847             eventLoop.sleep(20.msecs);
848             return add(11, 12);
849         }),
850         eventLoop.createTask({
851             eventLoop.sleep(60.msecs);
852             return add(12, 13);
853         }),
854     ];
855 
856     auto waitTask = eventLoop.createTask({
857         return eventLoop.wait(tasks, 50.msecs);
858     });
859 
860     const result = eventLoop.runUntilComplete(waitTask);
861 
862     assert(tasks[0].done);
863     assert(tasks[1].done);
864     assert(!tasks[2].done);
865 
866     assert(tasks[0].result == 21);
867     assert(tasks[1].result == 23);
868 
869     assert(result.done == tasks[0 .. 2]);
870     assert(result.notDone == tasks[2 .. $]);
871 }
872 
873 
874 /**
875  * Wait for the single Future to complete with timeout. If timeout is 0 or
876  * negative, block until the future completes.
877  *
878  * Params:
879  *  eventLoop = event loop, $(D_PSYMBOL getEventLoop) if not specified.
880  *
881  *  future = future to wait for.
882  *
883  *  timeout = can be used to control the maximum time to wait before
884  *      returning. If $(PARAM timeout) is 0 or negative, block until the
885  *      future completes.
886  *
887  * Returns: result of the future. When a timeout occurs, it cancels the task
888  *      and raises $(D_PSYMBOL TimeoutException). To avoid the task
889  *      cancellation, wrap it in $(D_SYMBOL shield()).
890  */
891 @Coroutine
892 auto waitFor(Future)(EventLoop eventLoop, Future future,
893     Duration timeout = Duration.zero)
894 if (is(Future : FutureHandle))
895 {
896     if (eventLoop is null)
897         eventLoop = getEventLoop;
898 
899     auto thisTask = TaskHandle.getThis;
900 
901     assert(thisTask !is null);
902 
903     future.addDoneCallback(&thisTask.scheduleStep);
904     scope (exit)
905     {
906         future.removeDoneCallback(&thisTask.scheduleStep);
907     }
908 
909     bool cancelFuture = false;
910     CallbackHandle timeoutCallback;
911 
912     if (timeout > Duration.zero)
913     {
914         timeoutCallback = eventLoop.callLater(timeout, {
915             cancelFuture = true;
916             thisTask.scheduleStep;
917         });
918     }
919 
920     while (true)
921     {
922         TaskHandle.yield;
923 
924         if (cancelFuture)
925         {
926             future.cancel;
927             throw new TimeoutException;
928         }
929 
930         if (future.done)
931             break;
932     }
933 
934     if (timeoutCallback !is null)
935         timeoutCallback.cancel;
936 
937     return future.result;
938 }
939 
940 unittest
941 {
942     auto eventLoop = getEventLoop;
943 
944     int add(int a, int b)
945     {
946         return a + b;
947     }
948 
949     auto task1 = eventLoop.createTask({
950         eventLoop.sleep(10.msecs);
951         return add(10, 11);
952     });
953 
954     auto task2 = eventLoop.createTask({
955         eventLoop.sleep(40.msecs);
956         return add(10, 11);
957     });
958 
959     auto waitTask = eventLoop.createTask({
960         eventLoop.waitFor(task1, 20.msecs);
961         assert(task1.done);
962         assert(task1.result == 21);
963         assert(!task2.done);
964         try
965         {
966             eventLoop.waitFor(task2, 10.msecs);
967             assert(0, "Should not get here");
968         }
969         catch (TimeoutException timeoutException)
970         {
971             eventLoop.sleep(20.msecs);
972             assert(task2.done);
973             assert(task2.cancelled);
974         }
975     });
976 
977     eventLoop.runUntilComplete(waitTask);
978 }
979 
980 /**
981  * A $(D_PSYMBOL GeneratorTask) is a $(D_PSYMBOL Task) that periodically returns
982  * values of type $(D_PSYMBOL T) to the caller via $(D_PSYMBOL yieldValue). This
983  * is represented as an $(D_PSYMBOL InputRange).
984  */
985 class GeneratorTask(T) : Task!(void delegate())
986 {
987     private T* frontValue = null;
988     private TaskHandle waitingTask = null;
989 
990     this(EventLoop eventLoop, void delegate() coroutine)
991     {
992         super(eventLoop, coroutine);
993         addDoneCallback((FutureHandle _) {
994             if (waitingTask)
995             {
996                 waitingTask.scheduleStep;
997                 waitingTask = null;
998             }
999         });
1000     }
1001 
1002     package void setFrontValue(ref T value)
1003     {
1004         frontValue = &value;
1005         if (waitingTask)
1006         {
1007             waitingTask.scheduleStep;
1008             waitingTask = null;
1009         }
1010     }
1011 
1012     @Coroutine
1013     @property bool empty()
1014     {
1015         if (done)
1016             return true;
1017 
1018         if (frontValue !is null)
1019             return false;
1020 
1021         auto thisTask = TaskHandle.getThis;
1022 
1023         assert(thisTask !is null);
1024         assert(thisTask !is this, "empty() called in the task routine");
1025         assert(waitingTask is null, "another Task is already waiting");
1026         waitingTask = thisTask;
1027         TaskHandle.yield;
1028         assert(done || frontValue !is null);
1029 
1030         return empty;
1031     }
1032 
1033     @property ref T front()
1034     {
1035         assert(frontValue !is null);
1036         return *frontValue;
1037     }
1038 
1039     void popFront()
1040     {
1041         assert(frontValue !is null);
1042         frontValue = null;
1043         scheduleStep;
1044     }
1045 }
1046 
1047 /**
1048  * Yield a value to the caller of the currently executing generator task. The
1049  * type of the yielded value and the type of the generator must be the same.
1050  */
1051 @Coroutine
1052 void yieldValue(T)(auto ref T value)
1053 {
1054     auto generatorTask = cast(GeneratorTask!T) TaskHandle.getThis;
1055     enforce(generatorTask !is null,
1056         "'TaskHandle.yieldValue' called outside of a generator task.");
1057 
1058     generatorTask.setFrontValue(value);
1059 
1060     TaskHandle.yield;
1061 }
1062 
1063 unittest
1064 {
1065     import std.functional : toDelegate;
1066 
1067     auto eventLoop = getEventLoop;
1068     auto task1 = new GeneratorTask!int(eventLoop, {}.toDelegate);
1069     auto testTask = eventLoop.createTask({
1070         assert(task1.empty);
1071     });
1072 
1073     eventLoop.runUntilComplete(testTask);
1074 
1075     auto task2 = new GeneratorTask!int(eventLoop, {
1076         yieldValue(8);
1077         yieldValue(10);
1078     }.toDelegate);
1079 
1080     auto testTask2 = eventLoop.createTask({
1081         int[] array1 = task2.array;
1082 
1083         assert(array1 == [8, 10]);
1084         assert(task2.empty);
1085     });
1086 
1087     eventLoop.runUntilComplete(testTask2);
1088 }