1 /**
2  * Future classes.
3  */
4 module asynchronous.futures;
5 
6 import std.algorithm;
7 import std.array;
8 import asynchronous.events;
9 import asynchronous.types;
10 
11 interface FutureHandle
12 {
13     /**
14      * Cancel the future and schedule callbacks.
15      *
16      * If the future is already done or cancelled, return $(D_KEYWORD false).
17      * Otherwise, change the future's state to cancelled, schedule the
18      * callbacks and return $(D_KEYWORD true).
19      */
20     bool cancel();
21 
22     /**
23      * Return $(D_KEYWORD true) if the future was cancelled.
24      */
25     bool cancelled() const;
26 
27     /**
28      * Return $(D_KEYWORD true) if the future is done.
29      *
30      * Done means either that a result/exception are available, or that the
31      * future was cancelled.
32      */
33     bool done() const;
34 
35     /**
36      * Returns: the exception that was set on this future.
37      *
38      * The exception (or $(D_KEYWORD null) if no exception was set) is returned
39      * only if the future is done. If the future has been cancelled, throws
40      * $(D_PSYMBOL CancelledException). If the future isn't done, throws
41      * $(D_PSYMBOL InvalidStateException).
42      */
43     Throwable exception();
44 
45     /**
46      * Add a callback to be run when the future becomes done.
47      *
48      * The callback is called with a single argument - the future object. If
49      * the future is already done when this is called, the callback is
50      * scheduled with $(D_PSYMBOL callSoon())
51      */
52     void addDoneCallback(void delegate() callback);
53 
54     /**
55      * Remove all instances of a callback from the "call when done" list.
56      *
57      * Returns: the number of callbacks removed;
58      */
59     size_t removeDoneCallback(void delegate() callback);
60 
61     /**
62      * Mark the future done and set an exception.
63      *
64      * If the future is already done when this method is called, throws
65      * $(D_PSYMBOL InvalidStateError).
66      */
67     void setException(Throwable exception);
68 
69     string toString() const;
70 }
71 
72 abstract class BaseFuture : FutureHandle
73 {
74     private enum State
75     {
76         PENDING,
77         CANCELLED,
78         FINISHED,
79     }
80 
81     package EventLoop eventLoop;
82 
83     private void delegate()[] callbacks = null;
84 
85     private State state = State.PENDING;
86 
87     private Throwable exception_;
88 
89     this(EventLoop eventLoop = null)
90     {
91         this.eventLoop = eventLoop is null ? getEventLoop : eventLoop;
92     }
93 
94     bool cancel()
95     {
96         if (this.state != State.PENDING)
97             return false;
98 
99         this.state = State.CANCELLED;
100         scheduleCallbacks;
101         return true;
102     }
103 
104     private void scheduleCallbacks()
105     {
106         if (this.callbacks.empty)
107             return;
108 
109         auto scheduledCallbacks = this.callbacks;
110         this.callbacks = null;
111 
112         foreach (callback; scheduledCallbacks)
113         {
114             this.eventLoop.callSoon(callback);
115         }
116     }
117 
118     bool cancelled() const
119     {
120         return this.state == State.CANCELLED;
121     }
122 
123     bool done() const
124     {
125         return this.state != State.PENDING;
126     }
127 
128     Throwable exception()
129     {
130         final switch (this.state)
131         {
132         case State.PENDING:
133             throw new InvalidStateException("Exception is not set.");
134         case State.CANCELLED:
135             throw new CancelledException;
136         case State.FINISHED:
137             return this.exception_;
138         }
139     }
140 
141     void addDoneCallback(void delegate() callback)
142     {
143         if (this.state != State.PENDING)
144             this.eventLoop.callSoon(callback);
145         else
146             this.callbacks ~= callback;
147     }
148 
149     size_t removeDoneCallback(void delegate() callback)
150     {
151         size_t length = this.callbacks.length;
152 
153         this.callbacks = this.callbacks.remove!(a => a is callback);
154 
155         return length - this.callbacks.length;
156     }
157 
158     void setException(Throwable exception)
159     {
160         if (this.state != State.PENDING)
161             throw new InvalidStateException("Result or exception already set");
162 
163         this.exception_ = exception;
164         this.state = State.FINISHED;
165         scheduleCallbacks;
166     }
167 
168     override string toString() const
169     {
170         import std.format : format;
171 
172         return "%s(done: %s, cancelled: %s)".format(typeid(this), done, cancelled);
173     }
174 }
175 
176 /**
177  * Encapsulates the asynchronous execution of a callable.
178  */
179 class Future(T) : BaseFuture
180 {
181     private T result_;
182 
183     alias ResultType = T;
184 
185     this(EventLoop eventLoop = null)
186     {
187         super(eventLoop);
188     }
189 
190     /**
191      * Returns: the result this future represents.
192      *
193      * If the future has been cancelled, throws
194      * $(D_PSYMBOL CancelledException). If the future's result isn't yet
195      * available, throws $(D_PSYMBOL InvalidStateException). If the future is
196      * done and has an exception set, this exception is thrown.
197      */
198     T result()
199     {
200         final switch (this.state)
201         {
202         case State.PENDING:
203             throw new InvalidStateException("Result is not ready.");
204         case State.CANCELLED:
205             throw new CancelledException;
206         case State.FINISHED:
207             if (this.exception_)
208                 throw this.exception_;
209             return this.result_;
210         }
211     }
212 
213     /**
214      * Helper setting the result only if the future was not cancelled.
215      */
216     package void setResultUnlessCancelled(T result)
217     {
218         if (cancelled)
219             return;
220         setResult(result);
221     }
222 
223     /**
224      * Mark the future done and set its result.
225      *
226      * If the future is already done when this method is called, throws
227      * $(D_PSYMBOL InvalidStateError).
228      */
229     void setResult(T result)
230     {
231         if (this.state != State.PENDING)
232             throw new InvalidStateException("Result or exception already set");
233 
234         this.result_ = result;
235         this.state = State.FINISHED;
236         scheduleCallbacks;
237     }
238 }
239 
240 alias Waiter = Future!void;
241 
242 class Future(T : void) : BaseFuture
243 {
244     alias ResultType = void;
245 
246     this(EventLoop eventLoop = null)
247     {
248         super(eventLoop);
249     }
250 
251     /**
252      * Helper setting the result only if the future was not cancelled.
253      */
254     package void setResultUnlessCancelled()
255     {
256         if (cancelled)
257             return;
258         setResult();
259     }
260 
261     /**
262      * Mark the future done.
263      *
264      * If the future is already done when this method is called, throws
265      * $(D_PSYMBOL InvalidStateError).
266      */
267     void setResult()
268     {
269         if (this.state != State.PENDING)
270             throw new InvalidStateException("Result or exception already set");
271 
272         this.state = State.FINISHED;
273         scheduleCallbacks;
274     }
275 }
276 
277 unittest
278 {
279     import std.exception : assertThrown, Exception;
280 
281     auto future = new Future!int;
282     assert(!future.done);
283     assert(!future.cancelled);
284     assertThrown(future.exception);
285     assertThrown(future.result);
286 
287     future = new Future!int;
288     future.setResult(42);
289     assert(future.done);
290     assert(!future.cancelled);
291     assert(future.result == 42);
292     assert(future.exception is null);
293 
294     future = new Future!int;
295     future.cancel;
296     assert(future.done);
297     assert(future.cancelled);
298     assertThrown(future.exception);
299     assertThrown(future.result);
300 
301     future = new Future!int;
302     future.setException(new Exception("foo"));
303     assert(future.done);
304     assert(!future.cancelled);
305     assert(future.exception !is null);
306     assertThrown(future.result);
307 }