如何使用“executeAsync”时限制对cassandra的写入请求?
我正在使用datastax java驱动程序3.1.0连接到cassandra集群,我的cassandra集群版本是2.0.10。我正在使用 QUORUM 一致性异步编写。
private final ExecutorService executorService = Executors.newFixedThreadPool(10);
public void save(String process, int clientid, long deviceid) {
String sql = "insert into storage (process, clientid, deviceid) values (?, ?, ?)";
try {
BoundStatement bs = CacheStatement.getInstance().getStatement(sql);
bs.setConsistencyLevel(ConsistencyLevel.QUORUM);
bs.setString(0, process);
bs.setInt(1, clientid);
bs.setLong(2, deviceid);
ResultSetFuture future = session.executeAsync(bs);
Futures.addCallback(future, new FutureCallback<ResultSet>() {
@Override
public void onSuccess(ResultSet result) {
logger.logInfo("successfully written");
}
@Override
public void onFailure(Throwable t) {
logger.logError("error= ", t);
}
}, executorService);
} catch (Exception ex) {
logger.logError("error= ", ex);
}
}
我上面的 save 方法将以非常快的速度从多个线程调用。
问题:
我想将请求限制为异步写入Cassandra的方法。如果我的写入速度非常快,超出了我的Cassandra集群可以处理的速度,那么它将开始抛出错误,我希望我的所有写入都应该成功地进入Cassandra而不会有任何损失。executeAsync
我看到这篇文章,解决方案是在固定数量的许可证下使用。但我不确定如何以及什么是实现这一目标的最佳方式。我以前从未使用过信号量。这是逻辑。任何人都可以在我的代码上提供基于信号量的示例,或者是否有任何更好的方法/选项,那么也请告诉我。Semaphore
在编写数据加载程序的上下文中,可以执行如下操作:
- 为了简单起见,请使用信号量或其他具有固定数量许可的构造(这将是您的最大飞行请求数)。每当您使用 executeAsync 提交查询时,请获取许可证。您实际上只需要 1 个线程(但可能需要引入一个用于执行此操作的 # cpu 内核大小的池),该线程从信号量获取许可并执行查询。它只会阻止获取,直到有可用的许可证。
- 使用 Futures.addCallback 作为从 executeAsync 返回的未来。回调应该在 onSuccess 和 onFailure 的情况下调用 Sempahore.release()。通过释放许可证,这应该允许您在步骤 1 中的线程继续并提交下一个请求。
我也看过其他几个帖子,他们谈到了使用,或者哪一个更好,我应该使用?以下是我能想到的选项:RingBuffer
Guava RateLimitter
- 使用信号量
- 使用环形缓冲区
- 使用番石榴限速器
任何人都可以帮我举个例子,说明我们如何限制请求或为cassandra写入获得背压,并确保所有写入都成功进入cassandra?