预初始化工作线程池以重用连接对象(套接字)

我需要在Java中构建一个工作线程池,其中每个工作线程都有自己的连接套接字;当工作线程运行时,它使用套接字,但保持其打开状态以供以后重用。我们之所以选择这种方法,是因为与临时创建、连接和销毁套接字相关的开销需要太多的开销,因此我们需要一种方法,通过这种方法,一组工作线程通过套接字连接进行预初始化,准备承担工作,同时保持套接字资源免受其他线程的影响(套接字不是线程安全的), 所以我们需要一些类似的东西...

public class SocketTask implements Runnable {
  Socket socket;
  public SocketTask(){
    //create + connect socket here
  }

  public void run(){
    //use socket here
  }

}

在应用程序启动时,我们希望初始化工作线程,并希望以某种方式初始化套接字连接...

MyWorkerPool pool = new MyWorkerPool();
for( int i = 0; i < 100; i++)
   pool.addWorker( new WorkerThread());

当应用程序请求工作时,我们会将任务发送到工作线程池以便立即执行...

pool.queueWork( new SocketTask(..));


使用工作代码
更新 基于Gray和jontejj的有用评论,我有以下代码工作...

套接字任务

public class SocketTask implements Runnable {
    private String workDetails;
    private static final ThreadLocal<Socket> threadLocal = 
           new ThreadLocal<Socket>(){
        @Override
        protected Socket initialValue(){
            return new Socket();
        }           
    };

    public SocketTask(String details){              
        this.workDetails = details;
    }

    public void run(){      
        Socket s = getSocket(); //gets from threadlocal
        //send data on socket based on workDetails, etc.
    }

    public static Socket getSocket(){
        return threadLocal.get();
    }
}

执行器服务

ExecutorService threadPool = 
    Executors.newFixedThreadPool(5, Executors.defaultThreadFactory());

    int tasks = 15;  
    for( int i = 1; i <= tasks; i++){
        threadPool.execute(new SocketTask("foobar-" + i));
    }   

我喜欢这种方法有几个原因...

  • 套接字是可用于正在运行的任务的本地对象(通过 ThreadLocal),从而消除了并发问题。
  • 套接字创建一次并保持打开状态,在新任务排队时重复使用,从而消除了套接字对象创建/销毁开销。

答案 1

一个想法是将 s 放在 .然后,每当你需要一个你的线程可以从队列中,当他们完成时,他们把它放回队列上。SocketBlockingQueueSockettake()Socketput()

public void run() {
    Socket socket = socketQueue.take();
    try {
       // use the socket ...
    } finally {
       socketQueue.put(socket);
    }
}

这有额外的好处:

  • 您可以返回使用代码。ExecutorService
  • 您可以将套接字通信与结果的处理分开。
  • 您不需要对处理线程和套接字进行 1 对 1 的对应关系。但是套接字通信可能是98%的工作,所以可能没有收获。
  • 完成并完成时,只需将套接字取消排队并关闭它们即可关闭套接字。ExecutorService

这确实增加了另一个的额外开销,但如果你正在进行通信,你不会注意到它。BlockingQueueSocket

我们不相信ThreadFactory可以满足我们的需求...

我认为如果你使用线程局部变量,你可以做到这一点。线程工厂将创建一个线程,该线程首先打开套接字,将其存储在线程本地中,然后调用 arg,该 arg 使用套接字完成所有工作,从而将作业从内部队列中取消排队。完成后,该方法将完成,您可以从线程本地获取套接字并将其关闭。RunnableExecutorServicearg.run()

如下所示。这有点乱,但你应该明白这个想法。

ExecutorService threadPool =
    Executors.newFixedThreadPool(10,
      new ThreadFactory() {
        public Thread newThread(final Runnable r) {
            Thread thread = new Thread(new Runnable() {
                public void run() {
                    openSocketAndStoreInThreadLocal();
                    // our tasks would then get the socket from the thread-local
                    r.run();
                    getSocketFromThreadLocalAndCloseIt();
                }
            });
            return thread;
        }
      }));

因此,您的任务将实现并如下所示:Runnable

public SocketWorker implements Runnable {
    private final ThreadLocal<Socket> threadLocal;
    public SocketWorker(ThreadLocal<Socket> threadLocal) {
       this.threadLocal = threadLocal;
    }
    public void run() {
        Socket socket = threadLocal.get();
        // use the socket ...
    }
}

答案 2

我认为你应该使用ThreadLocal

package com.stackoverflow.q16680096;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Main
{
    public static void main(String[] args)
    {
        ExecutorService pool = Executors.newCachedThreadPool();
        int nrOfConcurrentUsers = 100;
        for(int i = 0; i < nrOfConcurrentUsers; i++)
        {
            pool.submit(new InitSocketTask());
        }

        // do stuff...

        pool.submit(new Task());
    }
}

package com.stackoverflow.q16680096;

import java.net.Socket;

public class InitSocketTask implements Runnable
{
    public void run()
    {
        Socket socket = SocketPool.get();
        // Do initial setup here
    }

}

package com.stackoverflow.q16680096;

import java.net.Socket;

public final class SocketPool
{
    private static final ThreadLocal<Socket> SOCKETS = new ThreadLocal<Socket>(){
        @Override
        protected Socket initialValue()
        {
            return new Socket(); // Pass in suitable arguments here...
        }
    };

    public static Socket get()
    {
        return SOCKETS.get();
    }
}

package com.stackoverflow.q16680096;

import java.net.Socket;

public class Task implements Runnable
{
    public void run()
    {
        Socket socket = SocketPool.get();
        // Do stuff with socket...
    }
}

其中,每个线程都有自己的套接字。


推荐