SwingWorker, done() 在 process() 调用完成之前执行

2022-09-02 10:35:28

我已经与SwingWorker合作了一段时间,最终出现了一种奇怪的行为,至少对我来说是这样。我清楚地知道,由于性能原因,对publish()方法的几个调用在一个调用中合并。这对我来说是完全有道理的,我怀疑SwingWorker会保留某种队列来处理所有这些调用。

根据教程和API,当SwingWorker结束其执行时,要么doInBackground()正常完成,要么从外部取消工作线程,然后调用dound()方法。目前为止,一切都好。

但是我有一个例子(类似于教程中所示),其中在执行方法会完成方法调用。由于这两种方法都在事件调度线程中执行,因此我希望在所有调用完成后执行。换句话说:process()done()done()process()

预期:

Writing...
Writing...
Stopped!

结果:

Writing...
Stopped!
Writing...

示例代码

import java.awt.BorderLayout;
import java.awt.Dimension;
import java.awt.Graphics;
import java.awt.event.ActionEvent;
import java.util.List;
import javax.swing.AbstractAction;
import javax.swing.Action;
import javax.swing.JButton;
import javax.swing.JFrame;
import javax.swing.JPanel;
import javax.swing.JScrollPane;
import javax.swing.JTextArea;
import javax.swing.SwingUtilities;
import javax.swing.SwingWorker;

public class Demo {

    private SwingWorker<Void, String> worker;
    private JTextArea textArea;
    private Action startAction, stopAction;

    private void createAndShowGui() {

        startAction = new AbstractAction("Start writing") {
            @Override
            public void actionPerformed(ActionEvent e) {
                Demo.this.startWriting();
                this.setEnabled(false);
                stopAction.setEnabled(true);
            }
        };

        stopAction = new AbstractAction("Stop writing") {
            @Override
            public void actionPerformed(ActionEvent e) {
                Demo.this.stopWriting();
                this.setEnabled(false);
                startAction.setEnabled(true);
            }
        };

        JPanel buttonsPanel = new JPanel();
        buttonsPanel.add(new JButton(startAction));
        buttonsPanel.add(new JButton(stopAction));

        textArea = new JTextArea(30, 50);
        JScrollPane scrollPane = new JScrollPane(textArea);

        JFrame frame = new JFrame("Test");
        frame.setDefaultCloseOperation(JFrame.DISPOSE_ON_CLOSE);
        frame.add(scrollPane);
        frame.add(buttonsPanel, BorderLayout.SOUTH);
        frame.pack();
        frame.setLocationRelativeTo(null);
        frame.setVisible(true);
    }

    private void startWriting() {
        stopWriting();
        worker = new SwingWorker<Void, String>() {
            @Override
            protected Void doInBackground() throws Exception {
                while(!isCancelled()) {
                    publish("Writing...\n");
                }
                return null;
            }

            @Override
            protected void process(List<String> chunks) {
                String string = chunks.get(chunks.size() - 1);
                textArea.append(string);
            }

            @Override
            protected void done() {
                textArea.append("Stopped!\n");
            }
        };
        worker.execute();
    }

    private void stopWriting() {
        if(worker != null && !worker.isCancelled()) {
            worker.cancel(true);
        }
    }

    public static void main(String[] args) {
        SwingUtilities.invokeLater(new Runnable() {
            @Override
            public void run() {
                new Demo().createAndShowGui();
            }
        });
    }
}

答案 1

简短的回答:

发生这种情况是因为 publish() 不直接调度 ,它设置了一个计时器,该计时器将在 EDT 之后触发 EDT 中 process() 块的调度。因此,当工作线程被取消时,仍然有一个计时器等待使用上次发布的数据来计划一个进程()。使用计时器的原因是为了实现优化,其中可以使用多个发布的组合数据执行单个进程。processDELAY

长答案:

让我们看看publish()和un cancel是如何相互作用的,为此,让我们深入研究一些源代码。

首先是简单的部分, :cancel(true)

public final boolean cancel(boolean mayInterruptIfRunning) {
    return future.cancel(mayInterruptIfRunning);
}

此取消最终会调用以下代码:

boolean innerCancel(boolean mayInterruptIfRunning) {
    for (;;) {
        int s = getState();
        if (ranOrCancelled(s))
            return false;
        if (compareAndSetState(s, CANCELLED)) // <-----
            break;
    }
    if (mayInterruptIfRunning) {
        Thread r = runner;
        if (r != null)
            r.interrupt(); // <-----
    }
    releaseShared(0);
    done(); // <-----
    return true;
}

SwingWorker 状态设置为 ,线程被中断并被调用,但这不是 SwingWorker 完成的,而是 done(),这是在 SwingWorker 构造函数中实例化变量时指定的:CANCELLEDdone()future

future = new FutureTask<T>(callable) {
    @Override
    protected void done() {
        doneEDT();  // <-----
        setState(StateValue.DONE);
    }
};

