想要并行运行非线程安全库 - 可以使用多个类装入器完成吗?

2022-09-01 14:29:33

我从事一个项目,其中我们使用的库不能保证线程安全(并且不是),并且在Java 8流场景中是单线程的,它按预期工作。

我们希望使用并行流来获得低悬的可扩展性成果。

不幸的是,这会导致库失败 - 很可能是因为一个实例干扰了与另一个实例共享的变量 - 因此我们需要隔离。

我正在考虑为每个实例(可能是线程本地)使用单独的类加载器,据我所知,这应该意味着出于所有实际目的,我得到了所需的隔离,但我不熟悉为此目的故意构造类加载器。

这是正确的方法吗?为了获得适当的生产质量,我应该怎么做?


编辑:我被要求提供有关触发问题的情况的其他信息,以便更好地理解它。问题仍然是关于一般情况,而不是修复库。

我可以完全控制由库创建的对象(https://github.com/veraPDF/),由

<dependency>
    <groupId>org.verapdf</groupId>
    <artifactId>validation-model</artifactId>
    <version>1.1.6</version>
</dependency>

使用项目 maven 存储库获取工件。

<repositories>
    <repository>
        <snapshots>
            <enabled>true</enabled>
        </snapshots>
        <id>vera-dev</id>
        <name>Vera development</name>
        <url>http://artifactory.openpreservation.org/artifactory/vera-dev</url>
    </repository>
</repositories>

目前,强化库是不可行的。


编辑:我被要求显示代码。我们的核心适配器大致是:

public class VeraPDFValidator implements Function<InputStream, byte[]> {
    private String flavorId;
    private Boolean prettyXml;

    public VeraPDFValidator(String flavorId, Boolean prettyXml) {
        this.flavorId = flavorId;
        this.prettyXml = prettyXml;
        VeraGreenfieldFoundryProvider.initialise();
    }

    @Override
    public byte[] apply(InputStream inputStream) {
        try {
            return apply0(inputStream);
        } catch (RuntimeException e) {
            throw e;
        } catch (ModelParsingException | ValidationException | JAXBException | EncryptedPdfException e) {
            throw new RuntimeException("invoking VeraPDF validation", e);
        }
    }

    private byte[] apply0(InputStream inputStream) throws ModelParsingException, ValidationException, JAXBException, EncryptedPdfException {
        PDFAFlavour flavour = PDFAFlavour.byFlavourId(flavorId);
        PDFAValidator validator = Foundries.defaultInstance().createValidator(flavour, false);
        PDFAParser loader = Foundries.defaultInstance().createParser(inputStream, flavour);
        ValidationResult result = validator.validate(loader);

        // do in-memory generation of XML byte array - as we need to pass it to Fedora we need it to fit in memory anyway.

        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        XmlSerialiser.toXml(result, baos, prettyXml, false);
        final byte[] byteArray = baos.toByteArray();
        return byteArray;
    }
}

这是一个从 InputStream(提供 PDF 文件)映射到字节数组(表示 XML 报表输出)的函数。

(看到代码,我注意到构造函数中有一个对初始值设定项的调用,在我的特定情况下,这可能是罪魁祸首。我仍然想要一个通用问题的解决方案。


答案 1

我们面临类似的挑战。问题通常来自静态属性,这些属性在各种线程之间变得不情愿地“共享”。

使用不同的类装入器对我们有用,只要我们可以保证静态属性实际上是在类装入器装入的类上设置的。Java可能有一些类,它们提供的属性或方法不是在线程之间隔离的,或者不是线程安全的('并且是可以的 - 顺便说一句,欢迎任何关于这个问题的规范文档)。System.setProperties()Security.addProvider()

一个可能可行且快速的解决方案 - 至少可以让你有机会在你的库中测试这个理论 - 是使用Servlet引擎,如Jetty或Tomcat。

构建一些包含库的战争,并并行启动进程(每场战争1次)。

在 servlet 线程中运行代码时,这些引擎会首先尝试从父类装入器(与引擎相同)装入类,如果它找不到该类,则尝试从与 war 一起打包的 jar/类中装入该类。WebappClassLoaders

使用jetty,您可以以编程方式将战争热部署到您选择的上下文中,然后理论上根据需要扩展处理器数量(战争)。

我们通过扩展实现了我们自己的类加载器,并从 Jetty Webapp ClassLoader 中汲取了灵感。这并不像看起来那么难。URLClassLoader

我们的类装入器完全相反:它首先尝试从“包”的本地jars装入类,然后尝试从父类装入器获取它们。这保证了从不考虑由父类装入器意外装入的库(第一个)。我们的“包”实际上是一个包含其他jar/库的jar,其中包含自定义清单文件。

“按原样”发布此类加载器代码没有多大意义(并会产生一些版权问题)。如果你想进一步探索这条路线,我可以尝试想出一个骨架。

Jetty WebappClassLoader 的来源


答案 2

答案实际上取决于您的库所依赖的内容:

  1. 如果您的库依赖于至少一个本机库,那么使用 s 来隔离库的代码将无济于事,因为根据 JNI 规范,不允许将同一个 JNI 本机库加载到多个类装入器中,这样最终会得到一个 .ClassLoaderUnsatisfiedLinkError
  2. 如果您的库依赖于至少一个不打算共享的外部资源,例如文件,并且被您的库修改,则最终可能会遇到复杂的错误和/或资源的损坏。

假设您不处于上面列出的情况,一般来说,如果一个类被称为非线程安全并且不修改任何静态字段,则每个调用或每个线程使用此类的专用实例就足够了,因为类实例不再共享。

在这里,由于您的库显然依赖于并修改了一些不打算共享的静态字段,因此您确实需要将库的类隔离在专用中,当然也要确保您的线程不共享相同的 。ClassLoaderClassLoader

为此,您可以简单地创建一个URLClassLoader,您将向其提供库的位置(使用URLClassLoader.newInstance(URL[] urls,ClassLoader父级)),然后通过反射,您将检索与入口点相对应的库类并调用目标方法。为了避免在每次调用时生成一个新的实例,您可以考虑依靠 ThreadLocal 来存储 或 或 用于给定线程的实例。URLURLClassLoaderURLClassLoaderClassMethod


因此,您可以按照以下步骤操作:

假设我的库的入口点是如下所示的类:Foo

package com.company;

public class Foo {

    // A static field in which we store the name of the current thread
    public static String threadName;

    public void execute() {
        // We print the value of the field before setting a value
        System.out.printf(
            "%s: The value before %s%n", Thread.currentThread().getName(), threadName
        );
        // We set a new value
        threadName = Thread.currentThread().getName();
        // We print the value of the field after setting a value
        System.out.printf(
            "%s: The value after %s%n", Thread.currentThread().getName(), threadName
        );
    }
}

此类显然不是线程安全的,并且该方法修改了静态字段的值,该字段不应像您的用例一样由并发线程修改。execute

假设要启动我的库,我只需要创建一个实例并调用该方法。我可以将相应的内容存储在 a 中,以便每个线程仅使用 ThreadLocal.withInitial(Supplier<? extendS> supplier) 通过反射来检索它,如下所示:FooexecuteMethodThreadLocal

private static final ThreadLocal<Method> TL = ThreadLocal.withInitial(
    () -> {
        try {
            // Create the instance of URLClassLoader using the context 
            // CL as parent CL to be able to retrieve the potential 
            // dependencies of your library assuming that they are
            // thread safe otherwise you will need to provide their 
            // URL to isolate them too
            URLClassLoader cl = URLClassLoader.newInstance(
                new URL[]{/* Here the URL of my library*/},
                Thread.currentThread().getContextClassLoader()
            );
            // Get by reflection the class Foo
            Class<?> myClass = cl.loadClass("com.company.Foo");
            // Get by reflection the method execute
            return myClass.getMethod("execute");
        } catch (Exception e) {
            // Here deal with the exceptions
            throw new IllegalStateException(e);
        }
    }
);

最后,让我们模拟我的库的并发执行:

// Launch 50 times concurrently my library
IntStream.rangeClosed(1, 50).parallel().forEach(
    i -> {
        try {
            // Get the method instance from the ThreadLocal
            Method myMethod = TL.get();
            // Create an instance of my class using the default constructor
            Object myInstance = myMethod.getDeclaringClass().newInstance();
            // Invoke the method
            myMethod.invoke(myInstance);
        } catch (Exception e) {
            // Here deal with the exceptions
            throw new IllegalStateException(e);
        }
    }
);

您将获得下一个类型的输出,该输出表明我们在线程之间没有冲突,并且线程正确地重用了其相应的类/字段的值,从一个调用到另一个调用:execute

ForkJoinPool.commonPool-worker-7: The value before null
ForkJoinPool.commonPool-worker-7: The value after ForkJoinPool.commonPool-worker-7
ForkJoinPool.commonPool-worker-7: The value before ForkJoinPool.commonPool-worker-7
ForkJoinPool.commonPool-worker-7: The value after ForkJoinPool.commonPool-worker-7
main: The value before null
main: The value after main
main: The value before main
main: The value after main
...

由于此方法将为每个线程创建一个,因此请确保使用具有固定线程数的线程池应用此方法,并且应明智地选择线程数以防止内存不足,因为 a 在内存占用方面不是免费的,因此您需要根据堆大小限制实例的总数。ClassLoaderClassLoader

完成库操作后,应清理线程池的每个线程,以防止内存泄漏,为此,您可以继续操作:ThreadLocal

// The size of your the thread pool
// Here as I used for my example the common pool, its size by default is
// Runtime.getRuntime().availableProcessors()
int poolSize = Runtime.getRuntime().availableProcessors();
// The cyclic barrier used to make sure that all the threads of the pool
// will execute the code that will cleanup the ThreadLocal
CyclicBarrier barrier = new CyclicBarrier(poolSize);
// Launch one cleanup task per thread in the pool
IntStream.rangeClosed(1, poolSize).parallel().forEach(
    i -> {
        try {
            // Wait for all other threads of the pool
            // This is needed to fill up the thread pool in order to make sure 
            // that all threads will execute the cleanup code
            barrier.await();
            // Close the URLClassLoader to prevent memory leaks
            ((URLClassLoader) TL.get().getDeclaringClass().getClassLoader()).close();
        } catch (Exception e) {
            // Here deal with the exceptions
            throw new IllegalStateException(e);
        } finally {
            // Remove the URLClassLoader instance for this thread
            TL.remove();
        }
    }
);