任意键的锁定处理程序

我有代码为任意键实现“锁定处理程序”。给定 一个 ,它确保一次只有一个线程可以该(或等于)键(此处表示调用)。keyprocessexternalSystem.process(key)

到目前为止,我有这样的代码:

public class MyHandler {
    private final SomeWorkExecutor someWorkExecutor;
    private final ConcurrentHashMap<Key, Lock> lockMap = new ConcurrentHashMap<>();

    public void handle(Key key) {
        // This can lead to OOM as it creates locks without removing them
        Lock keyLock = lockMap.computeIfAbsent( 
            key, (k) -> new ReentrantLock()
        );
        keyLock.lock();
        try {
            someWorkExecutor.process(key);
        } finally {
            keyLock.unlock();
        }
    }
}

我明白,这段代码之所以能导致,是因为没有一个清晰的地图。OutOfMemoryError

我想到如何制作地图,这将积累有限的元素数量。当超过限制时,我们应该将最旧的访问元素替换为new(此代码应与作为监视器的最旧元素同步)。但我不知道如何进行回调,这将导致我超出限制。

请分享您的想法。

附言

我重新阅读了任务,现在我看到我有限制,即方法不能调用超过8个线程。我不知道它如何帮助我,但我刚刚提到它。handle

附言 2

通过@Boris蜘蛛被建议了一个很好的和简单的解决方案:

} finally {
      lockMap.remove(key);
      keyLock.unlock();
}

但是在Boris注意到代码之后,我们不是线程安全的,因为它会破坏行为:
让我们研究用同样关键调用的3个线程:

  1. 线程#1获取锁,现在更早map.remove(key);
  2. 线程 #2 使用相等键调用,因此在线程 #1 释放锁定时等待。
  3. 然后线程 #1 执行 。在此线程#3之后调用方法。它检查映射中是否存在此密钥的锁,因此它会创建新锁并获取它。map.remove(key);handle
  4. 线程 #1 释放锁,因此线程 #2 获取它。
    因此,线程 #2 和线程 #3 可以并行调用相等键。但这不应该被允许。

为了避免这种情况,在清除映射之前,我们应该阻止任何线程来获取锁,而等待集中的所有线程都不会获取并释放锁。看起来需要足够复杂的同步,这将导致算法工作缓慢。也许我们应该在地图大小超过一些有限的值时不时清除地图。

我浪费了很多时间,但不幸的是,我不知道如何实现这一目标。


答案 1

您不需要尝试将大小限制为某个任意值 - 事实证明,您可以完成这种“锁定处理程序”习语,同时仅准确存储当前锁定在地图中的密钥数量。

这个想法是使用一个简单的约定:成功地将映射添加到映射计数为“锁定”操作,删除它计数为“解锁”操作。这巧妙地避免了删除映射的问题,而某些线程仍然将其锁定和其他争用条件。

此时,in 中的映射仅用于阻止使用相同密钥到达并需要等待映射被删除的其他线程。value

下面是一个示例1,而不是作为映射值:CountDownLatchLock

public void handle(Key key) throws InterruptedException {
    CountDownLatch latch = new CountDownLatch(1);

    // try to acquire the lock by inserting our latch as a
    // mapping for key        
    while(true) {
        CountDownLatch existing = lockMap.putIfAbsent(key, latch);
        if (existing != null) {
            // there is an existing key, wait on it
            existing.await();
        } else {
            break;
        }
    }

    try {
        externalSystem.process(key);
    } finally {
        lockMap.remove(key);
        latch.countDown();
    }
}

在这里,映射的生存期仅在保持锁定时一样长。映射的条目数永远不会超过对不同键的并发请求数。

与您的方法的不同之处在于映射不会“重用” - 每次调用都将创建一个新的闩锁和映射。由于您已经在进行昂贵的原子操作,因此在实践中这不太可能是太大的减速。另一个缺点是,对于许多等待线程,当闩锁倒计时时,所有线程都会被唤醒,但只有一个会成功地放入新的映射并因此获得锁定 - 其余的线程在新锁上返回睡眠状态。handle

您可以构建另一个版本,当线程出现并等待现有映射时,该版本会重用映射。基本上,解锁线程只是对其中一个等待线程进行“切换”。只有一个映射将用于等待同一密钥的整个线程集 - 它按顺序传递给每个线程。大小仍然是有界的,因为没有更多的线程在等待给定的映射,它仍然被删除。

若要实现此目的,请将 替换为可以计算等待线程数的 map 值。当线程执行解锁时,它首先检查是否有任何线程正在等待,如果是,则唤醒一个线程以进行切换。如果没有线程在等待,它将“销毁”对象(即,设置一个标志,表明该对象不再在映射中)并将其从映射中删除。CountDownLatch

您需要在适当的锁定下进行上述操作,并且有一些棘手的细节。在实践中,我发现上面简短而甜蜜的例子效果很好。


1 动态编写,未编译,未经测试,但想法有效。


答案 2

您可以依靠方法计算(K键,BiFunction<?超级K,?超级V,?扩展V>重新映射函数)来同步对给定键的方法的调用,您甚至不再需要用作映射值的类型,因为您不再依赖它。processLock

这个想法是依靠你的内部锁定机制来执行你的方法,这将允许线程并行执行其相应哈希不是同一箱的键的方法。这等效于基于条带锁的方法,只是您不需要其他第三方库。ConcurrentHashMapprocess

条带锁的方法很有趣,因为它在内存占用方面非常轻,因为您只需要有限数量的锁来执行此操作,因此锁所需的内存占用量是已知的并且永远不会改变,这不是对每个键使用一个锁的方法(如您的问题中所示)的情况,因此通常更好/建议使用基于条带锁的方法。需要。

所以你的代码可能是这样的:

// This will create a ConcurrentHashMap with an initial table size of 16   
// bins by default, you may provide an initialCapacity and loadFactor
// if too much or not enough to get the expected table size in order
// increase or reduce the concurrency level of your map
// NB: We don't care much of the type of the value so I arbitrarily
// used Void but it could be any type like simply Object
private final ConcurrentMap<Key, Void> lockMap = new ConcurrentHashMap<>();

public void handle(Key lockKey) {
    // Execute the method process through the remapping Function
    lockMap.compute(
        lockKey,
        (key, value) -> {
            // Execute the process method under the protection of the
            // lock of the bin of hashes corresponding to the key
            someWorkExecutor.process(key);
            // Returns null to keep the Map empty
            return null;
        }
    );
}

注 1:由于我们始终返回地图将始终为空,因此您永远不会因为此地图而耗尽内存。null

注 2:由于我们从不影响给定键的值,请注意,也可以使用计算IfAbsent(K键,函数<?超级K,?扩展V>映射函数)的方法来完成:

public void handle(Key lockKey) {
    // Execute the method process through the remapping Function
    lockMap.computeIfAbsent(
        lockKey,
        key -> {
            // Execute the process method under the protection of the
            // lock of the segment of hashes corresponding to the key
            someWorkExecutor.process(key);
            // Returns null to keep the Map empty
            return null;
        }
    );
}

注 3:确保你的方法永远不会为任何键调用该方法,因为你最终会得到无限循环(相同的键)或死锁(其他无序键,例如:如果一个线程调用,然后内部调用,另一个线程并行调用,然后内部调用,无论使用什么方法,你都会得到一个死锁)。此行为并非特定于此方法,它将在任何方法中发生。processhandlehandle(key1)processhandle(key2)handle(key2)processhandle(key1)


推荐