1 /**
2  * Future classes.
3  *
4  * Copyright: © 2015-2016 Dragos Carp
5  * License: Boost Software License - Version 1.0
6  * Authors: Dragos Carp
7  */
8 module asynchronous.futures;
9 
10 import std.algorithm;
11 import std.array;
12 import asynchronous.events;
13 import asynchronous.types;
14 
15 interface FutureHandle
16 {
17     /**
18      * Cancel the future and schedule callbacks.
19      *
20      * If the future is already done or cancelled, return $(D_KEYWORD false).
21      * Otherwise, change the future's state to cancelled, schedule the
22      * callbacks and return $(D_KEYWORD true).
23      */
24     bool cancel();
25 
26     /**
27      * Return $(D_KEYWORD true) if the future was cancelled.
28      */
29     bool cancelled() const;
30 
31     /**
32      * Return $(D_KEYWORD true) if the future is done.
33      *
34      * Done means either that a result/exception are available, or that the
35      * future was cancelled.
36      */
37     bool done() const;
38 
39     /**
40      * Returns: the exception that was set on this future.
41      *
42      * The exception (or $(D_KEYWORD null) if no exception was set) is returned
43      * only if the future is done. If the future has been cancelled, throws
44      * $(D_PSYMBOL CancelledException). If the future isn't done, throws
45      * $(D_PSYMBOL InvalidStateException).
46      */
47     Throwable exception();
48 
49     /**
50      * Add a callback to be run when the future becomes done.
51      *
52      * The callback is called with a single argument - the future object. If
53      * the future is already done when this is called, the callback is
54      * scheduled with $(D_PSYMBOL callSoon())
55      */
56     void addDoneCallback(void delegate(FutureHandle) callback);
57 
58     /**
59      * Remove all instances of a callback from the "call when done" list.
60      *
61      * Returns: the number of callbacks removed;
62      */
63     size_t removeDoneCallback(void delegate(FutureHandle) callback);
64 
65     /**
66      * Mark the future done and set an exception.
67      *
68      * If the future is already done when this method is called, throws
69      * $(D_PSYMBOL InvalidStateError).
70      */
71     void setException(Throwable exception);
72 
73     string toString() const;
74 }
75 
76 abstract class BaseFuture : FutureHandle
77 {
78     private enum State : ubyte
79     {
80         PENDING,
81         CANCELLED,
82         FINISHED,
83     }
84 
85     package EventLoop eventLoop;
86 
87     private void delegate(FutureHandle)[] callbacks = null;
88 
89     private State state = State.PENDING;
90 
91     private Throwable exception_;
92 
93     this(EventLoop eventLoop = null)
94     {
95         this.eventLoop = eventLoop is null ? getEventLoop : eventLoop;
96     }
97 
98     bool cancel()
99     {
100         if (this.state != State.PENDING)
101             return false;
102 
103         this.state = State.CANCELLED;
104         scheduleCallbacks;
105         return true;
106     }
107 
108     private void scheduleCallbacks()
109     {
110         if (this.callbacks.empty)
111             return;
112 
113         auto scheduledCallbacks = this.callbacks;
114         this.callbacks = null;
115 
116         foreach (callback; scheduledCallbacks)
117         {
118             this.eventLoop.callSoon(callback, this);
119         }
120     }
121 
122     override bool cancelled() const
123     {
124         return this.state == State.CANCELLED;
125     }
126 
127     override bool done() const
128     {
129         return this.state != State.PENDING;
130     }
131 
132     override Throwable exception()
133     {
134         final switch (this.state)
135         {
136         case State.PENDING:
137             throw new InvalidStateException("Exception is not set.");
138         case State.CANCELLED:
139             throw new CancelledException;
140         case State.FINISHED:
141             return this.exception_;
142         }
143     }
144 
145     override void addDoneCallback(void delegate(FutureHandle) callback)
146     {
147         if (this.state != State.PENDING)
148             this.eventLoop.callSoon(callback, this);
149         else
150             this.callbacks ~= callback;
151     }
152 
153     override size_t removeDoneCallback(void delegate(FutureHandle) callback)
154     {
155         size_t length = this.callbacks.length;
156 
157         this.callbacks = this.callbacks.remove!(a => a is callback);
158 
159         return length - this.callbacks.length;
160     }
161 
162     override void setException(Throwable exception)
163     {
164         if (this.state != State.PENDING)
165             throw new InvalidStateException("Result or exception already set");
166 
167         this.exception_ = exception;
168         this.state = State.FINISHED;
169         scheduleCallbacks;
170     }
171 
172     override string toString() const
173     {
174         import std.format : format;
175 
176         return "%s(done: %s, cancelled: %s)".format(typeid(this), done, cancelled);
177     }
178 }
179 
180 /**
181  * Encapsulates the asynchronous execution of a callable.
182  */
183 class Future(T) : BaseFuture
184 {
185     static if (!is(T == void))
186         private T result_;
187 
188     alias ResultType = T;
189 
190     this(EventLoop eventLoop = null)
191     {
192         super(eventLoop);
193     }
194 
195     /**
196      * Returns: the result this future represents.
197      *
198      * If the future has been cancelled, throws
199      * $(D_PSYMBOL CancelledException). If the future's result isn't yet
200      * available, throws $(D_PSYMBOL InvalidStateException). If the future is
201      * done and has an exception set, this exception is thrown.
202      */
203     T result()
204     {
205         final switch (this.state)
206         {
207         case State.PENDING:
208             throw new InvalidStateException("Result is not ready.");
209         case State.CANCELLED:
210             throw new CancelledException;
211         case State.FINISHED:
212             if (this.exception_)
213                 throw this.exception_;
214             return this.result_;
215         }
216     }
217 
218     /**
219      * Helper setting the result only if the future was not cancelled.
220      */
221     package void setResultUnlessCancelled(T result)
222     {
223         if (cancelled)
224             return;
225         setResult(result);
226     }
227 
228     /**
229      * Mark the future done and set its result.
230      *
231      * If the future is already done when this method is called, throws
232      * $(D_PSYMBOL InvalidStateError).
233      */
234     void setResult(T result)
235     {
236         if (this.state != State.PENDING)
237             throw new InvalidStateException("Result or exception already set");
238 
239         this.result_ = result;
240         this.state = State.FINISHED;
241         scheduleCallbacks;
242     }
243 }
244 
245 alias Waiter = Future!void;
246 
247 class Future(T : void) : BaseFuture
248 {
249     alias ResultType = void;
250 
251     this(EventLoop eventLoop = null)
252     {
253         super(eventLoop);
254     }
255 
256     /**
257      * Returns: void.
258      *
259      * If the future has been cancelled, throws
260      * $(D_PSYMBOL CancelledException). If the future's result isn't yet
261      * available, throws $(D_PSYMBOL InvalidStateException). If the future is
262      * done and has an exception set, this exception is thrown.
263      */
264     T result()
265     {
266         final switch (this.state)
267         {
268         case State.PENDING:
269             throw new InvalidStateException("Result is not ready.");
270         case State.CANCELLED:
271             throw new CancelledException;
272         case State.FINISHED:
273             if (this.exception_)
274                 throw this.exception_;
275             return;
276         }
277     }
278 
279     /**
280      * Helper setting the result only if the future was not cancelled.
281      */
282     package void setResultUnlessCancelled()
283     {
284         if (cancelled)
285             return;
286         setResult();
287     }
288 
289     /**
290      * Mark the future done.
291      *
292      * If the future is already done when this method is called, throws
293      * $(D_PSYMBOL InvalidStateError).
294      */
295     void setResult()
296     {
297         if (this.state != State.PENDING)
298             throw new InvalidStateException("Result or exception already set");
299 
300         this.state = State.FINISHED;
301         scheduleCallbacks;
302     }
303 }
304 
305 unittest
306 {
307     import std.exception : assertNotThrown, assertThrown, Exception;
308 
309     auto future = new Future!int;
310     assert(!future.done);
311     assert(!future.cancelled);
312     assertThrown(future.exception);
313     assertThrown(future.result);
314 
315     future = new Future!int;
316     future.setResult(42);
317     assert(future.done);
318     assert(!future.cancelled);
319     assert(future.result == 42);
320     assert(future.exception is null);
321 
322     future = new Future!int;
323     future.cancel;
324     assert(future.done);
325     assert(future.cancelled);
326     assertThrown(future.exception);
327     assertThrown(future.result);
328 
329     future = new Future!int;
330     future.setException(new Exception("foo"));
331     assert(future.done);
332     assert(!future.cancelled);
333     assert(future.exception !is null);
334     assertThrown(future.result);
335 
336     auto future2 = new Future!void;
337     assert(!future2.done);
338     assert(!future2.cancelled);
339     assertThrown(future2.exception);
340     assertThrown(future2.result);
341 
342     future2 = new Future!void;
343     future2.setResult;
344     assert(future2.done);
345     assert(!future2.cancelled);
346     assert(future2.exception is null);
347     assertNotThrown(future2.result);
348 
349     future2 = new Future!void;
350     future2.cancel;
351     assert(future2.done);
352     assert(future2.cancelled);
353     assertThrown(future2.exception);
354     assertThrown(future2.result);
355 
356     future2 = new Future!void;
357     future2.setException(new Exception("foo"));
358     assert(future2.done);
359     assert(!future2.cancelled);
360     assert(future2.exception !is null);
361     assertThrown(future2.result);
362 }