1 /**
2  * Queues.
3  */
4 module asynchronous.queues;
5 
6 import std.algorithm;
7 import std.exception;
8 import std.range;
9 import std.typecons;
10 import asynchronous.events : EventLoop, getEventLoop;
11 import asynchronous.futures : Waiter;
12 import asynchronous.locks : Event;
13 import asynchronous.tasks : waitFor;
14 import asynchronous.types : Coroutine;
15 
16 /**
17  * Exception thrown when $(D_PSYMBOL Queue.getNowait()) is called on a
18  * $(D_PSYMBOL Queue) object which is empty.
19  */
20 class QueueEmptyException : Exception
21 {
22     this(string message = null, string file = __FILE__, size_t line = __LINE__,
23         Throwable next = null) @safe pure nothrow
24     {
25         super(message, file, line, next);
26     }
27 }
28 
29 /**
30  * Exception thrown when $(D_PSYMBOL Queue.putNowait()) is called on a
31  * $(D_PSYMBOL Queue) object which is full.
32  */
33 class QueueFullException : Exception
34 {
35     this(string message = null, string file = __FILE__, size_t line = __LINE__,
36         Throwable next = null) @safe pure nothrow
37     {
38         super(message, file, line, next);
39     }
40 }
41 
42 /**
43  * A queue, useful for coordinating producer and consumer coroutines.
44  *
45  * If $(D_PSYMBOL maxSize) is equal to zero, the queue size is infinite.
46  * Otherwise $(D_PSYMBOL put()) will block when the queue reaches maxsize, until
47  * an item is removed by $(D_PSYMBOL get()).
48  *
49  * You can reliably know this $(D_PSYMBOL Queue)'s size with $(D_PSYMBOL
50  * qsize()), since your single-threaded asynchronous application won't be
51  * interrupted between calling $(D_PSYMBOL qsize()) and doing an operation on
52  * the Queue.
53  *
54  * This class is not thread safe.
55  */
56 class Queue(T, size_t maxSize = 0)
57 {
58     private EventLoop eventLoop;
59     private Waiter[] getters;
60     private Waiter[] putters;
61     private size_t unfinishedTasks = 0;
62     private Event finished;
63     private T[] queue;
64     private size_t start = 0;
65     private size_t length = 0;
66 
67     this(EventLoop eventLoop = null)
68     {
69         if (eventLoop is null)
70             this.eventLoop = getEventLoop;
71         else
72             this.eventLoop = eventLoop;
73         
74         this.finished = new Event(this.eventLoop);
75         this.finished.set;
76     }
77 
78     override string toString() const
79     {
80         import std.format : format;
81 
82         import std.format : format;
83 
84         auto data = chain(cast(T[]) queue, cast(T[]) queue)[start .. start + length];
85 
86         return "%s(maxsize %s, queue %s, getters %s, putters %s, unfinishedTasks %s)"
87             .format(typeid(this), maxSize, data, getters, putters, unfinishedTasks);
88     }
89 
90     protected T get_()
91     {
92         auto result = queue[start];
93         queue[start] = T.init;
94         ++start;
95         if (start == queue.length)
96             start = 0;
97         --length;
98         return result;
99     }
100 
101     protected void put_(T item)
102     {
103         queue[(start + length) % $] = item;
104         ++length;
105     }
106 
107     private void ensureCapacity()
108     {
109         if (length < queue.length)
110             return;
111 
112         static if (maxSize > 0)
113             assert(length < maxSize);
114         assert(length == queue.length);
115 
116         size_t newLength = max(8, length * 2);
117 
118         static if (maxSize > 0)
119             newLength = min(newLength, maxSize);
120 
121         bringToFront(queue[0 .. start], queue[start .. $]);
122         start = 0;
123 
124         queue.length = newLength;
125 
126         static if (maxSize == 0)
127             queue.length = queue.capacity;
128     }
129 
130     private void wakeupNext(ref Waiter[] waiters)
131     {
132         waiters = waiters.remove!(a => a.done);
133 
134         if (!waiters.empty)
135             waiters[0].setResult;
136     }
137 
138     private void consumeDonePutters()
139     {
140         // Delete waiters at the head of the put() queue who've timed out.
141         putters = putters.find!(g => !g.done);
142     }
143 
144     /**
145      * Return $(D_KEYWORD true) if the queue is empty, $(D_KEYWORD false)
146      * otherwise.
147      */
148     @property bool empty()
149     {
150         return length == 0;
151     }
152 
153     /**
154      * Return $(D_KEYWORD true) if there are maxsize items in the queue.
155      *
156      * Note: if the Queue was initialized with $(D_PSYMBOL maxsize) = 0 (the
157      * default), then $(D_PSYMBOL full()) is never $(D_KEYWORD true).
158      */
159     @property bool full()
160     {
161         static if (maxSize == 0)
162             return false;
163         else
164             return qsize >= maxSize;
165     }
166 
167     /**
168      * Remove and return an item from the queue.
169      *
170      * If queue is empty, wait until an item is available.
171      */
172     @Coroutine
173     T get()
174     {
175         while (empty)
176         {
177             auto waiter = new Waiter(eventLoop);
178 
179             getters ~= waiter;
180             eventLoop.waitFor(waiter);
181         }
182 
183         return getNowait;
184     }
185 
186     /**
187      * Remove and return an item from the queue.
188      *
189      * Return an item if one is immediately available, else throw $(D_PSYMBOL
190      * QueueEmptyException).
191      */
192     @Coroutine
193     T getNowait()
194     {
195         enforceEx!QueueEmptyException(length > 0);
196 
197         T item = get_;
198 
199         wakeupNext(putters);
200 
201         return item;
202     }
203 
204     /**
205      * Block until all items in the queue have been gotten and processed.
206      *
207      * The count of unfinished tasks goes up whenever an item is added to the
208      * queue. The count goes down whenever a consumer calls $(D_PSYMBOL
209      * taskDone()) to indicate that the item was retrieved and all work on it is
210      * complete.
211      * When the count of unfinished tasks drops to zero, $(D_PSYMBOL join())
212      * unblocks.
213      */
214     @Coroutine
215     void join()
216     {
217         if (unfinishedTasks > 0)
218             finished.wait;
219     }
220 
221     /**
222      * Put an item into the queue.
223      *
224      * If the queue is full, wait until a free slot is available before adding
225      * item.
226      */
227     @Coroutine
228     void put(T item)
229     {
230         static if (maxSize > 0)
231         {
232             while (full)
233             {
234                 auto waiter = new Waiter(eventLoop);
235 
236                 putters ~= waiter;
237                 eventLoop.waitFor(waiter);
238             }
239         }
240 
241         putNowait(item);
242     }
243 
244     /**
245      * Put an item into the queue without blocking.
246      *
247      * If no free slot is immediately available, throw $(D_PSYMBOL
248      * QueueFullException).
249      */
250     void putNowait(T item)
251     {
252         static if (maxSize > 0)
253             enforceEx!QueueFullException(qsize < maxSize);
254 
255         ensureCapacity;
256         put_(item);
257         ++unfinishedTasks;
258         finished.clear;
259 
260         wakeupNext(getters);
261     }
262 
263     /**
264      * Number of items in the queue.
265      */
266     @property size_t qsize()
267     {
268         return length;
269     }
270 
271     /**
272      * Indicate that a formerly enqueued task is complete.
273      *
274      * Used by queue consumers. For each $(D_PSYMBOL get()) used to fetch a
275      * task, a subsequent call to $(D_PSYMBOL taskDone()) tells the queue that
276      * the processing on the task is complete.
277      *
278      * If a $(D_PSYMBOL join()) is currently blocking, it will resume when all
279      * items have been processed (meaning that a $(D_PSYMBOL taskDone()) call
280      * was received for every item that had been $(D_PSYMBOL put()) into the
281      * queue).
282      *
283      * Throws $(D_PSYMBOL Exception) if called more times than there were items
284      * placed in the queue.
285      */
286     void taskDone()
287     {
288         enforce(unfinishedTasks > 0, "taskDone() called too many times");
289 
290         --unfinishedTasks;
291         if (unfinishedTasks == 0)
292             finished.set;
293     }
294 
295     /**
296      * Number of items allowed in the queue.
297      */
298     @property size_t maxsize()
299     {
300         return maxSize;
301     }
302 }
303 
304 unittest
305 {
306     auto queue = new Queue!int;
307 
308     foreach (i; iota(200))
309         queue.putNowait(i);
310 
311     foreach (i; iota(200))
312         assert(queue.getNowait == i);
313 }
314 
315 unittest
316 {
317     auto queue = new Queue!(int, 10);
318 
319     foreach (i; iota(10))
320         queue.putNowait(i);
321 
322     assertThrown!QueueFullException(queue.putNowait(11));
323 }
324 
325 /**
326  * A subclass of $(D_PSYMBOL Queue); retrieves entries in priority order
327  * (largest first).
328  *
329  * Entries are typically tuples of the form: (priority number, data).
330  */
331 class PriorityQueue(T, size_t maxSize = 0, alias less = "a < b") :
332     Queue!(T, maxSize)
333 {
334     import std.container : BinaryHeap;
335 
336     private BinaryHeap!(T[], less) binaryHeap;
337 
338     this(EventLoop eventLoop = null)
339     {
340         super(eventLoop);
341         binaryHeap.acquire(queue, length);
342     }
343 
344     protected override T get_()
345     {
346         --length;
347         auto result = binaryHeap.front;
348         binaryHeap.removeFront;
349         return result;
350     }
351 
352     protected override void put_(T item)
353     {
354         // underlying store is reallocated
355         if (binaryHeap.capacity != queue.length)
356             binaryHeap.assume(queue, length);
357 
358         assert(binaryHeap.capacity == queue.length);
359 
360         binaryHeap.insert(item);
361         ++length;
362     }
363 }
364 
365 unittest
366 {
367     auto queue = new PriorityQueue!int;
368 
369     foreach (i; iota(200))
370         queue.putNowait(i);
371 
372     foreach (i; iota(199, -1, -1))
373         assert(queue.getNowait == i);
374 }
375 
376 /**
377  * A subclass of $(D_PSYMBOL Queue) that retrieves most recently added entries
378  * first.
379  */
380 class LifoQueue(T, size_t maxSize = 0) : Queue!(T, maxSize)
381 {
382     this(EventLoop eventLoop = null)
383     {
384         super(eventLoop);
385     }
386 
387     protected override T get_()
388     {
389         auto result = queue[--length];
390         queue[length] = T.init;
391         return result;
392     }
393 
394     protected override void put_(T item)
395     {
396         queue[length++] = item;
397     }
398 }
399 
400 unittest
401 {
402     auto queue = new LifoQueue!int;
403 
404     foreach (i; iota(200))
405         queue.putNowait(i);
406 
407     foreach (i; iota(199, -1, -1))
408         assert(queue.getNowait == i);
409 }