1 /** 2 * Synchronization primitives. 3 * 4 * Copyright: © 2015-2016 Dragos Carp 5 * License: Boost Software License - Version 1.0 6 * Authors: Dragos Carp 7 */ 8 module asynchronous.locks; 9 10 import std.algorithm; 11 import std.array; 12 import std.exception; 13 import asynchronous.events : EventLoop, getEventLoop; 14 import asynchronous.futures : Waiter; 15 import asynchronous.tasks : TaskHandle; 16 import asynchronous.types : Coroutine; 17 18 // Locks 19 20 /** 21 * Primitive lock objects. 22 * 23 * A primitive lock is a synchronization primitive that is not owned by a 24 * particular coroutine when locked. A primitive lock is in one of two states, 25 * ‘locked’ or ‘unlocked’. 26 * 27 * It is created in the unlocked state. It has two basic methods, $(D_PSYMBOL 28 * acquire()) and $(D_PSYMBOL release()). When the state is unlocked, 29 * $(D_PSYMBOL acquire()) changes the state to locked and returns immediately. 30 * When the state is locked, $(D_PSYMBOL acquire()) blocks until a call to 31 * $(D_PSYMBOL release()) in another coroutine changes it to unlocked, then the 32 * $(D_PSYMBOL acquire()) call resets it to locked and returns. The $(D_PSYMBOL 33 * release()) method should only be called in the locked state; it changes the 34 * state to unlocked and returns immediately. If an attempt is made to release 35 * an unlocked lock, an $(D_PSYMBOL Exception) will be thrown. 36 * 37 * When more than one coroutine is blocked in $(D_PSYMBOL acquire()) waiting for 38 * the state to turn to unlocked, only one coroutine proceeds when a $(D_PSYMBOL 39 * release()) call resets the state to unlocked; first coroutine which is 40 * blocked in $(D_PSYMBOL acquire()) is being processed. 41 * 42 * $(D_PSYMBOL acquire()) is a coroutine. 43 * 44 * This class is not thread safe. 45 * 46 * Usage: 47 * 48 * lock = new Lock(); 49 * ... 50 * lock.acquire(); 51 * scope (exit) lock.release(); 52 * ... 53 * 54 * Lock objects can be tested for locking state: 55 * 56 * if (!lock.locked) 57 * lock.acquire(); 58 * else; 59 * // lock is acquired 60 * ... 61 */ 62 final class Lock 63 { 64 package EventLoop eventLoop; 65 private Waiter[] waiters; 66 private bool locked_ = false; 67 68 this(EventLoop eventLoop = null) 69 { 70 if (eventLoop is null) 71 this.eventLoop = getEventLoop; 72 else 73 this.eventLoop = eventLoop; 74 } 75 76 override string toString() const 77 { 78 import std.format : format; 79 80 return "%s(%s, waiters %s)".format(typeid(this), 81 locked ? "locked" : "unlocked", waiters); 82 } 83 84 /** 85 * Return $(D_KEYWORD true) if lock is acquired. 86 */ 87 @property bool locked() const 88 { 89 return locked_; 90 } 91 92 /** 93 * Acquire a lock. 94 * 95 * This method blocks until the lock is unlocked, then sets it to locked and 96 * returns $(D_KEYWORD true). 97 */ 98 @Coroutine 99 bool acquire() 100 { 101 if (waiters.empty && !locked_) 102 { 103 locked_ = true; 104 return true; 105 } 106 107 auto waiter = new Waiter(eventLoop); 108 scope (exit) 109 { 110 waiters = waiters.remove!(w => w is waiter); 111 } 112 113 waiter.addDoneCallback(&TaskHandle.currentTask(eventLoop).scheduleStep); 114 waiters ~= waiter; 115 116 TaskHandle.yield; 117 locked_ = true; 118 return true; 119 } 120 121 /** 122 * Release a lock. 123 * 124 * When the lock is locked, reset it to unlocked, and return. If any other 125 * coroutines are blocked waiting for the lock to become unlocked, allow 126 * exactly one of them to proceed. 127 * 128 * When invoked on an unlocked lock, an $(D_PSYMBOL Exception) is thrown. 129 */ 130 void release() 131 { 132 enforce(locked_, "Lock is not acquired."); 133 134 locked_ = false; 135 136 // Wake up the first waiter who isn't cancelled. 137 auto found = waiters.find!(w => !w.done); 138 139 if (!found.empty) 140 found[0].setResult; 141 } 142 } 143 144 /** 145 * Class implementing event objects. 146 * 147 * An event manages a flag that can be set to true with the $(D_PSYMBOL set()) 148 * method and reset to false with the $(D_PSYMBOL clear()) method. The 149 * $(D_PSYMBOL wait()) method blocks until the flag is true. The flag is 150 * initially false. 151 * 152 * This class is not thread safe. 153 */ 154 final class Event 155 { 156 private EventLoop eventLoop; 157 private Waiter[] waiters; 158 private bool value = false; 159 160 this(EventLoop eventLoop = null) 161 { 162 if (eventLoop is null) 163 this.eventLoop = getEventLoop; 164 else 165 this.eventLoop = eventLoop; 166 } 167 168 override string toString() const 169 { 170 import std.format : format; 171 172 return "%s(%s, waiters %s)".format(typeid(this), 173 value ? "set" : "unset", waiters); 174 } 175 176 /** 177 * Reset the internal flag to false. Subsequently, coroutines calling 178 * $(D_PSYMBOL wait()) will block until $(D_PSYMBOL set()) is called to set 179 * the internal flag to true again. 180 */ 181 void clear() 182 { 183 value = false; 184 } 185 186 /** 187 * Return $(D_KEYWORD true) if and only if the internal flag is true. 188 */ 189 bool isSet() const 190 { 191 return value; 192 } 193 194 /** 195 * Set the internal flag to true. All coroutines waiting for it to become 196 * true are awakened. Coroutines that call $(D_PSYMBOL wait()) once the flag 197 * is true will not block at all. 198 */ 199 void set() 200 { 201 if (value) 202 return; 203 204 value = true; 205 206 auto found = waiters.find!(w => !w.done); 207 208 if (!found.empty) 209 found[0].setResult; 210 } 211 212 /** 213 * Block until the internal flag is true. 214 * 215 * If the internal flag is true on entry, return $(D_KEYWORD true) 216 * immediately. Otherwise, block until another coroutine calls $(D_PSYMBOL 217 * set()) to set the flag to true, then return $(D_KEYWORD true). 218 */ 219 @Coroutine 220 bool wait() 221 { 222 if (value) 223 return true; 224 225 auto waiter = new Waiter(eventLoop); 226 scope (exit) 227 { 228 waiters = waiters.remove!(w => w is waiter); 229 } 230 231 waiter.addDoneCallback(&TaskHandle.currentTask(eventLoop).scheduleStep); 232 waiters ~= waiter; 233 234 TaskHandle.yield; 235 return true; 236 } 237 } 238 239 /** 240 * This class implements condition variable objects. 241 * 242 * A condition variable allows one or more coroutines to wait until they are 243 * notified by another coroutine. 244 * 245 * If the lock argument is given and not $(D_KEYWORD null) then it is used as 246 * the underlying lock. Otherwise, a new Lock object is created and used as the 247 * underlying lock. 248 * 249 * This class is not thread safe. 250 */ 251 final class Condition 252 { 253 private EventLoop eventLoop; 254 private Lock lock; 255 private Waiter[] waiters; 256 private bool value = false; 257 258 this(EventLoop eventLoop = null, Lock lock = null) 259 { 260 if (eventLoop is null) 261 this.eventLoop = getEventLoop; 262 else 263 this.eventLoop = eventLoop; 264 265 if (lock is null) 266 lock = new Lock(this.eventLoop); 267 else 268 enforce(lock.eventLoop == this.eventLoop, 269 "loop argument must agree with lock"); 270 this.lock = lock; 271 } 272 273 override string toString() const 274 { 275 import std.format : format; 276 277 return "%s(%s, waiters %s)".format(typeid(this), 278 locked ? "locked" : "unlocked", waiters); 279 } 280 281 /** 282 * Acquire the underlying lock. 283 * 284 * This method blocks until the lock is unlocked, then sets it to locked and 285 * returns $(D_KEYWORD true). 286 */ 287 @Coroutine 288 bool acquire() 289 { 290 return lock.acquire; 291 } 292 293 /** 294 * By default, wake up one coroutine waiting on this condition, if any. If 295 * the calling coroutine has not acquired the lock when this method is 296 * called, an $(D_PSYMBOL Exception) is thrown. 297 * 298 * This method wakes up at most $(D_PSYMBOL n) of the coroutines waiting for 299 * the condition variable; it is a no-op if no coroutines are waiting. 300 * 301 * Note: an awakened coroutine does not actually return from its $(D_PSYMBOL 302 * wait()) call until it can reacquire the lock. Since $(D_PSYMBOL notify()) 303 * does not release the lock, its caller should. 304 */ 305 void notify(size_t n = 1) 306 { 307 enforce(locked, "cannot notify on un-acquired lock"); 308 309 foreach (waiter; waiters.filter!(w => !w.done)) 310 { 311 if (n == 0) 312 break; 313 314 --n; 315 waiter.setResult; 316 } 317 } 318 319 /** 320 * Return $(D_KEYWORD true) if the underlying lock is acquired. 321 */ 322 @property bool locked() const 323 { 324 return lock.locked; 325 } 326 327 /** 328 * Wake up all coroutines waiting on this condition. This method acts like 329 * $(D_PSYMBOL notify()), but wakes up all waiting coroutines instead of 330 * one. If the calling coroutine has not acquired the lock when this method 331 * is called, an $(D_PSYMBOL Exception) is thrown. 332 */ 333 void notifyAll() 334 { 335 notify(waiters.length); 336 } 337 338 /** 339 * Release the underlying lock. 340 * 341 * When the lock is locked, reset it to unlocked, and return. If any other 342 * coroutines are blocked waiting for the lock to become unlocked, allow 343 * exactly one of them to proceed. 344 * 345 * When invoked on an unlocked lock, an $(D_PSYMBOL Exception) is thrown. 346 */ 347 void release() 348 { 349 lock.release; 350 } 351 352 353 /** 354 * Wait until notified. 355 * 356 * If the calling coroutine has not acquired the lock when this method is 357 * called, an $(D_PSYMBOL Exception) is thrown. 358 * 359 * This method releases the underlying lock, and then blocks until it is 360 * awakened by a $(D_PSYMBOL notify()) or $(D_PSYMBOL notifyAll()) call for 361 * the same condition variable in another coroutine. Once awakened, it 362 * re-acquires the lock and returns $(D_KEYWORD true). 363 */ 364 @Coroutine 365 bool wait() 366 { 367 enforce(locked, "cannot wait on un-acquired lock"); 368 369 release; 370 scope (exit) 371 { 372 acquire; 373 } 374 375 auto waiter = new Waiter(eventLoop); 376 scope (exit) 377 { 378 waiters = waiters.remove!(w => w is waiter); 379 } 380 381 waiter.addDoneCallback(&TaskHandle.currentTask(eventLoop).scheduleStep); 382 waiters ~= waiter; 383 384 TaskHandle.yield; 385 return true; 386 } 387 388 /** 389 * Wait until a predicate becomes true. 390 * 391 * The predicate should be a callable returning a boolean value. 392 * 393 * Returns true 394 */ 395 @Coroutine 396 bool waitFor(bool delegate() predicate) 397 { 398 while (!predicate()) 399 wait; 400 401 return true; 402 } 403 } 404 405 // Semaphores 406 407 /** 408 * A Semaphore implementation. 409 * 410 * A semaphore manages an internal counter which is decremented by each 411 * $(D_PSYMBOL acquire()) call and incremented by each $(D_PSYMBOL release()) 412 * call. The counter can never go below zero; when $(D_PSYMBOL acquire()) finds 413 * that it is zero, it blocks, waiting until some other thread calls $(D_PSYMBOL 414 * release()). 415 * 416 * The optional argument gives the initial value for the internal counter; it 417 * defaults to 1. 418 * 419 * This class is not thread safe. 420 */ 421 class Semaphore 422 { 423 private EventLoop eventLoop; 424 private size_t value; 425 private Waiter[] waiters; 426 427 this(EventLoop eventLoop = null, size_t value = 1) 428 { 429 if (eventLoop is null) 430 this.eventLoop = getEventLoop; 431 else 432 this.eventLoop = eventLoop; 433 434 this.value = value; 435 } 436 437 override string toString() const 438 { 439 import std.format : format; 440 441 if (locked) 442 return "%s(locked, waiters %s)".format(typeid(this), waiters); 443 else 444 return "%s(unlocked, value %s, waiters %s)".format(typeid(this), 445 value, waiters); 446 } 447 448 /** 449 * Acquire a semaphore. 450 * 451 * If the internal counter is larger than zero on entry, decrement it by one 452 * and return $(D_KEYWORD true) immediately. If it is zero on entry, block, 453 * waiting until some other coroutine has called $(D_PSYMBOL release()) to 454 * make it larger than 0, and then return $(D_KEYWORD true). 455 */ 456 @Coroutine 457 bool acquire() 458 { 459 if (waiters.empty && value > 0) 460 { 461 --value; 462 return true; 463 } 464 465 auto waiter = new Waiter(eventLoop); 466 scope (exit) 467 { 468 waiters = waiters.remove!(w => w is waiter); 469 } 470 471 waiter.addDoneCallback(&TaskHandle.currentTask(eventLoop).scheduleStep); 472 waiters ~= waiter; 473 474 TaskHandle.yield; 475 --value; 476 return true; 477 } 478 479 /** 480 * Returns $(D_KEYWORD true) if semaphore can not be acquired immediately. 481 */ 482 @property bool locked() const 483 { 484 return value == 0; 485 } 486 487 /** 488 * Release a semaphore, incrementing the internal counter by one. When it 489 * was zero on entry and another coroutine is waiting for it to become 490 * larger than zero again, wake up that coroutine. 491 */ 492 void release() 493 { 494 ++value; 495 496 auto found = waiters.find!(w => !w.done); 497 498 if (!found.empty) 499 found[0].setResult; 500 } 501 } 502 503 /** 504 * A bounded semaphore implementation. 505 * 506 * This throws an Exception in $(D_PSYMBOL release()) if it would increase the 507 * value above the initial value. 508 */ 509 class BoundedSemaphore : Semaphore 510 { 511 private size_t boundValue; 512 513 this(EventLoop eventLoop = null, size_t value = 1) 514 { 515 boundValue = value; 516 super(eventLoop, value); 517 } 518 519 override void release() 520 { 521 enforce(value < boundValue, "BoundedSemaphore released too many times"); 522 super.release; 523 } 524 }