1 /**
2  * Synchronization primitives.
3  *
4  * Copyright: © 2015-2016 Dragos Carp
5  * License: Boost Software License - Version 1.0
6  * Authors: Dragos Carp
7  */
8 module asynchronous.locks;
9 
10 import std.algorithm;
11 import std.array;
12 import std.exception;
13 import asynchronous.events : EventLoop, getEventLoop;
14 import asynchronous.futures : Waiter;
15 import asynchronous.tasks : TaskHandle;
16 import asynchronous.types : Coroutine;
17 
18 // Locks
19 
20 /**
21  * Primitive lock objects.
22  *
23  * A primitive lock is a synchronization primitive that is not owned by a
24  * particular coroutine when locked. A primitive lock is in one of two states,
25  * ‘locked’ or ‘unlocked’.
26  *
27  * It is created in the unlocked state. It has two basic methods, $(D_PSYMBOL
28  * acquire()) and $(D_PSYMBOL release()). When the state is unlocked,
29  * $(D_PSYMBOL acquire()) changes the state to locked and returns immediately.
30  * When the state is locked, $(D_PSYMBOL acquire()) blocks until a call to
31  * $(D_PSYMBOL release()) in another coroutine changes it to unlocked, then the
32  * $(D_PSYMBOL acquire()) call resets it to locked and returns. The $(D_PSYMBOL
33  * release()) method should only be called in the locked state; it changes the
34  * state to unlocked and returns immediately. If an attempt is made to release
35  * an unlocked lock, an $(D_PSYMBOL Exception) will be thrown.
36  *
37  * When more than one coroutine is blocked in $(D_PSYMBOL acquire()) waiting for
38  * the state to turn to unlocked, only one coroutine proceeds when a $(D_PSYMBOL
39  * release()) call resets the state to unlocked; first coroutine which is
40  * blocked in $(D_PSYMBOL acquire()) is being processed.
41  *
42  * $(D_PSYMBOL acquire()) is a coroutine.
43  *
44  * This class is not thread safe.
45  *
46  * Usage:
47  *
48  *   lock = new Lock();
49  *   ...
50  *   lock.acquire();
51  *   scope (exit) lock.release();
52  *   ...
53  *
54  * Lock objects can be tested for locking state:
55  *
56  *   if (!lock.locked)
57  *      lock.acquire();
58  *   else;
59  *      // lock is acquired
60  *      ...
61  */
62 final class Lock
63 {
64     package EventLoop eventLoop;
65     private Waiter[] waiters;
66     private bool locked_ = false;
67 
68     this(EventLoop eventLoop = null)
69     {
70         if (eventLoop is null)
71             this.eventLoop = getEventLoop;
72         else
73             this.eventLoop = eventLoop;
74     }
75 
76     override string toString() const
77     {
78         import std.format : format;
79 
80         return "%s(%s, waiters %s)".format(typeid(this),
81             locked ? "locked" : "unlocked", waiters);
82     }
83 
84     /**
85      * Return $(D_KEYWORD true) if lock is acquired.
86      */
87     @property bool locked() const
88     {
89         return locked_;
90     }
91 
92     /**
93      * Acquire a lock.
94      *
95      * This method blocks until the lock is unlocked, then sets it to locked and
96      * returns $(D_KEYWORD true).
97      */
98     @Coroutine
99     bool acquire()
100     {
101         if (waiters.empty && !locked_)
102         {
103             locked_ = true;
104             return true;
105         }
106 
107         auto waiter = new Waiter(eventLoop);
108         scope (exit)
109         {
110             waiters = waiters.remove!(w => w is waiter);
111         }
112 
113         waiter.addDoneCallback(&TaskHandle.currentTask(eventLoop).scheduleStep);
114         waiters ~= waiter;
115 
116         TaskHandle.yield;
117         locked_ = true;
118         return true;
119     }
120 
121     /**
122      * Release a lock.
123      *
124      * When the lock is locked, reset it to unlocked, and return. If any other
125      * coroutines are blocked waiting for the lock to become unlocked, allow
126      * exactly one of them to proceed.
127      *
128      * When invoked on an unlocked lock, an $(D_PSYMBOL Exception) is thrown.
129      */
130     void release()
131     {
132         enforce(locked_, "Lock is not acquired.");
133 
134         locked_ = false;
135 
136         // Wake up the first waiter who isn't cancelled.
137         auto found = waiters.find!(w => !w.done);
138 
139         if (!found.empty)
140             found[0].setResult;
141     }
142 }
143 
144 /**
145  * Class implementing event objects.
146  *
147  * An event manages a flag that can be set to true with the $(D_PSYMBOL set())
148  * method and reset to false with the $(D_PSYMBOL clear()) method. The
149  * $(D_PSYMBOL wait()) method blocks until the flag is true. The flag is
150  * initially false.
151  *
152  * This class is not thread safe.
153  */
154 final class Event
155 {
156     private EventLoop eventLoop;
157     private Waiter[] waiters;
158     private bool value = false;
159 
160     this(EventLoop eventLoop = null)
161     {
162         if (eventLoop is null)
163             this.eventLoop = getEventLoop;
164         else
165             this.eventLoop = eventLoop;
166     }
167 
168     override string toString() const
169     {
170         import std.format : format;
171 
172         return "%s(%s, waiters %s)".format(typeid(this),
173             value ? "set" : "unset", waiters);
174     }
175 
176     /**
177      * Reset the internal flag to false. Subsequently, coroutines calling
178      * $(D_PSYMBOL wait()) will block until $(D_PSYMBOL set()) is called to set
179      * the internal flag to true again.
180      */
181     void clear()
182     {
183         value = false;
184     }
185 
186     /**
187      * Return $(D_KEYWORD true) if and only if the internal flag is true.
188      */
189     bool isSet() const
190     {
191         return value;
192     }
193 
194     /**
195      * Set the internal flag to true. All coroutines waiting for it to become
196      * true are awakened. Coroutines that call $(D_PSYMBOL wait()) once the flag
197      * is true will not block at all.
198      */
199     void set()
200     {
201         if (value)
202             return;
203 
204         value = true;
205 
206         auto found = waiters.find!(w => !w.done);
207 
208         if (!found.empty)
209             found[0].setResult;
210     }
211 
212     /**
213      * Block until the internal flag is true.
214      *
215      * If the internal flag is true on entry, return $(D_KEYWORD true)
216      * immediately. Otherwise, block until another coroutine calls $(D_PSYMBOL
217      * set()) to set the flag to true, then return $(D_KEYWORD true).
218      */
219     @Coroutine
220     bool wait()
221     {
222         if (value)
223             return true;
224 
225         auto waiter = new Waiter(eventLoop);
226         scope (exit)
227         {
228             waiters = waiters.remove!(w => w is waiter);
229         }
230 
231         waiter.addDoneCallback(&TaskHandle.currentTask(eventLoop).scheduleStep);
232         waiters ~= waiter;
233 
234         TaskHandle.yield;
235         return true;
236     }
237 }
238 
239 /**
240  * This class implements condition variable objects.
241  *
242  * A condition variable allows one or more coroutines to wait until they are
243  * notified by another coroutine.
244  *
245  * If the lock argument is given and not $(D_KEYWORD null) then it is used as
246  * the underlying lock. Otherwise, a new Lock object is created and used as the
247  * underlying lock.
248  *
249  * This class is not thread safe.
250  */
251 final class Condition
252 {
253     private EventLoop eventLoop;
254     private Lock lock;
255     private Waiter[] waiters;
256     private bool value = false;
257 
258     this(EventLoop eventLoop = null, Lock lock = null)
259     {
260         if (eventLoop is null)
261             this.eventLoop = getEventLoop;
262         else
263             this.eventLoop = eventLoop;
264 
265         if (lock is null)
266             lock = new Lock(this.eventLoop);
267         else
268             enforce(lock.eventLoop == this.eventLoop,
269                 "loop argument must agree with lock");
270         this.lock = lock;
271     }
272 
273     override string toString() const
274     {
275         import std.format : format;
276 
277         return "%s(%s, waiters %s)".format(typeid(this),
278             locked ? "locked" : "unlocked", waiters);
279     }
280 
281     /**
282      * Acquire the underlying lock.
283      *
284      * This method blocks until the lock is unlocked, then sets it to locked and
285      * returns $(D_KEYWORD true).
286      */
287     @Coroutine
288     bool acquire()
289     {
290         return lock.acquire;
291     }
292 
293     /**
294      * By default, wake up one coroutine waiting on this condition, if any. If
295      * the calling coroutine has not acquired the lock when this method is
296      * called, an $(D_PSYMBOL Exception) is thrown.
297      *
298      * This method wakes up at most $(D_PSYMBOL n) of the coroutines waiting for
299      * the condition variable; it is a no-op if no coroutines are waiting.
300      *
301      * Note: an awakened coroutine does not actually return from its $(D_PSYMBOL
302      * wait()) call until it can reacquire the lock. Since $(D_PSYMBOL notify())
303      * does not release the lock, its caller should.
304      */
305     void notify(size_t n = 1)
306     {
307         enforce(locked, "cannot notify on un-acquired lock");
308 
309         foreach (waiter; waiters.filter!(w => !w.done))
310         {
311             if (n == 0)
312                 break;
313 
314             --n;
315             waiter.setResult;
316         }
317     }
318 
319     /**
320      * Return $(D_KEYWORD true) if the underlying lock is acquired.
321      */
322     @property bool locked() const
323     {
324         return lock.locked;
325     }
326 
327     /**
328      * Wake up all coroutines waiting on this condition. This method acts like
329      * $(D_PSYMBOL notify()), but wakes up all waiting coroutines instead of
330      * one. If the calling coroutine has not acquired the lock when this method
331      * is called, an $(D_PSYMBOL Exception) is thrown.
332      */
333     void notifyAll()
334     {
335         notify(waiters.length);
336     }
337 
338     /**
339      * Release the underlying lock.
340      *
341      * When the lock is locked, reset it to unlocked, and return. If any other
342      * coroutines are blocked waiting for the lock to become unlocked, allow
343      * exactly one of them to proceed.
344      *
345      * When invoked on an unlocked lock, an $(D_PSYMBOL Exception) is thrown.
346      */
347     void release()
348     {
349         lock.release;
350     }
351 
352 
353     /**
354      * Wait until notified.
355      *
356      * If the calling coroutine has not acquired the lock when this method is
357      * called, an $(D_PSYMBOL Exception) is thrown.
358      *
359      * This method releases the underlying lock, and then blocks until it is
360      * awakened by a $(D_PSYMBOL notify()) or $(D_PSYMBOL notifyAll()) call for
361      * the same condition variable in another coroutine. Once awakened, it
362      * re-acquires the lock and returns $(D_KEYWORD true).
363      */
364     @Coroutine
365     bool wait()
366     {
367         enforce(locked, "cannot wait on un-acquired lock");
368 
369         release;
370         scope (exit)
371         {
372             acquire;
373         }
374 
375         auto waiter = new Waiter(eventLoop);
376         scope (exit)
377         {
378             waiters = waiters.remove!(w => w is waiter);
379         }
380 
381         waiter.addDoneCallback(&TaskHandle.currentTask(eventLoop).scheduleStep);
382         waiters ~= waiter;
383 
384         TaskHandle.yield;
385         return true;
386     }
387 
388     /**
389      * Wait until a predicate becomes true.
390      *
391      * The predicate should be a callable returning a boolean value.
392      *
393      * Returns true
394      */
395     @Coroutine
396     bool waitFor(bool delegate() predicate)
397     {
398         while (!predicate())
399             wait;
400 
401         return true;
402     }
403 }
404 
405 // Semaphores
406 
407 /**
408  * A Semaphore implementation.
409  *
410  * A semaphore manages an internal counter which is decremented by each
411  * $(D_PSYMBOL acquire()) call and incremented by each $(D_PSYMBOL release())
412  * call. The counter can never go below zero; when $(D_PSYMBOL acquire()) finds
413  * that it is zero, it blocks, waiting until some other thread calls $(D_PSYMBOL
414  * release()).
415  *
416  * The optional argument gives the initial value for the internal counter; it
417  * defaults to 1.
418  *
419  * This class is not thread safe.
420  */
421 class Semaphore
422 {
423     private EventLoop eventLoop;
424     private size_t value;
425     private Waiter[] waiters;
426 
427     this(EventLoop eventLoop = null, size_t value = 1)
428     {
429         if (eventLoop is null)
430             this.eventLoop = getEventLoop;
431         else
432             this.eventLoop = eventLoop;
433 
434         this.value = value;
435     }
436 
437     override string toString() const
438     {
439         import std.format : format;
440 
441         if (locked)
442             return "%s(locked, waiters %s)".format(typeid(this), waiters);
443         else
444             return "%s(unlocked, value %s, waiters %s)".format(typeid(this),
445                 value, waiters);
446     }
447 
448     /**
449      * Acquire a semaphore.
450      *
451      * If the internal counter is larger than zero on entry, decrement it by one
452      * and return $(D_KEYWORD true) immediately. If it is zero on entry, block,
453      * waiting until some other coroutine has called $(D_PSYMBOL release()) to
454      * make it larger than 0, and then return $(D_KEYWORD true).
455      */
456     @Coroutine
457     bool acquire()
458     {
459         if (waiters.empty && value > 0)
460         {
461             --value;
462             return true;
463         }
464 
465         auto waiter = new Waiter(eventLoop);
466         scope (exit)
467         {
468             waiters = waiters.remove!(w => w is waiter);
469         }
470 
471         waiter.addDoneCallback(&TaskHandle.currentTask(eventLoop).scheduleStep);
472         waiters ~= waiter;
473 
474         TaskHandle.yield;
475         --value;
476         return true;
477     }
478 
479     /**
480      * Returns $(D_KEYWORD true) if semaphore can not be acquired immediately.
481      */
482     @property bool locked() const
483     {
484         return value == 0;
485     }
486 
487     /**
488      * Release a semaphore, incrementing the internal counter by one. When it
489      * was zero on entry and another coroutine is waiting for it to become
490      * larger than zero again, wake up that coroutine.
491      */
492     void release()
493     {
494         ++value;
495 
496         auto found = waiters.find!(w => !w.done);
497 
498         if (!found.empty)
499             found[0].setResult;
500     }
501 }
502 
503 /**
504  * A bounded semaphore implementation.
505  *
506  * This throws an Exception in $(D_PSYMBOL release()) if it would increase the
507  * value above the initial value.
508  */
509 class BoundedSemaphore : Semaphore
510 {
511     private size_t boundValue;
512 
513     this(EventLoop eventLoop = null, size_t value = 1)
514     {
515         boundValue = value;
516         super(eventLoop, value);
517     }
518 
519     override void release()
520     {
521         enforce(value < boundValue, "BoundedSemaphore released too many times");
522         super.release;
523     }
524 }