1 /** 2 * Future classes. 3 * 4 * Copyright: © 2015-2016 Dragos Carp 5 * License: Boost Software License - Version 1.0 6 * Authors: Dragos Carp 7 */ 8 module asynchronous.futures; 9 10 import std.algorithm; 11 import std.array; 12 import asynchronous.events; 13 import asynchronous.types; 14 15 interface FutureHandle 16 { 17 /** 18 * Cancel the future and schedule callbacks. 19 * 20 * If the future is already done or cancelled, return $(D_KEYWORD false). 21 * Otherwise, change the future's state to cancelled, schedule the 22 * callbacks and return $(D_KEYWORD true). 23 */ 24 bool cancel(); 25 26 /** 27 * Return $(D_KEYWORD true) if the future was cancelled. 28 */ 29 bool cancelled() const; 30 31 /** 32 * Return $(D_KEYWORD true) if the future is done. 33 * 34 * Done means either that a result/exception are available, or that the 35 * future was cancelled. 36 */ 37 bool done() const; 38 39 /** 40 * Returns: the exception that was set on this future. 41 * 42 * The exception (or $(D_KEYWORD null) if no exception was set) is returned 43 * only if the future is done. If the future has been cancelled, throws 44 * $(D_PSYMBOL CancelledException). If the future isn't done, throws 45 * $(D_PSYMBOL InvalidStateException). 46 */ 47 Throwable exception(); 48 49 /** 50 * Add a callback to be run when the future becomes done. 51 * 52 * The callback is called with a single argument - the future object. If 53 * the future is already done when this is called, the callback is 54 * scheduled with $(D_PSYMBOL callSoon()) 55 */ 56 void addDoneCallback(void delegate(FutureHandle) callback); 57 58 /** 59 * Remove all instances of a callback from the "call when done" list. 60 * 61 * Returns: the number of callbacks removed; 62 */ 63 size_t removeDoneCallback(void delegate(FutureHandle) callback); 64 65 /** 66 * Mark the future done and set an exception. 67 * 68 * If the future is already done when this method is called, throws 69 * $(D_PSYMBOL InvalidStateError). 70 */ 71 void setException(Throwable exception); 72 73 string toString() const; 74 } 75 76 abstract class BaseFuture : FutureHandle 77 { 78 private enum State : ubyte 79 { 80 PENDING, 81 CANCELLED, 82 FINISHED, 83 } 84 85 package EventLoop eventLoop; 86 87 private void delegate(FutureHandle)[] callbacks = null; 88 89 private State state = State.PENDING; 90 91 private Throwable exception_; 92 93 this(EventLoop eventLoop = null) 94 { 95 this.eventLoop = eventLoop is null ? getEventLoop : eventLoop; 96 } 97 98 bool cancel() 99 { 100 if (this.state != State.PENDING) 101 return false; 102 103 this.state = State.CANCELLED; 104 scheduleCallbacks; 105 return true; 106 } 107 108 private void scheduleCallbacks() 109 { 110 if (this.callbacks.empty) 111 return; 112 113 auto scheduledCallbacks = this.callbacks; 114 this.callbacks = null; 115 116 foreach (callback; scheduledCallbacks) 117 { 118 this.eventLoop.callSoon(callback, this); 119 } 120 } 121 122 override bool cancelled() const 123 { 124 return this.state == State.CANCELLED; 125 } 126 127 override bool done() const 128 { 129 return this.state != State.PENDING; 130 } 131 132 override Throwable exception() 133 { 134 final switch (this.state) 135 { 136 case State.PENDING: 137 throw new InvalidStateException("Exception is not set."); 138 case State.CANCELLED: 139 throw new CancelledException; 140 case State.FINISHED: 141 return this.exception_; 142 } 143 } 144 145 override void addDoneCallback(void delegate(FutureHandle) callback) 146 { 147 if (this.state != State.PENDING) 148 this.eventLoop.callSoon(callback, this); 149 else 150 this.callbacks ~= callback; 151 } 152 153 override size_t removeDoneCallback(void delegate(FutureHandle) callback) 154 { 155 size_t length = this.callbacks.length; 156 157 this.callbacks = this.callbacks.remove!(a => a is callback); 158 159 return length - this.callbacks.length; 160 } 161 162 override void setException(Throwable exception) 163 { 164 if (this.state != State.PENDING) 165 throw new InvalidStateException("Result or exception already set"); 166 167 this.exception_ = exception; 168 this.state = State.FINISHED; 169 scheduleCallbacks; 170 } 171 172 override string toString() const 173 { 174 import std.format : format; 175 176 return "%s(done: %s, cancelled: %s)".format(typeid(this), done, cancelled); 177 } 178 } 179 180 /** 181 * Encapsulates the asynchronous execution of a callable. 182 */ 183 class Future(T) : BaseFuture 184 { 185 static if (!is(T == void)) 186 private T result_; 187 188 alias ResultType = T; 189 190 this(EventLoop eventLoop = null) 191 { 192 super(eventLoop); 193 } 194 195 /** 196 * Returns: the result this future represents. 197 * 198 * If the future has been cancelled, throws 199 * $(D_PSYMBOL CancelledException). If the future's result isn't yet 200 * available, throws $(D_PSYMBOL InvalidStateException). If the future is 201 * done and has an exception set, this exception is thrown. 202 */ 203 T result() 204 { 205 final switch (this.state) 206 { 207 case State.PENDING: 208 throw new InvalidStateException("Result is not ready."); 209 case State.CANCELLED: 210 throw new CancelledException; 211 case State.FINISHED: 212 if (this.exception_) 213 throw this.exception_; 214 return this.result_; 215 } 216 } 217 218 /** 219 * Helper setting the result only if the future was not cancelled. 220 */ 221 package void setResultUnlessCancelled(T result) 222 { 223 if (cancelled) 224 return; 225 setResult(result); 226 } 227 228 /** 229 * Mark the future done and set its result. 230 * 231 * If the future is already done when this method is called, throws 232 * $(D_PSYMBOL InvalidStateError). 233 */ 234 void setResult(T result) 235 { 236 if (this.state != State.PENDING) 237 throw new InvalidStateException("Result or exception already set"); 238 239 this.result_ = result; 240 this.state = State.FINISHED; 241 scheduleCallbacks; 242 } 243 } 244 245 alias Waiter = Future!void; 246 247 class Future(T : void) : BaseFuture 248 { 249 alias ResultType = void; 250 251 this(EventLoop eventLoop = null) 252 { 253 super(eventLoop); 254 } 255 256 /** 257 * Returns: void. 258 * 259 * If the future has been cancelled, throws 260 * $(D_PSYMBOL CancelledException). If the future's result isn't yet 261 * available, throws $(D_PSYMBOL InvalidStateException). If the future is 262 * done and has an exception set, this exception is thrown. 263 */ 264 T result() 265 { 266 final switch (this.state) 267 { 268 case State.PENDING: 269 throw new InvalidStateException("Result is not ready."); 270 case State.CANCELLED: 271 throw new CancelledException; 272 case State.FINISHED: 273 if (this.exception_) 274 throw this.exception_; 275 return; 276 } 277 } 278 279 /** 280 * Helper setting the result only if the future was not cancelled. 281 */ 282 package void setResultUnlessCancelled() 283 { 284 if (cancelled) 285 return; 286 setResult(); 287 } 288 289 /** 290 * Mark the future done. 291 * 292 * If the future is already done when this method is called, throws 293 * $(D_PSYMBOL InvalidStateError). 294 */ 295 void setResult() 296 { 297 if (this.state != State.PENDING) 298 throw new InvalidStateException("Result or exception already set"); 299 300 this.state = State.FINISHED; 301 scheduleCallbacks; 302 } 303 } 304 305 unittest 306 { 307 import std.exception : assertNotThrown, assertThrown, Exception; 308 309 auto future = new Future!int; 310 assert(!future.done); 311 assert(!future.cancelled); 312 assertThrown(future.exception); 313 assertThrown(future.result); 314 315 future = new Future!int; 316 future.setResult(42); 317 assert(future.done); 318 assert(!future.cancelled); 319 assert(future.result == 42); 320 assert(future.exception is null); 321 322 future = new Future!int; 323 future.cancel; 324 assert(future.done); 325 assert(future.cancelled); 326 assertThrown(future.exception); 327 assertThrown(future.result); 328 329 future = new Future!int; 330 future.setException(new Exception("foo")); 331 assert(future.done); 332 assert(!future.cancelled); 333 assert(future.exception !is null); 334 assertThrown(future.result); 335 336 auto future2 = new Future!void; 337 assert(!future2.done); 338 assert(!future2.cancelled); 339 assertThrown(future2.exception); 340 assertThrown(future2.result); 341 342 future2 = new Future!void; 343 future2.setResult; 344 assert(future2.done); 345 assert(!future2.cancelled); 346 assert(future2.exception is null); 347 assertNotThrown(future2.result); 348 349 future2 = new Future!void; 350 future2.cancel; 351 assert(future2.done); 352 assert(future2.cancelled); 353 assertThrown(future2.exception); 354 assertThrown(future2.result); 355 356 future2 = new Future!void; 357 future2.setException(new Exception("foo")); 358 assert(future2.done); 359 assert(!future2.cancelled); 360 assert(future2.exception !is null); 361 assertThrown(future2.result); 362 }