1 /**
2  * Synchronization primitives.
3  */
4 module asynchronous.locks;
5 
6 import std.algorithm;
7 import std.array;
8 import std.exception;
9 import asynchronous.events : EventLoop, getEventLoop;
10 import asynchronous.futures : Waiter;
11 import asynchronous.tasks : TaskHandle;
12 import asynchronous.types : Coroutine;
13 
14 // Locks
15 
16 /**
17  * Primitive lock objects.
18  *
19  * A primitive lock is a synchronization primitive that is not owned by a
20  * particular coroutine when locked. A primitive lock is in one of two states,
21  * ‘locked’ or ‘unlocked’.
22  *
23  * It is created in the unlocked state. It has two basic methods, $(D_PSYMBOL
24  * acquire()) and $(D_PSYMBOL release()). When the state is unlocked,
25  * $(D_PSYMBOL acquire()) changes the state to locked and returns immediately.
26  * When the state is locked, $(D_PSYMBOL acquire()) blocks until a call to
27  * $(D_PSYMBOL release()) in another coroutine changes it to unlocked, then the
28  * $(D_PSYMBOL acquire()) call resets it to locked and returns. The $(D_PSYMBOL
29  * release()) method should only be called in the locked state; it changes the
30  * state to unlocked and returns immediately. If an attempt is made to release
31  * an unlocked lock, an $(D_PSYMBOL Exception) will be thrown.
32  *
33  * When more than one coroutine is blocked in $(D_PSYMBOL acquire()) waiting for
34  * the state to turn to unlocked, only one coroutine proceeds when a $(D_PSYMBOL
35  * release()) call resets the state to unlocked; first coroutine which is
36  * blocked in $(D_PSYMBOL acquire()) is being processed.
37  *
38  * $(D_PSYMBOL acquire()) is a coroutine.
39  *
40  * This class is not thread safe.
41  *
42  * Usage:
43  *
44  *   lock = new Lock();
45  *   ...
46  *   lock.acquire();
47  *   scope (exit) lock.release();
48  *   ...
49  *
50  * Lock objects can be tested for locking state:
51  *
52  *   if (!lock.locked)
53  *      lock.acquire();
54  *   else;
55  *      // lock is acquired
56  *      ...
57  */
58 final class Lock
59 {
60     package EventLoop eventLoop;
61     private Waiter[] waiters;
62     private bool locked_ = false;
63 
64     this(EventLoop eventLoop = null)
65     {
66         if (eventLoop is null)
67             this.eventLoop = getEventLoop;
68         else
69             this.eventLoop = eventLoop;
70     }
71 
72     override string toString() const
73     {
74         import std.format : format;
75 
76         return "%s(%s, waiters %s)".format(typeid(this),
77             locked ? "locked" : "unlocked", waiters);
78     }
79 
80     /**
81      * Return $(D_KEYWORD true) if lock is acquired.
82      */
83     @property bool locked() const
84     {
85         return locked_;
86     }
87 
88     /**
89      * Acquire a lock.
90      *
91      * This method blocks until the lock is unlocked, then sets it to locked and
92      * returns $(D_KEYWORD true).
93      */
94     @Coroutine
95     bool acquire()
96     {
97         if (waiters.empty && !locked_)
98         {
99             locked_ = true;
100             return true;
101         }
102 
103         auto waiter = new Waiter(eventLoop);
104         scope (exit)
105         {
106             waiters = waiters.remove!(w => w is waiter);
107         }
108 
109         waiter.addDoneCallback(&TaskHandle.currentTask(eventLoop).scheduleStep);
110         waiters ~= waiter;
111 
112         TaskHandle.yield;
113         locked_ = true;
114         return true;
115     }
116 
117     /**
118      * Release a lock.
119      *
120      * When the lock is locked, reset it to unlocked, and return. If any other
121      * coroutines are blocked waiting for the lock to become unlocked, allow
122      * exactly one of them to proceed.
123      *
124      * When invoked on an unlocked lock, an $(D_PSYMBOL Exception) is thrown.
125      */
126     void release()
127     {
128         enforce(locked_, "Lock is not acquired.");
129 
130         locked_ = false;
131 
132         // Wake up the first waiter who isn't cancelled.
133         auto found = waiters.find!(w => !w.done);
134 
135         if (!found.empty)
136             found[0].setResult;
137     }
138 }
139 
140 /**
141  * Class implementing event objects.
142  *
143  * An event manages a flag that can be set to true with the $(D_PSYMBOL set())
144  * method and reset to false with the $(D_PSYMBOL clear()) method. The
145  * $(D_PSYMBOL wait()) method blocks until the flag is true. The flag is
146  * initially false.
147  *
148  * This class is not thread safe.
149  */ 
150 final class Event
151 {
152     private EventLoop eventLoop;
153     private Waiter[] waiters;
154     private bool value = false;
155 
156     this(EventLoop eventLoop = null)
157     {
158         if (eventLoop is null)
159             this.eventLoop = getEventLoop;
160         else
161             this.eventLoop = eventLoop;
162     }
163 
164     override string toString() const
165     {
166         import std.format : format;
167 
168         return "%s(%s, waiters %s)".format(typeid(this),
169             value ? "set" : "unset", waiters);
170     }
171 
172     /**
173      * Reset the internal flag to false. Subsequently, coroutines calling
174      * $(D_PSYMBOL wait()) will block until $(D_PSYMBOL set()) is called to set
175      * the internal flag to true again.
176      */
177     void clear()
178     {
179         value = false;
180     }
181 
182     /**
183      * Return $(D_KEYWORD true) if and only if the internal flag is true.
184      */
185     bool isSet() const
186     {
187         return value;
188     }
189 
190     /**
191      * Set the internal flag to true. All coroutines waiting for it to become
192      * true are awakened. Coroutine that call $(D_PSYMBOL wait()) once the flag
193      * is true will not block at all.
194      */
195     void set()
196     {
197         if (value)
198             return;
199 
200         value = true;
201 
202         auto found = waiters.find!(w => !w.done);
203 
204         if (!found.empty)
205             found[0].setResult;
206     }
207 
208     /**
209      * Block until the internal flag is true.
210      *
211      * If the internal flag is true on entry, return $(D_KEYWORD true)
212      * immediately. Otherwise, block until another coroutine calls $(D_PSYMBOL
213      * set()) to set the flag to true, then return $(D_KEYWORD true).
214      */
215     @Coroutine
216     bool wait()
217     {
218         if (value)
219             return true;
220 
221         auto waiter = new Waiter(eventLoop);
222         scope (exit)
223         {
224             waiters = waiters.remove!(w => w is waiter);
225         }
226 
227         waiter.addDoneCallback(&TaskHandle.currentTask(eventLoop).scheduleStep);
228         waiters ~= waiter;
229 
230         TaskHandle.yield;
231         return true;
232     }
233 }
234 
235 /**
236  * This class implements condition variable objects.
237  *
238  * A condition variable allows one or more coroutines to wait until they are
239  * notified by another coroutine.
240  *
241  * If the lock argument is given and not $(D_KEYWORD null) then it is used as
242  * the underlying lock. Otherwise, a new Lock object is created and used as the
243  * underlying lock.
244  *
245  * This class is not thread safe.
246  */
247 final class Condition
248 {
249     private EventLoop eventLoop;
250     private Lock lock;
251     private Waiter[] waiters;
252     private bool value = false;
253 
254     this(EventLoop eventLoop = null, Lock lock = null)
255     {
256         if (eventLoop is null)
257             this.eventLoop = getEventLoop;
258         else
259             this.eventLoop = eventLoop;
260 
261         if (lock is null)
262             lock = new Lock(this.eventLoop);
263         else
264             enforce(lock.eventLoop == this.eventLoop,
265                 "loop argument must agree with lock");
266         this.lock = lock;
267     }
268 
269     override string toString() const
270     {
271         import std.format : format;
272 
273         return "%s(%s, waiters %s)".format(typeid(this),
274             locked ? "locked" : "unlocked", waiters);
275     }
276 
277     /**
278      * Acquire the underlying lock.
279      *
280      * This method blocks until the lock is unlocked, then sets it to locked and
281      * returns $(D_KEYWORD true).
282      */
283     @Coroutine
284     bool acquire()
285     {
286         return lock.acquire;
287     }
288 
289     /**
290      * By default, wake up one coroutine waiting on this condition, if any. If
291      * the calling coroutine has not acquired the lock when this method is
292      * called, an $(D_PSYMBOL Exception) is thrown.
293      *
294      * This method wakes up at most $(D_PSYMBOL n) of the coroutines waiting for
295      * the condition variable; it is a no-op if no coroutines are waiting.
296      *
297      * Note: an awakened coroutine does not actually return from its $(D_PSYMBOL
298      * wait()) call until it can reacquire the lock. Since $(D_PSYMBOL notify())
299      * does not release the lock, its caller should.
300      */
301     void notify(size_t n = 1)
302     {
303         enforce(locked, "cannot notify on un-acquired lock");
304 
305         foreach (waiter; waiters.filter!(w => !w.done))
306         {
307             if (n == 0)
308                 break;
309 
310             --n;
311             waiter.setResult;
312         }
313     }
314 
315     /**
316      * Return $(D_KEYWORD true) if the underlying lock is acquired.
317      */
318     @property bool locked() const
319     {
320         return lock.locked;
321     }
322 
323     /**
324      * Wake up all coroutines waiting on this condition. This method acts like
325      * $(D_PSYMBOL notify()), but wakes up all waiting coroutines instead of
326      * one. If the calling coroutines has not acquired the lock when this method
327      * is called, an $(D_PSYMBOL Exception) is thrown.
328      */
329     void notifyAll()
330     {
331         notify(waiters.length);
332     }
333 
334     /**
335      * Release the underlying lock.
336      *
337      * When the lock is locked, reset it to unlocked, and return. If any other
338      * coroutines are blocked waiting for the lock to become unlocked, allow
339      * exactly one of them to proceed.
340      *
341      * When invoked on an unlocked lock, an $(D_PSYMBOL Exception) is thrown.
342      */
343     void release()
344     {
345         lock.release;
346     }
347 
348 
349     /**
350      * Wait until notified.
351      *
352      * If the calling coroutine has not acquired the lock when this method is
353      * called, an $(D_PSYMBOL Exception) is thrown.
354      *
355      * This method releases the underlying lock, and then blocks until it is
356      * awakened by a $(D_PSYMBOL notify()) or $(D_PSYMBOL notifyAll()) call for
357      * the same condition variable in another coroutine. Once awakened, it
358      * re-acquires the lock and returns $(D_KEYWORD true).
359      */
360     @Coroutine
361     bool wait()
362     {
363         enforce(locked, "cannot wait on un-acquired lock");
364 
365         release;
366         scope (exit)
367         {
368             acquire;
369         }
370 
371         auto waiter = new Waiter(eventLoop);
372         scope (exit)
373         {
374             waiters = waiters.remove!(w => w is waiter);
375         }
376 
377         waiter.addDoneCallback(&TaskHandle.currentTask(eventLoop).scheduleStep);
378         waiters ~= waiter;
379 
380         TaskHandle.yield;
381         return true;
382     }
383 
384     /**
385      * Wait until a predicate becomes true.
386      *
387      * The predicate should be a callable returning a boolean value. The final
388      * predicate value is the return value.
389      */
390     @Coroutine
391     bool waitFor(bool delegate() predicate)
392     {
393         while (!predicate())
394             wait;
395 
396         return true;
397     }
398 }
399 
400 // Semaphores
401 
402 /**
403  * A Semaphore implementation.
404  *
405  * A semaphore manages an internal counter which is decremented by each
406  * $(D_PSYMBOL acquire()) call and incremented by each $(D_PSYMBOL release())
407  * call. The counter can never go below zero; when $(D_PSYMBOL acquire()) finds
408  * that it is zero, it blocks, waiting until some other thread calls $(D_PSYMBOL
409  * release()).
410  *
411  * The optional argument gives the initial value for the internal counter; it
412  * defaults to 1.
413  *
414  * This class is not thread safe.
415  */
416 class Semaphore
417 {
418     private EventLoop eventLoop;
419     private size_t value;
420     private Waiter[] waiters;
421 
422     this(EventLoop eventLoop = null, size_t value = 1)
423     {
424         if (eventLoop is null)
425             this.eventLoop = getEventLoop;
426         else
427             this.eventLoop = eventLoop;
428 
429         this.value = value;
430     }
431 
432     override string toString() const
433     {
434         import std.format : format;
435 
436         if (locked)
437             return "%s(locked, waiters %s)".format(typeid(this), waiters);
438         else
439             return "%s(unlocked, value %s, waiters %s)".format(typeid(this),
440                 value, waiters);
441     }
442 
443     /**
444      * Acquire a semaphore.
445      *
446      * If the internal counter is larger than zero on entry, decrement it by one
447      * and return $(D_KEYWORD true) immediately. If it is zero on entry, block,
448      * waiting until some other coroutine has called $(D_PSYMBOL release()) to
449      * make it larger than 0, and then return $(D_KEYWORD true).
450      */
451     @Coroutine
452     bool acquire()
453     {
454         if (waiters.empty && value > 0)
455         {
456             --value;
457             return true;
458         }
459 
460         auto waiter = new Waiter(eventLoop);
461         scope (exit)
462         {
463             waiters = waiters.remove!(w => w is waiter);
464         }
465 
466         waiter.addDoneCallback(&TaskHandle.currentTask(eventLoop).scheduleStep);
467         waiters ~= waiter;
468 
469         TaskHandle.yield;
470         --value;
471         return true;
472     }
473 
474     /**
475      * Returns $(D_KEYWORD true) if semaphore can not be acquired immediately.
476      */
477     @property bool locked() const
478     {
479         return value == 0;
480     }
481 
482     /**
483      * Release a semaphore, incrementing the internal counter by one. When it
484      * was zero on entry and another coroutine is waiting for it to become
485      * larger than zero again, wake up that coroutine.
486      */
487     void release()
488     {
489         ++value;
490 
491         auto found = waiters.find!(w => !w.done);
492 
493         if (!found.empty)
494             found[0].setResult;
495     }
496 }
497 
498 /**
499  * A bounded semaphore implementation.
500  * 
501  * This throws an Exception in $(D_PSYMBOL release()) if it would increase the
502  * value above the initial value.
503  */
504 class BoundedSemaphore : Semaphore
505 {
506     private size_t boundValue;
507 
508     this(EventLoop eventLoop = null, size_t value = 1)
509     {
510         boundValue = value;
511         super(eventLoop, value);
512     }
513 
514     override void release()
515     {
516         enforce(value < boundValue, "BoundedSemaphore released too many times");
517         super.release;
518     }
519 }