MongoDB Java API 阅读速度慢

我们正在从本地MongoDB读取来自集合的所有文档,并且性能不是很出色。

我们需要转储所有数据,不要担心为什么,只要相信它确实需要,并且没有解决方法。

我们有4mio文档,看起来像:

{
    "_id":"4d094f58c96767d7a0099d49",
    "exchange":"NASDAQ",
    "stock_symbol":"AACC",
    "date":"2008-03-07",
    "open":8.4,
    "high":8.75,
    "low":8.08,
    "close":8.55,
    "volume":275800,
    "adj close":8.55
}

我们现在使用这个简单的代码来阅读:

MongoClient mongoClient = MongoClients.create();
MongoDatabase database = mongoClient.getDatabase("localhost");
MongoCollection<Document> collection = database.getCollection("test");

MutableInt count = new MutableInt();
long start = System.currentTimeMillis();
collection.find().forEach((Block<Document>) document -> count.increment() /* actually something more complicated */ );
long start = System.currentTimeMillis();

我们以16秒(250k行/秒)的速度阅读整个集合,对于小文档来说,这真的不令人印象深刻。请记住,我们希望加载 8 亿行。不可能进行聚合、映射减少或类似操作。

这是像MongoDB一样快,还是有其他方法可以更快地加载文档(其他技术,移动Linux,更多RAM,设置...)?


答案 1

您没有指定您的用例,因此很难告诉您如何调整查询。(即:谁愿意一次加载800mil行只是为了计数?

鉴于您的架构,我认为您的数据几乎是只读的,并且您的任务与数据聚合相关。

您当前的工作只是读取数据(很可能您的驱动程序将批量读取),然后停止,然后执行一些计算(是的,int包装器用于增加处理时间),然后重复。这不是一个好方法。如果您没有以正确的方式访问数据库,则数据库不会神奇地快速。

如果计算不是太复杂,我建议您使用聚合框架,而不是全部加载到RAM中。

您应该考虑一些改进聚合的事情:

  1. 将数据集划分为较小的数据集。(例如:分区方式,分区方式...)。添加索引以支持该分区,并在分区上操作聚合,然后合并结果(典型的分而治之方法)dateexchange
  2. 项目仅需要的字段
  3. 过滤掉不必要的文档(如果可能)
  4. 如果无法在内存上执行聚合(如果达到每个 pipiline 100MB 的限制),则允许使用 diskusage。
  5. 使用内置管道来加速计算(例如:例如:例如)$count

如果你的计算太复杂,你无法用聚合框架来表达,那么使用mapReduce。它对进程进行操作,数据不需要通过网络传输到您的内存。mongod

更新

所以看起来你想做一个OLAP处理,你卡在了ETL步骤。

您不需要也不必避免每次都将整个 OLTP 数据加载到 OLAP。只需将新更改加载到数据仓库。然后首先数据加载/转储需要更多的时间是正常和可接受的。

对于首次加载,应考虑以下几点:

  1. 同样,Divide-N-Conquer将数据分解为较小的数据集(使用日期/交换/股票标签等谓词...
  2. 执行并行计算,然后合并结果(必须正确分区数据集)
  3. 在 中执行批处理而不是处理:加载数据分区,然后逐个计算而不是计算。forEach

答案 2

首先,正如@xtreme-biker所说,性能在很大程度上取决于您的硬件。具体来说,我的第一个建议是检查您是在虚拟机上运行还是在本机主机上运行。在我的情况下,在带有SDD驱动器的i7上的CentOS VM中,我每秒可以读取123,000个文档,但在同一驱动器上的Windows主机上运行的完全相同的代码每秒读取多达387,000个文档。

接下来,让我们假设您确实需要阅读完整的集合。这就是说您必须执行完全扫描。让我们假设您无法更改MongoDB服务器的配置,而只能优化代码。

然后一切都归结为什么

collection.find().forEach((Block<Document>) document -> count.increment());

确实如此。

快速展开 MongoCollection.find() 显示它实际上这样做:

ReadPreference readPref = ReadPreference.primary();
ReadConcern concern = ReadConcern.DEFAULT;
MongoNamespace ns = new MongoNamespace(databaseName,collectionName);
Decoder<Document> codec = new DocumentCodec();
FindOperation<Document> fop = new FindOperation<Document>(ns,codec);
ReadWriteBinding readBinding = new ClusterBinding(getCluster(), readPref, concern);
QueryBatchCursor<Document> cursor = (QueryBatchCursor<Document>) fop.execute(readBinding);
AtomicInteger count = new AtomicInteger(0);
try (MongoBatchCursorAdapter<Document> cursorAdapter = new MongoBatchCursorAdapter<Document>(cursor)) {
    while (cursorAdapter.hasNext()) {
        Document doc = cursorAdapter.next();
        count.incrementAndGet();
    }
}

这里相当快(低于10ms),大部分时间都花在while循环内,特别是在私有方法内部FindOperation.execute()QueryBatchCursor.getMore()

getMore()调用和它的时间基本上消耗在两个操作中:1)从服务器获取字符串数据和2)将字符串数据转换为BsonDocument。DefaultServerConnection.command()

