1 /**
2  * Support for tasks, coroutines and the scheduler.
3  */
4 module asynchronous.tasks;
5 
6 import core.thread;
7 import std.algorithm;
8 import std.array;
9 import std.exception;
10 import std.traits;
11 import std.typecons;
12 import asynchronous.events;
13 import asynchronous.futures;
14 import asynchronous.types;
15 
16 package class TaskRepository
17 {
18     private static TaskHandle[EventLoop] currentTasks;
19 
20     private static TaskHandle[][EventLoop] tasks;
21 
22     static TaskHandle[] allTasks(EventLoop eventLoop)
23     in
24     {
25         assert(eventLoop !is null);
26     }
27     body
28     {
29         return tasks.get(eventLoop, null).dup;
30     }
31 
32     static TaskHandle currentTask(EventLoop eventLoop)
33     in
34     {
35         assert(eventLoop !is null);
36     }
37     body
38     {
39         return currentTasks.get(eventLoop, null);
40     }
41 
42     static void resetCurrentTask(EventLoop eventLoop,
43         TaskHandle taskHandle = null)
44     in
45     {
46         assert(eventLoop !is null);
47     }
48     body
49     {
50         currentTasks[eventLoop] = taskHandle;
51     }
52 
53     package static void registerTask(EventLoop eventLoop, TaskHandle taskHandle)
54     in
55     {
56         assert(eventLoop !is null);
57         assert(taskHandle !is null);
58     }
59     body
60     {
61         tasks[eventLoop] ~= taskHandle;
62     }
63 
64     package static void unregisterTask(EventLoop eventLoop,
65         TaskHandle taskHandle)
66     in
67     {
68         assert(eventLoop !is null);
69         assert(taskHandle !is null);
70     }
71     body
72     {
73         tasks[eventLoop] = tasks[eventLoop].remove!(a => a is taskHandle);
74     }
75 }
76 
77 interface TaskHandle : FutureHandle
78 {
79     protected @property Throwable injectException();
80 
81     protected void run();
82 
83     protected void scheduleStep();
84     protected void scheduleStep(Throwable throwable);
85 
86     /**
87      * Returns: a set of all tasks for an event loop.
88      *
89      * By default all tasks for the current event loop are returned.
90      */
91     static TaskHandle[] allTasks(EventLoop eventLoop = null)
92     {
93         if (eventLoop is null)
94             eventLoop = getEventLoop;
95 
96         return TaskRepository.allTasks(eventLoop);
97     }
98 
99     /**
100      * Returns: the currently running task in an event loop or
101      *          $(D_KEYWORD null).
102      *
103      * By default the current task for the current event loop is returned.
104      *
105      * $(D_KEYWORD null) is returned when called not in the context of a Task.
106      */
107     static TaskHandle currentTask(EventLoop eventLoop = null)
108     {
109         if (eventLoop is null)
110             eventLoop = getEventLoop;
111 
112         return TaskRepository.currentTask(eventLoop);
113     }
114 
115     static TaskHandle getThis()
116     {
117         auto taskFiber = TaskFiber.getThis;
118 
119         return taskFiber is null ? null : taskFiber.taskHandle;
120     }
121 
122     /**
123      * Forces a context switch to occur away from the calling task.
124      *
125      * Params:
126      *  eventLoop = event loop, $(D_PSYMBOL getEventLoop) if not specified.
127      */
128     @Coroutine
129     package static void yield()
130     {
131         Fiber.yield;
132 
133         auto thisTask = getThis;
134         enforce(thisTask !is null,
135             "'TaskHandle.yield' called outside of a task.");
136 
137         if (thisTask.injectException !is null)
138             throw thisTask.injectException;
139     }    
140 }
141 
142 private class TaskFiber : Fiber
143 {
144     private TaskHandle task_ = null;
145 
146     this()
147     {
148         super({ });
149     }
150 
151     @property TaskHandle taskHandle()
152     {
153         return task_;
154     }
155 
156     void reset(TaskHandle task = null)
157     in
158     {
159         assert(state == (task is null ? Fiber.State.TERM : Fiber.State.HOLD));
160     }
161     body
162     {
163         task_ = task;
164         if (task is null)
165             super.reset;
166         else
167             super.reset(&(task.run));
168     }
169 
170     static TaskFiber getThis()
171     {
172         return cast(TaskFiber) Fiber.getThis;
173     }
174 }
175 
176 
177 private static ResourcePool!TaskFiber taskFibers;
178 
179 static this()
180 {
181     taskFibers = new ResourcePool!TaskFiber;
182 }
183 
184 /**
185  * Schedule the execution of a fiber: wrap it in a future. A task is a
186  * subclass of Future.
187  *
188  * A task is responsible for executing a fiber object in an event loop. If the
189  * wrapped fiber yields from a future, the task suspends the execution of the
190  * wrapped fiber and waits for the completion of the future. When the future
191  * is done, the execution of the wrapped fiber restarts with the result or the
192  * exception of the fiber.
193  *
194  * Event loops use cooperative scheduling: an event loop only runs one task at
195  * a time. Other tasks may run in parallel if other event loops are running in
196  * different threads. While a task waits for the completion of a future, the
197  * event loop executes a new task.
198  *
199  * The cancellation of a task is different from the cancelation of a future.
200  * Calling $(D_PSYMBOL cancel()) will throw a $(D_PSYMBOL CancelledException)
201  * to the wrapped fiber. $(D_PSYMBOL cancelled()) only returns $(D_KEYWORD
202  * true) if the wrapped fiber did not catch the $(D_PSYMBOL
203  * CancelledException), or raised a $(D_PSYMBOL CancelledException).
204  *
205  * If a pending task is destroyed, the execution of its wrapped fiber did not
206  * complete. It is probably a bug and a warning is logged: see Pending task
207  * destroyed.
208  *
209  * Don’t directly create $(D_PSYMBOL Task) instances: use the $(D_PSYMBOL
210  * task()) function or the $(D_PSYMBOL EventLoop.create_task()) method.
211  */
212 class Task(Coroutine, Args...) : Future!(ReturnType!Coroutine), TaskHandle
213 if (isDelegate!Coroutine)
214 {
215     alias ResultType = ReturnType!Coroutine;
216 
217     package TaskFiber fiber;
218 
219     private Coroutine coroutine;
220     private Args args;
221 
222     private Throwable injectException_;
223 
224     protected override @property Throwable injectException()
225     {
226         return this.injectException_;
227     }
228 
229     debug (tasks)
230         package string id()
231         {
232             import std.format : format;
233 
234             return "%s".format(cast(void*) cast(TaskHandle) this);
235         }
236 
237     this(EventLoop eventLoop, Coroutine coroutine, Args args)
238     {
239         super(eventLoop);
240 
241         this.coroutine = coroutine;
242         static if (args.length > 0)
243         {
244             this.args = args;
245         }
246 
247         this.fiber = taskFibers.acquire(this);
248         TaskRepository.registerTask(this.eventLoop, this);
249         this.eventLoop.callSoon(&step, null);
250 
251         debug (tasks)
252             std.stdio.writeln("Create task ", id);
253     }
254 
255     override protected void run()
256     {
257         debug (tasks)
258             std.stdio.writeln("Start task ", id);
259 
260         static if (is(ResultType : void))
261         {
262             coroutine(args);
263             setResult;
264         }
265         else
266         {
267             setResult(coroutine(args));
268         }
269 
270         debug (tasks)
271             std.stdio.writeln("End task ", id);
272     }
273 
274     /**
275      * Request that this task cancel itself.
276      *
277      * This arranges for a $(D_PSYMBOL CancelledException) to be thrown into
278      * the wrapped coroutine on the next cycle through the event loop. The
279      * coroutine then has a chance to clean up or even deny the request using
280      * try/catch/finally.
281      *
282      * Unlike $(D_PSYMBOL Future.cancel()), this does not guarantee that the
283      * task will be cancelled: the exception might be caught and acted upon,
284      * delaying cancellation of the task or preventing cancellation completely.
285      * The task may also return a value or raise a different exception.
286      *
287      * Immediately after this method is called, $(D_PSYMBOL cancelled()) will
288      * not return $(D_KEYWORD true) (unless the task was already cancelled). A
289      * task will be marked as cancelled when the wrapped coroutine terminates
290      * with a $(D_PSYMBOL CancelledException) (even if cancel() was not
291      * called).
292      */
293     override bool cancel()
294     {
295         if (done)
296             return false;
297 
298         this.injectException_ = new CancelledException;
299         return true;
300     }
301 
302     private void step(Throwable throwable = null)
303     {
304         // TODO why rescheduled when it is already terminated?
305         if (this.fiber is null)
306             return;
307 
308         final switch (this.fiber.state)
309         {
310         case Fiber.State.HOLD:
311             debug (tasks)
312                 std.stdio.writeln("Resume task ", id);
313 
314             TaskRepository.resetCurrentTask(this.eventLoop, this);
315 
316             if (throwable !is null)
317                 this.injectException_ = throwable;
318 
319             try
320             {
321                 this.fiber.call;
322             }
323             catch (CancelledException cancelledException)
324             {
325                 assert(this.fiber.state == Fiber.State.TERM);
326                 super.cancel;
327             }
328             catch (Throwable throwable)
329             {
330                 assert(this.fiber.state == Fiber.State.TERM);
331                 setException(throwable);
332             }
333 
334             TaskRepository.resetCurrentTask(this.eventLoop);
335 
336             if (this.fiber.state == Fiber.State.TERM)
337                 goto case Fiber.State.TERM;
338 
339             debug (tasks)
340                 std.stdio.writeln("Suspend task ", id);
341             break;
342         case Fiber.State.EXEC:
343             assert(0, "Internal error");
344         case Fiber.State.TERM:
345             assert(done);
346             TaskRepository.unregisterTask(this.eventLoop, this);
347             taskFibers.release(this.fiber);
348             this.fiber = null;
349             break;
350         }
351     }
352 
353     protected override void scheduleStep()
354     {
355         scheduleStep(null);
356     }
357 
358     protected override void scheduleStep(Throwable throwable)
359     {
360         debug (tasks)
361         {
362             if (throwable is null)
363                 std.stdio.writeln("Schedule task ", id);
364             else
365                 std.stdio.writeln("Schedule task ", id, " to throw ",
366                     throwable);
367         }
368 
369         if (TaskFiber.getThis is null)
370             step(throwable);
371         else
372             this.eventLoop.callSoon(&step, throwable);
373     }
374 
375     override string toString() const
376     {
377         import std.format : format;
378 
379         return "%s(done: %s, cancelled: %s)".format(typeid(this), done,
380             cancelled);
381     }
382 }
383 
384 /**
385  * Return an generator whose values, when waited for, are Future instances in
386  * the order in which and as soon as they complete.
387  *
388  * Raises $(D_PSYMBOL TimeoutException) if the timeout occurs before all futures
389  * are done.
390  *
391  * Example:
392  *
393  *  foreach (f; getEventLoop.asCompleted(fs))
394  *       // use f.result
395  *
396  * Note: The futures $(D_PSYMBOL f) are not necessarily members of $(D_PSYMBOL
397  *       fs).
398  */
399 auto asCompleted(EventLoop eventLoop, FutureHandle[] futures,
400     Duration timeout = Duration.zero)
401 in
402 {
403     futures.all!"a !is null";
404 }
405 body
406 {
407     if (eventLoop is null)
408         eventLoop = getEventLoop;
409 
410     import asynchronous.queues : Queue;
411     auto done = new Queue!FutureHandle;
412     CallbackHandle timeoutCallback = null;
413     Tuple!(FutureHandle, void delegate())[] todo;
414 
415     foreach (future; futures)
416     {
417         void delegate() doneCallback = (f => {
418             if (todo.empty)
419                 return; // timeoutCallback was here first
420             todo = todo.remove!(a => a[0] is f);
421             done.putNowait(f);
422             if (todo.empty && timeoutCallback !is null)
423                 timeoutCallback.cancel;
424         })(future); // workaround bug #2043
425 
426         todo ~= tuple(future, doneCallback);
427         future.addDoneCallback(doneCallback);
428     }
429 
430     if (!todo.empty && timeout > Duration.zero)
431     {
432         timeoutCallback = eventLoop.callLater(timeout, {
433             foreach (future_callback; todo)
434             {
435                 future_callback[0].removeDoneCallback(future_callback[1]);
436                 future_callback[0].cancel;
437             }
438             if (!todo.empty)
439                 done.putNowait(null);
440             todo = null;
441         });
442     }
443 
444     return new GeneratorTask!FutureHandle(eventLoop, {
445         while (!todo.empty || !done.empty)
446         {
447             auto f = done.get;
448             if (f is null)
449                 throw new TimeoutException;
450             yieldValue(f);
451         }
452     });
453 }
454 
455 unittest
456 {
457     auto eventLoop = getEventLoop;
458 
459     auto task1 = eventLoop.createTask({
460         eventLoop.sleep(3.msecs);
461     });
462     auto task2 = eventLoop.createTask({
463         eventLoop.sleep(2.msecs);
464     });
465     auto testTask = eventLoop.createTask({
466         auto tasks = asCompleted(eventLoop, [task1, task2]).array;
467 
468         assert(tasks[0] is cast(FutureHandle) task2);
469         assert(tasks[1] is cast(FutureHandle) task1);
470     });
471     eventLoop.runUntilComplete(testTask);
472 
473     task1 = eventLoop.createTask({
474         eventLoop.sleep(10.msecs);
475     });
476     task2 = eventLoop.createTask({
477         eventLoop.sleep(2.msecs);
478     });
479     testTask = eventLoop.createTask({
480         auto tasks = asCompleted(eventLoop, [task1, task2], 5.msecs).array;
481 
482         assert(tasks.length == 1);
483         assert(tasks[0] is cast(FutureHandle) task2);
484     });
485     eventLoop.runUntilComplete(testTask);
486 }
487 
488 /**
489  * Schedule the execution of a coroutine: wrap it in a future. Return a
490  * $(D_PSYMBOL Task) object.
491  *
492  * If the argument is a $(D_PSYMBOL Future), it is returned directly.
493  */
494 auto ensureFuture(Coroutine, Args...)(EventLoop eventLoop, Coroutine coroutine,
495     Args args)
496 {
497     static if (is(typeof(coroutine) : FutureHandle))
498     {
499         static assert(args.length == 0);
500 
501         return coroutine;
502     }
503     else
504     {
505         return new Task!(Coroutine, Args)(eventLoop, coroutine, args);
506     }
507 }
508 
509 unittest
510 {
511     import std.functional : toDelegate;
512 
513     auto task1 = ensureFuture(null, toDelegate({
514         return 3 + 39;
515     }));
516 
517     assert(!task1.done);
518     assert(!task1.cancelled);
519 
520     task1.cancel;
521     assert(!task1.cancelled);
522 
523     const result = getEventLoop.runUntilComplete(task1);
524 
525     assert(task1.done);
526     assert(!task1.cancelled);
527     assert(result == 42);
528 
529     assert(task1 == ensureFuture(null, task1));
530 }
531 
532 /**
533  * Create a coroutine that completes after a given time. If $(D_PSYMBOL result)
534  * is provided, it is produced to the caller when the coroutine completes.
535  *
536  * The resolution of the sleep depends on the granularity of the event loop.
537  */
538 @Coroutine
539 auto sleep(Result...)(EventLoop eventLoop, Duration delay, Result result)
540 if (result.length <= 1)
541 {
542     if (eventLoop is null)
543         eventLoop = getEventLoop;
544 
545     static if (result.length == 0)
546     {
547         auto future = new Future!void(eventLoop);
548         auto handle = eventLoop.callLater(delay,
549             &future.setResultUnlessCancelled);
550     }
551     else
552     {
553         auto future = new Future!Result(eventLoop);
554         auto handle = eventLoop.callLater(delay,
555             &future.setResultUnlessCancelled, result);
556     }
557     scope (exit) handle.cancel;
558 
559     return eventLoop.waitFor(future);
560 }
561 
562 unittest
563 {
564     auto eventLoop = getEventLoop;
565     auto task1 = eventLoop.ensureFuture({
566         return eventLoop.sleep(5.msecs, "foo");
567     });
568 
569     eventLoop.runUntilComplete(task1);
570     assert(task1.result == "foo");
571 }
572 
573 /**
574  * Wait for a future, shielding it from cancellation.
575  *
576  * The statement
577  *
578  *   $(D_CODE res = shield(something());)
579  *
580  * is exactly equivalent to the statement
581  *
582  *   $(D_CODE res = something();)
583  *
584  * $(I except) that if the coroutine containing it is cancelled, the task
585  * running in $(D_PSYMBOL something()) is not cancelled. From the point of view
586  * of $(D_PSYMBOL something()), the cancellation did not happen. But its caller
587  * is still cancelled, so the call still raises $(D_PSYMBOL CancelledException).
588  * Note: If $(D_PSYMBOL something()) is cancelled by other means this will still
589  * cancel $(D_PSYMBOL shield()).
590  *
591  * If you want to completely ignore cancellation (not recommended) you can
592  * combine $(D_PSYMBOL shield()) with a try/except clause, as follows:
593  *
594  * $(D_CODE
595  *   try
596  *       res = shield(something());
597  *   catch (CancelledException)
598  *       res = null;
599  * )
600  */
601 auto shield(Coroutine, Args...)(EventLoop eventLoop, Coroutine coroutine,
602     Args args)
603 {
604     auto inner = ensureFuture(eventLoop, coroutine, args);
605     if (inner.done())
606         return inner;
607 
608     auto outer = new Future!(inner.ResultType)(eventLoop);
609 
610     void doneCallback()
611     {
612         if (outer.cancelled())
613         {
614             if (!inner.cancelled())
615                 inner.exception(); // Mark inner's result as retrieved.
616             return;
617         }
618 
619         if (inner.cancelled())
620         {
621             outer.cancel();
622         }
623         else
624         {
625             auto exception = inner.exception();
626             if (exception !is null)
627             {
628                 outer.setException(exception);
629             }
630             else
631             {
632                 static if (!is(T == void))
633                 {
634                     outer.setResult(inner.result);
635                 }
636                 else
637                 {
638                     outer.setResult;
639                 }
640             }
641         }
642     }
643 
644     inner.addDoneCallback(&doneCallback);
645     return outer;
646 }
647 
648 unittest
649 {
650     int add(int a, int b)
651     {
652         return a + b;
653     }
654 
655     auto eventLoop = getEventLoop;
656 
657     auto task1 = eventLoop.ensureFuture(&add, 3, 4);
658     auto future1 = eventLoop.shield(task1);
659 
660     eventLoop.runUntilComplete(future1);
661     assert(future1.result == 7);
662 
663     task1 = eventLoop.ensureFuture(&add, 3, 4);
664     future1 = eventLoop.shield(task1);
665     future1.cancel;
666 
667     eventLoop.runUntilComplete(task1);
668     assert(future1.cancelled);
669     assert(task1.result == 7);
670 }
671 
672 
673 enum ReturnWhen
674 {
675     FIRST_COMPLETED,
676     FIRST_EXCEPTION,
677     ALL_COMPLETED,
678 }
679 
680 /**
681  * Wait for the Future instances given by $(PARAM futures) to complete.
682  *
683  * Params:
684  *  eventLoop = event loop, $(D_PSYMBOL getEventLoop) if not specified.
685  *
686  *  futures = futures to wait for.
687  *
688  *  timeout = can be used to control the maximum number of seconds to wait
689  *      before returning. If $(PARAM timeout) is 0 or negative there is no limit
690  *      to the wait time.
691  *
692  *  returnWhen = indicates when this function should return. It must be one of
693  *      the following constants:
694  *          FIRST_COMPLETED The function will return when any future finishes or
695  *              is cancelled.
696  *          FIRST_EXCEPTION The function will return when any future finishes by
697  *              raising an exception. If no future raises an exception then it
698  *              is equivalent to ALL_COMPLETED.
699  *          ALL_COMPLETED The function will return when all futures finish or
700  *              are cancelled.
701  *
702  * Returns: a named 2-tuple of arrays. The first array, named $(D_PSYMBOL done),
703  *      contains the futures that completed (finished or were cancelled) before
704  *      the wait completed. The second array, named $(D_PSYMBOL notDone),
705  *      contains uncompleted futures.
706  */
707 @Coroutine
708 auto wait(Future)(EventLoop eventLoop, Future[] futures,
709     Duration timeout = Duration.zero,
710     ReturnWhen returnWhen = ReturnWhen.ALL_COMPLETED)
711 if (is(Future : FutureHandle))
712 {
713     if (eventLoop is null)
714         eventLoop = getEventLoop;
715 
716     auto thisTask = TaskHandle.getThis;
717 
718     assert(thisTask !is null);
719     if (futures.empty)
720     {
721         thisTask.scheduleStep;
722         TaskHandle.yield;
723 
724         return Tuple!(Future[], "done", Future[], "notDone")(null, null);
725     }
726 
727     foreach (future; futures)
728     {
729         future.addDoneCallback(&thisTask.scheduleStep);
730     }
731 
732     scope (exit)
733     {
734         foreach (future; futures)
735         {
736             future.removeDoneCallback(&thisTask.scheduleStep);
737         }
738     }
739 
740     bool completed = false;
741     CallbackHandle timeoutCallback;
742 
743     if (timeout > Duration.zero)
744     {
745         timeoutCallback = eventLoop.callLater(timeout, {
746             completed = true;
747             thisTask.scheduleStep;
748         });
749     }
750 
751     while (!completed)
752     {
753         TaskHandle.yield;
754 
755         if (completed)
756             break;
757 
758         final switch (returnWhen)
759         {
760         case ReturnWhen.FIRST_COMPLETED:
761             completed = futures.any!"a.done";
762             break;
763         case ReturnWhen.FIRST_EXCEPTION:
764             completed = futures.any!"a.exception !is null" ||
765                 futures.all!"a.done";
766             break;
767         case ReturnWhen.ALL_COMPLETED:
768             completed = futures.all!"a.done";
769             break;
770         }
771     }
772 
773     if (timeoutCallback !is null)
774         timeoutCallback.cancel;
775 
776     Tuple!(Future[], "done", Future[], "notDone") result;
777 
778     result.done = futures.filter!"a.done".array;
779     result.notDone = futures.filter!"!a.done".array;
780 
781     return result;
782 }
783 
784 unittest
785 {
786     auto eventLoop = getEventLoop;
787 
788     int add(int a, int b)
789     {
790         return a + b;
791     }
792 
793     auto tasks = [
794         eventLoop.createTask({
795             eventLoop.sleep(10.msecs);
796             return add(10, 11);
797         }),
798         eventLoop.createTask({
799             eventLoop.sleep(20.msecs);
800             return add(11, 12);
801         }),
802         eventLoop.createTask({
803             eventLoop.sleep(60.msecs);
804             return add(12, 13);
805         }),
806     ];
807 
808     auto waitTask = eventLoop.createTask({
809         return eventLoop.wait(tasks, 50.msecs);
810     });
811 
812     const result = eventLoop.runUntilComplete(waitTask);
813 
814     assert(tasks[0].done);
815     assert(tasks[1].done);
816     assert(!tasks[2].done);
817 
818     assert(tasks[0].result == 21);
819     assert(tasks[1].result == 23);
820 
821     assert(result.done == tasks[0 .. 2]);
822     assert(result.notDone == tasks[2 .. $]);
823 }
824 
825 
826 /**
827  * Wait for the single Future to complete with timeout. If timeout is 0 or
828  * negative, block until the future completes.
829  *
830  * Params:
831  *  eventLoop = event loop, $(D_PSYMBOL getEventLoop) if not specified.
832  *
833  *  future = future to wait for.
834  *
835  *  timeout = can be used to control the maximum number of seconds to wait
836  *      before returning. If $(PARAM timeout) is 0 or negative, block until the
837  *      future completes.
838  *
839  * Returns: result of the future. When a timeout occurs, it cancels the task
840  *      and raises $(D_PSYMBOL TimeoutException). To avoid the task
841  *      cancellation, wrap it in $(D_SYMBOL shield()).
842  */
843 @Coroutine
844 auto waitFor(Future)(EventLoop eventLoop, Future future,
845     Duration timeout = Duration.zero)
846 if (is(Future : FutureHandle))
847 {
848     if (eventLoop is null)
849         eventLoop = getEventLoop;
850 
851     auto thisTask = TaskHandle.getThis;
852 
853     assert(thisTask !is null);
854 
855     future.addDoneCallback(&thisTask.scheduleStep);
856 
857     scope (exit)
858     {
859         future.removeDoneCallback(&thisTask.scheduleStep);
860     }
861 
862     bool cancelFuture = false;
863     CallbackHandle timeoutCallback;
864 
865     if (timeout > Duration.zero)
866     {
867         timeoutCallback = eventLoop.callLater(timeout, {
868             cancelFuture = true;
869             thisTask.scheduleStep;
870         });
871     }
872 
873     while (true)
874     {
875         TaskHandle.yield;
876 
877         if (cancelFuture)
878         {
879             future.cancel;
880             throw new TimeoutException;
881         }
882 
883         if (future.done)
884             break;
885     }
886 
887     if (timeoutCallback !is null)
888         timeoutCallback.cancel;
889 
890     static if (!is(Future.ResultType == void))
891     {
892         return future.result;
893     }
894 }
895 
896 unittest
897 {
898     auto eventLoop = getEventLoop;
899 
900     int add(int a, int b)
901     {
902         return a + b;
903     }
904 
905     auto task1 = eventLoop.createTask({
906         eventLoop.sleep(10.msecs);
907         return add(10, 11);
908     });
909 
910     auto task2 = eventLoop.createTask({
911         eventLoop.sleep(40.msecs);
912         return add(10, 11);
913     });
914 
915     auto waitTask = eventLoop.createTask({
916         eventLoop.waitFor(task1, 20.msecs);
917         assert(task1.done);
918         assert(task1.result == 21);
919         assert(!task2.done);
920         try
921         {
922             eventLoop.waitFor(task2, 10.msecs);
923             assert(0, "Should not get here");
924         }
925         catch (TimeoutException timeoutException)
926         {
927             eventLoop.sleep(20.msecs);
928             assert(task2.done);
929             assert(task2.cancelled);
930         }
931     });
932 
933     eventLoop.runUntilComplete(waitTask);
934 }
935 
936 /**
937  * A $(D_PSYMBOL GeneratorTask) is a $(D_PSYMBOL Task) that periodically returns
938  * values of type $(D_PSYMBOL T) to the caller via $(D_PSYMBOL yieldValue). This
939  * is represented as an $(D_PSYMBOL InputRange).
940  */
941 class GeneratorTask(T) : Task!(void delegate())
942 {
943     private T* frontValue = null;
944     private TaskHandle waitingTask = null;
945 
946     this(EventLoop eventLoop, void delegate() coroutine)
947     {
948         super(eventLoop, coroutine);
949         addDoneCallback({
950             if (waitingTask)
951             {
952                 waitingTask.scheduleStep;
953                 waitingTask = null;
954             }
955         });
956     }
957 
958     package void setFrontValue(ref T value)
959     {
960         frontValue = &value;
961         if (waitingTask)
962         {
963             waitingTask.scheduleStep;
964             waitingTask = null;
965         }
966     }
967 
968     @Coroutine
969     @property bool empty()
970     {
971         if (done)
972             return true;
973 
974         if (frontValue !is null)
975             return false;
976 
977         auto thisTask = TaskHandle.getThis;
978 
979         assert(thisTask !is null);
980         assert(thisTask !is this, "empty() called in the task routine");
981 
982         if (eventLoop is null)
983             eventLoop = getEventLoop;
984 
985         assert(waitingTask is null, "another Task is already waiting");
986         waitingTask = thisTask;
987         TaskHandle.yield;
988         assert(done || frontValue !is null);
989 
990         return empty;
991     }
992 
993     @property ref T front()
994     {
995         assert(frontValue !is null);
996         return *frontValue;
997     }
998 
999     void popFront()
1000     {
1001         assert(frontValue !is null);
1002         frontValue = null;
1003         scheduleStep;
1004     }
1005 }
1006 
1007 /**
1008  * Yield a value to the caller of the currently executing generator task. The
1009  * type of the yielded value and of type of the generator must be the same;
1010  */
1011 @Coroutine
1012 void yieldValue(T)(auto ref T value)
1013 {
1014     auto generatorTask = cast(GeneratorTask!T) TaskHandle.getThis;
1015     enforce(generatorTask !is null,
1016         "'TaskHandle.yieldValue' called outside of a generator task.");
1017 
1018     generatorTask.setFrontValue(value);
1019 
1020     TaskHandle.yield;
1021 }
1022 
1023 unittest
1024 {
1025     import std.functional : toDelegate;
1026 
1027     auto eventLoop = getEventLoop;
1028     auto task1 = new GeneratorTask!int(eventLoop, {}.toDelegate);
1029     auto testTask = eventLoop.createTask({
1030         assert(task1.empty);
1031     });
1032 
1033     eventLoop.runUntilComplete(testTask);
1034 
1035     auto task2 = new GeneratorTask!int(eventLoop, {
1036         yieldValue(8);
1037         yieldValue(10);
1038     }.toDelegate);
1039 
1040     auto testTask2 = eventLoop.createTask({
1041         int[] array1 = task2.array;
1042 
1043         assert(array1 == [8, 10]);
1044         assert(task2.empty);
1045     });
1046 
1047     eventLoop.runUntilComplete(testTask2);
1048 }