View Javadoc

1   /*
2     File: Channel.java
3   
4     Originally written by Doug Lea and released into the public domain.
5     This may be used for any purposes whatsoever without acknowledgment.
6     Thanks for the assistance and support of Sun Microsystems Labs,
7     and everyone contributing, testing, and using this code.
8   
9     History:
10    Date       Who                What
11    11Jun1998  dl               Create public version
12    25aug1998  dl               added peek
13  */
14  
15  package org.jdiagnose.concurrent;
16  
17  /*** 
18   * Main interface for buffers, queues, pipes, conduits, etc.
19   * <p>
20   * A Channel represents anything that you can put items
21   * into and take them out of. As with the Sync 
22   * interface, both
23   * blocking (put(x), take),
24   * and timeouts (offer(x, msecs), poll(msecs)) policies
25   * are provided. Using a
26   * zero timeout for offer and poll results in a pure balking policy.
27   * <p>
28   * To aid in efforts to use Channels in a more typesafe manner,
29   * this interface extends Puttable and Takable. You can restrict
30   * arguments of instance variables to this type as a way of
31   * guaranteeing that producers never try to take, or consumers put.
32   * for example:
33   * <pre>
34   * class Producer implements Runnable {
35   *   final Puttable chan;
36   *   Producer(Puttable channel) { chan = channel; }
37   *   public void run() {
38   *     try {
39   *       for(;;) { chan.put(produce()); }
40   *     }
41   *     catch (InterruptedException ex) {}
42   *   }
43   *   Object produce() { ... }
44   * }
45   *
46   *
47   * class Consumer implements Runnable {
48   *   final Takable chan;
49   *   Consumer(Takable channel) { chan = channel; }
50   *   public void run() {
51   *     try {
52   *       for(;;) { consume(chan.take()); }
53   *     }
54   *     catch (InterruptedException ex) {}
55   *   }
56   *   void consume(Object x) { ... }
57   * }
58   *
59   * class Setup {
60   *   void main() {
61   *     Channel chan = new SomeChannelImplementation();
62   *     Producer p = new Producer(chan);
63   *     Consumer c = new Consumer(chan);
64   *     new Thread(p).start();
65   *     new Thread(c).start();
66   *   }
67   * }
68   * </pre>
69   * <p>
70   * A given channel implementation might or might not have bounded
71   * capacity or other insertion constraints, so in general, you cannot tell if
72   * a given put will block. However,
73   * Channels that are designed to 
74   * have an element capacity (and so always block when full)
75   * should implement the 
76   * BoundedChannel 
77   * subinterface.
78   * <p>
79   * Channels may hold any kind of item. However,
80   * insertion of null is not in general supported. Implementations
81   * may (all currently do) throw IllegalArgumentExceptions upon attempts to
82   * insert null. 
83   * <p>
84   * By design, the Channel interface does not support any methods to determine
85   * the current number of elements being held in the channel.
86   * This decision reflects the fact that in
87   * concurrent programming, such methods are so rarely useful
88   * that including them invites misuse; at best they could 
89   * provide a snapshot of current
90   * state, that could change immediately after being reported.
91   * It is better practice to instead use poll and offer to try
92   * to take and put elements without blocking. For example,
93   * to empty out the current contents of a channel, you could write:
94   * <pre>
95   *  try {
96   *    for (;;) {
97   *       Object item = channel.poll(0);
98   *       if (item != null)
99   *         process(item);
100  *       else
101  *         break;
102  *    }
103  *  }
104  *  catch(InterruptedException ex) { ... }
105  * </pre>
106  * <p>
107  * However, it is possible to determine whether an item
108  * exists in a Channel via <code>peek</code>, which returns
109  * but does NOT remove the next item that can be taken (or null
110  * if there is no such item). The peek operation has a limited
111  * range of applicability, and must be used with care. Unless it
112  * is known that a given thread is the only possible consumer
113  * of a channel, and that no time-out-based <code>offer</code> operations
114  * are ever invoked, there is no guarantee that the item returned
115  * by peek will be available for a subsequent take.
116  * <p>
117  * When appropriate, you can define an isEmpty method to
118  * return whether <code>peek</code> returns null.
119  * <p>
120  * Also, as a compromise, even though it does not appear in interface,
121  * implementation classes that can readily compute the number
122  * of elements support a <code>size()</code> method. This allows careful
123  * use, for example in queue length monitors, appropriate to the
124  * particular implementation constraints and properties.
125  * <p>
126  * All channels allow multiple producers and/or consumers.
127  * They do not support any kind of <em>close</em> method
128  * to shut down operation or indicate completion of particular
129  * producer or consumer threads. 
130  * If you need to signal completion, one way to do it is to
131  * create a class such as
132  * <pre>
133  * class EndOfStream { 
134  *    // Application-dependent field/methods
135  * }
136  * </pre>
137  * And to have producers put an instance of this class into
138  * the channel when they are done. The consumer side can then
139  * check this via
140  * <pre>
141  *   Object x = aChannel.take();
142  *   if (x instanceof EndOfStream) 
143  *     // special actions; perhaps terminate
144  *   else
145  *     // process normally
146  * </pre>
147  * <p>
148  * In time-out based methods (poll(msecs) and offer(x, msecs), 
149  * time bounds are interpreted in
150  * a coarse-grained, best-effort fashion. Since there is no
151  * way in Java to escape out of a wait for a synchronized
152  * method/block, time bounds can sometimes be exceeded when
153  * there is a lot contention for the channel. Additionally,
154  * some Channel semantics entail a ``point of
155  * no return'' where, once some parts of the operation have completed,
156  * others must follow, regardless of time bound.
157  * <p>
158  * Interruptions are in general handled as early as possible
159  * in all methods. Normally, InterruptionExceptions are thrown
160  * in put/take and offer(msec)/poll(msec) if interruption
161  * is detected upon entry to the method, as well as in any
162  * later context surrounding waits. 
163  * <p>
164  * If a put returns normally, an offer
165  * returns true, or a put or poll returns non-null, the operation
166  * completed successfully. 
167  * In all other cases, the operation fails cleanly -- the
168  * element is not put or taken.
169  * <p>
170  * As with Sync classes, spinloops are not directly supported,
171  * are not particularly recommended for routine use, but are not hard 
172  * to construct. For example, here is an exponential backoff version:
173  * <pre>
174  * Object backOffTake(Channel q) throws InterruptedException {
175  *   long waitTime = 0;
176  *   for (;;) {
177  *      Object x = q.poll(0);
178  *      if (x != null)
179  *        return x;
180  *      else {
181  *        Thread.sleep(waitTime);
182  *        waitTime = 3 * waitTime / 2 + 1;
183  *      }
184  *    }
185  * </pre>
186  * <p>
187  * <b>Sample Usage</b>. Here is a producer/consumer design
188  * where the channel is used to hold Runnable commands representing
189  * background tasks.
190  * <pre>
191  * class Service {
192  *   private final Channel channel = ... some Channel implementation;
193  *  
194  *   private void backgroundTask(int taskParam) { ... }
195  *
196  *   public void action(final int arg) {
197  *     Runnable command = 
198  *       new Runnable() {
199  *         public void run() { backgroundTask(arg); }
200  *       };
201  *     try { channel.put(command) }
202  *     catch (InterruptedException ex) {
203  *       Thread.currentThread().interrupt(); // ignore but propagate
204  *     }
205  *   }
206  * 
207  *   public Service() {
208  *     Runnable backgroundLoop = 
209  *       new Runnable() {
210  *         public void run() {
211  *           for (;;) {
212  *             try {
213  *               Runnable task = (Runnable)(channel.take());
214  *               task.run();
215  *             }
216  *             catch (InterruptedException ex) { return; }
217  *           }
218  *         }
219  *       };
220  *     new Thread(backgroundLoop).start();
221  *   }
222  * }
223  *    
224  * </pre>
225  * <p>[<a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>]
226  * @see Sync 
227  * @see BoundedChannel 
228 **/
229 
230 public interface Channel extends Puttable, Takable {
231 
232   /*** 
233    * Place item in the channel, possibly waiting indefinitely until
234    * it can be accepted. Channels implementing the BoundedChannel
235    * subinterface are generally guaranteed to block on puts upon
236    * reaching capacity, but other implementations may or may not block.
237    * @param item the element to be inserted. Should be non-null.
238    * @exception InterruptedException if the current thread has
239    * been interrupted at a point at which interruption
240    * is detected, in which case the element is guaranteed not
241    * to be inserted. Otherwise, on normal return, the element is guaranteed
242    * to have been inserted.
243   **/
244   public void put(Object item) throws InterruptedException;
245 
246   /*** 
247    * Place item in channel only if it can be accepted within
248    * msecs milliseconds. The time bound is interpreted in
249    * a coarse-grained, best-effort fashion. 
250    * @param item the element to be inserted. Should be non-null.
251    * @param msecs the number of milliseconds to wait. If less than
252    * or equal to zero, the method does not perform any timed waits,
253    * but might still require
254    * access to a synchronization lock, which can impose unbounded
255    * delay if there is a lot of contention for the channel.
256    * @return true if accepted, else false
257    * @exception InterruptedException if the current thread has
258    * been interrupted at a point at which interruption
259    * is detected, in which case the element is guaranteed not
260    * to be inserted (i.e., is equivalent to a false return).
261   **/
262   public boolean offer(Object item, long msecs) throws InterruptedException;
263 
264   /*** 
265    * Return and remove an item from channel, 
266    * possibly waiting indefinitely until
267    * such an item exists.
268    * @return  some item from the channel. Different implementations
269    *  may guarantee various properties (such as FIFO) about that item
270    * @exception InterruptedException if the current thread has
271    * been interrupted at a point at which interruption
272    * is detected, in which case state of the channel is unchanged.
273    *
274   **/
275   public Object take() throws InterruptedException;
276 
277 
278   /*** 
279    * Return and remove an item from channel only if one is available within
280    * msecs milliseconds. The time bound is interpreted in a coarse
281    * grained, best-effort fashion.
282    * @param msecs the number of milliseconds to wait. If less than
283    *  or equal to zero, the operation does not perform any timed waits,
284    * but might still require
285    * access to a synchronization lock, which can impose unbounded
286    * delay if there is a lot of contention for the channel.
287    * @return some item, or null if the channel is empty.
288    * @exception InterruptedException if the current thread has
289    * been interrupted at a point at which interruption
290    * is detected, in which case state of the channel is unchanged
291    * (i.e., equivalent to a null return).
292   **/
293 
294   public Object poll(long msecs) throws InterruptedException;
295 
296   /***
297    * Return, but do not remove object at head of Channel,
298    * or null if it is empty.
299    **/
300 
301   public Object peek();
302 
303 }
304