代码是:doneEDT()

private void doneEDT() {
    Runnable doDone =
        new Runnable() {
            public void run() {
                done(); // <-----
            }
        };
    if (SwingUtilities.isEventDispatchThread()) {
        doDone.run(); // <-----
    } else {
        doSubmit.add(doDone);
    }
}

如果我们在EDT中,它直接调用SwingWorkers's,这就是我们的情况。此时,SwingWorker应该停止,不应该再调用,这很容易通过以下修改进行演示:done()publish()

while(!isCancelled()) {
    textArea.append("Calling publish\n");
    publish("Writing...\n");
}

然而,我们仍然得到一个“写作...”来自进程() 的消息。那么让我们看看 process() 是如何被调用的。的源代码是publish(...)

protected final void publish(V... chunks) {
    synchronized (this) {
        if (doProcess == null) {
            doProcess = new AccumulativeRunnable<V>() {
                @Override
                public void run(List<V> args) {
                    process(args); // <-----
                }
                @Override
                protected void submit() {
                    doSubmit.add(this); // <-----
                }
            };
        }
    }
    doProcess.add(chunks);  // <-----
}

我们看到 Runnable 的 是谁最终调用 ,但是这个代码只是调用不,并且周围也有一个。我看看。run()doProcessprocess(args)doProcess.add(chunks)doProcess.run()doSubmitdoProcess.add(chunks)

public final synchronized void add(T... args) {
    boolean isSubmitted = true;
    if (arguments == null) {
        isSubmitted = false;
        arguments = new ArrayList<T>();
    }
    Collections.addAll(arguments, args); // <-----
    if (!isSubmitted) { //This is what will make that for multiple publishes only one process is executed
        submit(); // <-----
    }
}

因此,实际的作用是将块添加到一些内部ArrayList中并调用。我们刚刚看到 提交只是调用 ,这是完全相同的方法,因为两者和扩展,但是这次是而不是如.因此,块是调用 的可运行项。然而,调用是一个完全不同的方法,定义在以下类中:publish()argumentssubmit()doSubmit.add(this)adddoProcessdoSubmitAccumulativeRunnable<V>VRunnableStringdoProcessprocess(args)submit()doSubmit

private static class DoSubmitAccumulativeRunnable
     extends AccumulativeRunnable<Runnable> implements ActionListener {
    private final static int DELAY = (int) (1000 / 30);
    @Override
    protected void run(List<Runnable> args) {
        for (Runnable runnable : args) {
            runnable.run();
        }
    }
    @Override
    protected void submit() {
        Timer timer = new Timer(DELAY, this); // <-----
        timer.setRepeats(false);
        timer.start();
    }
    public void actionPerformed(ActionEvent event) {
        run(); // <-----
    }
}

它会创建一个计时器,该计时器在毫秒后触发代码一次。一旦事件被触发,代码将在EDT中排队,它将调用一个内部,最终调用并因此执行,其中块是ArrayList的刷新数据。我跳过了一些细节,“运行”调用链是这样的:actionPerformedDELAYrun()run(flush())doProcessprocess(chunk)arguments

  • doSubmit.run()
  • doSubmit.run(flush()) //实际上是一个可运行循环,但只有一个 (*)
  • doProcess.run()
  • doProcess.run(flush())
  • 进程(块)

