1 /** 2 * Synchronization primitives. 3 */ 4 module asynchronous.locks; 5 6 import std.algorithm; 7 import std.array; 8 import std.exception; 9 import asynchronous.events : EventLoop, getEventLoop; 10 import asynchronous.futures : Waiter; 11 import asynchronous.tasks : TaskHandle; 12 import asynchronous.types : Coroutine; 13 14 // Locks 15 16 /** 17 * Primitive lock objects. 18 * 19 * A primitive lock is a synchronization primitive that is not owned by a 20 * particular coroutine when locked. A primitive lock is in one of two states, 21 * ‘locked’ or ‘unlocked’. 22 * 23 * It is created in the unlocked state. It has two basic methods, $(D_PSYMBOL 24 * acquire()) and $(D_PSYMBOL release()). When the state is unlocked, 25 * $(D_PSYMBOL acquire()) changes the state to locked and returns immediately. 26 * When the state is locked, $(D_PSYMBOL acquire()) blocks until a call to 27 * $(D_PSYMBOL release()) in another coroutine changes it to unlocked, then the 28 * $(D_PSYMBOL acquire()) call resets it to locked and returns. The $(D_PSYMBOL 29 * release()) method should only be called in the locked state; it changes the 30 * state to unlocked and returns immediately. If an attempt is made to release 31 * an unlocked lock, an $(D_PSYMBOL Exception) will be thrown. 32 * 33 * When more than one coroutine is blocked in $(D_PSYMBOL acquire()) waiting for 34 * the state to turn to unlocked, only one coroutine proceeds when a $(D_PSYMBOL 35 * release()) call resets the state to unlocked; first coroutine which is 36 * blocked in $(D_PSYMBOL acquire()) is being processed. 37 * 38 * $(D_PSYMBOL acquire()) is a coroutine. 39 * 40 * This class is not thread safe. 41 * 42 * Usage: 43 * 44 * lock = new Lock(); 45 * ... 46 * lock.acquire(); 47 * scope (exit) lock.release(); 48 * ... 49 * 50 * Lock objects can be tested for locking state: 51 * 52 * if (!lock.locked) 53 * lock.acquire(); 54 * else; 55 * // lock is acquired 56 * ... 57 */ 58 final class Lock 59 { 60 package EventLoop eventLoop; 61 private Waiter[] waiters; 62 private bool locked_ = false; 63 64 this(EventLoop eventLoop = null) 65 { 66 if (eventLoop is null) 67 this.eventLoop = getEventLoop; 68 else 69 this.eventLoop = eventLoop; 70 } 71 72 override string toString() const 73 { 74 import std.format : format; 75 76 return "%s(%s, waiters %s)".format(typeid(this), 77 locked ? "locked" : "unlocked", waiters); 78 } 79 80 /** 81 * Return $(D_KEYWORD true) if lock is acquired. 82 */ 83 @property bool locked() const 84 { 85 return locked_; 86 } 87 88 /** 89 * Acquire a lock. 90 * 91 * This method blocks until the lock is unlocked, then sets it to locked and 92 * returns $(D_KEYWORD true). 93 */ 94 @Coroutine 95 bool acquire() 96 { 97 if (waiters.empty && !locked_) 98 { 99 locked_ = true; 100 return true; 101 } 102 103 auto waiter = new Waiter(eventLoop); 104 scope (exit) 105 { 106 waiters = waiters.remove!(w => w is waiter); 107 } 108 109 waiter.addDoneCallback(&TaskHandle.currentTask(eventLoop).scheduleStep); 110 waiters ~= waiter; 111 112 TaskHandle.yield; 113 locked_ = true; 114 return true; 115 } 116 117 /** 118 * Release a lock. 119 * 120 * When the lock is locked, reset it to unlocked, and return. If any other 121 * coroutines are blocked waiting for the lock to become unlocked, allow 122 * exactly one of them to proceed. 123 * 124 * When invoked on an unlocked lock, an $(D_PSYMBOL Exception) is thrown. 125 */ 126 void release() 127 { 128 enforce(locked_, "Lock is not acquired."); 129 130 locked_ = false; 131 132 // Wake up the first waiter who isn't cancelled. 133 auto found = waiters.find!(w => !w.done); 134 135 if (!found.empty) 136 found[0].setResult; 137 } 138 } 139 140 /** 141 * Class implementing event objects. 142 * 143 * An event manages a flag that can be set to true with the $(D_PSYMBOL set()) 144 * method and reset to false with the $(D_PSYMBOL clear()) method. The 145 * $(D_PSYMBOL wait()) method blocks until the flag is true. The flag is 146 * initially false. 147 * 148 * This class is not thread safe. 149 */ 150 final class Event 151 { 152 private EventLoop eventLoop; 153 private Waiter[] waiters; 154 private bool value = false; 155 156 this(EventLoop eventLoop = null) 157 { 158 if (eventLoop is null) 159 this.eventLoop = getEventLoop; 160 else 161 this.eventLoop = eventLoop; 162 } 163 164 override string toString() const 165 { 166 import std.format : format; 167 168 return "%s(%s, waiters %s)".format(typeid(this), 169 value ? "set" : "unset", waiters); 170 } 171 172 /** 173 * Reset the internal flag to false. Subsequently, coroutines calling 174 * $(D_PSYMBOL wait()) will block until $(D_PSYMBOL set()) is called to set 175 * the internal flag to true again. 176 */ 177 void clear() 178 { 179 value = false; 180 } 181 182 /** 183 * Return $(D_KEYWORD true) if and only if the internal flag is true. 184 */ 185 bool isSet() const 186 { 187 return value; 188 } 189 190 /** 191 * Set the internal flag to true. All coroutines waiting for it to become 192 * true are awakened. Coroutine that call $(D_PSYMBOL wait()) once the flag 193 * is true will not block at all. 194 */ 195 void set() 196 { 197 if (value) 198 return; 199 200 value = true; 201 202 auto found = waiters.find!(w => !w.done); 203 204 if (!found.empty) 205 found[0].setResult; 206 } 207 208 /** 209 * Block until the internal flag is true. 210 * 211 * If the internal flag is true on entry, return $(D_KEYWORD true) 212 * immediately. Otherwise, block until another coroutine calls $(D_PSYMBOL 213 * set()) to set the flag to true, then return $(D_KEYWORD true). 214 */ 215 @Coroutine 216 bool wait() 217 { 218 if (value) 219 return true; 220 221 auto waiter = new Waiter(eventLoop); 222 scope (exit) 223 { 224 waiters = waiters.remove!(w => w is waiter); 225 } 226 227 waiter.addDoneCallback(&TaskHandle.currentTask(eventLoop).scheduleStep); 228 waiters ~= waiter; 229 230 TaskHandle.yield; 231 return true; 232 } 233 } 234 235 /** 236 * This class implements condition variable objects. 237 * 238 * A condition variable allows one or more coroutines to wait until they are 239 * notified by another coroutine. 240 * 241 * If the lock argument is given and not $(D_KEYWORD null) then it is used as 242 * the underlying lock. Otherwise, a new Lock object is created and used as the 243 * underlying lock. 244 * 245 * This class is not thread safe. 246 */ 247 final class Condition 248 { 249 private EventLoop eventLoop; 250 private Lock lock; 251 private Waiter[] waiters; 252 private bool value = false; 253 254 this(EventLoop eventLoop = null, Lock lock = null) 255 { 256 if (eventLoop is null) 257 this.eventLoop = getEventLoop; 258 else 259 this.eventLoop = eventLoop; 260 261 if (lock is null) 262 lock = new Lock(this.eventLoop); 263 else 264 enforce(lock.eventLoop == this.eventLoop, 265 "loop argument must agree with lock"); 266 this.lock = lock; 267 } 268 269 override string toString() const 270 { 271 import std.format : format; 272 273 return "%s(%s, waiters %s)".format(typeid(this), 274 locked ? "locked" : "unlocked", waiters); 275 } 276 277 /** 278 * Acquire the underlying lock. 279 * 280 * This method blocks until the lock is unlocked, then sets it to locked and 281 * returns $(D_KEYWORD true). 282 */ 283 @Coroutine 284 bool acquire() 285 { 286 return lock.acquire; 287 } 288 289 /** 290 * By default, wake up one coroutine waiting on this condition, if any. If 291 * the calling coroutine has not acquired the lock when this method is 292 * called, an $(D_PSYMBOL Exception) is thrown. 293 * 294 * This method wakes up at most $(D_PSYMBOL n) of the coroutines waiting for 295 * the condition variable; it is a no-op if no coroutines are waiting. 296 * 297 * Note: an awakened coroutine does not actually return from its $(D_PSYMBOL 298 * wait()) call until it can reacquire the lock. Since $(D_PSYMBOL notify()) 299 * does not release the lock, its caller should. 300 */ 301 void notify(size_t n = 1) 302 { 303 enforce(locked, "cannot notify on un-acquired lock"); 304 305 foreach (waiter; waiters.filter!(w => !w.done)) 306 { 307 if (n == 0) 308 break; 309 310 --n; 311 waiter.setResult; 312 } 313 } 314 315 /** 316 * Return $(D_KEYWORD true) if the underlying lock is acquired. 317 */ 318 @property bool locked() const 319 { 320 return lock.locked; 321 } 322 323 /** 324 * Wake up all coroutines waiting on this condition. This method acts like 325 * $(D_PSYMBOL notify()), but wakes up all waiting coroutines instead of 326 * one. If the calling coroutines has not acquired the lock when this method 327 * is called, an $(D_PSYMBOL Exception) is thrown. 328 */ 329 void notifyAll() 330 { 331 notify(waiters.length); 332 } 333 334 /** 335 * Release the underlying lock. 336 * 337 * When the lock is locked, reset it to unlocked, and return. If any other 338 * coroutines are blocked waiting for the lock to become unlocked, allow 339 * exactly one of them to proceed. 340 * 341 * When invoked on an unlocked lock, an $(D_PSYMBOL Exception) is thrown. 342 */ 343 void release() 344 { 345 lock.release; 346 } 347 348 349 /** 350 * Wait until notified. 351 * 352 * If the calling coroutine has not acquired the lock when this method is 353 * called, an $(D_PSYMBOL Exception) is thrown. 354 * 355 * This method releases the underlying lock, and then blocks until it is 356 * awakened by a $(D_PSYMBOL notify()) or $(D_PSYMBOL notifyAll()) call for 357 * the same condition variable in another coroutine. Once awakened, it 358 * re-acquires the lock and returns $(D_KEYWORD true). 359 */ 360 @Coroutine 361 bool wait() 362 { 363 enforce(locked, "cannot wait on un-acquired lock"); 364 365 release; 366 scope (exit) 367 { 368 acquire; 369 } 370 371 auto waiter = new Waiter(eventLoop); 372 scope (exit) 373 { 374 waiters = waiters.remove!(w => w is waiter); 375 } 376 377 waiter.addDoneCallback(&TaskHandle.currentTask(eventLoop).scheduleStep); 378 waiters ~= waiter; 379 380 TaskHandle.yield; 381 return true; 382 } 383 384 /** 385 * Wait until a predicate becomes true. 386 * 387 * The predicate should be a callable returning a boolean value. The final 388 * predicate value is the return value. 389 */ 390 @Coroutine 391 bool waitFor(bool delegate() predicate) 392 { 393 while (!predicate()) 394 wait; 395 396 return true; 397 } 398 } 399 400 // Semaphores 401 402 /** 403 * A Semaphore implementation. 404 * 405 * A semaphore manages an internal counter which is decremented by each 406 * $(D_PSYMBOL acquire()) call and incremented by each $(D_PSYMBOL release()) 407 * call. The counter can never go below zero; when $(D_PSYMBOL acquire()) finds 408 * that it is zero, it blocks, waiting until some other thread calls $(D_PSYMBOL 409 * release()). 410 * 411 * The optional argument gives the initial value for the internal counter; it 412 * defaults to 1. 413 * 414 * This class is not thread safe. 415 */ 416 class Semaphore 417 { 418 private EventLoop eventLoop; 419 private size_t value; 420 private Waiter[] waiters; 421 422 this(EventLoop eventLoop = null, size_t value = 1) 423 { 424 if (eventLoop is null) 425 this.eventLoop = getEventLoop; 426 else 427 this.eventLoop = eventLoop; 428 429 this.value = value; 430 } 431 432 override string toString() const 433 { 434 import std.format : format; 435 436 if (locked) 437 return "%s(locked, waiters %s)".format(typeid(this), waiters); 438 else 439 return "%s(unlocked, value %s, waiters %s)".format(typeid(this), 440 value, waiters); 441 } 442 443 /** 444 * Acquire a semaphore. 445 * 446 * If the internal counter is larger than zero on entry, decrement it by one 447 * and return $(D_KEYWORD true) immediately. If it is zero on entry, block, 448 * waiting until some other coroutine has called $(D_PSYMBOL release()) to 449 * make it larger than 0, and then return $(D_KEYWORD true). 450 */ 451 @Coroutine 452 bool acquire() 453 { 454 if (waiters.empty && value > 0) 455 { 456 --value; 457 return true; 458 } 459 460 auto waiter = new Waiter(eventLoop); 461 scope (exit) 462 { 463 waiters = waiters.remove!(w => w is waiter); 464 } 465 466 waiter.addDoneCallback(&TaskHandle.currentTask(eventLoop).scheduleStep); 467 waiters ~= waiter; 468 469 TaskHandle.yield; 470 --value; 471 return true; 472 } 473 474 /** 475 * Returns $(D_KEYWORD true) if semaphore can not be acquired immediately. 476 */ 477 @property bool locked() const 478 { 479 return value == 0; 480 } 481 482 /** 483 * Release a semaphore, incrementing the internal counter by one. When it 484 * was zero on entry and another coroutine is waiting for it to become 485 * larger than zero again, wake up that coroutine. 486 */ 487 void release() 488 { 489 ++value; 490 491 auto found = waiters.find!(w => !w.done); 492 493 if (!found.empty) 494 found[0].setResult; 495 } 496 } 497 498 /** 499 * A bounded semaphore implementation. 500 * 501 * This throws an Exception in $(D_PSYMBOL release()) if it would increase the 502 * value above the initial value. 503 */ 504 class BoundedSemaphore : Semaphore 505 { 506 private size_t boundValue; 507 508 this(EventLoop eventLoop = null, size_t value = 1) 509 { 510 boundValue = value; 511 super(eventLoop, value); 512 } 513 514 override void release() 515 { 516 enforce(value < boundValue, "BoundedSemaphore released too many times"); 517 super.release; 518 } 519 }