Index: src/org/openide/util/RequestProcessor.java =================================================================== RCS file: /cvs/openide/src/org/openide/util/RequestProcessor.java,v --- src/org/openide/util/RequestProcessor.java 18 Apr 2003 11:52:07 -0000 1.73 +++ src/org/openide/util/RequestProcessor.java 7 May 2003 13:47:02 -0000 @@ -63,6 +63,24 @@ * RequestProcessor instance with limited throughput (probably * set to 1), the IDE would try to run all your requests in parallel otherwise. * + *

+ * Since version 4.6 there is a support for interruption of long running tasks. + * There always was a way how to cancel not yet running task using {@link RequestProcessor.Task#cancel } + * but if the task was already running, one was out of luck. Since version 4.6 + * the thread running the task is interrupted and the Runnable can check for that + * and terminate its execution sooner. In the runnable one shall check for + * thread interruption (done from {@link RequestProcessor.Task#cancel }) and + * if true, return immediatelly as in this example: + *

+ * public void run () {
+ *     while (veryLongTimeLook) {
+ *       doAPieceOfIt ();
+ *
+ *       if (Thread.interrupted ()) return;
+ *     }
+ * }
+ * 
+ * * @author Petr Nejedly, Jaroslav Tulach */ public final class RequestProcessor { @@ -356,7 +374,7 @@ final Item localItem; synchronized (processorLock) { notifyRunning(); - if (item != null) item.clear(); + if (item != null) item.clear(null); item = new Item(this, RequestProcessor.this); localItem = item; } @@ -381,7 +399,16 @@ */ public boolean cancel () { synchronized (processorLock) { - boolean success = (item == null) ? false : item.clear(); + boolean success; + if (item == null) { + success = false; + } else { + Processor p = item.getProcessor(); + success = item.clear(null); + if (p != null) { + p.interruptTask (this); + } + } if (success) notifyFinished(); // mark it as finished return success; } @@ -422,7 +449,7 @@ synchronized (processorLock) { // correct line: toRun = (item == null) ? !isFinished (): (item.clear() && !isFinished ()); // the same: toRun = !isFinished () && (item == null ? true : item.clear ()); - toRun = !isFinished () && (item == null || item.clear ()); + toRun = !isFinished () && (item == null || item.clear (null)); } if (toRun) { //System.err.println(" ## running it synchronously"); @@ -494,19 +521,18 @@ } } + /** Called under the processorLock */ Task askForWork (Processor worker) { - synchronized (processorLock) { - if (stopped || queue.isEmpty()) { // no more work in this burst, return him - processors.remove(worker); - Processor.put(worker); - running--; - return null; - } else { // we have some work for the worker, pass it - Item i = (Item) queue.remove(0); - Task t = i.getTask (); - i.clear (); - return t; - } + if (stopped || queue.isEmpty()) { // no more work in this burst, return him + processors.remove(worker); + Processor.put(worker); + running--; + return null; + } else { // we have some work for the worker, pass it + Item i = (Item) queue.remove(0); + Task t = i.getTask (); + i.clear (worker); + return t; } } @@ -515,7 +541,7 @@ /* One item representing the task pending in the pending queue */ private static class Item extends Exception { private final RequestProcessor owner; - private Task action; + private Object action; private boolean enqueued; Item (Task task, RequestProcessor rp) { @@ -525,21 +551,27 @@ } Task getTask() { - return action; + Object a = action; + return a instanceof Task ? (Task)a : null; } /** Annulate this request iff still possible. * @returns true if it was possible to skip this item, false * if the item was/is already processed */ - boolean clear() { + boolean clear(Processor processor) { synchronized (owner.processorLock) { - action = null; + action = processor; return enqueued ? owner.queue.remove(this) : true; } } + + Processor getProcessor () { + Object a = action; + return a instanceof Processor ? (Processor)a : null; + } int getPriority() { - return action.getPriority(); + return getTask ().getPriority(); } public Throwable fillInStackTrace() { @@ -603,6 +635,9 @@ private RequestProcessor source; + /** task we are working on */ + private RequestProcessor.Task todo; + /* One minute of inactivity and the Thread will die if not assigned */ private static final int INACTIVE_TIMEOUT = 60000; @@ -663,13 +698,17 @@ } } - Task todo; - ErrorManager em = logger(); boolean loggable = em.isLoggable(ErrorManager.INFORMATIONAL); if (loggable) em.log (ErrorManager.INFORMATIONAL, "Begining work " + getName ()); // NOI18N // while we have something to do - while((todo = current.askForWork(this)) != null) { + for (;;) { + // need the same sync as interruptTask + synchronized (current.processorLock) { + todo = current.askForWork(this); + if (todo == null) break; + } + if(todo != null) { setPrio(todo.getPriority()); try { @@ -685,15 +724,32 @@ doNotify(todo, e); } // do not catch e.g. OutOfMemoryError, etc. - - // to improve GC - todo = null; + + // need the same sync as interruptTask + synchronized (current.processorLock) { + // to improve GC + todo = null; + // and to clear any possible interrupted state + // set by calling Task.cancel () + Thread.interrupted(); + } } } if (loggable) logger ().log (ErrorManager.INFORMATIONAL, "Work finished " + getName ()); // NOI18N } } + + /** Called under the processorLock */ + public void interruptTask (Task t) { + if (t != todo) { + // not running this task so + return; + } + + // otherwise interrupt this thread + interrupt (); + } /** @see "#20467" */ private static void doNotify(RequestProcessor.Task todo, Throwable ex) { Index: test/unit/src/org/openide/util/RequestProcessorTest.java =================================================================== RCS file: /cvs/openide/test/unit/src/org/openide/util/RequestProcessorTest.java,v --- test/unit/src/org/openide/util/RequestProcessorTest.java 18 Apr 2003 12:01:13 -0000 1.16 +++ test/unit/src/org/openide/util/RequestProcessorTest.java 7 May 2003 13:47:02 -0000 @@ -564,6 +564,113 @@ assertTrue("Task was performed even if it is canceled", !x.performed); } + public void testCancelInterruptsTheRunningThread () throws Exception { + RequestProcessor rp = new RequestProcessor ("Cancellable"); + + class R implements Runnable { + public boolean checkBefore; + public boolean checkAfter; + public boolean interrupted; + + public synchronized void run () { + checkBefore = Thread.interrupted(); + + notifyAll (); + + try { + wait (); + interrupted = false; + } catch (InterruptedException ex) { + interrupted = true; + } + + notifyAll (); + + try { + wait (); + } catch (InterruptedException ex) { + } + + checkAfter = Thread.interrupted(); + + notifyAll (); + } + } + + R r = new R (); + synchronized (r) { + RequestProcessor.Task t = rp.post (r); + r.wait (); + assertTrue ("The task is already running", !t.cancel ()); + r.wait (); + r.notifyAll (); + r.wait (); + assertTrue ("The task has been interrupted", r.interrupted); + assertTrue ("Not before", !r.checkBefore); + assertTrue ("Not after - as the notification was thru InterruptedException", !r.checkAfter); + } + + // interrupt after the task has finished + r = new R (); + synchronized (r) { + RequestProcessor.Task t = rp.post (r); + r.wait (); + r.notifyAll (); + r.wait (); + assertTrue ("The task is already running", !t.cancel ()); + r.notifyAll (); + r.wait (); + assertTrue ("The task has not been interrupted by exception", !r.interrupted); + assertTrue ("Not interupted before", !r.checkBefore); + assertTrue ("But interupted after", r.checkAfter); + } + } + + public void testInterruptedStatusIsClearedBetweenTwoTaskExecution () throws Exception { + RequestProcessor rp = new RequestProcessor ("testInterruptedStatusIsClearedBetweenTwoTaskExecution"); + + final RequestProcessor.Task[] task = new RequestProcessor.Task[1]; + // test interrupted status is cleared after task ends + class Fail implements Runnable { + public boolean checkBefore; + public Thread runIn; + public boolean goodThread; + + public synchronized void run () { + if (runIn == null) { + runIn = Thread.currentThread(); + task[0].schedule (0); + } else { + goodThread = Thread.currentThread () == runIn; + } + + checkBefore = runIn.isInterrupted(); + // set the flag for next execution + runIn.interrupt(); + + notifyAll (); + } + } + + Fail f = new Fail (); + synchronized (f) { + task[0] = rp.post (f); + + // wait for the first execution + f.wait (); + } + // wait for the second + task[0].waitFinished (); + + assertTrue ("This shall be always true, but if not, than it does not mean too much" + + " just that the tasks were not executed in the same thread. In such case it " + + " this test does not do anything useful as it needs to execute the task twice " + + " in the same thread", f.goodThread); + + assertTrue ("Interrupted state has been cleared between two executions of the task", !f.checkBefore); + + } + private static void doGc (int count, Reference toClear) { java.util.ArrayList l = new java.util.ArrayList (count); while (count-- > 0) {