(*)布尔值和(重置此布尔值)使得发布的其他调用不会添加要在 doSubmit.run(flush()中调用的 doProcess runnables,但是它们的数据不会被忽略。因此,在计时器的生命周期内调用的任何数量的发布执行单个进程。isSubmitedflush()

总而言之,在延迟之后,在EDT中调度呼叫。这就解释了为什么即使在我们取消了线程并且没有完成更多发布之后,仍然会出现一个进程执行,因为当我们取消工作线程的那一刻,(很有可能)有一个计时器将安排一个after已经计划。publish("Writing...")process(chunk)process()done()

为什么在 EDT 中使用此计时器,而不仅仅是在 EDT 中用 ?要实现文档中介绍的性能优化,请执行以下操作invokeLater(doProcess)

由于在执行处理方法之前,在事件调度线程上异步调用了对发布方法的多次调用。出于性能目的,所有这些调用都合并为一个具有串联参数的调用。例如:

 publish("1");
 publish("2", "3");
 publish("4", "5", "6");

might result in:
 process("1", "2", "3", "4", "5", "6")

我们现在知道这是有效的,因为在延迟间隔内发生的所有发布都将它们添加到我们看到的内部变量中,并且将一次性执行所有这些数据。argsargumentsprocess(chunk)

这是一个错误吗?解决方法?

很难判断这是否是一个错误,处理后台线程已发布的数据可能是有意义的,因为工作实际上已经完成,你可能有兴趣使用尽可能多的信息更新GUI(例如,如果这就是正在做的事情)。然后,如果需要处理所有数据和/或在完成之后调用process()会产生数据/ GUI不一致,则可能没有意义。process()done()

有一个明显的解决方法,如果你不希望任何新的process()在完成后执行,只需检查工作线程是否也在方法中被取消!process

@Override
protected void process(List<String> chunks) {
    if (isCancelled()) return;
    String string = chunks.get(chunks.size() - 1);
    textArea.append(string);
}

在最后一个进程()之后执行 done() 会更棘手,例如,done 也可以使用一个计时器,它将在>DELAY之后安排实际的 done() 工作。虽然我不认为这将是一个常见的情况,因为如果你取消了,当我们知道我们实际上正在取消所有未来进程的执行时,错过一个进程()应该不重要。


答案 2

在阅读了DSquare的精湛答案,并从中得出结论,需要一些子类化之后,我为任何需要确保所有已发布的块都已在EDT中处理过的人提出了这个想法。

注意:我试图用Java而不是Jython(我选择的语言,正式地说是世界上最好的语言)来编写它,但它有点复杂,因为,例如,是,所以你必须发明另一种方法来调用它,也因为你必须(打哈欠)用Java中的泛型参数化所有内容。publishfinal

任何 Java 人都应该可以理解此代码:只是为了提供帮助,使用 ,当结果为 0 时,此计算结果为 。self.publication_counter.get()False

# this is how you say Worker... is a subclass of SwingWorker in Python/Jython
class WorkerAbleToWaitForPublicationToFinish( javax.swing.SwingWorker ):

    # __init__ is the constructor method in Python/Jython
    def __init__( self ):

        # we just add an "attribute" (here, publication_counter) to the object being created (self) to create a field of the new object
        self.publication_counter = java.util.concurrent.atomic.AtomicInteger()

    def await_processing_of_all_chunks( self ):
        while self.publication_counter.get():
            time.sleep( 0.001 )

    # fully functional override of the Java method     
    def process( self, chunks ):
        for chunk in chunks:
            pass
            # DO SOMETHING WITH EACH CHUNK

        # decrement the counter by the number of chunks received
        # NB do this AFTER dealing with the chunks 
        self.publication_counter.addAndGet( - len( chunks ) )

    # fully functional override of the Java method     
    def publish( self, *chunks ):
        # increment the counter by the number of chunks received
        # NB do this BEFORE publishing the chunks
        self.publication_counter.addAndGet( len( chunks ))
        self.super__publish( chunks )

因此,在调用代码中,您可以输入如下内容:

    engine.update_xliff_task.get()
    engine.update_xliff_task.await_processing_of_all_chunks()

PS:使用这样的子句(即轮询技术)并不优雅。我查看了可用的类,例如和(两者都使用线程阻塞方法),但我认为两者都不适合此目的...whilejava.util.concurrentCountDownLatchPhaser

我对此非常感兴趣,可以调整一个适当的并发类(用Java编写,在Apache站点上找到)。该类称为。如果达到计数器的值,则其版本将停止线程。我在这里的版本允许您这样做,或者相反:说“等到计数器达到某个值再抬起闩锁”:CounterLatchawait()AtomicLong

NB 使用 for 和 for :因为在原始 Java 中,它们使用关键字。我认为使用原子类将达到相同的目的。AtomicLongsignalAtomicBooleanreleasedvolatile

class CounterLatch():
    def __init__( self, initial = 0, wait_value = 0, lift_on_reached = True ):
        self.count = java.util.concurrent.atomic.AtomicLong( initial )
        self.signal = java.util.concurrent.atomic.AtomicLong( wait_value )

        class Sync( java.util.concurrent.locks.AbstractQueuedSynchronizer ):
            def tryAcquireShared( sync_self, arg ):
                if lift_on_reached:
                    return -1 if (( not self.released.get() ) and self.count.get() != self.signal.get() ) else 1
                else:
                    return -1 if (( not self.released.get() ) and self.count.get() == self.signal.get() ) else 1
            def tryReleaseShared( self, args ):
                return True

        self.sync = Sync()
        self.released = java.util.concurrent.atomic.AtomicBoolean() # initialised at False

    def await( self, *args ):
        if args:
            assert len( args ) == 2
            assert type( args[ 0 ] ) is int
            timeout = args[ 0 ]
            assert type( args[ 1 ] ) is java.util.concurrent.TimeUnit
            unit = args[ 1 ]
            return self.sync.tryAcquireSharedNanos(1, unit.toNanos(timeout))
        else:
            self.sync.acquireSharedInterruptibly( 1 )

    def count_relative( self, n ):
        previous = self.count.addAndGet( n )
        if previous == self.signal.get():
            self.sync.releaseShared( 0 )
        return previous

所以我的代码现在看起来像这样:

在 SwingWorker 构造函数中:

self.publication_counter_latch = CounterLatch() 

在SW.publish中:

self.publication_counter_latch.count_relative( len( chunks ) )
self.super__publish( chunks )

在等待块处理停止的线程中:

worker.publication_counter_latch.await()

推荐