是通过 Collections.synchronizedSet(...) 迭代的。forEach() 保证是线程安全的?
众所周知,默认情况下,迭代并发集合不是线程安全的,因此不能使用:
Set<E> set = Collections.synchronizedSet(new HashSet<>());
//fill with data
for (E e : set) {
process(e);
}
发生这种情况是因为在迭代期间可能会添加数据,因为 上没有独占锁。set
这在以下的 javadoc 中进行了描述:Collections.synchronizedSet
公共静态 设置同步集(集)
返回由指定集支持的同步(线程安全)集。为了保证串行访问,对支持集的所有访问都必须通过返回的集合完成,这一点至关重要。
用户在迭代返回的集合时必须手动同步该集:
Set s = Collections.synchronizedSet(new HashSet());
...synchronized (s) { Iterator i = s.iterator(); // Must be in the synchronized block while (i.hasNext()) foo(i.next()); }
不遵循此建议可能会导致非确定性行为。
但是,这不适用于 ,它继承了 Iterable.forEach 的默认方法。Set.forEach
forEach
现在我查看了源代码,在这里我们可以看到我们有以下结构:
- 我们要求.
Collections.synchronizedSet()
-
我们得到一个:
public static <T> Set<T> synchronizedSet(Set<T> s) { return new SynchronizedSet<>(s); } ... static class SynchronizedSet<E> extends SynchronizedCollection<E> implements Set<E> { private static final long serialVersionUID = 487447009682186044L; SynchronizedSet(Set<E> s) { super(s); } SynchronizedSet(Set<E> s, Object mutex) { super(s, mutex); } public boolean equals(Object o) { if (this == o) return true; synchronized (mutex) {return c.equals(o);} } public int hashCode() { synchronized (mutex) {return c.hashCode();} } }
-
它扩展了 ,在明显的方法旁边有以下有趣的方法:
SynchronizedCollection
// Override default methods in Collection @Override public void forEach(Consumer<? super E> consumer) { synchronized (mutex) {c.forEach(consumer);} } @Override public boolean removeIf(Predicate<? super E> filter) { synchronized (mutex) {return c.removeIf(filter);} } @Override public Spliterator<E> spliterator() { return c.spliterator(); // Must be manually synched by user! } @Override public Stream<E> stream() { return c.stream(); // Must be manually synched by user! } @Override public Stream<E> parallelStream() { return c.parallelStream(); // Must be manually synched by user! }
这里使用的对象与所有操作锁定到的对象相同。mutex
Collections.synchronizedSet
现在我们可以,从实现判断说它是线程安全的 使用 ,但是按规范也可以线程安全吗?Collections.synchronizedSet(...).forEach(...)
(令人困惑的是,通过实现不是线程安全的,并且规范的结论似乎也是未知的。Collections.synchronizedSet(...).stream().forEach(...)