你能重新平衡一个未知大小的不平衡分路器吗?
我想使用 a 来并行处理一组数量未知的远程存储 JSON 文件(文件数量预先未知)。这些文件的大小可能相差很大,从每个文件 1 个 JSON 记录到某些其他文件中的 100,000 条记录。在这种情况下,JSON 记录表示文件中的一行表示的独立 JSON 对象。Stream
我真的很想使用Streams来实现这一点,所以我实现了这个:Spliterator
public abstract class JsonStreamSpliterator<METADATA, RECORD> extends AbstractSpliterator<RECORD> {
abstract protected JsonStreamSupport<METADATA> openInputStream(String path);
abstract protected RECORD parse(METADATA metadata, Map<String, Object> json);
private static final int ADDITIONAL_CHARACTERISTICS = Spliterator.IMMUTABLE | Spliterator.DISTINCT | Spliterator.NONNULL;
private static final int MAX_BUFFER = 100;
private final Iterator<String> paths;
private JsonStreamSupport<METADATA> reader = null;
public JsonStreamSpliterator(Iterator<String> paths) {
this(Long.MAX_VALUE, ADDITIONAL_CHARACTERISTICS, paths);
}
private JsonStreamSpliterator(long est, int additionalCharacteristics, Iterator<String> paths) {
super(est, additionalCharacteristics);
this.paths = paths;
}
private JsonStreamSpliterator(long est, int additionalCharacteristics, Iterator<String> paths, String nextPath) {
this(est, additionalCharacteristics, paths);
open(nextPath);
}
@Override
public boolean tryAdvance(Consumer<? super RECORD> action) {
if(reader == null) {
String path = takeNextPath();
if(path != null) {
open(path);
}
else {
return false;
}
}
Map<String, Object> json = reader.readJsonLine();
if(json != null) {
RECORD item = parse(reader.getMetadata(), json);
action.accept(item);
return true;
}
else {
reader.close();
reader = null;
return tryAdvance(action);
}
}
private void open(String path) {
reader = openInputStream(path);
}
private String takeNextPath() {
synchronized(paths) {
if(paths.hasNext()) {
return paths.next();
}
}
return null;
}
@Override
public Spliterator<RECORD> trySplit() {
String nextPath = takeNextPath();
if(nextPath != null) {
return new JsonStreamSpliterator<METADATA,RECORD>(Long.MAX_VALUE, ADDITIONAL_CHARACTERISTICS, paths, nextPath) {
@Override
protected JsonStreamSupport<METADATA> openInputStream(String path) {
return JsonStreamSpliterator.this.openInputStream(path);
}
@Override
protected RECORD parse(METADATA metaData, Map<String,Object> json) {
return JsonStreamSpliterator.this.parse(metaData, json);
}
};
}
else {
List<RECORD> records = new ArrayList<RECORD>();
while(tryAdvance(records::add) && records.size() < MAX_BUFFER) {
// loop
}
if(records.size() != 0) {
return records.spliterator();
}
else {
return null;
}
}
}
}
我遇到的问题是,虽然 Stream 最初并行化得很好,但最终最大的文件在单个线程中保留处理。我相信近端原因有据可查:分路器是“不平衡的”。
更具体地说,似乎该方法不是在 生命周期的某个点之后调用的,因此在结束时分发小批量的额外逻辑很少被执行。trySplit
Stream.forEach
trySplit
请注意,从 trySplit 返回的所有拆分器如何共享同一个迭代器。我认为这是平衡所有分离器之间工作的一种非常聪明的方法,但它还不足以实现完全并行。paths
我希望并行处理首先跨文件进行,然后当几个大文件仍处于拆分状态时,我想在其余文件的块之间并行化。这就是 末尾块的意图。else
trySplit
有没有一种简单/简单/规范的方法可以解决这个问题?