1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.jdiagnose.concurrent;
18
19 /***
20 *
21 * An implementation of Executor that queues incoming
22 * requests until they can be processed by a single background
23 * thread.
24 * <p>
25 * The thread is not actually started until the first
26 * <code>execute</code> request is encountered. Also, if the
27 * thread is stopped for any reason (for example, after hitting
28 * an unrecoverable exception in an executing task), one is started
29 * upon encountering a new request, or if <code>restart()</code> is
30 * invoked.
31 * <p>
32 * Beware that, especially in situations
33 * where command objects themselves invoke execute, queuing can
34 * sometimes lead to lockups, since commands that might allow
35 * other threads to terminate do not run at all when they are in the queue.
36 * <p>[<a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>]
37 **/
38 public class QueuedExecutor extends ThreadFactoryUser implements Executor {
39
40
41
42 /*** The thread used to process commands **/
43 protected Thread thread_;
44
45 /*** Special queue element to signal termination **/
46 protected static Runnable ENDTASK = new Runnable() { public void run() {} };
47
48 /*** true if thread should shut down after processing current task **/
49 protected volatile boolean shutdown_;
50
51 /***
52 * Return the thread being used to process commands, or
53 * null if there is no such thread. You can use this
54 * to invoke any special methods on the thread, for
55 * example, to interrupt it.
56 **/
57 public synchronized Thread getThread() {
58 return thread_;
59 }
60
61 /*** set thread_ to null to indicate termination **/
62 protected synchronized void clearThread() {
63 thread_ = null;
64 }
65
66
67 /*** The queue **/
68 protected final Channel queue_;
69
70
71 /***
72 * The runloop is isolated in its own Runnable class
73 * just so that the main
74 * class need not implement Runnable, which would
75 * allow others to directly invoke run, which would
76 * never make sense here.
77 **/
78 protected class RunLoop implements Runnable {
79 public void run() {
80 try {
81 while (!shutdown_) {
82 Runnable task = (Runnable)(queue_.take());
83 if (task == ENDTASK) {
84 shutdown_ = true;
85 break;
86 }
87 else if (task != null) {
88 task.run();
89 task = null;
90 }
91 else
92 break;
93 }
94 }
95 catch (InterruptedException ex) {}
96 finally {
97 clearThread();
98 }
99 }
100 }
101
102 protected final RunLoop runLoop_;
103
104
105 /***
106 * Construct a new QueuedExecutor that uses
107 * the supplied Channel as its queue.
108 * <p>
109 * This class does not support any methods that
110 * reveal this queue. If you need to access it
111 * independently (for example to invoke any
112 * special status monitoring operations), you
113 * should record a reference to it separately.
114 **/
115
116 public QueuedExecutor(Channel queue) {
117 queue_ = queue;
118 runLoop_ = new RunLoop();
119 }
120
121 /***
122 * Construct a new QueuedExecutor that uses
123 * a BoundedLinkedQueue with the current
124 * DefaultChannelCapacity as its queue.
125 **/
126
127 public QueuedExecutor() {
128 this(new BoundedLinkedQueue());
129 }
130
131 /***
132 * Start (or restart) the background thread to process commands. It has
133 * no effect if a thread is already running. This
134 * method can be invoked if the background thread crashed
135 * due to an unrecoverable exception.
136 **/
137
138 public synchronized void restart() {
139 if (thread_ == null && !shutdown_) {
140 thread_ = threadFactory_.newThread(runLoop_);
141 thread_.start();
142 }
143 }
144
145
146 /***
147 * Arrange for execution of the command in the
148 * background thread by adding it to the queue.
149 * The method may block if the channel's put
150 * operation blocks.
151 * <p>
152 * If the background thread
153 * does not exist, it is created and started.
154 **/
155 public void execute(Runnable command) throws InterruptedException {
156 restart();
157 queue_.put(command);
158 }
159
160 /***
161 * Terminate background thread after it processes all
162 * elements currently in queue. Any tasks entered after this point will
163 * not be processed. A shut down thread cannot be restarted.
164 * This method may block if the task queue is finite and full.
165 * Also, this method
166 * does not in general apply (and may lead to comparator-based
167 * exceptions) if the task queue is a priority queue.
168 **/
169 public synchronized void shutdownAfterProcessingCurrentlyQueuedTasks() {
170 if (thread_ != null && !shutdown_) {
171 try { queue_.put(ENDTASK); }
172 catch (InterruptedException ex) {
173 Thread.currentThread().interrupt();
174 }
175 }
176 }
177
178
179 /***
180 * Terminate background thread after it processes the
181 * current task, removing other queued tasks and leaving them unprocessed.
182 * A shut down thread cannot be restarted.
183 **/
184 public synchronized void shutdownAfterProcessingCurrentTask() {
185 shutdown_ = true;
186 if (thread_ != null) {
187 try {
188 while (queue_.poll(0) != null) ;
189 queue_.put(ENDTASK);
190 }
191 catch (InterruptedException ex) {
192 Thread.currentThread().interrupt();
193 }
194 }
195 }
196
197
198 /***
199 * Terminate background thread even if it is currently processing
200 * a task. This method uses Thread.interrupt, so relies on tasks
201 * themselves responding appropriately to interruption. If the
202 * current tasks does not terminate on interruption, then the
203 * thread will not terminate until processing current task.
204 * A shut down thread cannot be restarted.
205 **/
206 public synchronized void shutdownNow() {
207 shutdown_ = true;
208 if (thread_ != null) {
209 thread_.interrupt();
210 shutdownAfterProcessingCurrentTask();
211 }
212 }
213
214
215
216 }