1 /** 2 * Future classes. 3 */ 4 module asynchronous.futures; 5 6 import std.algorithm; 7 import std.array; 8 import asynchronous.events; 9 import asynchronous.types; 10 11 interface FutureHandle 12 { 13 /** 14 * Cancel the future and schedule callbacks. 15 * 16 * If the future is already done or cancelled, return $(D_KEYWORD false). 17 * Otherwise, change the future's state to cancelled, schedule the 18 * callbacks and return $(D_KEYWORD true). 19 */ 20 bool cancel(); 21 22 /** 23 * Return $(D_KEYWORD true) if the future was cancelled. 24 */ 25 bool cancelled() const; 26 27 /** 28 * Return $(D_KEYWORD true) if the future is done. 29 * 30 * Done means either that a result/exception are available, or that the 31 * future was cancelled. 32 */ 33 bool done() const; 34 35 /** 36 * Returns: the exception that was set on this future. 37 * 38 * The exception (or $(D_KEYWORD null) if no exception was set) is returned 39 * only if the future is done. If the future has been cancelled, throws 40 * $(D_PSYMBOL CancelledException). If the future isn't done, throws 41 * $(D_PSYMBOL InvalidStateException). 42 */ 43 Throwable exception(); 44 45 /** 46 * Add a callback to be run when the future becomes done. 47 * 48 * The callback is called with a single argument - the future object. If 49 * the future is already done when this is called, the callback is 50 * scheduled with $(D_PSYMBOL callSoon()) 51 */ 52 void addDoneCallback(void delegate() callback); 53 54 /** 55 * Remove all instances of a callback from the "call when done" list. 56 * 57 * Returns: the number of callbacks removed; 58 */ 59 size_t removeDoneCallback(void delegate() callback); 60 61 /** 62 * Mark the future done and set an exception. 63 * 64 * If the future is already done when this method is called, throws 65 * $(D_PSYMBOL InvalidStateError). 66 */ 67 void setException(Throwable exception); 68 69 string toString() const; 70 } 71 72 abstract class BaseFuture : FutureHandle 73 { 74 private enum State 75 { 76 PENDING, 77 CANCELLED, 78 FINISHED, 79 } 80 81 package EventLoop eventLoop; 82 83 private void delegate()[] callbacks = null; 84 85 private State state = State.PENDING; 86 87 private Throwable exception_; 88 89 this(EventLoop eventLoop = null) 90 { 91 this.eventLoop = eventLoop is null ? getEventLoop : eventLoop; 92 } 93 94 bool cancel() 95 { 96 if (this.state != State.PENDING) 97 return false; 98 99 this.state = State.CANCELLED; 100 scheduleCallbacks; 101 return true; 102 } 103 104 private void scheduleCallbacks() 105 { 106 if (this.callbacks.empty) 107 return; 108 109 auto scheduledCallbacks = this.callbacks; 110 this.callbacks = null; 111 112 foreach (callback; scheduledCallbacks) 113 { 114 this.eventLoop.callSoon(callback); 115 } 116 } 117 118 bool cancelled() const 119 { 120 return this.state == State.CANCELLED; 121 } 122 123 bool done() const 124 { 125 return this.state != State.PENDING; 126 } 127 128 Throwable exception() 129 { 130 final switch (this.state) 131 { 132 case State.PENDING: 133 throw new InvalidStateException("Exception is not set."); 134 case State.CANCELLED: 135 throw new CancelledException; 136 case State.FINISHED: 137 return this.exception_; 138 } 139 } 140 141 void addDoneCallback(void delegate() callback) 142 { 143 if (this.state != State.PENDING) 144 this.eventLoop.callSoon(callback); 145 else 146 this.callbacks ~= callback; 147 } 148 149 size_t removeDoneCallback(void delegate() callback) 150 { 151 size_t length = this.callbacks.length; 152 153 this.callbacks = this.callbacks.remove!(a => a is callback); 154 155 return length - this.callbacks.length; 156 } 157 158 void setException(Throwable exception) 159 { 160 if (this.state != State.PENDING) 161 throw new InvalidStateException("Result or exception already set"); 162 163 this.exception_ = exception; 164 this.state = State.FINISHED; 165 scheduleCallbacks; 166 } 167 168 override string toString() const 169 { 170 import std.format : format; 171 172 return "%s(done: %s, cancelled: %s)".format(typeid(this), done, cancelled); 173 } 174 } 175 176 /** 177 * Encapsulates the asynchronous execution of a callable. 178 */ 179 class Future(T) : BaseFuture 180 { 181 private T result_; 182 183 alias ResultType = T; 184 185 this(EventLoop eventLoop = null) 186 { 187 super(eventLoop); 188 } 189 190 /** 191 * Returns: the result this future represents. 192 * 193 * If the future has been cancelled, throws 194 * $(D_PSYMBOL CancelledException). If the future's result isn't yet 195 * available, throws $(D_PSYMBOL InvalidStateException). If the future is 196 * done and has an exception set, this exception is thrown. 197 */ 198 T result() 199 { 200 final switch (this.state) 201 { 202 case State.PENDING: 203 throw new InvalidStateException("Result is not ready."); 204 case State.CANCELLED: 205 throw new CancelledException; 206 case State.FINISHED: 207 if (this.exception_) 208 throw this.exception_; 209 return this.result_; 210 } 211 } 212 213 /** 214 * Helper setting the result only if the future was not cancelled. 215 */ 216 package void setResultUnlessCancelled(T result) 217 { 218 if (cancelled) 219 return; 220 setResult(result); 221 } 222 223 /** 224 * Mark the future done and set its result. 225 * 226 * If the future is already done when this method is called, throws 227 * $(D_PSYMBOL InvalidStateError). 228 */ 229 void setResult(T result) 230 { 231 if (this.state != State.PENDING) 232 throw new InvalidStateException("Result or exception already set"); 233 234 this.result_ = result; 235 this.state = State.FINISHED; 236 scheduleCallbacks; 237 } 238 } 239 240 alias Waiter = Future!void; 241 242 class Future(T : void) : BaseFuture 243 { 244 alias ResultType = void; 245 246 this(EventLoop eventLoop = null) 247 { 248 super(eventLoop); 249 } 250 251 /** 252 * Helper setting the result only if the future was not cancelled. 253 */ 254 package void setResultUnlessCancelled() 255 { 256 if (cancelled) 257 return; 258 setResult(); 259 } 260 261 /** 262 * Mark the future done. 263 * 264 * If the future is already done when this method is called, throws 265 * $(D_PSYMBOL InvalidStateError). 266 */ 267 void setResult() 268 { 269 if (this.state != State.PENDING) 270 throw new InvalidStateException("Result or exception already set"); 271 272 this.state = State.FINISHED; 273 scheduleCallbacks; 274 } 275 } 276 277 unittest 278 { 279 import std.exception : assertThrown, Exception; 280 281 auto future = new Future!int; 282 assert(!future.done); 283 assert(!future.cancelled); 284 assertThrown(future.exception); 285 assertThrown(future.result); 286 287 future = new Future!int; 288 future.setResult(42); 289 assert(future.done); 290 assert(!future.cancelled); 291 assert(future.result == 42); 292 assert(future.exception is null); 293 294 future = new Future!int; 295 future.cancel; 296 assert(future.done); 297 assert(future.cancelled); 298 assertThrown(future.exception); 299 assertThrown(future.result); 300 301 future = new Future!int; 302 future.setException(new Exception("foo")); 303 assert(future.done); 304 assert(!future.cancelled); 305 assert(future.exception !is null); 306 assertThrown(future.result); 307 }