Index: openide/util/src/org/openide/util/RequestProcessor.java =================================================================== RCS file: /cvs/openide/util/src/org/openide/util/RequestProcessor.java,v --- openide/util/src/org/openide/util/RequestProcessor.java 21 Apr 2005 20:16:12 -0000 1.1 +++ openide/util/src/org/openide/util/RequestProcessor.java 2 Jun 2005 20:34:52 -0000 @@ -24,7 +24,7 @@ import java.util.*; * *
There are several use cases for RequestProcessor: * - *
RequestProcessor.{@link RequestProcessor#getDefault
* }.{@link #post(java.lang.Runnable) post(runnable)}
@@ -64,6 +64,24 @@ import java.util.*;
* RequestProcessor
instance with limited throughput (probably
* set to 1), the IDE would try to run all your requests in parallel otherwise.
*
+ * + * Since version JST-PENDING there is a support for interruption of long running tasks. + * There always was a way how to cancel not yet running task using {@link RequestProcessor.Task#cancel } + * but if the task was already running, one was out of luck. Since version 4.6 + * the thread running the task is interrupted and the Runnable can check for that + * and terminate its execution sooner. In the runnable one shall check for + * thread interruption (done from {@link RequestProcessor.Task#cancel }) and + * if true, return immediatelly as in this example: + *
+ * public void run () { + * while (veryLongTimeLook) { + * doAPieceOfIt (); + * + * if (Thread.interrupted ()) return; + * } + * } + *+ * * @author Petr Nejedly, Jaroslav Tulach */ public final class RequestProcessor { @@ -364,21 +382,19 @@ public final class RequestProcessor { } Task askForWork(Processor worker, String debug) { - synchronized (processorLock) { - if (stopped || queue.isEmpty()) { // no more work in this burst, return him - processors.remove(worker); - Processor.put(worker, debug); - running--; - - return null; - } else { // we have some work for the worker, pass it - - Item i = (Item) queue.remove(0); - Task t = i.getTask(); - i.clear(); + if (stopped || queue.isEmpty()) { // no more work in this burst, return him + processors.remove(worker); + Processor.put(worker, debug); + running--; + + return null; + } else { // we have some work for the worker, pass it + + Item i = (Item) queue.remove(0); + Task t = i.getTask(); + i.clear(worker); - return t; - } + return t; } } @@ -458,7 +474,7 @@ public final class RequestProcessor { notifyRunning(); if (item != null) { - item.clear(); + item.clear(null); } item = new Item(this, RequestProcessor.this); @@ -490,7 +506,19 @@ public final class RequestProcessor { */ public boolean cancel() { synchronized (processorLock) { - boolean success = (item == null) ? false : item.clear(); + boolean success; + + if (item == null) { + success = false; + } else { + Processor p = item.getProcessor(); + success = item.clear(null); + + if (p != null) { + p.interruptTask(this); + item = null; + } + } if (success) { notifyFinished(); // mark it as finished @@ -541,17 +569,17 @@ public final class RequestProcessor { */ public void waitFinished() { if (isRequestProcessorThread()) { //System.err.println("Task.waitFinished on " + this + " from other task in RP: " + Thread.currentThread().getName()); - boolean toRun; synchronized (processorLock) { // correct line: toRun = (item == null) ? !isFinished (): (item.clear() && !isFinished ()); // the same: toRun = !isFinished () && (item == null ? true : item.clear ()); - toRun = !isFinished() && ((item == null) || item.clear()); + toRun = !isFinished() && ((item == null) || item.clear(null)); } if (toRun) { //System.err.println(" ## running it synchronously"); - run(); + Processor processor = (Processor)Thread.currentThread(); + processor.doEvaluate (this, processorLock); } else { // it is already running in other thread of this RP if (lastThread != Thread.currentThread()) { @@ -586,7 +614,7 @@ public final class RequestProcessor { boolean toRun; synchronized (processorLock) { - toRun = !isFinished() && ((item == null) || item.clear()); + toRun = !isFinished() && ((item == null) || item.clear(null)); } if (toRun) { @@ -614,7 +642,7 @@ public final class RequestProcessor { /* One item representing the task pending in the pending queue */ private static class Item extends Exception { private final RequestProcessor owner; - private Task action; + private Object action; private boolean enqueued; Item(Task task, RequestProcessor rp) { @@ -624,22 +652,30 @@ public final class RequestProcessor { } Task getTask() { - return action; + Object a = action; + + return (a instanceof Task) ? (Task) a : null; } /** Annulate this request iff still possible. * @returns true if it was possible to skip this item, false * if the item was/is already processed */ - boolean clear() { + boolean clear(Processor processor) { synchronized (owner.processorLock) { - action = null; + action = processor; return enqueued ? owner.queue.remove(this) : true; } } + Processor getProcessor() { + Object a = action; + + return (a instanceof Processor) ? (Processor) a : null; + } + int getPriority() { - return action.getPriority(); + return getTask().getPriority(); } public Throwable fillInStackTrace() { @@ -669,6 +705,9 @@ public final class RequestProcessor { //private Item task; private RequestProcessor source; + + /** task we are working on */ + private RequestProcessor.Task todo; private boolean idle = true; /** Waiting lock */ @@ -771,7 +810,6 @@ public final class RequestProcessor { } } - Task todo; String debug = null; ErrorManager em = logger(); @@ -782,8 +820,12 @@ public final class RequestProcessor { } // while we have something to do - while ((todo = current.askForWork(this, debug)) != null) { - // if(todo != null) { + for (;;) { + // need the same sync as interruptTask + synchronized (current.processorLock) { + todo = current.askForWork(this, debug); + if (todo == null) break; + } setPrio(todo.getPriority()); try { @@ -813,16 +855,49 @@ public final class RequestProcessor { doNotify(todo, t); } - // to improve GC - todo = null; - - // } + // need the same sync as interruptTask + synchronized (current.processorLock) { + // to improve GC + todo = null; + // and to clear any possible interrupted state + // set by calling Task.cancel () + Thread.interrupted(); + } } if (loggable) { logger().log(ErrorManager.INFORMATIONAL, "Work finished " + getName()); // NOI18N } } + } + + /** Evaluates given task directly. + */ + final void doEvaluate (Task t, Object processorLock) { + Task previous = todo; + boolean interrupted = Thread.interrupted(); + try { + todo = t; + t.run (); + } finally { + synchronized (processorLock) { + todo = previous; + if (interrupted || todo.item == null) { + Thread.currentThread().interrupt(); + } + } + } + } + + /** Called under the processorLock */ + public void interruptTask(Task t) { + if (t != todo) { + // not running this task so + return; + } + + // otherwise interrupt this thread + interrupt(); } /** @see "#20467" */ Index: openide/util/test/unit/src/org/openide/util/RequestProcessorTest.java =================================================================== RCS file: /cvs/openide/util/test/unit/src/org/openide/util/RequestProcessorTest.java,v --- openide/util/test/unit/src/org/openide/util/RequestProcessorTest.java 22 Apr 2005 06:03:05 -0000 1.1 +++ openide/util/test/unit/src/org/openide/util/RequestProcessorTest.java 2 Jun 2005 20:34:52 -0000 @@ -682,6 +682,257 @@ public class RequestProcessorTest extend (error ? "error" : "exception"), 1); } + public void testCancelInterruptsTheRunningThread () throws Exception { + RequestProcessor rp = new RequestProcessor ("Cancellable"); + + class R implements Runnable { + public boolean checkBefore; + public boolean checkAfter; + public boolean interrupted; + + public synchronized void run () { + checkBefore = Thread.interrupted(); + + notifyAll (); + + try { + wait (); + interrupted = false; + } catch (InterruptedException ex) { + interrupted = true; + } + + notifyAll (); + + try { + wait (); + } catch (InterruptedException ex) { + } + + checkAfter = Thread.interrupted(); + + notifyAll (); + } + } + + R r = new R (); + synchronized (r) { + RequestProcessor.Task t = rp.post (r); + r.wait (); + assertTrue ("The task is already running", !t.cancel ()); + r.wait (); + r.notifyAll (); + r.wait (); + assertTrue ("The task has been interrupted", r.interrupted); + assertTrue ("Not before", !r.checkBefore); + assertTrue ("Not after - as the notification was thru InterruptedException", !r.checkAfter); + } + + // interrupt after the task has finished + r = new R (); + synchronized (r) { + RequestProcessor.Task t = rp.post (r); + r.wait (); + r.notifyAll (); + r.wait (); + assertTrue ("The task is already running", !t.cancel ()); + r.notifyAll (); + r.wait (); + assertTrue ("The task has not been interrupted by exception", !r.interrupted); + assertTrue ("Not interupted before", !r.checkBefore); + assertTrue ("But interupted after", r.checkAfter); + } + } + + public void testInterruptedStatusIsClearedBetweenTwoTaskExecution () throws Exception { + RequestProcessor rp = new RequestProcessor ("testInterruptedStatusIsClearedBetweenTwoTaskExecution"); + + final RequestProcessor.Task[] task = new RequestProcessor.Task[1]; + // test interrupted status is cleared after task ends + class Fail implements Runnable { + public boolean checkBefore; + public Thread runIn; + public boolean goodThread; + + public synchronized void run () { + if (runIn == null) { + runIn = Thread.currentThread(); + task[0].schedule (0); + } else { + goodThread = Thread.currentThread () == runIn; + } + + checkBefore = runIn.isInterrupted(); + // set the flag for next execution + runIn.interrupt(); + + notifyAll (); + } + } + + Fail f = new Fail (); + synchronized (f) { + task[0] = rp.post (f); + + // wait for the first execution + f.wait (); + } + // wait for the second + task[0].waitFinished (); + + assertTrue ("This shall be always true, but if not, than it does not mean too much" + + " just that the tasks were not executed in the same thread. In such case it " + + " this test does not do anything useful as it needs to execute the task twice " + + " in the same thread", f.goodThread); + + assertTrue ("Interrupted state has been cleared between two executions of the task", !f.checkBefore); + + } + + public void testInterruptedStatusWorksInInversedTasks() throws Exception { + RequestProcessor rp = new RequestProcessor ("testInterruptedStatusWorksInInversedTasks"); + + class Fail implements Runnable { + public Fail (String n) { + name = n; + } + + private String name; + public RequestProcessor.Task wait; + public Object lock; + + public boolean checkBefore; + public boolean checkAfter; + + public void run () { + synchronized (this) { + checkBefore = Thread.interrupted(); + notifyAll(); + } + if (lock != null) { + synchronized (lock) { + lock.notify(); + try { + lock.wait(); + } catch (InterruptedException ex) { + ex.printStackTrace(); + fail ("No InterruptedException"); + } + } + } + + if (wait != null) { + wait.schedule(100); + wait.waitFinished(); + } + + synchronized (this) { + checkAfter = Thread.interrupted(); + notifyAll(); + } + } + + public String toString () { + return name; + } + } + + Object initLock = new Object(); + + Fail smaller = new Fail("smaller"); + smaller.lock = initLock; + Fail bigger = new Fail("BIGGER"); + RequestProcessor.Task smallerTask, biggerTask; + + + smallerTask = rp.create (smaller); + biggerTask = rp.create (bigger); + + bigger.wait = smallerTask; + + synchronized (initLock) { + biggerTask.schedule(0); + initLock.wait(); + initLock.notifyAll(); + assertFalse ("Already running", biggerTask.cancel()); + } + + biggerTask.waitFinished(); + + assertFalse("bigger not interrupted at begining", bigger.checkBefore); + assertFalse("smaller not interrupted at all", smaller.checkBefore); + assertFalse("smaller not interrupted at all2", smaller.checkAfter); + assertTrue("bigger interrupted at end", bigger.checkAfter); + + } + + public void testInterruptedStatusWorksInInversedTasksWhenInterruptedSoon() throws Exception { + RequestProcessor rp = new RequestProcessor ("testInterruptedStatusWorksInInversedTasksWhenInterruptedSoon"); + + class Fail implements Runnable { + public RequestProcessor.Task wait; + public Object lock; + + public boolean checkBefore; + public boolean checkAfter; + + public void run () { + synchronized (this) { + checkBefore = Thread.interrupted(); + notifyAll(); + } + if (lock != null) { + synchronized (lock) { + lock.notify(); + } + } + + if (wait != null) { + // we cannot call Thread.sleep, so lets slow things own + // in other way + for (int i = 0; i < 10; i++) { + System.gc (); + } + + wait.waitFinished(); + } + + synchronized (this) { + checkAfter = Thread.interrupted(); + notifyAll(); + } + } + } + + Object initLock = new Object(); + + Fail smaller = new Fail(); + Fail bigger = new Fail(); + RequestProcessor.Task smallerTask, biggerTask; + + + smallerTask = rp.create (smaller); + biggerTask = rp.create (bigger); + + + bigger.lock = initLock; + bigger.wait = smallerTask; + + synchronized (initLock) { + biggerTask.schedule(0); + initLock.wait(); + assertFalse ("Already running", biggerTask.cancel()); + } + + biggerTask.waitFinished(); + + assertFalse("bigger not interrupted at begining", bigger.checkBefore); + assertFalse("smaller not interrupted at all", smaller.checkBefore); + assertFalse("smaller not interrupted at all2", smaller.checkAfter); + assertTrue("bigger interrupted at end", bigger.checkAfter); + + } + private static void doGc (int count, Reference toClear) { java.util.ArrayList l = new java.util.ArrayList (count); while (count-- > 0) {