在删除条目时迭代 ConcurrentHashMap

2022-09-04 23:16:45

我想定期迭代一段时间删除条目,如下所示:ConcurrentHashMap

for (Iterator<Entry<Integer, Integer>> iter = map.entrySet().iterator(); iter.hasNext(); ) {
    Entry<Integer, Integer> entry = iter.next();
    // do something
    iter.remove();
}

问题是,在我迭代时,另一个线程可能正在更新或修改值。如果发生这种情况,这些更新可能会永远丢失,因为我的线程在迭代时只能看到过时的值,但会删除活动条目。remove()

经过一番考虑,我想出了这个解决方法:

map.forEach((key, value) -> {
    // delete if value is up to date, otherwise leave for next round
    if (map.remove(key, value)) {
        // do something
    }
});

这样做的一个问题是,它不会捕获对未实现的可变值的修改(例如 )。有没有更好的方法可以通过并发修改来安全地删除?equals()AtomicInteger


答案 1

您的解决方法有效,但有一种可能的情况。如果某些条目有不断的更新 map.remove(键,值) 可能永远不会返回 true,直到更新结束。

如果你使用JDK8,这是我的解决方案

for (Iterator<Entry<Integer, Integer>> iter = map.entrySet().iterator(); iter.hasNext(); ) {
    Entry<Integer, Integer> entry = iter.next();
    Map.compute(entry.getKey(), (k, v) -> f(v));
    //do something for prevValue
}
....
private Integer prevValue;

private Integer f(Integer v){
    prevValue = v;
    return null;
}

compute() 将 f(v) 应用于值,在我们的例子中,将值分配给全局变量并删除该条目。

根据Javadoc的说法,它是原子的。

尝试计算指定键及其当前映射值的映射(如果没有当前映射,则为 null)。整个方法调用以原子方式执行。在计算过程中,其他线程在此映射上尝试的一些更新操作可能会被阻止,因此计算应简短明了,并且不得尝试更新此映射的任何其他映射。


答案 2

您的解决方法实际上非常好。还有其他工具,您可以在其上构建一个有点类似的解决方案(例如,使用和逻辑删除值),但它们有自己的警告,我已经在略有不同的用例中使用了它们。computeIfPresent()

至于使用不实现映射值的类型,您可以在相应类型之上使用自己的包装器。这是将对象相等的自定义语义注入到 提供的原子替换/删除操作的最直接方法。equals()ConcurrentMap

更新

下面是一个草图,显示了如何在 API 之上进行构建:ConcurrentMap.remove(Object key, Object value)

  • 在用于值的可变类型之上定义包装器类型,同时定义基于当前可变值构建的自定义方法。equals()
  • 在(要传递到的 lambda)中,创建值的深层副本(该值的类型为新包装器类型),并执行逻辑,确定是否需要在副本上删除该值。BiConsumerforEach
  • 如果需要删除该值,请调用 。remove(myKey, myValueCopy)
    • 如果在计算是否需要删除该值时发生了一些并发更改,则将返回(除非 ABA 问题,这是一个单独的主题)。remove(myKey, myValueCopy)false

下面是一些代码来说明这一点:

import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;

public class Playground {

   private static class AtomicIntegerWrapper {
      private final AtomicInteger value;

      AtomicIntegerWrapper(int value) {
         this.value = new AtomicInteger(value);
      }

      public void set(int value) {
         this.value.set(value);
      }

      public int get() {
         return this.value.get();
      }

      @Override
      public boolean equals(Object obj) {
         if (this == obj) {
            return true;
         }
         if (!(obj instanceof AtomicIntegerWrapper)) {
            return false;
         }
         AtomicIntegerWrapper other = (AtomicIntegerWrapper) obj;
         if (other.value.get() == this.value.get()) {
            return true;
         }
         return false;
      }

      public static AtomicIntegerWrapper deepCopy(AtomicIntegerWrapper wrapper) {
         int wrapped = wrapper.get();
         return new AtomicIntegerWrapper(wrapped);
      }
   }

   private static final ConcurrentMap<Integer, AtomicIntegerWrapper> MAP
         = new ConcurrentHashMap<>();

   private static final int NUM_THREADS = 3;

   public static void main(String[] args) throws InterruptedException {
      for (int i = 0; i < 10; ++i) {
         MAP.put(i, new AtomicIntegerWrapper(1));
      }

      Thread.sleep(1);

      for (int i = 0; i < NUM_THREADS; ++i) {
         new Thread(() -> {
            Random rnd = new Random();
            while (!MAP.isEmpty()) {
               MAP.forEach((key, value) -> {
                  AtomicIntegerWrapper elem = MAP.get(key);
                  if (elem == null) {
                     System.out.println("Oops...");
                  } else if (elem.get() == 1986) {
                     elem.set(1);
                  } else if ((rnd.nextInt() & 128) == 0) {
                     elem.set(1986);
                  }
               });
            }
         }).start();
      }

      Thread.sleep(1);

      new Thread(() -> {
         Random rnd = new Random();
         while (!MAP.isEmpty()) {
            MAP.forEach((key, value) -> {
               AtomicIntegerWrapper elem =
                     AtomicIntegerWrapper.deepCopy(MAP.get(key));
               if (elem.get() == 1986) {
                  try {
                     Thread.sleep(10);
                  } catch (Exception e) {}
                  boolean replaced = MAP.remove(key, elem);
                  if (!replaced) {
                     System.out.println("Bailed out!");
                  } else {
                     System.out.println("Replaced!");
                  }
               }
            });
         }
      }).start();
   }
}

您将看到“救助!”的打印输出,与“替换!”(删除成功,因为您关心没有并发更新),计算将在某个时刻停止。

  • 如果移除自定义方法并继续使用副本,您将看到源源不断的“救助!”,因为副本永远不会被视为等于地图中的值。equals()
  • 如果不使用副本,则不会看到打印出来的“Bailed out!”,并且会遇到您正在解释的问题 - 无论并发更改如何,值都会被删除。