View Javadoc

1   /*
2     File: BoundedLinkedQueue.java
3   
4     Originally written by Doug Lea and released into the public domain.
5     This may be used for any purposes whatsoever without acknowledgment.
6     Thanks for the assistance and support of Sun Microsystems Labs,
7     and everyone contributing, testing, and using this code.
8   
9     History:
10    Date       Who                What
11    11Jun1998  dl               Create public version
12    17Jul1998  dl               Simplified by eliminating wait counts
13    25aug1998  dl               added peek
14    10oct1999  dl               lock on node object to ensure visibility
15    27jan2000  dl               setCapacity forces immediate permit reconcile
16  */
17  
18  package org.jdiagnose.concurrent;
19  
20  /***
21   * A bounded variant of 
22   * LinkedQueue 
23   * class. This class may be
24   * preferable to 
25   * BoundedBuffer 
26   * because it allows a bit more
27   * concurency among puts and takes,  because it does not
28   * pre-allocate fixed storage for elements, and allows 
29   * capacity to be dynamically reset.
30   * On the other hand, since it allocates a node object
31   * on each put, it can be slow on systems with slow
32   * allocation and GC.
33   * Also, it may be
34   * preferable to 
35   * LinkedQueue 
36   * when you need to limit
37   * the capacity to prevent resource exhaustion. This protection
38   * normally does not hurt much performance-wise: When the
39   * queue is not empty or full, most puts and
40   * takes are still usually able to execute concurrently.
41   * @see LinkedQueue 
42   * @see BoundedBuffer 
43   * <p>[<a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>] <p>
44   **/
45  
46  public class BoundedLinkedQueue implements BoundedChannel {
47  
48    /*
49     * It might be a bit nicer if this were declared as
50     * a subclass of LinkedQueue, or a sibling class of
51     * a common abstract class. It shares much of the
52     * basic design and bookkeeping fields. But too 
53     * many details differ to make this worth doing.
54     */
55  
56  
57  
58    /*** 
59     * Dummy header node of list. The first actual node, if it exists, is always 
60     * at head_.next. After each take, the old first node becomes the head.
61     **/
62    protected LinkedNode head_;
63  
64    /*** 
65     * The last node of list. Put() appends to list, so modifies last_
66     **/
67    protected LinkedNode last_;
68  
69  
70    /***
71     * Helper monitor. Ensures that only one put at a time executes.
72     **/
73  
74    protected final Object putGuard_ = new Object();
75  
76    /***
77     * Helper monitor. Protects and provides wait queue for takes
78     **/
79  
80    protected final Object takeGuard_ = new Object();
81  
82  
83    /*** Number of elements allowed **/
84    protected int capacity_;
85  
86    
87    /***
88     * One side of a split permit count. 
89     * The counts represent permits to do a put. (The queue is full when zero).
90     * Invariant: putSidePutPermits_ + takeSidePutPermits_ = capacity_ - length.
91     * (The length is never separately recorded, so this cannot be
92     * checked explicitly.)
93     * To minimize contention between puts and takes, the
94     * put side uses up all of its permits before transfering them from
95     * the take side. The take side just increments the count upon each take.
96     * Thus, most puts and take can run independently of each other unless
97     * the queue is empty or full. 
98     * Initial value is queue capacity.
99     **/
100 
101   protected int putSidePutPermits_; 
102 
103   /*** Number of takes since last reconcile **/
104   protected int takeSidePutPermits_ = 0;
105 
106 
107   /***
108    * Create a queue with the given capacity
109    * @exception IllegalArgumentException if capacity less or equal to zero
110    **/
111   public BoundedLinkedQueue(int capacity) {
112     if (capacity <= 0) throw new IllegalArgumentException();
113     capacity_ = capacity;
114     putSidePutPermits_ = capacity;
115     head_ =  new LinkedNode(null); 
116     last_ = head_;
117   }
118 
119   /***
120    * Create a queue with the current default capacity
121    **/
122 
123   public BoundedLinkedQueue() { 
124     this(DefaultChannelCapacity.get()); 
125   }
126 
127   /***
128    * Move put permits from take side to put side; 
129    * return the number of put side permits that are available.
130    * Call only under synch on puGuard_ AND this.
131    **/
132   protected final int reconcilePutPermits() {
133     putSidePutPermits_ += takeSidePutPermits_;
134     takeSidePutPermits_ = 0;
135     return putSidePutPermits_;
136   }
137 
138 
139   /*** Return the current capacity of this queue **/
140   public synchronized int capacity() { return capacity_; }
141 
142 
143   /*** 
144    * Return the number of elements in the queue.
145    * This is only a snapshot value, that may be in the midst 
146    * of changing. The returned value will be unreliable in the presence of
147    * active puts and takes, and should only be used as a heuristic
148    * estimate, for example for resource monitoring purposes.
149    **/
150   public synchronized int size() {
151     /*
152       This should ideally synch on putGuard_, but
153       doing so would cause it to block waiting for an in-progress
154       put, which might be stuck. So we instead use whatever
155       value of putSidePutPermits_ that we happen to read.
156     */
157     return capacity_ - (takeSidePutPermits_ + putSidePutPermits_);
158   }
159 
160 
161   /***
162    * Reset the capacity of this queue.
163    * If the new capacity is less than the old capacity,
164    * existing elements are NOT removed, but
165    * incoming puts will not proceed until the number of elements
166    * is less than the new capacity.
167    * @exception IllegalArgumentException if capacity less or equal to zero
168    **/
169 
170   public void setCapacity(int newCapacity) {
171     if (newCapacity <= 0) throw new IllegalArgumentException();
172     synchronized (putGuard_) {
173       synchronized(this) {
174         takeSidePutPermits_ += (newCapacity - capacity_);
175         capacity_ = newCapacity;
176         
177         // Force immediate reconcilation.
178         reconcilePutPermits();
179         notifyAll();
180       }
181     }
182   }
183 
184 
185   /*** Main mechanics for take/poll **/
186   protected synchronized Object extract() {
187     synchronized(head_) {
188       Object x = null;
189       LinkedNode first = head_.next;
190       if (first != null) {
191         x = first.value;
192         first.value = null;
193         head_ = first; 
194         ++takeSidePutPermits_;
195         notify();
196       }
197       return x;
198     }
199   }
200 
201   public Object peek() {
202     synchronized(head_) {
203       LinkedNode first = head_.next;
204       if (first != null) 
205         return first.value;
206       else
207         return null;
208     }
209   }
210 
211   public Object take() throws InterruptedException {
212     if (Thread.interrupted()) throw new InterruptedException();
213     Object x = extract();
214     if (x != null) 
215       return x;
216     else {
217       synchronized(takeGuard_) {
218         try {
219           for (;;) {
220             x = extract();
221             if (x != null) {
222               return x;
223             }
224             else {
225               takeGuard_.wait(); 
226             }
227           }
228         }
229         catch(InterruptedException ex) {
230           takeGuard_.notify();
231           throw ex; 
232         }
233       }
234     }
235   }
236 
237   public Object poll(long msecs) throws InterruptedException {
238     if (Thread.interrupted()) throw new InterruptedException();
239     Object x = extract();
240     if (x != null) 
241       return x;
242     else {
243       synchronized(takeGuard_) {
244         try {
245           long waitTime = msecs;
246           long start = (msecs <= 0)? 0: System.currentTimeMillis();
247           for (;;) {
248             x = extract();
249             if (x != null || waitTime <= 0) {
250               return x;
251             }
252             else {
253               takeGuard_.wait(waitTime); 
254               waitTime = msecs - (System.currentTimeMillis() - start);
255             }
256           }
257         }
258         catch(InterruptedException ex) {
259           takeGuard_.notify();
260           throw ex; 
261         }
262       }
263     }
264   }
265 
266   /*** Notify a waiting take if needed **/
267   protected final void allowTake() {
268     synchronized(takeGuard_) {
269       takeGuard_.notify();
270     }
271   }
272 
273 
274   /***
275    * Create and insert a node.
276    * Call only under synch on putGuard_
277    **/
278   protected void insert(Object x) { 
279     --putSidePutPermits_;
280     LinkedNode p = new LinkedNode(x);
281     synchronized(last_) {
282       last_.next = p;
283       last_ = p;
284     }
285   }
286 
287 
288   /* 
289      put and offer(ms) differ only in policy before insert/allowTake
290   */
291 
292   public void put(Object x) throws InterruptedException {
293     if (x == null) throw new IllegalArgumentException();
294     if (Thread.interrupted()) throw new InterruptedException();
295 
296     synchronized(putGuard_) {
297 
298       if (putSidePutPermits_ <= 0) { // wait for permit. 
299         synchronized(this) {
300           if (reconcilePutPermits() <= 0) {
301             try {
302               for(;;) {
303                 wait();
304                 if (reconcilePutPermits() > 0) {
305                   break;
306                 }
307               }
308             }
309             catch (InterruptedException ex) { 
310               notify(); 
311               throw ex; 
312             }
313           }
314         }
315       }
316       insert(x);
317     }
318     // call outside of lock to loosen put/take coupling
319     allowTake();
320   }
321 
322   public boolean offer(Object x, long msecs) throws InterruptedException {
323     if (x == null) throw new IllegalArgumentException();
324     if (Thread.interrupted()) throw new InterruptedException();
325 
326     synchronized(putGuard_) {
327 
328       if (putSidePutPermits_ <= 0) {
329         synchronized(this) {
330           if (reconcilePutPermits() <= 0) {
331             if (msecs <= 0)
332               return false;
333             else {
334               try {
335                 long waitTime = msecs;
336                 long start = System.currentTimeMillis();
337                 
338                 for(;;) {
339                   wait(waitTime);
340                   if (reconcilePutPermits() > 0) {
341                     break;
342                   }
343                   else {
344                     waitTime = msecs - (System.currentTimeMillis() - start);
345                     if (waitTime <= 0) {
346                       return false;
347                     }
348                   }
349                 }
350               }
351               catch (InterruptedException ex) { 
352                 notify(); 
353                 throw ex; 
354               }
355             }
356           }
357         }
358       }
359 
360       insert(x);
361     }
362 
363     allowTake();
364     return true;
365   }
366 
367   public boolean isEmpty() {
368     synchronized(head_) {
369       return head_.next == null;
370     }
371   }    
372     
373 }