1 /** 2 * Support for tasks, coroutines and the scheduler. 3 */ 4 module asynchronous.tasks; 5 6 import core.thread; 7 import std.algorithm; 8 import std.array; 9 import std.exception; 10 import std.traits; 11 import std.typecons; 12 import asynchronous.events; 13 import asynchronous.futures; 14 import asynchronous.types; 15 16 package class TaskRepository 17 { 18 private static TaskHandle[EventLoop] currentTasks; 19 20 private static TaskHandle[][EventLoop] tasks; 21 22 static TaskHandle[] allTasks(EventLoop eventLoop) 23 in 24 { 25 assert(eventLoop !is null); 26 } 27 body 28 { 29 return tasks.get(eventLoop, null).dup; 30 } 31 32 static TaskHandle currentTask(EventLoop eventLoop) 33 in 34 { 35 assert(eventLoop !is null); 36 } 37 body 38 { 39 return currentTasks.get(eventLoop, null); 40 } 41 42 static void resetCurrentTask(EventLoop eventLoop, 43 TaskHandle taskHandle = null) 44 in 45 { 46 assert(eventLoop !is null); 47 } 48 body 49 { 50 currentTasks[eventLoop] = taskHandle; 51 } 52 53 package static void registerTask(EventLoop eventLoop, TaskHandle taskHandle) 54 in 55 { 56 assert(eventLoop !is null); 57 assert(taskHandle !is null); 58 } 59 body 60 { 61 tasks[eventLoop] ~= taskHandle; 62 } 63 64 package static void unregisterTask(EventLoop eventLoop, 65 TaskHandle taskHandle) 66 in 67 { 68 assert(eventLoop !is null); 69 assert(taskHandle !is null); 70 } 71 body 72 { 73 tasks[eventLoop] = tasks[eventLoop].remove!(a => a is taskHandle); 74 } 75 } 76 77 interface TaskHandle : FutureHandle 78 { 79 protected @property Throwable injectException(); 80 81 protected void run(); 82 83 protected void scheduleStep(); 84 protected void scheduleStep(Throwable throwable); 85 86 /** 87 * Returns: a set of all tasks for an event loop. 88 * 89 * By default all tasks for the current event loop are returned. 90 */ 91 static TaskHandle[] allTasks(EventLoop eventLoop = null) 92 { 93 if (eventLoop is null) 94 eventLoop = getEventLoop; 95 96 return TaskRepository.allTasks(eventLoop); 97 } 98 99 /** 100 * Returns: the currently running task in an event loop or 101 * $(D_KEYWORD null). 102 * 103 * By default the current task for the current event loop is returned. 104 * 105 * $(D_KEYWORD null) is returned when called not in the context of a Task. 106 */ 107 static TaskHandle currentTask(EventLoop eventLoop = null) 108 { 109 if (eventLoop is null) 110 eventLoop = getEventLoop; 111 112 return TaskRepository.currentTask(eventLoop); 113 } 114 115 static TaskHandle getThis() 116 { 117 auto taskFiber = TaskFiber.getThis; 118 119 return taskFiber is null ? null : taskFiber.taskHandle; 120 } 121 122 /** 123 * Forces a context switch to occur away from the calling task. 124 * 125 * Params: 126 * eventLoop = event loop, $(D_PSYMBOL getEventLoop) if not specified. 127 */ 128 @Coroutine 129 package static void yield() 130 { 131 Fiber.yield; 132 133 auto thisTask = getThis; 134 enforce(thisTask !is null, 135 "'TaskHandle.yield' called outside of a task."); 136 137 if (thisTask.injectException !is null) 138 throw thisTask.injectException; 139 } 140 } 141 142 private class TaskFiber : Fiber 143 { 144 private TaskHandle task_ = null; 145 146 this() 147 { 148 super({ }); 149 } 150 151 @property TaskHandle taskHandle() 152 { 153 return task_; 154 } 155 156 void reset(TaskHandle task = null) 157 in 158 { 159 assert(state == (task is null ? Fiber.State.TERM : Fiber.State.HOLD)); 160 } 161 body 162 { 163 task_ = task; 164 if (task is null) 165 super.reset; 166 else 167 super.reset(&(task.run)); 168 } 169 170 static TaskFiber getThis() 171 { 172 return cast(TaskFiber) Fiber.getThis; 173 } 174 } 175 176 177 private static ResourcePool!TaskFiber taskFibers; 178 179 static this() 180 { 181 taskFibers = new ResourcePool!TaskFiber; 182 } 183 184 /** 185 * Schedule the execution of a fiber: wrap it in a future. A task is a 186 * subclass of Future. 187 * 188 * A task is responsible for executing a fiber object in an event loop. If the 189 * wrapped fiber yields from a future, the task suspends the execution of the 190 * wrapped fiber and waits for the completion of the future. When the future 191 * is done, the execution of the wrapped fiber restarts with the result or the 192 * exception of the fiber. 193 * 194 * Event loops use cooperative scheduling: an event loop only runs one task at 195 * a time. Other tasks may run in parallel if other event loops are running in 196 * different threads. While a task waits for the completion of a future, the 197 * event loop executes a new task. 198 * 199 * The cancellation of a task is different from the cancelation of a future. 200 * Calling $(D_PSYMBOL cancel()) will throw a $(D_PSYMBOL CancelledException) 201 * to the wrapped fiber. $(D_PSYMBOL cancelled()) only returns $(D_KEYWORD 202 * true) if the wrapped fiber did not catch the $(D_PSYMBOL 203 * CancelledException), or raised a $(D_PSYMBOL CancelledException). 204 * 205 * If a pending task is destroyed, the execution of its wrapped fiber did not 206 * complete. It is probably a bug and a warning is logged: see Pending task 207 * destroyed. 208 * 209 * Don’t directly create $(D_PSYMBOL Task) instances: use the $(D_PSYMBOL 210 * task()) function or the $(D_PSYMBOL EventLoop.create_task()) method. 211 */ 212 class Task(Coroutine, Args...) : Future!(ReturnType!Coroutine), TaskHandle 213 if (isDelegate!Coroutine) 214 { 215 alias ResultType = ReturnType!Coroutine; 216 217 package TaskFiber fiber; 218 219 private Coroutine coroutine; 220 private Args args; 221 222 private Throwable injectException_; 223 224 protected override @property Throwable injectException() 225 { 226 return this.injectException_; 227 } 228 229 debug (tasks) 230 package string id() 231 { 232 import std.format : format; 233 234 return "%s".format(cast(void*) cast(TaskHandle) this); 235 } 236 237 this(EventLoop eventLoop, Coroutine coroutine, Args args) 238 { 239 super(eventLoop); 240 241 this.coroutine = coroutine; 242 static if (args.length > 0) 243 { 244 this.args = args; 245 } 246 247 this.fiber = taskFibers.acquire(this); 248 TaskRepository.registerTask(this.eventLoop, this); 249 this.eventLoop.callSoon(&step, null); 250 251 debug (tasks) 252 std.stdio.writeln("Create task ", id); 253 } 254 255 override protected void run() 256 { 257 debug (tasks) 258 std.stdio.writeln("Start task ", id); 259 260 static if (is(ResultType : void)) 261 { 262 coroutine(args); 263 setResult; 264 } 265 else 266 { 267 setResult(coroutine(args)); 268 } 269 270 debug (tasks) 271 std.stdio.writeln("End task ", id); 272 } 273 274 /** 275 * Request that this task cancel itself. 276 * 277 * This arranges for a $(D_PSYMBOL CancelledException) to be thrown into 278 * the wrapped coroutine on the next cycle through the event loop. The 279 * coroutine then has a chance to clean up or even deny the request using 280 * try/catch/finally. 281 * 282 * Unlike $(D_PSYMBOL Future.cancel()), this does not guarantee that the 283 * task will be cancelled: the exception might be caught and acted upon, 284 * delaying cancellation of the task or preventing cancellation completely. 285 * The task may also return a value or raise a different exception. 286 * 287 * Immediately after this method is called, $(D_PSYMBOL cancelled()) will 288 * not return $(D_KEYWORD true) (unless the task was already cancelled). A 289 * task will be marked as cancelled when the wrapped coroutine terminates 290 * with a $(D_PSYMBOL CancelledException) (even if cancel() was not 291 * called). 292 */ 293 override bool cancel() 294 { 295 if (done) 296 return false; 297 298 this.injectException_ = new CancelledException; 299 return true; 300 } 301 302 private void step(Throwable throwable = null) 303 { 304 // TODO why rescheduled when it is already terminated? 305 if (this.fiber is null) 306 return; 307 308 final switch (this.fiber.state) 309 { 310 case Fiber.State.HOLD: 311 debug (tasks) 312 std.stdio.writeln("Resume task ", id); 313 314 TaskRepository.resetCurrentTask(this.eventLoop, this); 315 316 if (throwable !is null) 317 this.injectException_ = throwable; 318 319 try 320 { 321 this.fiber.call; 322 } 323 catch (CancelledException cancelledException) 324 { 325 assert(this.fiber.state == Fiber.State.TERM); 326 super.cancel; 327 } 328 catch (Throwable throwable) 329 { 330 assert(this.fiber.state == Fiber.State.TERM); 331 setException(throwable); 332 } 333 334 TaskRepository.resetCurrentTask(this.eventLoop); 335 336 if (this.fiber.state == Fiber.State.TERM) 337 goto case Fiber.State.TERM; 338 339 debug (tasks) 340 std.stdio.writeln("Suspend task ", id); 341 break; 342 case Fiber.State.EXEC: 343 assert(0, "Internal error"); 344 case Fiber.State.TERM: 345 assert(done); 346 TaskRepository.unregisterTask(this.eventLoop, this); 347 taskFibers.release(this.fiber); 348 this.fiber = null; 349 break; 350 } 351 } 352 353 protected override void scheduleStep() 354 { 355 scheduleStep(null); 356 } 357 358 protected override void scheduleStep(Throwable throwable) 359 { 360 debug (tasks) 361 { 362 if (throwable is null) 363 std.stdio.writeln("Schedule task ", id); 364 else 365 std.stdio.writeln("Schedule task ", id, " to throw ", 366 throwable); 367 } 368 369 if (TaskFiber.getThis is null) 370 step(throwable); 371 else 372 this.eventLoop.callSoon(&step, throwable); 373 } 374 375 override string toString() const 376 { 377 import std.format : format; 378 379 return "%s(done: %s, cancelled: %s)".format(typeid(this), done, 380 cancelled); 381 } 382 } 383 384 /** 385 * Return an generator whose values, when waited for, are Future instances in 386 * the order in which and as soon as they complete. 387 * 388 * Raises $(D_PSYMBOL TimeoutException) if the timeout occurs before all futures 389 * are done. 390 * 391 * Example: 392 * 393 * foreach (f; getEventLoop.asCompleted(fs)) 394 * // use f.result 395 * 396 * Note: The futures $(D_PSYMBOL f) are not necessarily members of $(D_PSYMBOL 397 * fs). 398 */ 399 auto asCompleted(EventLoop eventLoop, FutureHandle[] futures, 400 Duration timeout = Duration.zero) 401 in 402 { 403 futures.all!"a !is null"; 404 } 405 body 406 { 407 if (eventLoop is null) 408 eventLoop = getEventLoop; 409 410 import asynchronous.queues : Queue; 411 auto done = new Queue!FutureHandle; 412 CallbackHandle timeoutCallback = null; 413 Tuple!(FutureHandle, void delegate())[] todo; 414 415 foreach (future; futures) 416 { 417 void delegate() doneCallback = (f => { 418 if (todo.empty) 419 return; // timeoutCallback was here first 420 todo = todo.remove!(a => a[0] is f); 421 done.putNowait(f); 422 if (todo.empty && timeoutCallback !is null) 423 timeoutCallback.cancel; 424 })(future); // workaround bug #2043 425 426 todo ~= tuple(future, doneCallback); 427 future.addDoneCallback(doneCallback); 428 } 429 430 if (!todo.empty && timeout > Duration.zero) 431 { 432 timeoutCallback = eventLoop.callLater(timeout, { 433 foreach (future_callback; todo) 434 { 435 future_callback[0].removeDoneCallback(future_callback[1]); 436 future_callback[0].cancel; 437 } 438 if (!todo.empty) 439 done.putNowait(null); 440 todo = null; 441 }); 442 } 443 444 return new GeneratorTask!FutureHandle(eventLoop, { 445 while (!todo.empty || !done.empty) 446 { 447 auto f = done.get; 448 if (f is null) 449 throw new TimeoutException; 450 yieldValue(f); 451 } 452 }); 453 } 454 455 unittest 456 { 457 auto eventLoop = getEventLoop; 458 459 auto task1 = eventLoop.createTask({ 460 eventLoop.sleep(3.msecs); 461 }); 462 auto task2 = eventLoop.createTask({ 463 eventLoop.sleep(2.msecs); 464 }); 465 auto testTask = eventLoop.createTask({ 466 auto tasks = asCompleted(eventLoop, [task1, task2]).array; 467 468 assert(tasks[0] is cast(FutureHandle) task2); 469 assert(tasks[1] is cast(FutureHandle) task1); 470 }); 471 eventLoop.runUntilComplete(testTask); 472 473 task1 = eventLoop.createTask({ 474 eventLoop.sleep(10.msecs); 475 }); 476 task2 = eventLoop.createTask({ 477 eventLoop.sleep(2.msecs); 478 }); 479 testTask = eventLoop.createTask({ 480 auto tasks = asCompleted(eventLoop, [task1, task2], 5.msecs).array; 481 482 assert(tasks.length == 1); 483 assert(tasks[0] is cast(FutureHandle) task2); 484 }); 485 eventLoop.runUntilComplete(testTask); 486 } 487 488 /** 489 * Schedule the execution of a coroutine: wrap it in a future. Return a 490 * $(D_PSYMBOL Task) object. 491 * 492 * If the argument is a $(D_PSYMBOL Future), it is returned directly. 493 */ 494 auto ensureFuture(Coroutine, Args...)(EventLoop eventLoop, Coroutine coroutine, 495 Args args) 496 { 497 static if (is(typeof(coroutine) : FutureHandle)) 498 { 499 static assert(args.length == 0); 500 501 return coroutine; 502 } 503 else 504 { 505 return new Task!(Coroutine, Args)(eventLoop, coroutine, args); 506 } 507 } 508 509 unittest 510 { 511 import std.functional : toDelegate; 512 513 auto task1 = ensureFuture(null, toDelegate({ 514 return 3 + 39; 515 })); 516 517 assert(!task1.done); 518 assert(!task1.cancelled); 519 520 task1.cancel; 521 assert(!task1.cancelled); 522 523 const result = getEventLoop.runUntilComplete(task1); 524 525 assert(task1.done); 526 assert(!task1.cancelled); 527 assert(result == 42); 528 529 assert(task1 == ensureFuture(null, task1)); 530 } 531 532 /** 533 * Create a coroutine that completes after a given time. If $(D_PSYMBOL result) 534 * is provided, it is produced to the caller when the coroutine completes. 535 * 536 * The resolution of the sleep depends on the granularity of the event loop. 537 */ 538 @Coroutine 539 auto sleep(Result...)(EventLoop eventLoop, Duration delay, Result result) 540 if (result.length <= 1) 541 { 542 if (eventLoop is null) 543 eventLoop = getEventLoop; 544 545 static if (result.length == 0) 546 { 547 auto future = new Future!void(eventLoop); 548 auto handle = eventLoop.callLater(delay, 549 &future.setResultUnlessCancelled); 550 } 551 else 552 { 553 auto future = new Future!Result(eventLoop); 554 auto handle = eventLoop.callLater(delay, 555 &future.setResultUnlessCancelled, result); 556 } 557 scope (exit) handle.cancel; 558 559 return eventLoop.waitFor(future); 560 } 561 562 unittest 563 { 564 auto eventLoop = getEventLoop; 565 auto task1 = eventLoop.ensureFuture({ 566 return eventLoop.sleep(5.msecs, "foo"); 567 }); 568 569 eventLoop.runUntilComplete(task1); 570 assert(task1.result == "foo"); 571 } 572 573 /** 574 * Wait for a future, shielding it from cancellation. 575 * 576 * The statement 577 * 578 * $(D_CODE res = shield(something());) 579 * 580 * is exactly equivalent to the statement 581 * 582 * $(D_CODE res = something();) 583 * 584 * $(I except) that if the coroutine containing it is cancelled, the task 585 * running in $(D_PSYMBOL something()) is not cancelled. From the point of view 586 * of $(D_PSYMBOL something()), the cancellation did not happen. But its caller 587 * is still cancelled, so the call still raises $(D_PSYMBOL CancelledException). 588 * Note: If $(D_PSYMBOL something()) is cancelled by other means this will still 589 * cancel $(D_PSYMBOL shield()). 590 * 591 * If you want to completely ignore cancellation (not recommended) you can 592 * combine $(D_PSYMBOL shield()) with a try/except clause, as follows: 593 * 594 * $(D_CODE 595 * try 596 * res = shield(something()); 597 * catch (CancelledException) 598 * res = null; 599 * ) 600 */ 601 auto shield(Coroutine, Args...)(EventLoop eventLoop, Coroutine coroutine, 602 Args args) 603 { 604 auto inner = ensureFuture(eventLoop, coroutine, args); 605 if (inner.done()) 606 return inner; 607 608 auto outer = new Future!(inner.ResultType)(eventLoop); 609 610 void doneCallback() 611 { 612 if (outer.cancelled()) 613 { 614 if (!inner.cancelled()) 615 inner.exception(); // Mark inner's result as retrieved. 616 return; 617 } 618 619 if (inner.cancelled()) 620 { 621 outer.cancel(); 622 } 623 else 624 { 625 auto exception = inner.exception(); 626 if (exception !is null) 627 { 628 outer.setException(exception); 629 } 630 else 631 { 632 static if (!is(T == void)) 633 { 634 outer.setResult(inner.result); 635 } 636 else 637 { 638 outer.setResult; 639 } 640 } 641 } 642 } 643 644 inner.addDoneCallback(&doneCallback); 645 return outer; 646 } 647 648 unittest 649 { 650 int add(int a, int b) 651 { 652 return a + b; 653 } 654 655 auto eventLoop = getEventLoop; 656 657 auto task1 = eventLoop.ensureFuture(&add, 3, 4); 658 auto future1 = eventLoop.shield(task1); 659 660 eventLoop.runUntilComplete(future1); 661 assert(future1.result == 7); 662 663 task1 = eventLoop.ensureFuture(&add, 3, 4); 664 future1 = eventLoop.shield(task1); 665 future1.cancel; 666 667 eventLoop.runUntilComplete(task1); 668 assert(future1.cancelled); 669 assert(task1.result == 7); 670 } 671 672 673 enum ReturnWhen 674 { 675 FIRST_COMPLETED, 676 FIRST_EXCEPTION, 677 ALL_COMPLETED, 678 } 679 680 /** 681 * Wait for the Future instances given by $(PARAM futures) to complete. 682 * 683 * Params: 684 * eventLoop = event loop, $(D_PSYMBOL getEventLoop) if not specified. 685 * 686 * futures = futures to wait for. 687 * 688 * timeout = can be used to control the maximum number of seconds to wait 689 * before returning. If $(PARAM timeout) is 0 or negative there is no limit 690 * to the wait time. 691 * 692 * returnWhen = indicates when this function should return. It must be one of 693 * the following constants: 694 * FIRST_COMPLETED The function will return when any future finishes or 695 * is cancelled. 696 * FIRST_EXCEPTION The function will return when any future finishes by 697 * raising an exception. If no future raises an exception then it 698 * is equivalent to ALL_COMPLETED. 699 * ALL_COMPLETED The function will return when all futures finish or 700 * are cancelled. 701 * 702 * Returns: a named 2-tuple of arrays. The first array, named $(D_PSYMBOL done), 703 * contains the futures that completed (finished or were cancelled) before 704 * the wait completed. The second array, named $(D_PSYMBOL notDone), 705 * contains uncompleted futures. 706 */ 707 @Coroutine 708 auto wait(Future)(EventLoop eventLoop, Future[] futures, 709 Duration timeout = Duration.zero, 710 ReturnWhen returnWhen = ReturnWhen.ALL_COMPLETED) 711 if (is(Future : FutureHandle)) 712 { 713 if (eventLoop is null) 714 eventLoop = getEventLoop; 715 716 auto thisTask = TaskHandle.getThis; 717 718 assert(thisTask !is null); 719 if (futures.empty) 720 { 721 thisTask.scheduleStep; 722 TaskHandle.yield; 723 724 return Tuple!(Future[], "done", Future[], "notDone")(null, null); 725 } 726 727 foreach (future; futures) 728 { 729 future.addDoneCallback(&thisTask.scheduleStep); 730 } 731 732 scope (exit) 733 { 734 foreach (future; futures) 735 { 736 future.removeDoneCallback(&thisTask.scheduleStep); 737 } 738 } 739 740 bool completed = false; 741 CallbackHandle timeoutCallback; 742 743 if (timeout > Duration.zero) 744 { 745 timeoutCallback = eventLoop.callLater(timeout, { 746 completed = true; 747 thisTask.scheduleStep; 748 }); 749 } 750 751 while (!completed) 752 { 753 TaskHandle.yield; 754 755 if (completed) 756 break; 757 758 final switch (returnWhen) 759 { 760 case ReturnWhen.FIRST_COMPLETED: 761 completed = futures.any!"a.done"; 762 break; 763 case ReturnWhen.FIRST_EXCEPTION: 764 completed = futures.any!"a.exception !is null" || 765 futures.all!"a.done"; 766 break; 767 case ReturnWhen.ALL_COMPLETED: 768 completed = futures.all!"a.done"; 769 break; 770 } 771 } 772 773 if (timeoutCallback !is null) 774 timeoutCallback.cancel; 775 776 Tuple!(Future[], "done", Future[], "notDone") result; 777 778 result.done = futures.filter!"a.done".array; 779 result.notDone = futures.filter!"!a.done".array; 780 781 return result; 782 } 783 784 unittest 785 { 786 auto eventLoop = getEventLoop; 787 788 int add(int a, int b) 789 { 790 return a + b; 791 } 792 793 auto tasks = [ 794 eventLoop.createTask({ 795 eventLoop.sleep(10.msecs); 796 return add(10, 11); 797 }), 798 eventLoop.createTask({ 799 eventLoop.sleep(20.msecs); 800 return add(11, 12); 801 }), 802 eventLoop.createTask({ 803 eventLoop.sleep(60.msecs); 804 return add(12, 13); 805 }), 806 ]; 807 808 auto waitTask = eventLoop.createTask({ 809 return eventLoop.wait(tasks, 50.msecs); 810 }); 811 812 const result = eventLoop.runUntilComplete(waitTask); 813 814 assert(tasks[0].done); 815 assert(tasks[1].done); 816 assert(!tasks[2].done); 817 818 assert(tasks[0].result == 21); 819 assert(tasks[1].result == 23); 820 821 assert(result.done == tasks[0 .. 2]); 822 assert(result.notDone == tasks[2 .. $]); 823 } 824 825 826 /** 827 * Wait for the single Future to complete with timeout. If timeout is 0 or 828 * negative, block until the future completes. 829 * 830 * Params: 831 * eventLoop = event loop, $(D_PSYMBOL getEventLoop) if not specified. 832 * 833 * future = future to wait for. 834 * 835 * timeout = can be used to control the maximum number of seconds to wait 836 * before returning. If $(PARAM timeout) is 0 or negative, block until the 837 * future completes. 838 * 839 * Returns: result of the future. When a timeout occurs, it cancels the task 840 * and raises $(D_PSYMBOL TimeoutException). To avoid the task 841 * cancellation, wrap it in $(D_SYMBOL shield()). 842 */ 843 @Coroutine 844 auto waitFor(Future)(EventLoop eventLoop, Future future, 845 Duration timeout = Duration.zero) 846 if (is(Future : FutureHandle)) 847 { 848 if (eventLoop is null) 849 eventLoop = getEventLoop; 850 851 auto thisTask = TaskHandle.getThis; 852 853 assert(thisTask !is null); 854 855 future.addDoneCallback(&thisTask.scheduleStep); 856 857 scope (exit) 858 { 859 future.removeDoneCallback(&thisTask.scheduleStep); 860 } 861 862 bool cancelFuture = false; 863 CallbackHandle timeoutCallback; 864 865 if (timeout > Duration.zero) 866 { 867 timeoutCallback = eventLoop.callLater(timeout, { 868 cancelFuture = true; 869 thisTask.scheduleStep; 870 }); 871 } 872 873 while (true) 874 { 875 TaskHandle.yield; 876 877 if (cancelFuture) 878 { 879 future.cancel; 880 throw new TimeoutException; 881 } 882 883 if (future.done) 884 break; 885 } 886 887 if (timeoutCallback !is null) 888 timeoutCallback.cancel; 889 890 static if (!is(Future.ResultType == void)) 891 { 892 return future.result; 893 } 894 } 895 896 unittest 897 { 898 auto eventLoop = getEventLoop; 899 900 int add(int a, int b) 901 { 902 return a + b; 903 } 904 905 auto task1 = eventLoop.createTask({ 906 eventLoop.sleep(10.msecs); 907 return add(10, 11); 908 }); 909 910 auto task2 = eventLoop.createTask({ 911 eventLoop.sleep(40.msecs); 912 return add(10, 11); 913 }); 914 915 auto waitTask = eventLoop.createTask({ 916 eventLoop.waitFor(task1, 20.msecs); 917 assert(task1.done); 918 assert(task1.result == 21); 919 assert(!task2.done); 920 try 921 { 922 eventLoop.waitFor(task2, 10.msecs); 923 assert(0, "Should not get here"); 924 } 925 catch (TimeoutException timeoutException) 926 { 927 eventLoop.sleep(20.msecs); 928 assert(task2.done); 929 assert(task2.cancelled); 930 } 931 }); 932 933 eventLoop.runUntilComplete(waitTask); 934 } 935 936 /** 937 * A $(D_PSYMBOL GeneratorTask) is a $(D_PSYMBOL Task) that periodically returns 938 * values of type $(D_PSYMBOL T) to the caller via $(D_PSYMBOL yieldValue). This 939 * is represented as an $(D_PSYMBOL InputRange). 940 */ 941 class GeneratorTask(T) : Task!(void delegate()) 942 { 943 private T* frontValue = null; 944 private TaskHandle waitingTask = null; 945 946 this(EventLoop eventLoop, void delegate() coroutine) 947 { 948 super(eventLoop, coroutine); 949 addDoneCallback({ 950 if (waitingTask) 951 { 952 waitingTask.scheduleStep; 953 waitingTask = null; 954 } 955 }); 956 } 957 958 package void setFrontValue(ref T value) 959 { 960 frontValue = &value; 961 if (waitingTask) 962 { 963 waitingTask.scheduleStep; 964 waitingTask = null; 965 } 966 } 967 968 @Coroutine 969 @property bool empty() 970 { 971 if (done) 972 return true; 973 974 if (frontValue !is null) 975 return false; 976 977 auto thisTask = TaskHandle.getThis; 978 979 assert(thisTask !is null); 980 assert(thisTask !is this, "empty() called in the task routine"); 981 982 if (eventLoop is null) 983 eventLoop = getEventLoop; 984 985 assert(waitingTask is null, "another Task is already waiting"); 986 waitingTask = thisTask; 987 TaskHandle.yield; 988 assert(done || frontValue !is null); 989 990 return empty; 991 } 992 993 @property ref T front() 994 { 995 assert(frontValue !is null); 996 return *frontValue; 997 } 998 999 void popFront() 1000 { 1001 assert(frontValue !is null); 1002 frontValue = null; 1003 scheduleStep; 1004 } 1005 } 1006 1007 /** 1008 * Yield a value to the caller of the currently executing generator task. The 1009 * type of the yielded value and of type of the generator must be the same; 1010 */ 1011 @Coroutine 1012 void yieldValue(T)(auto ref T value) 1013 { 1014 auto generatorTask = cast(GeneratorTask!T) TaskHandle.getThis; 1015 enforce(generatorTask !is null, 1016 "'TaskHandle.yieldValue' called outside of a generator task."); 1017 1018 generatorTask.setFrontValue(value); 1019 1020 TaskHandle.yield; 1021 } 1022 1023 unittest 1024 { 1025 import std.functional : toDelegate; 1026 1027 auto eventLoop = getEventLoop; 1028 auto task1 = new GeneratorTask!int(eventLoop, {}.toDelegate); 1029 auto testTask = eventLoop.createTask({ 1030 assert(task1.empty); 1031 }); 1032 1033 eventLoop.runUntilComplete(testTask); 1034 1035 auto task2 = new GeneratorTask!int(eventLoop, { 1036 yieldValue(8); 1037 yieldValue(10); 1038 }.toDelegate); 1039 1040 auto testTask2 = eventLoop.createTask({ 1041 int[] array1 = task2.array; 1042 1043 assert(array1 == [8, 10]); 1044 assert(task2.empty); 1045 }); 1046 1047 eventLoop.runUntilComplete(testTask2); 1048 }