有没有办法防止CledByInterruptException?

2022-09-01 15:59:01

在下面的示例中,我有一个文件被两个线程使用(在实际示例中,我可以有任意数量的线程)

import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;

public class A {
    static volatile boolean running = true;

    public static void main(String[] args) throws IOException, InterruptedException {
        String name = "delete.me";
        new File(name).deleteOnExit();
        RandomAccessFile raf = new RandomAccessFile(name, "rw");
        FileChannel fc = raf.getChannel();

        Thread monitor = new Thread(() -> {
            try {
                while (running) {
                    System.out.println(name + " is " + (fc.size() >> 10) + " KB");

                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        System.out.println("Interrupted");
                        Thread.currentThread().interrupt();
                    }
                }
            } catch (IOException e) {
                System.err.println("Monitor thread died");
                e.printStackTrace();
            }
        });
        monitor.setDaemon(true);
        monitor.start();

        Thread writer = new Thread(() -> {
            ByteBuffer bb = ByteBuffer.allocateDirect(32);
            try {
                while (running) {
                    bb.position(0).limit(32);
                    fc.write(bb);

                    try {
                        Thread.sleep(10);
                    } catch (InterruptedException e) {
                        System.out.println("Interrupted");
                        Thread.currentThread().interrupt();
                    }
                }
            } catch (IOException e) {
                System.err.println("Writer thread died");
                e.printStackTrace();
            }
        });

        writer.setDaemon(true);
        writer.start();

        Thread.sleep(5000);
        monitor.interrupt();
        Thread.sleep(2000);
        running = false;
        raf.close();
    }
}

与其为每个线程创建一个 RandomAccessFile 和一个内存映射,不如在线程之间共享一个文件和一个内存映射,但是有一个 catch,如果任何线程被中断,资源就会关闭。

delete.me is 0 KB
delete.me is 2 KB
delete.me is 4 KB
delete.me is 6 KB
delete.me is 8 KB
Interrupted
Monitor thread died
java.nio.channels.ClosedByInterruptException
    at java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:202)
    at sun.nio.ch.FileChannelImpl.size(FileChannelImpl.java:315)
    at A.lambda$main$0(A.java:19)
    at java.lang.Thread.run(Thread.java:748)
Writer thread died
java.nio.channels.ClosedChannelException
    at sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:110)
    at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:199)
    at A.lambda$main$1(A.java:41)
    at java.lang.Thread.run(Thread.java:748)

有没有办法防止文件通道仅仅因为一个使用它的线程被中断而关闭?


编辑 我想避免做的是,因为我怀疑它不适用于Java 9 +

private void doNotCloseOnInterrupt(FileChannel fc) {
    try {
        Field field = AbstractInterruptibleChannel.class
                .getDeclaredField("interruptor");
        field.setAccessible(true);
        field.set(fc, (Interruptible) thread
                -> Jvm.warn().on(getClass(), fc + " not closed on interrupt"));
    } catch (Exception e) {
        Jvm.warn().on(getClass(), "Couldn't disable close on interrupt", e);
    }
}

顺便说一句,调用 返回上述黑客的预期大小。fc.size()


答案 1

既然你说你想要“线程之间共享一个内存映射”,所以根本没有这样的问题,因为内存映射不受关闭的影响。实际上,尽快关闭通道是一个很好的策略,以减少应用程序占用的资源。FileChannel

例如:

static volatile boolean running = true;

public static void main(String[] args) throws IOException {
    Path name = Paths.get("delete.me");
    MappedByteBuffer mapped;
    try(FileChannel fc1 = FileChannel.open(name, READ,WRITE,CREATE_NEW,DELETE_ON_CLOSE)) {
        mapped = fc1.map(FileChannel.MapMode.READ_WRITE, 0, 4096);
    }
    Thread thread1 = new Thread(() -> {
        LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(50));
        while(running && !Thread.interrupted()) {
            LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(100));
            byte[] b = new byte[5];
            mapped.position(4000);
            mapped.get(b);
            System.out.println("read "+new String(b, StandardCharsets.US_ASCII));
        }
    });
    thread1.setDaemon(true);
    thread1.start();
    Thread thread2 = new Thread(() -> {
        byte[] b = "HELLO".getBytes(StandardCharsets.US_ASCII);
        while(running && !Thread.interrupted()) {
            LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(100));
            mapped.position(4000);
            mapped.put(b);
            System.out.println("wrote "+new String(b, StandardCharsets.US_ASCII));
            byte b1 = b[0];
            System.arraycopy(b, 1, b, 0, b.length-1);
            b[b.length-1] = b1;
        }
        mapped.force();
    });
    thread2.setDaemon(true);
    thread2.start();
    LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(5));
    thread2.interrupt();
    LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(2));
    running = false;