事实证明,Mongo在获取大量结果集所需的网络往返次数方面非常聪明。它将首先使用 firstBatch 命令提取 100 个结果,然后提取更大的批次,nextBatch 是批大小,具体取决于集合大小,直到达到限制。

所以,在木头下面,会发生这样的事情来获取第一批。

ReadPreference readPref = ReadPreference.primary();
ReadConcern concern = ReadConcern.DEFAULT;
MongoNamespace ns = new MongoNamespace(databaseName,collectionName);
FieldNameValidator noOpValidator = new NoOpFieldNameValidator();
DocumentCodec payloadDecoder = new DocumentCodec();
Constructor<CodecProvider> providerConstructor = (Constructor<CodecProvider>) Class.forName("com.mongodb.operation.CommandResultCodecProvider").getDeclaredConstructor(Decoder.class, List.class);
providerConstructor.setAccessible(true);
CodecProvider firstBatchProvider = providerConstructor.newInstance(payloadDecoder, Collections.singletonList("firstBatch"));
CodecProvider nextBatchProvider = providerConstructor.newInstance(payloadDecoder, Collections.singletonList("nextBatch"));
Codec<BsonDocument> firstBatchCodec = fromProviders(Collections.singletonList(firstBatchProvider)).get(BsonDocument.class);
Codec<BsonDocument> nextBatchCodec = fromProviders(Collections.singletonList(nextBatchProvider)).get(BsonDocument.class);
ReadWriteBinding readBinding = new ClusterBinding(getCluster(), readPref, concern);
BsonDocument find = new BsonDocument("find", new BsonString(collectionName));
Connection conn = readBinding.getReadConnectionSource().getConnection();

BsonDocument results = conn.command(databaseName,find,noOpValidator,readPref,firstBatchCodec,readBinding.getReadConnectionSource().getSessionContext(), true, null, null);
BsonDocument cursor = results.getDocument("cursor");
long cursorId = cursor.getInt64("id").longValue();

BsonArray firstBatch = cursor.getArray("firstBatch");

然后用于获取每个下一批。cursorId

在我看来,驱动程序实现的“问题”是注入了字符串到JSON解码器,但JsonReader(解码()方法所依赖的JsonReader却没有。即使到您已经接近套接字通信的地方,也是这样。com.mongodb.internal.connection.InternalStreamConnection

因此,我认为,除非你深入到MongoCollection.find()InternalStreamConnection.sendAndReceiveAsync()

您无法减少往返次数,也无法更改将响应转换为 BsonDocument 的方式。不是没有绕过驱动程序并编写自己的客户端,我怀疑这是一个好主意。

邮 编如果你想尝试上面的一些代码,你需要getCluster()方法,它需要对mongo-java-driver进行肮脏的黑客攻击。

private Cluster getCluster() {
    Field cluster, delegate;
    Cluster mongoCluster = null;
    try {
        delegate = mongoClient.getClass().getDeclaredField("delegate");
        delegate.setAccessible(true);
        Object clientDelegate = delegate.get(mongoClient);
        cluster = clientDelegate.getClass().getDeclaredField("cluster");
        cluster.setAccessible(true);
        mongoCluster = (Cluster) cluster.get(clientDelegate);
    } catch (NoSuchFieldException | SecurityException | IllegalArgumentException | IllegalAccessException e) {
        System.err.println(e.getClass().getName()+" "+e.getMessage());
    }
    return mongoCluster;
}

推荐