具有报价和刷新的非阻塞并发队列

2022-09-02 21:49:27

具有报价和刷新的非阻塞并发队列

我需要一个基本上只有2个操作的无界非阻塞并发队列:

  • offer :以原子方式在此队列的末尾插入指定的项目;
  • flush :将队列中该时刻存在的所有项目都取出,并按照广告订单逐个开始处理它们。更具体地说,必须是原子的只是这个“takeAll”操作,它将是刷新的第一个操作。在 takeAll 之后提供给队列的所有项目都将入,然后仅由另一个后续刷新进行处理。

目标是使用者在 takeAll 上具有单个 CAS 操作,然后可以迭代列表中的元素,而无需在每次读取时执行 CAS 操作。此外,我们已经拥有节点(Entry),因为这是存储其他不可变状态所必需的。新节点可以将 HEAD 作为构造函数参数,从而创建单向链表。

在文献中是否存在具有这些特征的队列?


答案 1

给你:

public class FunkyQueue<T> {
    private final AtomicReference<Node<T>> _tail = new AtomicReference<Node<T>>();

    public void offer(T t) {
        while(true) {
            Node<T> tail = _tail.get();
            Node<T> newTail = new Node<T>(t, tail);
            if(_tail.compareAndSet(tail, newTail)) {
                break;
            }
        }
    }

    public List<T> takeAll() {
        Node<T> tail = _tail.getAndSet(null);

        LinkedList<T> list = new LinkedList<T>();
        while(tail != null) {
            list.addFirst(tail.get());
            tail = tail.getPrevious();
        }

        return list;
    }

    private static final class Node<T>
    {
        private final T _obj;
        private Node<T> _prev;

        private Node(T obj, Node<T> prev) {
            _obj = obj;
            _prev = prev;            
        }

        public T get() {
            return _obj;
        }

        public Node<T> getPrevious() {
            return _prev;
        }
    }
}

答案 2

给定不错的实现,这需要一个 CAS 同时用于 和 。offer()takeAll()

问题:执行时间长,因为它需要在相反方向上完全遍历单链表。takeAll()

解决方案:在节点上创建其他跳过级别。对于上述数字(N〜100K),两个水平就足够了,从而将步骤数减少到〜150。takeAll()

基于上述实现,类:Node

public static final class Node<T> {

    private final T value;
    private Node<T> prev, prevL1, prevL2;
    private Node<T> next, nextL1, nextL2;

    private Node(T obj, Node<T> prev, long c) {
        value = obj;
        this.prev = prev;  
        // level 1 to skip 64 nodes, level 2 to skip 64^2 nodes
        // c is a value from some global addition counter, that
        // is not required to be atomic with `offer()`
        prevL1 = (c & (64 - 1) == 0) ? prev : prev.prevL1;
        prevL2 = (c & (64 * 64 - 1) == 0) ? prev : prev.prevL2;
    }

    public T get() {
        return value;
    }

    public Node<T> findHead() {
        // see below
    }

    public Node<T> next() {
        // see below
    }
}

FunkyQueue#offer()方法:

public void offer(T t) {
    long c = counter.incrementAndGet();  
    for(;;) {
        Node<T> oldTail = tail.get();
        Node<T> newTail = new Node<T>(t, oldTail, c);
        if (tail.compareAndSet(oldTail, newTail)) 
            break;
    }
}

FunkyQueue#takeAll()现在将返回列表的头部:

public Node<T> takeAll() {
    return tail.getAndSet(null).findHead();
}

它调用 ,现在可以使用跳过级别来加速向后遍历:Node#findHead()

private Node<T> findHead() {

     Node<T> n = this;
     while (n.prevL2 != null) {  // <- traverse back on L2, assigning `next` nodes
         n.prevL2.nextL2 = n;
         n = n.prevL2; 
     }
     while (n.prevL1 != null) {  // <- the same for L1
         n.prevL1.nextL1 = n;
         n = n.prev1;
     }
     while (n.prev != null) {    // <- the same for L0
         n.prev.next = n;
         n = n.prev;
     }
     return n;
}

最后, :Node#next()

public Node<T> next() {

    if (this.next == null && this.nextL1 == null && this.nextL2 == null)       
        throw new IllegalStateException("No such element");

    Node<T> n;
    if (this.next == null) {         // L0 is not traversed yet
        if (this.nextL1 == null) {   // the same for L1
            n = this.nextL2;         // step forward on L2
            while (n != this) {      // traverse on L1
                n.prevL1.nextL1 = n;
                n = n.prevL1;
            }
        }  
        n = this.nextL1;             // step forward on L1
        while (n != this) {          // traverse on L0
            n.prev.next = n;
            n = n.prev;
        }
    }
    return this.next;
}

我想主要思想很清楚。应用一些重构,可以在O(log N)中进行操作,并在O(1)中平均操作。Node#findHead()FunkyQueue#takeAll()Node#next()


附言:如果有人注意到一些错误或语法错误,请编辑。


推荐