如何找到哪个Java / Scala线程锁定了文件?

2022-09-03 15:21:06

简言之:

  1. 如何找到哪个Java / Scala线程锁定了文件?我知道JVM中的类/线程锁定了一个具体的文件(与文件区域重叠),但我不知道如何。当我在断点中停止应用程序时,可以找出哪个类/线程正在执行此操作?

下面的代码抛出重叠文件锁异常

FileChannel.open(Paths.get("thisfile"), StandardOpenOption.APPEND).tryLock().isValid();
FileChannel.open(Paths.get("thisfile"), StandardOpenOption.APPEND).tryLock()..isShared();
  1. Java/Scala如何锁定这个文件(Spark)?我知道如何使用java.nio.channel锁定文件,但我在Spark的github存储库中没有找到适当的调用。


更多关于我的问题:1.当我在Windows操作系统中使用Hive运行Spark时,它可以正常工作,但是每次Spark关闭时,它都无法删除一个临时目录(在正确删除之前的其他临时目录)并输出以下异常:

2015-12-11 15:04:36 [Thread-13] INFO  org.apache.spark.SparkContext - Successfully stopped SparkContext
2015-12-11 15:04:36 [Thread-13] INFO  o.a.spark.util.ShutdownHookManager - Shutdown hook called
2015-12-11 15:04:36 [Thread-13] INFO  o.a.spark.util.ShutdownHookManager - Deleting directory C:\Users\MyUser\AppData\Local\Temp\spark-9d564520-5370-4834-9946-ac5af3954032
2015-12-11 15:04:36 [Thread-13] INFO  o.a.spark.util.ShutdownHookManager - Deleting directory C:\Users\MyUser\AppData\Local\Temp\spark-42b70530-30d2-41dc-aff5-8d01aba38041
2015-12-11 15:04:36 [Thread-13] ERROR o.a.spark.util.ShutdownHookManager - Exception while deleting Spark temp dir: C:\Users\MyUser\AppData\Local\Temp\spark-42b70530-30d2-41dc-aff5-8d01aba38041
java.io.IOException: Failed to delete: C:\Users\MyUser\AppData\Local\Temp\spark-42b70530-30d2-41dc-aff5-8d01aba38041
    at org.apache.spark.util.Utils$.deleteRecursively(Utils.scala:884) [spark-core_2.11-1.5.0.jar:1.5.0]
    at org.apache.spark.util.ShutdownHookManager$$anonfun$1$$anonfun$apply$mcV$sp$3.apply(ShutdownHookManager.scala:63) [spark-core_2.11-1.5.0.jar:1.5.0]
    at org.apache.spark.util.ShutdownHookManager$$anonfun$1$$anonfun$apply$mcV$sp$3.apply(ShutdownHookManager.scala:60) [spark-core_2.11-1.5.0.jar:1.5.0]
    at scala.collection.mutable.HashSet.foreach(HashSet.scala:78) [scala-library-2.11.6.jar:na]
    at org.apache.spark.util.ShutdownHookManager$$anonfun$1.apply$mcV$sp(ShutdownHookManager.scala:60) [spark-core_2.11-1.5.0.jar:1.5.0]
    at org.apache.spark.util.SparkShutdownHook.run(ShutdownHookManager.scala:264) [spark-core_2.11-1.5.0.jar:1.5.0]
    at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ShutdownHookManager.scala:234) [spark-core_2.11-1.5.0.jar:1.5.0]
    at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:234) [spark-core_2.11-1.5.0.jar:1.5.0]
    at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:234) [spark-core_2.11-1.5.0.jar:1.5.0]
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699) [spark-core_2.11-1.5.0.jar:1.5.0]
    at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply$mcV$sp(ShutdownHookManager.scala:234) [spark-core_2.11-1.5.0.jar:1.5.0]
    at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(ShutdownHookManager.scala:234) [spark-core_2.11-1.5.0.jar:1.5.0]
    at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(ShutdownHookManager.scala:234) [spark-core_2.11-1.5.0.jar:1.5.0]
    at scala.util.Try$.apply(Try.scala:191) [scala-library-2.11.6.jar:na]
    at org.apache.spark.util.SparkShutdownHookManager.runAll(ShutdownHookManager.scala:234) [spark-core_2.11-1.5.0.jar:1.5.0]
    at org.apache.spark.util.SparkShutdownHookManager$$anon$2.run(ShutdownHookManager.scala:216) [spark-core_2.11-1.5.0.jar:1.5.0]
    at org.apache.hadoop.util.ShutdownHookManager$1.run(ShutdownHookManager.java:54) [hadoop-common-2.4.1.jar:na]

我试图在互联网上进行搜索,但在Spark发现正在进行中的问题(一个用户尝试做一些补丁,但它不起作用,如果我正确地取消了对此拉取请求的评论)和SO中的一些未解答的问题。

看起来问题出在Utils.scala类的deleteRecursively()方法中。我为此方法设置了断点并将其重写为Java:

public class Test {
    public static void deleteRecursively(File file) {
        if (file != null) {
            try {
                if (file.isDirectory()) {
                    for (File child : listFilesSafely(file)) {
                        deleteRecursively(child);
                    }
                    //ShutdownHookManager.removeShutdownDeleteDir(file)
                }
            } finally {
                if (!file.delete()) {
                    if (file.exists()) {
                        throw new RuntimeException("Failed to delete: " + file.getAbsolutePath());
                    }
                }
            }
        }
    }

    private static List<File> listFilesSafely(File file) {
        if (file.exists()) {
            File[] files = file.listFiles();
            if (files == null) {
                throw new RuntimeException("Failed to list files for dir: " + file);
            }
            return Arrays.asList(files);
        } else {
            return Collections.emptyList();
        }
    }

