1 /**
2  * Queues.
3  *
4  * Copyright: © 2015-2016 Dragos Carp
5  * License: Boost Software License - Version 1.0
6  * Authors: Dragos Carp
7  */
8 module asynchronous.queues;
9 
10 import std.algorithm;
11 import std.exception;
12 import std.range;
13 import std.typecons;
14 import asynchronous.events : EventLoop, getEventLoop;
15 import asynchronous.futures : Waiter;
16 import asynchronous.locks : Event;
17 import asynchronous.tasks : waitFor;
18 import asynchronous.types : Coroutine;
19 
20 /**
21  * Exception thrown when $(D_PSYMBOL Queue.getNowait()) is called on a
22  * $(D_PSYMBOL Queue) object which is empty.
23  */
24 class QueueEmptyException : Exception
25 {
26     this(string message = null, string file = __FILE__, size_t line = __LINE__,
27         Throwable next = null) @safe pure nothrow
28     {
29         super(message, file, line, next);
30     }
31 }
32 
33 /**
34  * Exception thrown when $(D_PSYMBOL Queue.putNowait()) is called on a
35  * $(D_PSYMBOL Queue) object which is full.
36  */
37 class QueueFullException : Exception
38 {
39     this(string message = null, string file = __FILE__, size_t line = __LINE__,
40         Throwable next = null) @safe pure nothrow
41     {
42         super(message, file, line, next);
43     }
44 }
45 
46 /**
47  * A queue, useful for coordinating producer and consumer coroutines.
48  *
49  * If $(D_PSYMBOL maxSize) is equal to zero, the queue size is infinite.
50  * Otherwise $(D_PSYMBOL put()) will block when the queue reaches maxsize, until
51  * an item is removed by $(D_PSYMBOL get()).
52  *
53  * You can reliably know this $(D_PSYMBOL Queue)'s size with $(D_PSYMBOL
54  * qsize()), since your single-threaded asynchronous application won't be
55  * interrupted between calling $(D_PSYMBOL qsize()) and doing an operation on
56  * the Queue.
57  *
58  * This class is not thread safe.
59  */
60 class Queue(T, size_t maxSize = 0)
61 {
62     private EventLoop eventLoop;
63     private Waiter[] getters;
64     private Waiter[] putters;
65     private size_t unfinishedTasks = 0;
66     private Event finished;
67     private T[] queue;
68     private size_t start = 0;
69     private size_t length = 0;
70 
71     this(EventLoop eventLoop = null)
72     {
73         if (eventLoop is null)
74             this.eventLoop = getEventLoop;
75         else
76             this.eventLoop = eventLoop;
77 
78         this.finished = new Event(this.eventLoop);
79         this.finished.set;
80     }
81 
82     override string toString() const
83     {
84         import std.format : format;
85 
86         auto data = chain(cast(T[]) queue, cast(T[]) queue)[start .. start + length];
87 
88         return "%s(maxsize %s, queue %s, getters %s, putters %s, unfinishedTasks %s)"
89             .format(typeid(this), maxSize, data, getters, putters, unfinishedTasks);
90     }
91 
92     protected T get_()
93     {
94         auto result = queue[start];
95         queue[start] = T.init;
96         ++start;
97         if (start == queue.length)
98             start = 0;
99         --length;
100         return result;
101     }
102 
103     protected void put_(T item)
104     {
105         queue[(start + length) % $] = item;
106         ++length;
107     }
108 
109     private void ensureCapacity()
110     {
111         if (length < queue.length)
112             return;
113 
114         static if (maxSize > 0)
115             assert(length < maxSize);
116         assert(length == queue.length);
117 
118         size_t newLength = max(8, length * 2);
119 
120         static if (maxSize > 0)
121             newLength = min(newLength, maxSize);
122 
123         bringToFront(queue[0 .. start], queue[start .. $]);
124         start = 0;
125 
126         queue.length = newLength;
127 
128         static if (maxSize == 0)
129             queue.length = queue.capacity;
130     }
131 
132     private void wakeupNext(ref Waiter[] waiters)
133     {
134         waiters = waiters.remove!(a => a.done);
135 
136         if (!waiters.empty)
137             waiters[0].setResult;
138     }
139 
140     private void consumeDonePutters()
141     {
142         // Delete waiters at the head of the put() queue who've timed out.
143         putters = putters.find!(g => !g.done);
144     }
145 
146     /**
147      * Return $(D_KEYWORD true) if the queue is empty, $(D_KEYWORD false)
148      * otherwise.
149      */
150     @property bool empty()
151     {
152         return length == 0;
153     }
154 
155     /**
156      * Return $(D_KEYWORD true) if there are maxsize items in the queue.
157      *
158      * Note: if the Queue was initialized with $(D_PSYMBOL maxsize) = 0 (the
159      * default), then $(D_PSYMBOL full()) is never $(D_KEYWORD true).
160      */
161     @property bool full()
162     {
163         static if (maxSize == 0)
164             return false;
165         else
166             return qsize >= maxSize;
167     }
168 
169     /**
170      * Remove and return an item from the queue.
171      *
172      * If queue is empty, wait until an item is available.
173      */
174     @Coroutine
175     T get()
176     {
177         while (empty)
178         {
179             auto waiter = new Waiter(eventLoop);
180 
181             getters ~= waiter;
182             eventLoop.waitFor(waiter);
183         }
184 
185         return getNowait;
186     }
187 
188     /**
189      * Remove and return an item from the queue.
190      *
191      * Return an item if one is immediately available, else throw $(D_PSYMBOL
192      * QueueEmptyException).
193      */
194     @Coroutine
195     T getNowait()
196     {
197         enforce!QueueEmptyException(length > 0);
198 
199         T item = get_;
200 
201         wakeupNext(putters);
202 
203         return item;
204     }
205 
206     /**
207      * Block until all items in the queue have been gotten and processed.
208      *
209      * The count of unfinished tasks goes up whenever an item is added to the
210      * queue. The count goes down whenever a consumer calls $(D_PSYMBOL
211      * taskDone()) to indicate that the item was retrieved and all work on it is
212      * complete.
213      * When the count of unfinished tasks drops to zero, $(D_PSYMBOL join())
214      * unblocks.
215      */
216     @Coroutine
217     void join()
218     {
219         if (unfinishedTasks > 0)
220             finished.wait;
221     }
222 
223     /**
224      * Put an item into the queue.
225      *
226      * If the queue is full, wait until a free slot is available before adding
227      * item.
228      */
229     @Coroutine
230     void put(T item)
231     {
232         static if (maxSize > 0)
233         {
234             while (full)
235             {
236                 auto waiter = new Waiter(eventLoop);
237 
238                 putters ~= waiter;
239                 eventLoop.waitFor(waiter);
240             }
241         }
242 
243         putNowait(item);
244     }
245 
246     /**
247      * Put an item into the queue without blocking.
248      *
249      * If no free slot is immediately available, throw $(D_PSYMBOL
250      * QueueFullException).
251      */
252     void putNowait(T item)
253     {
254         static if (maxSize > 0)
255             enforce!QueueFullException(qsize < maxSize);
256 
257         ensureCapacity;
258         put_(item);
259         ++unfinishedTasks;
260         finished.clear;
261 
262         wakeupNext(getters);
263     }
264 
265     /**
266      * Number of items in the queue.
267      */
268     @property size_t qsize()
269     {
270         return length;
271     }
272 
273     /**
274      * Indicate that a formerly enqueued task is complete.
275      *
276      * Used by queue consumers. For each $(D_PSYMBOL get()) used to fetch a
277      * task, a subsequent call to $(D_PSYMBOL taskDone()) tells the queue that
278      * the processing on the task is complete.
279      *
280      * If a $(D_PSYMBOL join()) is currently blocking, it will resume when all
281      * items have been processed (meaning that a $(D_PSYMBOL taskDone()) call
282      * was received for every item that had been $(D_PSYMBOL put()) into the
283      * queue).
284      *
285      * Throws $(D_PSYMBOL Exception) if called more times than there were items
286      * placed in the queue.
287      */
288     void taskDone()
289     {
290         enforce(unfinishedTasks > 0, "taskDone() called too many times");
291 
292         --unfinishedTasks;
293         if (unfinishedTasks == 0)
294             finished.set;
295     }
296 
297     /**
298      * Number of items allowed in the queue.
299      */
300     @property size_t maxsize()
301     {
302         return maxSize;
303     }
304 }
305 
306 unittest
307 {
308     auto queue = new Queue!int;
309 
310     foreach (i; iota(200))
311         queue.putNowait(i);
312 
313     foreach (i; iota(200))
314         assert(queue.getNowait == i);
315 }
316 
317 unittest
318 {
319     auto queue = new Queue!(int, 10);
320 
321     foreach (i; iota(10))
322         queue.putNowait(i);
323 
324     assertThrown!QueueFullException(queue.putNowait(11));
325 }
326 
327 /**
328  * A subclass of $(D_PSYMBOL Queue); retrieves entries in priority order
329  * (largest first).
330  *
331  * Entries are typically tuples of the form: (priority number, data).
332  */
333 class PriorityQueue(T, size_t maxSize = 0, alias less = "a < b") :
334     Queue!(T, maxSize)
335 {
336     import std.container : BinaryHeap;
337 
338     private BinaryHeap!(T[], less) binaryHeap;
339 
340     this(EventLoop eventLoop = null)
341     {
342         super(eventLoop);
343         binaryHeap.acquire(queue, length);
344     }
345 
346     protected override T get_()
347     {
348         --length;
349         auto result = binaryHeap.front;
350         binaryHeap.removeFront;
351         return result;
352     }
353 
354     protected override void put_(T item)
355     {
356         // underlying store is reallocated
357         if (binaryHeap.capacity != queue.length)
358             binaryHeap.assume(queue, length);
359 
360         assert(binaryHeap.capacity == queue.length);
361 
362         binaryHeap.insert(item);
363         ++length;
364     }
365 }
366 
367 unittest
368 {
369     auto queue = new PriorityQueue!int;
370 
371     foreach (i; iota(200))
372         queue.putNowait(i);
373 
374     foreach (i; iota(199, -1, -1))
375         assert(queue.getNowait == i);
376 }
377 
378 /**
379  * A subclass of $(D_PSYMBOL Queue) that retrieves most recently added entries
380  * first.
381  */
382 class LifoQueue(T, size_t maxSize = 0) : Queue!(T, maxSize)
383 {
384     this(EventLoop eventLoop = null)
385     {
386         super(eventLoop);
387     }
388 
389     protected override T get_()
390     {
391         auto result = queue[--length];
392         queue[length] = T.init;
393         return result;
394     }
395 
396     protected override void put_(T item)
397     {
398         queue[length++] = item;
399     }
400 }
401 
402 unittest
403 {
404     auto queue = new LifoQueue!int;
405 
406     foreach (i; iota(200))
407         queue.putNowait(i);
408 
409     foreach (i; iota(199, -1, -1))
410         assert(queue.getNowait == i);
411 }