1 /** 2 * Support for tasks, coroutines and the scheduler. 3 * 4 * Copyright: © 2015-2016 Dragos Carp 5 * License: Boost Software License - Version 1.0 6 * Authors: Dragos Carp 7 */ 8 module asynchronous.tasks; 9 10 import core.thread; 11 import std.algorithm; 12 import std.array; 13 import std.exception; 14 import std.traits; 15 import std.typecons; 16 import asynchronous.events; 17 import asynchronous.futures; 18 import asynchronous.types; 19 20 package class TaskRepository 21 { 22 private static TaskHandle[EventLoop] currentTasks; 23 24 private static TaskHandle[][EventLoop] tasks; 25 26 static TaskHandle[] allTasks(EventLoop eventLoop) 27 in 28 { 29 assert(eventLoop !is null); 30 } 31 body 32 { 33 return tasks.get(eventLoop, null).dup; 34 } 35 36 static TaskHandle currentTask(EventLoop eventLoop) 37 in 38 { 39 assert(eventLoop !is null); 40 } 41 body 42 { 43 return currentTasks.get(eventLoop, null); 44 } 45 46 static void resetCurrentTask(EventLoop eventLoop, 47 TaskHandle taskHandle = null) 48 in 49 { 50 assert(eventLoop !is null); 51 } 52 body 53 { 54 currentTasks[eventLoop] = taskHandle; 55 } 56 57 package static void registerTask(EventLoop eventLoop, TaskHandle taskHandle) 58 in 59 { 60 assert(eventLoop !is null); 61 assert(taskHandle !is null); 62 } 63 body 64 { 65 tasks[eventLoop] ~= taskHandle; 66 } 67 68 package static void unregisterTask(EventLoop eventLoop, 69 TaskHandle taskHandle) 70 in 71 { 72 assert(eventLoop !is null); 73 assert(taskHandle !is null); 74 } 75 body 76 { 77 tasks[eventLoop] = tasks[eventLoop].remove!(a => a is taskHandle); 78 } 79 } 80 81 interface TaskHandle : FutureHandle 82 { 83 protected @property Throwable injectException(); 84 85 protected void run(); 86 87 package void scheduleStep() 88 { 89 scheduleStepImpl(cast(Throwable) null); 90 } 91 92 package void scheduleStep(Throwable throwable) 93 { 94 scheduleStepImpl(throwable); 95 } 96 97 // Convenience overload for calling in addDoneCallback. 98 // It ignores the provided argument. 99 package void scheduleStep(FutureHandle future) 100 { 101 scheduleStepImpl(cast(Throwable) null); 102 } 103 104 protected void scheduleStepImpl(Throwable throwable); 105 106 /** 107 * Returns: a set of all tasks for an event loop. 108 * 109 * By default all tasks for the current event loop are returned. 110 */ 111 static TaskHandle[] allTasks(EventLoop eventLoop = null) 112 { 113 if (eventLoop is null) 114 eventLoop = getEventLoop; 115 116 return TaskRepository.allTasks(eventLoop); 117 } 118 119 /** 120 * Returns: the currently running task in an event loop or 121 * $(D_KEYWORD null). 122 * 123 * By default the current task for the current event loop is returned. 124 * 125 * $(D_KEYWORD null) is returned when called not in the context of a Task. 126 */ 127 static TaskHandle currentTask(EventLoop eventLoop = null) 128 { 129 if (eventLoop is null) 130 eventLoop = getEventLoop; 131 132 return TaskRepository.currentTask(eventLoop); 133 } 134 135 static TaskHandle getThis() 136 { 137 auto taskFiber = TaskFiber.getThis; 138 139 return taskFiber is null ? null : taskFiber.taskHandle; 140 } 141 142 /** 143 * Forces a context switch to occur away from the calling task. 144 * 145 * Params: 146 * eventLoop = event loop, $(D_PSYMBOL getEventLoop) if not specified. 147 */ 148 @Coroutine 149 package static void yield() 150 { 151 Fiber.yield; 152 153 auto thisTask = getThis; 154 enforce(thisTask !is null, 155 "'TaskHandle.yield' called outside of a task."); 156 157 if (thisTask.injectException !is null) 158 throw thisTask.injectException; 159 } 160 } 161 162 private class TaskFiber : Fiber 163 { 164 private TaskHandle task_ = null; 165 166 this() 167 { 168 super({ }, 16 * 4096UL); 169 } 170 171 @property TaskHandle taskHandle() 172 { 173 return task_; 174 } 175 176 void reset(TaskHandle task = null) 177 in 178 { 179 assert(state == (task is null ? Fiber.State.TERM : Fiber.State.HOLD)); 180 } 181 body 182 { 183 task_ = task; 184 if (task is null) 185 super.reset; 186 else 187 super.reset(&(task.run)); 188 } 189 190 static TaskFiber getThis() 191 { 192 return cast(TaskFiber) Fiber.getThis; 193 } 194 } 195 196 197 private static ResourcePool!TaskFiber taskFibers; 198 199 static this() 200 { 201 taskFibers = new ResourcePool!TaskFiber; 202 } 203 204 /** 205 * Schedule the execution of a fiber: wrap it in a future. A task is a 206 * subclass of Future. 207 * 208 * A task is responsible for executing a fiber object in an event loop. If the 209 * wrapped fiber yields from a future, the task suspends the execution of the 210 * wrapped fiber and waits for the completion of the future. When the future 211 * is done, the execution of the wrapped fiber restarts with the result or the 212 * exception of the fiber. 213 * 214 * Event loops use cooperative scheduling: an event loop only runs one task at 215 * a time. Other tasks may run in parallel if other event loops are running in 216 * different threads. While a task waits for the completion of a future, the 217 * event loop executes a new task. 218 * 219 * The cancellation of a task is different from the cancellation of a future. 220 * Calling $(D_PSYMBOL cancel()) will throw a $(D_PSYMBOL CancelledException) 221 * to the wrapped fiber. $(D_PSYMBOL cancelled()) only returns $(D_KEYWORD 222 * true) if the wrapped fiber did not catch the $(D_PSYMBOL 223 * CancelledException), or raised a $(D_PSYMBOL CancelledException). 224 * 225 * If a pending task is destroyed, the execution of its wrapped fiber did not 226 * complete. It is probably a bug and a warning is logged: see Pending task 227 * destroyed. 228 * 229 * Don’t directly create $(D_PSYMBOL Task) instances: use the $(D_PSYMBOL 230 * task()) function or the $(D_PSYMBOL EventLoop.create_task()) method. 231 */ 232 class Task(Coroutine, Args...) : Future!(ReturnType!Coroutine), TaskHandle 233 if (isDelegate!Coroutine) 234 { 235 alias ResultType = ReturnType!Coroutine; 236 237 package TaskFiber fiber; 238 239 private Coroutine coroutine; 240 private Args args; 241 242 private Throwable _injectException; 243 244 protected override @property Throwable injectException() 245 { 246 return this._injectException; 247 } 248 249 debug (tasks) 250 package string id() 251 { 252 import std.format : format; 253 254 return "%s".format(cast(void*) cast(TaskHandle) this); 255 } 256 257 this(EventLoop eventLoop, Coroutine coroutine, Args args) 258 { 259 super(eventLoop); 260 261 this.coroutine = coroutine; 262 static if (args.length > 0) 263 { 264 this.args = args; 265 } 266 267 this.fiber = taskFibers.acquire(this); 268 TaskRepository.registerTask(this.eventLoop, this); 269 this.eventLoop.callSoon(&step, null); 270 271 debug (tasks) 272 std.stdio.writeln("Create task ", id); 273 } 274 275 override protected void run() 276 { 277 debug (tasks) 278 std.stdio.writeln("Start task ", id); 279 280 static if (is(ResultType == void)) 281 { 282 coroutine(args); 283 setResult; 284 } 285 else 286 { 287 setResult(coroutine(args)); 288 } 289 290 debug (tasks) 291 std.stdio.writeln("End task ", id); 292 } 293 294 /** 295 * Request that this task cancel itself. 296 * 297 * This arranges for a $(D_PSYMBOL CancelledException) to be thrown into 298 * the wrapped coroutine on the next cycle through the event loop. The 299 * coroutine then has a chance to clean up or even deny the request using 300 * try/catch/finally. 301 * 302 * Unlike $(D_PSYMBOL Future.cancel()), this does not guarantee that the 303 * task will be cancelled: the exception might be caught and acted upon, 304 * delaying cancellation of the task or preventing cancellation completely. 305 * The task may also return a value or raise a different exception. 306 * 307 * Immediately after this method is called, $(D_PSYMBOL cancelled()) will 308 * not return $(D_KEYWORD true) (unless the task was already cancelled). A 309 * task will be marked as cancelled when the wrapped coroutine terminates 310 * with a $(D_PSYMBOL CancelledException) (even if cancel() was not 311 * called). 312 */ 313 override bool cancel() 314 { 315 if (done) 316 return false; 317 318 this._injectException = new CancelledException; 319 return true; 320 } 321 322 private void step(Throwable throwable = null) 323 { 324 // TODO why rescheduled when it is already terminated? 325 if (this.fiber is null) 326 return; 327 328 final switch (this.fiber.state) 329 { 330 case Fiber.State.HOLD: 331 debug (tasks) 332 std.stdio.writeln("Resume task ", id); 333 334 TaskRepository.resetCurrentTask(this.eventLoop, this); 335 336 if (throwable !is null) 337 this._injectException = throwable; 338 339 try 340 { 341 this.fiber.call; 342 } 343 catch (CancelledException cancelledException) 344 { 345 assert(this.fiber.state == Fiber.State.TERM); 346 super.cancel; 347 } 348 catch (Throwable throwable) 349 { 350 assert(this.fiber.state == Fiber.State.TERM); 351 setException(throwable); 352 } 353 354 TaskRepository.resetCurrentTask(this.eventLoop); 355 356 if (this.fiber.state == Fiber.State.TERM) 357 goto case Fiber.State.TERM; 358 359 debug (tasks) 360 std.stdio.writeln("Suspend task ", id); 361 break; 362 case Fiber.State.EXEC: 363 assert(0, "Internal error"); 364 case Fiber.State.TERM: 365 assert(done); 366 TaskRepository.unregisterTask(this.eventLoop, this); 367 taskFibers.release(this.fiber); 368 this.fiber = null; 369 break; 370 } 371 } 372 373 protected override void scheduleStepImpl(Throwable throwable) 374 { 375 debug (tasks) 376 { 377 if (throwable is null) 378 std.stdio.writeln("Schedule task ", id); 379 else 380 std.stdio.writeln("Schedule task ", id, " to throw ", 381 throwable); 382 } 383 384 if (TaskFiber.getThis is null) 385 step(throwable); 386 else 387 this.eventLoop.callSoon(&step, throwable); 388 } 389 390 override string toString() const 391 { 392 import std.format : format; 393 394 return "%s(done: %s, cancelled: %s)".format(typeid(this), done, 395 cancelled); 396 } 397 } 398 399 unittest 400 { 401 import std.functional : toDelegate; 402 403 auto testCoroutine = (int i) { 404 if (i > 0) 405 throw new Exception("bar"); 406 return "foo"; 407 }; 408 auto eventLoop = getEventLoop; 409 auto testTask = eventLoop.createTask(testCoroutine.toDelegate, 0); 410 eventLoop.runUntilComplete(testTask); 411 assert(testTask.done); 412 assert(testTask.result == "foo"); 413 414 testTask = eventLoop.createTask(testCoroutine.toDelegate, 1); 415 assertThrown!Exception(eventLoop.runUntilComplete(testTask)); 416 assert(testTask.done); 417 assert(testTask.exception !is null); 418 assert(testTask.exception.msg == "bar"); 419 } 420 421 /** 422 * Return a generator whose values, when waited for, are Future instances in 423 * the order they complete. 424 * 425 * Raises $(D_PSYMBOL TimeoutException) if the timeout occurs before all futures 426 * are done. 427 * 428 * Example: 429 * 430 * foreach (f; getEventLoop.asCompleted(fs)) 431 * // use f.result 432 * 433 * Note: The futures $(D_PSYMBOL f) are not necessarily members of $(D_PSYMBOL 434 * fs). 435 */ 436 auto asCompleted(EventLoop eventLoop, FutureHandle[] futures, 437 Duration timeout = Duration.zero) 438 in 439 { 440 futures.all!"a !is null"; 441 } 442 body 443 { 444 if (eventLoop is null) 445 eventLoop = getEventLoop; 446 447 import asynchronous.queues : Queue; 448 auto done = new Queue!FutureHandle; 449 CallbackHandle timeoutCallback = null; 450 Tuple!(FutureHandle, void delegate(FutureHandle))[] todo; 451 452 foreach (future; futures) 453 { 454 auto doneCallback = (FutureHandle f) { 455 return (FutureHandle _) { 456 if (todo.empty) 457 return; // timeoutCallback was here first 458 todo = todo.remove!(a => a[0] is f); 459 done.putNowait(f); 460 if (todo.empty && timeoutCallback !is null) 461 timeoutCallback.cancel; 462 }; 463 }(future); // workaround bug #2043; 464 465 todo ~= tuple(future, doneCallback); 466 future.addDoneCallback(doneCallback); 467 } 468 469 if (!todo.empty && timeout > Duration.zero) 470 { 471 timeoutCallback = eventLoop.callLater(timeout, { 472 foreach (future_callback; todo) 473 { 474 future_callback[0].removeDoneCallback(future_callback[1]); 475 future_callback[0].cancel; 476 } 477 if (!todo.empty) 478 done.putNowait(null); 479 todo = null; 480 }); 481 } 482 483 return new GeneratorTask!FutureHandle(eventLoop, { 484 while (!todo.empty || !done.empty) 485 { 486 auto f = done.get; 487 if (f is null) 488 throw new TimeoutException; 489 yieldValue(f); 490 } 491 }); 492 } 493 494 unittest 495 { 496 auto eventLoop = getEventLoop; 497 auto task1 = eventLoop.createTask({ 498 eventLoop.sleep(3.msecs); 499 }); 500 auto task2 = eventLoop.createTask({ 501 eventLoop.sleep(2.msecs); 502 }); 503 auto testTask = eventLoop.createTask({ 504 auto tasks = asCompleted(eventLoop, [task1, task2]).array; 505 506 assert(tasks[0] is cast(FutureHandle) task2); 507 assert(tasks[1] is cast(FutureHandle) task1); 508 }); 509 eventLoop.runUntilComplete(testTask); 510 511 task1 = eventLoop.createTask({ 512 eventLoop.sleep(10.msecs); 513 }); 514 task2 = eventLoop.createTask({ 515 eventLoop.sleep(2.msecs); 516 }); 517 testTask = eventLoop.createTask({ 518 auto tasks = asCompleted(eventLoop, [task1, task2], 5.msecs).array; 519 520 assert(tasks.length == 1); 521 assert(tasks[0] is cast(FutureHandle) task2); 522 }); 523 eventLoop.runUntilComplete(testTask); 524 } 525 526 /** 527 * Schedule the execution of a coroutine: wrap it in a future. Return a 528 * $(D_PSYMBOL Task) object. 529 * 530 * If the argument is a $(D_PSYMBOL Future), it is returned directly. 531 */ 532 auto ensureFuture(Coroutine, Args...)(EventLoop eventLoop, Coroutine coroutine, 533 Args args) 534 { 535 static if (is(typeof(coroutine) : FutureHandle)) 536 { 537 static assert(args.length == 0); 538 539 return coroutine; 540 } 541 else 542 { 543 return new Task!(Coroutine, Args)(eventLoop, coroutine, args); 544 } 545 } 546 547 unittest 548 { 549 import std.functional : toDelegate; 550 551 auto task1 = ensureFuture(null, toDelegate({ 552 return 3 + 39; 553 })); 554 555 assert(!task1.done); 556 assert(!task1.cancelled); 557 558 task1.cancel; 559 assert(!task1.cancelled); 560 561 const result = getEventLoop.runUntilComplete(task1); 562 563 assert(task1.done); 564 assert(!task1.cancelled); 565 assert(result == 42); 566 567 assert(task1 == ensureFuture(null, task1)); 568 } 569 570 /** 571 * Create a coroutine that completes after a given time. If $(D_PSYMBOL result) 572 * is provided, it is produced to the caller when the coroutine completes. 573 * 574 * The resolution of the sleep depends on the granularity of the event loop. 575 */ 576 @Coroutine 577 auto sleep(Result...)(EventLoop eventLoop, Duration delay, Result result) 578 if (result.length <= 1) 579 { 580 if (eventLoop is null) 581 eventLoop = getEventLoop; 582 583 static if (result.length == 0) 584 { 585 auto future = new Future!void(eventLoop); 586 auto handle = eventLoop.callLater(delay, 587 &future.setResultUnlessCancelled); 588 } 589 else 590 { 591 auto future = new Future!Result(eventLoop); 592 auto handle = eventLoop.callLater(delay, 593 &future.setResultUnlessCancelled, result); 594 } 595 scope (exit) handle.cancel; 596 597 return eventLoop.waitFor(future); 598 } 599 600 unittest 601 { 602 auto eventLoop = getEventLoop; 603 auto task1 = eventLoop.ensureFuture({ 604 return eventLoop.sleep(5.msecs, "foo"); 605 }); 606 607 eventLoop.runUntilComplete(task1); 608 assert(task1.result == "foo"); 609 } 610 611 /** 612 * Wait for a future, shielding it from cancellation. 613 * 614 * The statement 615 * 616 * $(D_CODE res = shield(something());) 617 * 618 * is exactly equivalent to the statement 619 * 620 * $(D_CODE res = something();) 621 * 622 * $(I except) that if the coroutine containing it is cancelled, the task 623 * running in $(D_PSYMBOL something()) is not cancelled. From the point of view 624 * of $(D_PSYMBOL something()), the cancellation did not happen. But its caller 625 * is still cancelled, so the call still raises $(D_PSYMBOL CancelledException). 626 * Note: If $(D_PSYMBOL something()) is cancelled by other means this will still 627 * cancel $(D_PSYMBOL shield()). 628 * 629 * If you want to completely ignore cancellation (not recommended) you can 630 * combine $(D_PSYMBOL shield()) with a try/catch clause, as follows: 631 * 632 * $(D_CODE 633 * try 634 * res = shield(something()); 635 * catch (CancelledException) 636 * res = null; 637 * ) 638 */ 639 auto shield(Coroutine, Args...)(EventLoop eventLoop, Coroutine coroutine, 640 Args args) 641 { 642 auto inner = ensureFuture(eventLoop, coroutine, args); 643 if (inner.done()) 644 return inner; 645 646 auto outer = new Future!(inner.ResultType)(eventLoop); 647 648 void doneCallback(FutureHandle _) 649 { 650 if (outer.cancelled()) 651 { 652 if (!inner.cancelled()) 653 inner.exception(); // Mark inner's result as retrieved. 654 return; 655 } 656 657 if (inner.cancelled()) 658 { 659 outer.cancel(); 660 } 661 else 662 { 663 auto exception = inner.exception(); 664 if (exception !is null) 665 { 666 outer.setException(exception); 667 } 668 else 669 { 670 static if (is(inner.ResultType == void)) 671 { 672 outer.setResult; 673 } 674 else 675 { 676 outer.setResult(inner.result); 677 } 678 } 679 } 680 } 681 682 inner.addDoneCallback(&doneCallback); 683 return outer; 684 } 685 686 unittest 687 { 688 import std.functional : toDelegate; 689 690 int add(int a, int b) 691 { 692 return a + b; 693 } 694 695 auto eventLoop = getEventLoop; 696 697 auto task1 = eventLoop.ensureFuture(&add, 3, 4); 698 auto future1 = eventLoop.shield(task1); 699 700 eventLoop.runUntilComplete(future1); 701 assert(future1.result == 7); 702 703 task1 = eventLoop.ensureFuture(&add, 3, 4); 704 future1 = eventLoop.shield(task1); 705 future1.cancel; 706 707 eventLoop.runUntilComplete(task1); 708 assert(future1.cancelled); 709 assert(task1.result == 7); 710 711 auto task2 = eventLoop.ensureFuture({ }.toDelegate); 712 auto future2 = eventLoop.shield(task2); 713 future2.cancel; 714 715 eventLoop.runUntilComplete(task2); 716 assert(future2.cancelled); 717 assert(task2.done); 718 } 719 720 721 enum ReturnWhen 722 { 723 FIRST_COMPLETED, 724 FIRST_EXCEPTION, 725 ALL_COMPLETED, 726 } 727 728 /** 729 * Wait for the Future instances given by $(PARAM futures) to complete. 730 * 731 * Params: 732 * eventLoop = event loop, $(D_PSYMBOL getEventLoop) if not specified. 733 * 734 * futures = futures to wait for. 735 * 736 * timeout = can be used to control the maximum number of seconds to wait 737 * before returning. If $(PARAM timeout) is 0 or negative there is no limit 738 * to the wait time. 739 * 740 * returnWhen = indicates when this function should return. It must be one of 741 * the following constants: 742 * FIRST_COMPLETED The function will return when any future finishes or 743 * is cancelled. 744 * FIRST_EXCEPTION The function will return when any future finishes by 745 * raising an exception. If no future raises an exception then it 746 * is equivalent to ALL_COMPLETED. 747 * ALL_COMPLETED The function will return when all futures finish or 748 * are cancelled. 749 * 750 * Returns: a named 2-tuple of arrays. The first array, named $(D_PSYMBOL done), 751 * contains the futures that completed (finished or were cancelled) before 752 * the wait completed. The second array, named $(D_PSYMBOL notDone), 753 * contains uncompleted futures. 754 */ 755 @Coroutine 756 auto wait(Future)(EventLoop eventLoop, Future[] futures, 757 Duration timeout = Duration.zero, 758 ReturnWhen returnWhen = ReturnWhen.ALL_COMPLETED) 759 if (is(Future : FutureHandle)) 760 { 761 if (eventLoop is null) 762 eventLoop = getEventLoop; 763 764 auto thisTask = TaskHandle.getThis; 765 766 assert(thisTask !is null); 767 if (futures.empty) 768 { 769 thisTask.scheduleStep; 770 TaskHandle.yield; 771 772 return Tuple!(Future[], "done", Future[], "notDone")(null, null); 773 } 774 775 foreach (future; futures) 776 { 777 future.addDoneCallback(&thisTask.scheduleStep); 778 } 779 780 scope (exit) 781 { 782 foreach (future; futures) 783 { 784 future.removeDoneCallback(&thisTask.scheduleStep); 785 } 786 } 787 788 bool completed = false; 789 CallbackHandle timeoutCallback; 790 791 if (timeout > Duration.zero) 792 { 793 timeoutCallback = eventLoop.callLater(timeout, { 794 completed = true; 795 thisTask.scheduleStep; 796 }); 797 } 798 799 while (!completed) 800 { 801 TaskHandle.yield; 802 803 if (completed) 804 break; 805 806 final switch (returnWhen) 807 { 808 case ReturnWhen.FIRST_COMPLETED: 809 completed = futures.any!"a.done"; 810 break; 811 case ReturnWhen.FIRST_EXCEPTION: 812 completed = futures.any!"a.exception !is null" || 813 futures.all!"a.done"; 814 break; 815 case ReturnWhen.ALL_COMPLETED: 816 completed = futures.all!"a.done"; 817 break; 818 } 819 } 820 821 if (timeoutCallback !is null) 822 timeoutCallback.cancel; 823 824 Tuple!(Future[], "done", Future[], "notDone") result; 825 826 result.done = futures.filter!"a.done".array; 827 result.notDone = futures.filter!"!a.done".array; 828 829 return result; 830 } 831 832 unittest 833 { 834 auto eventLoop = getEventLoop; 835 836 int add(int a, int b) 837 { 838 return a + b; 839 } 840 841 auto tasks = [ 842 eventLoop.createTask({ 843 eventLoop.sleep(10.msecs); 844 return add(10, 11); 845 }), 846 eventLoop.createTask({ 847 eventLoop.sleep(20.msecs); 848 return add(11, 12); 849 }), 850 eventLoop.createTask({ 851 eventLoop.sleep(60.msecs); 852 return add(12, 13); 853 }), 854 ]; 855 856 auto waitTask = eventLoop.createTask({ 857 return eventLoop.wait(tasks, 50.msecs); 858 }); 859 860 const result = eventLoop.runUntilComplete(waitTask); 861 862 assert(tasks[0].done); 863 assert(tasks[1].done); 864 assert(!tasks[2].done); 865 866 assert(tasks[0].result == 21); 867 assert(tasks[1].result == 23); 868 869 assert(result.done == tasks[0 .. 2]); 870 assert(result.notDone == tasks[2 .. $]); 871 } 872 873 874 /** 875 * Wait for the single Future to complete with timeout. If timeout is 0 or 876 * negative, block until the future completes. 877 * 878 * Params: 879 * eventLoop = event loop, $(D_PSYMBOL getEventLoop) if not specified. 880 * 881 * future = future to wait for. 882 * 883 * timeout = can be used to control the maximum time to wait before 884 * returning. If $(PARAM timeout) is 0 or negative, block until the 885 * future completes. 886 * 887 * Returns: result of the future. When a timeout occurs, it cancels the task 888 * and raises $(D_PSYMBOL TimeoutException). To avoid the task 889 * cancellation, wrap it in $(D_SYMBOL shield()). 890 */ 891 @Coroutine 892 auto waitFor(Future)(EventLoop eventLoop, Future future, 893 Duration timeout = Duration.zero) 894 if (is(Future : FutureHandle)) 895 { 896 if (eventLoop is null) 897 eventLoop = getEventLoop; 898 899 auto thisTask = TaskHandle.getThis; 900 901 assert(thisTask !is null); 902 903 future.addDoneCallback(&thisTask.scheduleStep); 904 scope (exit) 905 { 906 future.removeDoneCallback(&thisTask.scheduleStep); 907 } 908 909 bool cancelFuture = false; 910 CallbackHandle timeoutCallback; 911 912 if (timeout > Duration.zero) 913 { 914 timeoutCallback = eventLoop.callLater(timeout, { 915 cancelFuture = true; 916 thisTask.scheduleStep; 917 }); 918 } 919 920 while (true) 921 { 922 TaskHandle.yield; 923 924 if (cancelFuture) 925 { 926 future.cancel; 927 throw new TimeoutException; 928 } 929 930 if (future.done) 931 break; 932 } 933 934 if (timeoutCallback !is null) 935 timeoutCallback.cancel; 936 937 return future.result; 938 } 939 940 unittest 941 { 942 auto eventLoop = getEventLoop; 943 944 int add(int a, int b) 945 { 946 return a + b; 947 } 948 949 auto task1 = eventLoop.createTask({ 950 eventLoop.sleep(10.msecs); 951 return add(10, 11); 952 }); 953 954 auto task2 = eventLoop.createTask({ 955 eventLoop.sleep(40.msecs); 956 return add(10, 11); 957 }); 958 959 auto waitTask = eventLoop.createTask({ 960 eventLoop.waitFor(task1, 20.msecs); 961 assert(task1.done); 962 assert(task1.result == 21); 963 assert(!task2.done); 964 try 965 { 966 eventLoop.waitFor(task2, 10.msecs); 967 assert(0, "Should not get here"); 968 } 969 catch (TimeoutException timeoutException) 970 { 971 eventLoop.sleep(20.msecs); 972 assert(task2.done); 973 assert(task2.cancelled); 974 } 975 }); 976 977 eventLoop.runUntilComplete(waitTask); 978 } 979 980 /** 981 * A $(D_PSYMBOL GeneratorTask) is a $(D_PSYMBOL Task) that periodically returns 982 * values of type $(D_PSYMBOL T) to the caller via $(D_PSYMBOL yieldValue). This 983 * is represented as an $(D_PSYMBOL InputRange). 984 */ 985 class GeneratorTask(T) : Task!(void delegate()) 986 { 987 private T* frontValue = null; 988 private TaskHandle waitingTask = null; 989 990 this(EventLoop eventLoop, void delegate() coroutine) 991 { 992 super(eventLoop, coroutine); 993 addDoneCallback((FutureHandle _) { 994 if (waitingTask) 995 { 996 waitingTask.scheduleStep; 997 waitingTask = null; 998 } 999 }); 1000 } 1001 1002 package void setFrontValue(ref T value) 1003 { 1004 frontValue = &value; 1005 if (waitingTask) 1006 { 1007 waitingTask.scheduleStep; 1008 waitingTask = null; 1009 } 1010 } 1011 1012 @Coroutine 1013 @property bool empty() 1014 { 1015 if (done) 1016 return true; 1017 1018 if (frontValue !is null) 1019 return false; 1020 1021 auto thisTask = TaskHandle.getThis; 1022 1023 assert(thisTask !is null); 1024 assert(thisTask !is this, "empty() called in the task routine"); 1025 assert(waitingTask is null, "another Task is already waiting"); 1026 waitingTask = thisTask; 1027 TaskHandle.yield; 1028 assert(done || frontValue !is null); 1029 1030 return empty; 1031 } 1032 1033 @property ref T front() 1034 { 1035 assert(frontValue !is null); 1036 return *frontValue; 1037 } 1038 1039 void popFront() 1040 { 1041 assert(frontValue !is null); 1042 frontValue = null; 1043 scheduleStep; 1044 } 1045 } 1046 1047 /** 1048 * Yield a value to the caller of the currently executing generator task. The 1049 * type of the yielded value and the type of the generator must be the same. 1050 */ 1051 @Coroutine 1052 void yieldValue(T)(auto ref T value) 1053 { 1054 auto generatorTask = cast(GeneratorTask!T) TaskHandle.getThis; 1055 enforce(generatorTask !is null, 1056 "'TaskHandle.yieldValue' called outside of a generator task."); 1057 1058 generatorTask.setFrontValue(value); 1059 1060 TaskHandle.yield; 1061 } 1062 1063 unittest 1064 { 1065 import std.functional : toDelegate; 1066 1067 auto eventLoop = getEventLoop; 1068 auto task1 = new GeneratorTask!int(eventLoop, {}.toDelegate); 1069 auto testTask = eventLoop.createTask({ 1070 assert(task1.empty); 1071 }); 1072 1073 eventLoop.runUntilComplete(testTask); 1074 1075 auto task2 = new GeneratorTask!int(eventLoop, { 1076 yieldValue(8); 1077 yieldValue(10); 1078 }.toDelegate); 1079 1080 auto testTask2 = eventLoop.createTask({ 1081 int[] array1 = task2.array; 1082 1083 assert(array1 == [8, 10]); 1084 assert(task2.empty); 1085 }); 1086 1087 eventLoop.runUntilComplete(testTask2); 1088 }