    public static void main(String [] arg) {
        deleteRecursively(new File("C:\\Users\\MyUser\\AppData\\Local\\Temp\\spark-9ba0bb0c-1e20-455d-bc1f-86c696661ba3")); 
    }

Spark 在此方法的断点处停止时,我发现 Spark 的一个线程的 JVM 锁定了“C:\Users\MyUser\AppData\Local\Temp\spark-9ba0bb0c-1e20-455d-bc1f-86c696661ba3\metastore\db.lck”文件和 Windows 进程资源管理器也显示 Java 锁定了此文件。此外,文件通道显示文件在 JVM 中被锁定。

现在,我必须:

  1. 找出哪个线程/类锁定了此文件

  2. 了解Spark使用什么方法来锁定文件以锁定“metastore\db.lck”,哪个类可以做到这一点以及如何在关机前解锁它

  3. 在调用 deleteRecursively() 方法之前,对 SparkHive 执行一些拉取请求以解锁此文件(“metastore\db.lck”),或者至少留下有关该问题的注释

如果您需要任何其他信息,请在评论中询问。


答案 1

请参阅如何找出哪个线程在java中锁定文件?

文件被Windows进程锁定。线程可以打开文件以进行读取写入,但保存对文件句柄的引用的类负责关闭它。因此,您应该查找对象,而不是线程。

请参阅如何确定什么东西固定在未被取压的对象上?以了解如何操作。


答案 2
  1. 如何找到哪个Java / Scala线程锁定了文件?

我有一些问题,找出这个解决方案:所有锁定的对象,你至少可以在Thread.threadLocals字段中看到。

如果文件锁定,则以下代码:

    File newFile = new File("newFile.lock");
    newFile.createNewFile();
    FileLock fileLock = FileChannel.open(Paths.get(newFile.getAbsolutePath()), StandardOpenOption.APPEND).tryLock();

在你可以看到类与字段 = “.../newFile.lock”。Thread.threadLocalssun.nio.fs.NativeBufferowner

所以你可以尝试下面的代码,它返回所有线程,所有类都在threadLocals中,你需要找到哪些线程具有NativeBuffer类或Spark / Hive对象等等(并在检查此线程在Eclipse或IDEA调试模式下此线程的位置):

private static String getThreadsLockFile() {
    Set<Thread> threads = Thread.getAllStackTraces().keySet();
    StringBuilder builder = new StringBuilder();
    for (Thread thread : threads) {
        builder.append(getThreadsLockFile(thread));
    }
    return builder.toString();
}

private static String getThreadsLockFile(Thread thread) {
    StringBuffer stringBuffer = new StringBuffer();
    try {
        Field field = thread.getClass().getDeclaredField("threadLocals");
        field.setAccessible(true);
        Object map = field.get(thread);
        Field table = Class.forName("java.lang.ThreadLocal$ThreadLocalMap").getDeclaredField("table");
        table.setAccessible(true);
        Object tbl = table.get(map);
        int length = Array.getLength(tbl);
        for (int i = 0; i < length; i++) {
            try {
                Object entry = Array.get(tbl, i);
                if (entry != null) {
                    Field valueField = Class.forName("java.lang.ThreadLocal$ThreadLocalMap$Entry").getDeclaredField("value");
                    valueField.setAccessible(true);
                    Object value = valueField.get(entry);
                    if (value != null) {
                        stringBuffer.append(thread.getName()).append(" : ").append(value.getClass()).
                                append(" ").append(value).append("\n");
                       }
                }
            } catch (Exception exp) {
                // skip, do nothing
            }
        }
    } catch (Exception exp) {
        // skip, do nothing
    }
    return stringBuffer.toString();
}

或者,您可以尝试使用以下代码,但此代码仅查找带有参数的类(因此它并非在所有情况下都有效):NativeBufferowner

private static String getThreadsLockFile(String fileName) {
    Set<Thread> threads = Thread.getAllStackTraces().keySet();
    StringBuilder builder = new StringBuilder();
    for (Thread thread : threads) {
        builder.append(getThreadsLockFile(thread, fileName));
    }
    return builder.toString();
}

private static String getThreadsLockFile(Thread thread, String fileName) {
    StringBuffer stringBuffer = new StringBuffer();
    try {
        Field field = thread.getClass().getDeclaredField("threadLocals");
        field.setAccessible(true);
        Object map = field.get(thread);
        Field table = Class.forName("java.lang.ThreadLocal$ThreadLocalMap").getDeclaredField("table");
        table.setAccessible(true);
        Object tbl = table.get(map);
        int length = Array.getLength(tbl);
        for (int i = 0; i < length; i++) {
            try {
                Object entry = Array.get(tbl, i);
                if (entry != null) {
                    Field valueField = Class.forName("java.lang.ThreadLocal$ThreadLocalMap$Entry").getDeclaredField("value");
                    valueField.setAccessible(true);
                    Object value = valueField.get(entry);
                    if (value != null) {
                        int length1 = Array.getLength(value);
                        for (int j = 0; j < length1; j++) {
                            try {
                                Object entry1 = Array.get(value, j);
                                Field ownerField = Class.forName("sun.nio.fs.NativeBuffer").getDeclaredField("owner");
                                ownerField.setAccessible(true);
                                String owner = ownerField.get(entry1).toString();
                                if (owner.contains(fileName)) {
                                    stringBuffer.append(thread.getName());
                                }
                            } catch (Exception exp) {
                                // skip, do nothing
                            }
                        }
                    }
                }
            } catch (Exception exp) {
                // skip, do nothing
            }
        }
    } catch (Exception exp) {
        // skip, do nothing
    }
    return stringBuffer.toString();
}

推荐