1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.jdiagnose.concurrent;
19
20 /***
21 * A bounded variant of
22 * LinkedQueue
23 * class. This class may be
24 * preferable to
25 * BoundedBuffer
26 * because it allows a bit more
27 * concurency among puts and takes, because it does not
28 * pre-allocate fixed storage for elements, and allows
29 * capacity to be dynamically reset.
30 * On the other hand, since it allocates a node object
31 * on each put, it can be slow on systems with slow
32 * allocation and GC.
33 * Also, it may be
34 * preferable to
35 * LinkedQueue
36 * when you need to limit
37 * the capacity to prevent resource exhaustion. This protection
38 * normally does not hurt much performance-wise: When the
39 * queue is not empty or full, most puts and
40 * takes are still usually able to execute concurrently.
41 * @see LinkedQueue
42 * @see BoundedBuffer
43 * <p>[<a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>] <p>
44 **/
45
46 public class BoundedLinkedQueue implements BoundedChannel {
47
48
49
50
51
52
53
54
55
56
57
58 /***
59 * Dummy header node of list. The first actual node, if it exists, is always
60 * at head_.next. After each take, the old first node becomes the head.
61 **/
62 protected LinkedNode head_;
63
64 /***
65 * The last node of list. Put() appends to list, so modifies last_
66 **/
67 protected LinkedNode last_;
68
69
70 /***
71 * Helper monitor. Ensures that only one put at a time executes.
72 **/
73
74 protected final Object putGuard_ = new Object();
75
76 /***
77 * Helper monitor. Protects and provides wait queue for takes
78 **/
79
80 protected final Object takeGuard_ = new Object();
81
82
83 /*** Number of elements allowed **/
84 protected int capacity_;
85
86
87 /***
88 * One side of a split permit count.
89 * The counts represent permits to do a put. (The queue is full when zero).
90 * Invariant: putSidePutPermits_ + takeSidePutPermits_ = capacity_ - length.
91 * (The length is never separately recorded, so this cannot be
92 * checked explicitly.)
93 * To minimize contention between puts and takes, the
94 * put side uses up all of its permits before transfering them from
95 * the take side. The take side just increments the count upon each take.
96 * Thus, most puts and take can run independently of each other unless
97 * the queue is empty or full.
98 * Initial value is queue capacity.
99 **/
100
101 protected int putSidePutPermits_;
102
103 /*** Number of takes since last reconcile **/
104 protected int takeSidePutPermits_ = 0;
105
106
107 /***
108 * Create a queue with the given capacity
109 * @exception IllegalArgumentException if capacity less or equal to zero
110 **/
111 public BoundedLinkedQueue(int capacity) {
112 if (capacity <= 0) throw new IllegalArgumentException();
113 capacity_ = capacity;
114 putSidePutPermits_ = capacity;
115 head_ = new LinkedNode(null);
116 last_ = head_;
117 }
118
119 /***
120 * Create a queue with the current default capacity
121 **/
122
123 public BoundedLinkedQueue() {
124 this(DefaultChannelCapacity.get());
125 }
126
127 /***
128 * Move put permits from take side to put side;
129 * return the number of put side permits that are available.
130 * Call only under synch on puGuard_ AND this.
131 **/
132 protected final int reconcilePutPermits() {
133 putSidePutPermits_ += takeSidePutPermits_;
134 takeSidePutPermits_ = 0;
135 return putSidePutPermits_;
136 }
137
138
139 /*** Return the current capacity of this queue **/
140 public synchronized int capacity() { return capacity_; }
141
142
143 /***
144 * Return the number of elements in the queue.
145 * This is only a snapshot value, that may be in the midst
146 * of changing. The returned value will be unreliable in the presence of
147 * active puts and takes, and should only be used as a heuristic
148 * estimate, for example for resource monitoring purposes.
149 **/
150 public synchronized int size() {
151
152
153
154
155
156
157 return capacity_ - (takeSidePutPermits_ + putSidePutPermits_);
158 }
159
160
161 /***
162 * Reset the capacity of this queue.
163 * If the new capacity is less than the old capacity,
164 * existing elements are NOT removed, but
165 * incoming puts will not proceed until the number of elements
166 * is less than the new capacity.
167 * @exception IllegalArgumentException if capacity less or equal to zero
168 **/
169
170 public void setCapacity(int newCapacity) {
171 if (newCapacity <= 0) throw new IllegalArgumentException();
172 synchronized (putGuard_) {
173 synchronized(this) {
174 takeSidePutPermits_ += (newCapacity - capacity_);
175 capacity_ = newCapacity;
176
177
178 reconcilePutPermits();
179 notifyAll();
180 }
181 }
182 }
183
184
185 /*** Main mechanics for take/poll **/
186 protected synchronized Object extract() {
187 synchronized(head_) {
188 Object x = null;
189 LinkedNode first = head_.next;
190 if (first != null) {
191 x = first.value;
192 first.value = null;
193 head_ = first;
194 ++takeSidePutPermits_;
195 notify();
196 }
197 return x;
198 }
199 }
200
201 public Object peek() {
202 synchronized(head_) {
203 LinkedNode first = head_.next;
204 if (first != null)
205 return first.value;
206 else
207 return null;
208 }
209 }
210
211 public Object take() throws InterruptedException {
212 if (Thread.interrupted()) throw new InterruptedException();
213 Object x = extract();
214 if (x != null)
215 return x;
216 else {
217 synchronized(takeGuard_) {
218 try {
219 for (;;) {
220 x = extract();
221 if (x != null) {
222 return x;
223 }
224 else {
225 takeGuard_.wait();
226 }
227 }
228 }
229 catch(InterruptedException ex) {
230 takeGuard_.notify();
231 throw ex;
232 }
233 }
234 }
235 }
236
237 public Object poll(long msecs) throws InterruptedException {
238 if (Thread.interrupted()) throw new InterruptedException();
239 Object x = extract();
240 if (x != null)
241 return x;
242 else {
243 synchronized(takeGuard_) {
244 try {
245 long waitTime = msecs;
246 long start = (msecs <= 0)? 0: System.currentTimeMillis();
247 for (;;) {
248 x = extract();
249 if (x != null || waitTime <= 0) {
250 return x;
251 }
252 else {
253 takeGuard_.wait(waitTime);
254 waitTime = msecs - (System.currentTimeMillis() - start);
255 }
256 }
257 }
258 catch(InterruptedException ex) {
259 takeGuard_.notify();
260 throw ex;
261 }
262 }
263 }
264 }
265
266 /*** Notify a waiting take if needed **/
267 protected final void allowTake() {
268 synchronized(takeGuard_) {
269 takeGuard_.notify();
270 }
271 }
272
273
274 /***
275 * Create and insert a node.
276 * Call only under synch on putGuard_
277 **/
278 protected void insert(Object x) {
279 --putSidePutPermits_;
280 LinkedNode p = new LinkedNode(x);
281 synchronized(last_) {
282 last_.next = p;
283 last_ = p;
284 }
285 }
286
287
288
289
290
291
292 public void put(Object x) throws InterruptedException {
293 if (x == null) throw new IllegalArgumentException();
294 if (Thread.interrupted()) throw new InterruptedException();
295
296 synchronized(putGuard_) {
297
298 if (putSidePutPermits_ <= 0) {
299 synchronized(this) {
300 if (reconcilePutPermits() <= 0) {
301 try {
302 for(;;) {
303 wait();
304 if (reconcilePutPermits() > 0) {
305 break;
306 }
307 }
308 }
309 catch (InterruptedException ex) {
310 notify();
311 throw ex;
312 }
313 }
314 }
315 }
316 insert(x);
317 }
318
319 allowTake();
320 }
321
322 public boolean offer(Object x, long msecs) throws InterruptedException {
323 if (x == null) throw new IllegalArgumentException();
324 if (Thread.interrupted()) throw new InterruptedException();
325
326 synchronized(putGuard_) {
327
328 if (putSidePutPermits_ <= 0) {
329 synchronized(this) {
330 if (reconcilePutPermits() <= 0) {
331 if (msecs <= 0)
332 return false;
333 else {
334 try {
335 long waitTime = msecs;
336 long start = System.currentTimeMillis();
337
338 for(;;) {
339 wait(waitTime);
340 if (reconcilePutPermits() > 0) {
341 break;
342 }
343 else {
344 waitTime = msecs - (System.currentTimeMillis() - start);
345 if (waitTime <= 0) {
346 return false;
347 }
348 }
349 }
350 }
351 catch (InterruptedException ex) {
352 notify();
353 throw ex;
354 }
355 }
356 }
357 }
358 }
359
360 insert(x);
361 }
362
363 allowTake();
364 return true;
365 }
366
367 public boolean isEmpty() {
368 synchronized(head_) {
369 return head_.next == null;
370 }
371 }
372
373 }