多线程读取大量文件

2022-09-03 05:49:21

我仍然在围绕并发在Java中的工作方式进行思考。我理解(如果您订阅 OO Java 5 并发模型),则分别使用 or 方法实现 or,并且应该尽可能多地并行化该已实现的方法。TaskCallablerun()call()

但是我仍然不了解Java中并发编程的固有内容:

  • 如何为 's 方法分配要执行的适当数量的并发工时?Taskrun()

举个具体的例子,如果我有一个I / O绑定方法,它将Herman Melville的Moby Dick的全部内容从本地系统上的文件中读取到内存中,该怎么办?假设我希望此方法并发并由 3 个线程处理,其中:readMobyDick()readMobyDick()

  • 线程 #1 将书的前 1/3 读入内存
  • 线程 #2 将书的第二个 1/3 读取到内存中
  • 线程 #3 将书的最后 1/3 读入内存

我是否需要将 Moby Dick 分成三个文件,并将它们分别传递给它们自己的任务,或者我只是从实现的 run() 方法内部调用 readMobyDick(),并且(以某种方式)执行器知道如何在线程之间分解工作。

我是一个非常直观的学习者,所以任何正确方法的代码示例都非常感谢!谢谢!


答案 1

您可能偶然选择了平行活动的绝对最糟糕的例子!

从单个机械磁盘并行读取实际上比使用单个线程读取要慢,因为实际上,当每个线程轮到其运行时,您将机械头弹跳到磁盘的不同部分。这最好保留为单线程活动。

让我们再举一个例子,它类似于你的,但实际上可以提供一些好处:假设我想在一个巨大的单词列表中搜索某个单词的出现次数(这个列表甚至可能来自磁盘文件,但就像我说的,由单个线程读取)。假设我可以使用3个线程,就像你的例子一样,每个线程在巨大的单词列表的1/3上搜索,并保留一个本地计数器,显示搜索单词出现的次数。

在这种情况下,您需要将列表划分为3个部分,将每个部分传递给其类型实现Runnable的不同对象,并在方法中实现搜索。run

运行时本身不知道如何进行分区或类似操作,您必须自己指定它。还有许多其他分区策略,每种策略都有自己的优点和缺点,但是我们现在可以坚持静态分区。

让我们看一些代码:

class SearchTask implements Runnable {
     private int localCounter = 0;
     private int start; // start index of search
     private int end;
     private List<String> words;
     private String token;

     public SearchTask(int start, int end, List<String> words, String token) {
         this.start = start;
         this.end = end;
         this.words = words;
         this.token = token;
     }

     public void run() {
         for(int i = start; i < end; i++) {
              if(words.get(i).equals(token)) localCounter++;
         }
     }

     public int getCounter() { return localCounter; }
}

// meanwhile in main :)

List<String> words = new ArrayList<String>();
// populate words 
// let's assume you have 30000 words

// create tasks
SearchTask task1 = new SearchTask(0, 10000, words, "John");
SearchTask task2 = new SearchTask(10000, 20000, words, "John");
SearchTask task3 = new SearchTask(20000, 30000, words, "John");

// create threads for each task
Thread t1 = new Thread(task1);
Thread t2 = new Thread(task2);
Thread t3 = new Thread(task3);

// start threads
t1.start();
t2.start();
t3.start();

// wait for threads to finish
t1.join();
t2.join();
t3.join();

// collect results
int counter = 0;
counter += task1.getCounter();
counter += task2.getCounter();
counter += task3.getCounter();

这应该可以很好地工作。请注意,在实际情况下,您将构建一个更通用的分区方案。或者,您可以使用 和 实现,而不是如果您希望返回结果。ExecutorServiceCallableRunnable

因此,使用更高级构造的替代示例:

class SearchTask implements Callable<Integer> {
     private int localCounter = 0;
     private int start; // start index of search
     private int end;
     private List<String> words;
     private String token;

     public SearchTask(int start, int end, List<String> words, String token) {
         this.start = start;
         this.end = end;
         this.words = words;
         this.token = token;
     }

     public Integer call() {
         for(int i = start; i < end; i++) {
              if(words.get(i).equals(token)) localCounter++;
         }
         return localCounter;
     }        
}

// meanwhile in main :)

List<String> words = new ArrayList<String>();
// populate words 
// let's assume you have 30000 words

// create tasks
List<Callable> tasks = new ArrayList<Callable>();
tasks.add(new SearchTask(0, 10000, words, "John"));
tasks.add(new SearchTask(10000, 20000, words, "John"));
tasks.add(new SearchTask(20000, 30000, words, "John"));

// create thread pool and start tasks
ExecutorService exec = Executors.newFixedThreadPool(3);
List<Future> results = exec.invokeAll(tasks);

// wait for tasks to finish and collect results
int counter = 0;
for(Future f: results) {
    counter += f.get();
}

答案 2

你选择了一个不好的例子,正如都铎王朝所指出的那样。旋转磁盘硬件受到移动盘片和磁头的物理约束,最有效的读取实现是按顺序读取每个块,这减少了移动磁头或等待磁盘对齐的需要。

也就是说,某些操作系统并不总是将内容连续存储在磁盘上,对于那些记住的人来说,如果您的操作系统/文件系统没有为您完成工作,碎片整理可以提供磁盘性能提升。

正如你提到的想要一个能受益的程序,让我建议一个简单的程序,矩阵加法。

假设您为每个内核创建了一个线程,则可以轻松地将任意两个矩阵划分为 N 行(每个线程一个矩阵)。矩阵添加(如果您还记得)的工作原理如下:

A + B = C

[ a11, a12, a13 ]   [ b11, b12, b13]  =  [ (a11+b11), (a12+b12), (a13+c13) ]
[ a21, a22, a23 ] + [ b21, b22, b23]  =  [ (a21+b21), (a22+b22), (a23+c23) ]
[ a31, a32, a33 ]   [ b31, b32, b33]  =  [ (a31+b31), (a32+b32), (a33+c33) ]

因此,要将其分布在N个线程中,我们只需要将行数和模数除以线程数,即可获得将要添加的“线程id”。

matrix with 20 rows across 3 threads
row % 3 == 0 (for rows 0, 3, 6,  9, 12, 15, and 18)
row % 3 == 1 (for rows 1, 4, 7, 10, 13, 16, and 19)
row % 3 == 2 (for rows 2, 5, 8, 11, 14, and 17)
// row 20 doesn't exist, because we number rows from 0

现在,每个线程“知道”它应该处理哪些行,并且“每行”的结果可以微不足道地计算,因为结果不会交叉到其他线程的计算域

现在所需要的只是一个“结果”数据结构,它跟踪何时计算了值,当设置了最后一个值时,计算就完成了。在这个具有两个线程的矩阵加法结果的“假”示例中,计算具有两个线程的答案大约需要一半的时间。

// the following assumes that threads don't get rescheduled to different cores for 
// illustrative purposes only.  Real Threads are scheduled across cores due to
// availability and attempts to prevent unnecessary core migration of a running thread.
[ done, done, done ] // filled in at about the same time as row 2 (runs on core 3)
[ done, done, done ] // filled in at about the same time as row 1 (runs on core 1)
[ done, done, .... ] // filled in at about the same time as row 4 (runs on core 3)
[ done, ...., .... ] // filled in at about the same time as row 3 (runs on core 1)

更复杂的问题可以通过多线程来解决,不同的问题可以用不同的技术来解决。我特意选择了一个最简单的例子。


推荐