相当于 Java 中的 Go 通道

我有一个要求,我需要从一组阻塞队列中读取。阻塞队列由我正在使用的库创建。我的代码必须从队列中读取。我不想为每个阻塞队列创建一个读取器线程。相反,我想使用单个线程(或者可能最多使用2/3线程)轮询它们以获取数据的可用性。由于某些阻塞队列可能长时间没有数据,而其中一些队列可能会获得突发数据。以较小的超时轮询队列将起作用,但这根本不有效,因为它仍然需要继续循环所有队列,即使其中一些队列长时间没有数据。基本上,我正在寻找一种选择/epoll(用于套接字)的阻塞队列机制。任何线索都是值得赞赏的。

不过,在Go中做到这一点非常容易。下面的代码模拟了通道和 goroutines 的相同情况:

package main

import "fmt"
import "time"
import "math/rand"

func sendMessage(sc chan string) {
    var i int

    for {
        i =  rand.Intn(10)
        for ; i >= 0 ; i-- {
            sc <- fmt.Sprintf("Order number %d",rand.Intn(100))
        }
        i = 1000 + rand.Intn(32000);
        time.Sleep(time.Duration(i) * time.Millisecond)
    }
}

func sendNum(c chan int) {
    var i int 
    for  {
        i = rand.Intn(16);
        for ; i >=  0; i-- {
            time.Sleep(20 * time.Millisecond)
            c <- rand.Intn(65534)
        }
        i = 1000 + rand.Intn(24000);
        time.Sleep(time.Duration(i) * time.Millisecond)
    }
}

func main() {
    msgchan := make(chan string, 32)
    numchan := make(chan int, 32)
    i := 0
    for ; i < 8 ; i++ {
        go sendNum(numchan)
        go sendMessage(msgchan)
    }
    for {
        select {
        case msg := <- msgchan:
            fmt.Printf("Worked on  %s\n", msg)
        case x := <- numchan:
            fmt.Printf("I got %d \n", x)
        }
    }
}

答案 1

我建议您考虑使用 JCSP 库。Go的等价物称为替代。您只需要一个消耗线程,如果它使用 打开传入通道,则不需要轮询传入通道。因此,这将是多路复用源数据的有效方法。selectAlternative

如果您能够用 JCSP 通道替换 BlockingQueues,这将有很大帮助。通道的行为基本相同,但在通道端共享的扇出或扇入方面提供了更大程度的灵活性,特别是使用 具有 的通道。Alternative

有关用法的示例,下面是一个公平的多路复用器。此示例演示了一个进程,该进程将流量从其输入通道数组相当多路复用到其单个输出通道。任何输入通道都不会挨饿,无论竞争对手的渴望如何。

import org.jcsp.lang.*;

public class FairPlex implements CSProcess {

   private final AltingChannelInput[] in;
   private final ChannelOutput out;

   public FairPlex (final AltingChannelInput[] in, final ChannelOutput out) {
     this.in = in;
     this.out = out;
   }

   public void run () {

     final Alternative alt = new Alternative (in);

     while (true) {
       final int index = alt.fairSelect ();
       out.write (in[index].read ());
     }
   }
 }

请注意,如果在上面使用,则如果索引较低的频道不断要求服务,则索引较高的频道将挨饿。或者可以使用 代替 ,但这样就不可能进行饥饿分析。只有在饥饿不是问题时才应使用该机制。priSelectfairSelectselectselect

摆脱僵局

与Go一样,使用通道的Java程序必须设计为不死锁。在Java中实现低级并发原语是非常困难的,你需要一些可靠的东西。幸运的是,已经通过正式分析以及JCSP通道进行了验证。这使其成为可靠的选择。Alternative

只是为了澄清轻微的混淆点,当前的JCSP版本在Maven存储库中是1.1-rc5,而不是网站所说的。


答案 2

唯一的方法是将标准队列替换为功能更丰富的类的对象,这将在数据插入空队列时通知使用者。这个类仍然可以实现BlocklingQueue接口,所以另一端(生产者)看不到任何区别。诀窍是操作还应该举起一个标志并通知消费者。使用者在轮询所有线程后,清除标志并调用 。putObject.wait()


推荐