View Javadoc

1   /*
2     File: QueuedExecutor.java
3   
4     Originally written by Doug Lea and released into the public domain.
5     This may be used for any purposes whatsoever without acknowledgment.
6     Thanks for the assistance and support of Sun Microsystems Labs,
7     and everyone contributing, testing, and using this code.
8   
9     History:
10    Date       Who                What
11    21Jun1998  dl               Create public version
12    28aug1998  dl               rely on ThreadFactoryUser, restart now public
13     4may1999  dl               removed redundant interrupt detect
14     7sep2000  dl               new shutdown methods
15  */
16  
17  package org.jdiagnose.concurrent;
18  
19  /***
20   * 
21   * An implementation of Executor that queues incoming
22   * requests until they can be processed by a single background
23   * thread.
24   * <p>
25   * The thread is not actually started until the first 
26   * <code>execute</code> request is encountered. Also, if the
27   * thread is stopped for any reason (for example, after hitting
28   * an unrecoverable exception in an executing task), one is started 
29   * upon encountering a new request, or if <code>restart()</code> is
30   * invoked.
31   * <p>
32   * Beware that, especially in situations
33   * where command objects themselves invoke execute, queuing can
34   * sometimes lead to lockups, since commands that might allow
35   * other threads to terminate do not run at all when they are in the queue.
36   * <p>[<a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>]
37   **/
38  public class QueuedExecutor extends ThreadFactoryUser implements Executor {
39  
40  
41    
42    /*** The thread used to process commands **/
43    protected Thread thread_;
44  
45    /*** Special queue element to signal termination **/
46    protected static Runnable ENDTASK = new Runnable() { public void run() {} };
47  
48    /*** true if thread should shut down after processing current task **/
49    protected volatile boolean shutdown_; // latches true;
50    
51    /***
52     * Return the thread being used to process commands, or
53     * null if there is no such thread. You can use this
54     * to invoke any special methods on the thread, for
55     * example, to interrupt it.
56     **/
57    public synchronized Thread getThread() { 
58      return thread_;
59    }
60  
61    /*** set thread_ to null to indicate termination **/
62    protected synchronized void clearThread() {
63      thread_ = null;
64    }
65  
66  
67    /*** The queue **/
68    protected final Channel queue_;
69  
70  
71    /***
72     * The runloop is isolated in its own Runnable class
73     * just so that the main
74     * class need not implement Runnable,  which would
75     * allow others to directly invoke run, which would
76     * never make sense here.
77     **/
78    protected class RunLoop implements Runnable {
79      public void run() {
80        try {
81          while (!shutdown_) {
82            Runnable task = (Runnable)(queue_.take());
83            if (task == ENDTASK) {
84              shutdown_ = true;
85              break;
86            }
87            else if (task != null) {
88              task.run();
89              task = null;
90            }
91            else
92              break;
93          }
94        }
95        catch (InterruptedException ex) {} // fallthrough
96        finally {
97          clearThread();
98        }
99      }
100   }
101 
102   protected final RunLoop runLoop_;
103 
104 
105   /***
106    * Construct a new QueuedExecutor that uses
107    * the supplied Channel as its queue. 
108    * <p>
109    * This class does not support any methods that 
110    * reveal this queue. If you need to access it
111    * independently (for example to invoke any
112    * special status monitoring operations), you
113    * should record a reference to it separately.
114    **/
115 
116   public QueuedExecutor(Channel queue) {
117     queue_ = queue;
118     runLoop_ = new RunLoop();
119   }
120 
121   /***
122    * Construct a new QueuedExecutor that uses
123    * a BoundedLinkedQueue with the current
124    * DefaultChannelCapacity as its queue.
125    **/
126 
127   public QueuedExecutor() {
128     this(new BoundedLinkedQueue());
129   }
130 
131   /***
132    * Start (or restart) the background thread to process commands. It has
133    * no effect if a thread is already running. This
134    * method can be invoked if the background thread crashed
135    * due to an unrecoverable exception.
136    **/
137 
138   public synchronized void restart() {
139     if (thread_ == null && !shutdown_) {
140       thread_ = threadFactory_.newThread(runLoop_);
141       thread_.start();
142     }
143   }
144 
145 
146   /*** 
147    * Arrange for execution of the command in the
148    * background thread by adding it to the queue. 
149    * The method may block if the channel's put
150    * operation blocks.
151    * <p>
152    * If the background thread
153    * does not exist, it is created and started.
154    **/
155   public void execute(Runnable command) throws InterruptedException {
156     restart();
157     queue_.put(command);
158   }
159 
160   /***
161    * Terminate background thread after it processes all
162    * elements currently in queue. Any tasks entered after this point will
163    * not be processed. A shut down thread cannot be restarted.
164    * This method may block if the task queue is finite and full.
165    * Also, this method 
166    * does not in general apply (and may lead to comparator-based
167    * exceptions) if the task queue is a priority queue.
168    **/
169   public synchronized void shutdownAfterProcessingCurrentlyQueuedTasks() {
170     if (thread_ != null && !shutdown_) {
171       try { queue_.put(ENDTASK); }
172       catch (InterruptedException ex) {
173         Thread.currentThread().interrupt();
174       }
175     }
176   }
177 
178 
179   /***
180    * Terminate background thread after it processes the 
181    * current task, removing other queued tasks and leaving them unprocessed.
182    * A shut down thread cannot be restarted.
183    **/
184   public synchronized void shutdownAfterProcessingCurrentTask() {
185     shutdown_ = true;
186     if (thread_ != null) {
187       try { 
188         while (queue_.poll(0) != null) ; // drain
189         queue_.put(ENDTASK); 
190       }
191       catch (InterruptedException ex) {
192         Thread.currentThread().interrupt();
193       }
194     }
195   }
196 
197 
198   /***
199    * Terminate background thread even if it is currently processing
200    * a task. This method uses Thread.interrupt, so relies on tasks
201    * themselves responding appropriately to interruption. If the
202    * current tasks does not terminate on interruption, then the 
203    * thread will not terminate until processing current task.
204    * A shut down thread cannot be restarted.
205    **/
206   public synchronized void shutdownNow() {
207     shutdown_ = true;
208     if (thread_ != null) {
209       thread_.interrupt();
210       shutdownAfterProcessingCurrentTask();
211     }
212   }
213 
214 
215 
216 }