不要同时在两个线程之间共享同一套接字
我有大约60个套接字和20个线程,我想确保每个线程每次都在不同的套接字上运行,这样我就不想在两个线程之间共享同一个套接字。
在我的班级中,我有一个后台线程,它每60秒运行一次并调用方法。在方法中,我迭代了我拥有的所有套接字,然后通过调用类的方法开始逐个ping它们,并根据我将它们标记为活动或死亡的响应。在方法中,我总是需要迭代所有套接字并ping它们以检查它们是活的还是死的。SocketManager
updateLiveSockets()
updateLiveSockets()
send
SendToQueue
updateLiveSockets()
现在,所有读取器线程将并发调用类的方法,以获取下一个实时可用套接字,以在该套接字上发送业务消息。因此,我有两种类型的消息,我在套接字上发送:getNextSocket()
SocketManager
- 一个是套接字上的消息。这仅从类中的计时器线程调用方法发送。
ping
updateLiveSockets()
SocketManager
- 其他是套接字上的消息。这是在课堂上完成的。
business
SendToQueue
因此,如果 pinger 线程正在 ping 一个套接字以检查它们是否处于活动状态,则其他任何业务线程都不应使用该套接字。同样,如果业务线程使用套接字在其上发送数据,则 pinger 线程不应 ping 该套接字。这适用于所有套接字。但是我需要确保在方法中,每当我的背景线程启动时,我们都会ping所有可用的套接字,以便我们可以确定哪个套接字是活的还是死的。updateLiveSockets
以下是我的班级:SocketManager
public class SocketManager {
private static final Random random = new Random();
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
private final Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter =
new ConcurrentHashMap<>();
private final ZContext ctx = new ZContext();
// ...
private SocketManager() {
connectToZMQSockets();
scheduler.scheduleAtFixedRate(new Runnable() {
public void run() {
updateLiveSockets();
}
}, 60, 60, TimeUnit.SECONDS);
}
// during startup, making a connection and populate once
private void connectToZMQSockets() {
Map<Datacenters, List<String>> socketsByDatacenter = Utils.SERVERS;
for (Map.Entry<Datacenters, List<String>> entry : socketsByDatacenter.entrySet()) {
List<SocketHolder> addedColoSockets = connect(entry.getValue(), ZMQ.PUSH);
liveSocketsByDatacenter.put(entry.getKey(), addedColoSockets);
}
}
private List<SocketHolder> connect(List<String> paddes, int socketType) {
List<SocketHolder> socketList = new ArrayList<>();
// ....
return socketList;
}
// this method will be called by multiple threads concurrently to get the next live socket
// is there any concurrency or thread safety issue or race condition here?
public Optional<SocketHolder> getNextSocket() {
for (Datacenters dc : Datacenters.getOrderedDatacenters()) {
Optional<SocketHolder> liveSocket = getLiveSocket(liveSocketsByDatacenter.get(dc));
if (liveSocket.isPresent()) {
return liveSocket;
}
}
return Optional.absent();
}
private Optional<SocketHolder> getLiveSocket(final List<SocketHolder> listOfEndPoints) {
if (!listOfEndPoints.isEmpty()) {
// The list of live sockets
List<SocketHolder> liveOnly = new ArrayList<>(listOfEndPoints.size());
for (SocketHolder obj : listOfEndPoints) {
if (obj.isLive()) {
liveOnly.add(obj);
}
}
if (!liveOnly.isEmpty()) {
// The list is not empty so we shuffle it an return the first element
return Optional.of(liveOnly.get(random.nextInt(liveOnly.size()))); // just pick one
}
}
return Optional.absent();
}
// runs every 60 seconds to ping all the available socket to make sure whether they are alive or not
private void updateLiveSockets() {
Map<Datacenters, List<String>> socketsByDatacenter = Utils.SERVERS;
for (Map.Entry<Datacenters, List<String>> entry : socketsByDatacenter.entrySet()) {
List<SocketHolder> liveSockets = liveSocketsByDatacenter.get(entry.getKey());
List<SocketHolder> liveUpdatedSockets = new ArrayList<>();
for (SocketHolder liveSocket : liveSockets) {
Socket socket = liveSocket.getSocket();
String endpoint = liveSocket.getEndpoint();
Map<byte[], byte[]> holder = populateMap();
Message message = new Message(holder, Partition.COMMAND);
// pinging to see whether a socket is live or not
boolean isLive = SendToQueue.getInstance().send(message.getAddress(), message.getEncodedRecords(), socket);
SocketHolder zmq = new SocketHolder(socket, liveSocket.getContext(), endpoint, isLive);
liveUpdatedSockets.add(zmq);
}
liveSocketsByDatacenter.put(entry.getKey(), Collections.unmodifiableList(liveUpdatedSockets));
}
}
}
这是我的班级:SendToQueue
// this method will be called by multiple reader threads (around 20) concurrently to send the data
public boolean sendAsync(final long address, final byte[] encodedRecords) {
PendingMessage m = new PendingMessage(address, encodedRecords, true);
cache.put(address, m);
return doSendAsync(m);
}
private boolean doSendAsync(final PendingMessage pendingMessage) {
Optional<SocketHolder> liveSocket = SocketManager.getInstance().getNextSocket();
if (!liveSocket.isPresent()) {
// log error
return false;
}
ZMsg msg = new ZMsg();
msg.add(pendingMessage.getEncodedRecords());
try {
// send data on a socket LINE A
return msg.send(liveSocket.get().getSocket());
} finally {
msg.destroy();
}
}
public boolean send(final long address, final byte[] encodedRecords, final Socket socket) {
PendingMessage m = new PendingMessage(address, encodedRecords, socket, false);
cache.put(address, m);
try {
if (doSendAsync(m, socket)) {
return m.waitForAck();
}
return false;
} finally {
cache.invalidate(address);
}
}
问题陈述
现在,正如您在上面看到的,我在两个线程之间共享相同的套接字。似乎在课堂上可以将 a 返回到 。同时,可以访问相同的内容以 ping 它。在这种情况下,它们变异是相同的,这可能会导致问题。因此,我正试图找到一种方法,以便我可以防止不同的线程同时将数据发送到同一套接字并弄乱我的数据。getNextSocket()
SocketManager
0MQ socket
Thread A
timer thread
0MQ socket
Thread A
timer thread
0MQ socket
我能想到的一个解决方案是在发送数据时在套接字上使用,但如果许多线程使用相同的套接字,资源就不会得到很好的利用。此外,如果被阻止(从技术上讲它不应该),所有等待此套接字的线程都被阻止。因此,我想可能有更好的方法来确保每个线程同时使用不同的单个实时套接字,而不是在特定套接字上进行同步。synchronization
msg.send(socket);