这演示了线程如何在通道关闭后读取和写入其数据,并且中断写入线程不会停止读取线程。

如果除了内存映射 I/O 之外还需要执行操作,则使用多个实例没有问题,因此关闭一个通道不会影响另一个通道。例如:FileChannelFileChannel

static volatile boolean running = true;

public static void main(String[] args) throws IOException {
    Path name = Paths.get("delete.me");
    try(FileChannel fc1 = FileChannel.open(name,READ,WRITE,CREATE_NEW,DELETE_ON_CLOSE);
        FileChannel fc2 = FileChannel.open(name,READ,WRITE)) {
        Thread thread1 = new Thread(() -> {
            LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(50));
            try {
                MappedByteBuffer mapped = fc1.map(FileChannel.MapMode.READ_WRITE, 0, 4096);
                while(running && !Thread.interrupted()) {
                    LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(100));
                    byte[] b = new byte[5];
                    mapped.position(4000);
                    mapped.get(b);
                    System.out.println("read from map "
                        +new String(b, StandardCharsets.US_ASCII)
                        +", file size "+fc1.size());
                }
            }catch(IOException ex) {
                ex.printStackTrace();
            }
        });
        thread1.setDaemon(true);
        thread1.start();
        Thread thread2 = new Thread(() -> {
            byte[] b = "HELLO".getBytes(StandardCharsets.US_ASCII);
            try {
                MappedByteBuffer mapped = fc2.map(FileChannel.MapMode.READ_WRITE, 0, 4096);
                fc2.position(4096);
                try {
                    while(running && !Thread.interrupted()) {
                        LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(100));
                        mapped.position(4000);
                        mapped.put(b);
                        System.out.println("wrote to mapped "
                            +new String(b, StandardCharsets.US_ASCII));
                        byte b1 = b[0];
                        System.arraycopy(b, 1, b, 0, b.length-1);
                        b[b.length-1] = b1;
                        fc2.write(ByteBuffer.wrap(b));
                    }
                } finally { mapped.force(); }
            }catch(IOException ex) {
                ex.printStackTrace();
            }
        });
        thread2.setDaemon(true);
        thread2.start();
        LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(5));
        thread2.interrupt();
        LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(2));
        running = false;
    }
}

在这里,一个线程的中断确实关闭了其通道,但不会影响另一个线程。此外,即使每个线程从自己的通道中获取自己的线程,即使不使用 ,更改也会显示到另一个线程。当然,后者被定义为一种依赖于系统的行为,不能保证在每个系统上都有效。MappedByteBufferforce()

但是,如第一个示例所示,您仍然可以在开始时仅从其中一个通道创建共享缓冲区,同时在不同的通道上执行 I/O 操作(每个线程一个),无论是否关闭哪些通道都无关紧要,映射的缓冲区不受其影响。


答案 2

您可以使用反射来访问非法字段并从那里获取类类型以创建代理实例:interruptorsun.nio.ch.Interruptible

private void doNotCloseOnInterrupt(FileChannel fc) {
    try {
        Field field = AbstractInterruptibleChannel.class.getDeclaredField("interruptor");
        Class<?> interruptibleClass = field.getType();
        field.setAccessible(true);
        field.set(fc, Proxy.newProxyInstance(
                interruptibleClass.getClassLoader(), 
                new Class[] { interruptibleClass },
                new InterruptibleInvocationHandler()));
    } catch (final Exception e) {
        Jvm.warn().on(getClass(), "Couldn't disable close on interrupt", e);
    }
}

public class InterruptibleInvocationHandler implements InvocationHandler {
    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable
    {
        // TODO: Check method and handle accordingly
        return null;
    }
}

在 Java9 中,这只警告一次,因为它默认使用 .--illegal-access=permit

但是,此标志可能会在将来的版本中被删除,确保此标志长期工作的最佳方法是使用该标志:--add-opens

--add-opens java.base/sun.nio.ch=your-module
--add-opens java.base/java.nio.channels.spi=your-module

或者,如果您不使用模块(不推荐):

--add-opens java.base/sun.nio.ch=ALL-UNNAMED
--add-opens java.base/java.nio.channels.spi=ALL-UNNAMED

这适用于Java 9,Java 10和当前的JDK 11抢先体验版本(28 (2018/8/23))。