Scala 和第三方 Java 库中 Akka 的最佳实践

2022-09-04 22:10:18

我需要在我的Scala/Akka代码中使用memcached Java API。此 API 为您提供了同步和异步方法。异步返回java.util.concurrent.Future。这里有一个关于在Scala中处理Java Futures的问题,我如何包装java.util.concurrent.Future in an Akka Future?。但是,就我而言,我有两种选择:

  1. 将来使用同步 API 和包装阻塞代码并标记阻塞:

    Future {
      blocking {
        cache.get(key) //synchronous blocking call
      } 
    }
    
  2. 使用异步Java API并在Java Future上每n毫秒进行一次轮询,以检查未来是否完成(如上面链接的问题中的答案之一所述)。

哪一个更好?我倾向于第一个选项,因为轮询会极大地影响响应时间。阻止不应该阻止整个池吗?blocking { }


答案 1

我总是选择第一个选项。但我以稍微不同的方式做到这一点。我不使用该功能。(实际上我还没有想过。相反,我正在为Future提供一个自定义执行上下文,用于包装同步阻塞调用。所以它基本上看起来像这样:blocking

val ecForBlockingMemcachedStuff = ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(100)) // whatever number you think is appropriate
// i create a separate ec for each blocking client/resource/api i use

Future {
    cache.get(key) //synchronous blocking call
}(ecForBlockingMemcachedStuff) // or mark the execution context implicit. I like to mention it explicitly.

因此,所有阻塞调用都将使用专用的执行上下文 (= Threadpool)。因此,它与负责非阻塞内容的主执行上下文分离。

Typesafe 提供的 Play/Akka 在线培训视频也解释了这种方法。第 4 课中有一个关于如何处理阻塞呼叫的视频。它由Nilanjan Raychaudhuri(希望我拼写正确)解释,他是Scala书籍的着名作者。

更新:我在推特上与Nilanjan进行了讨论。他解释了方法和习惯之间的区别是什么。该功能只是创建了一个特殊的.它为您需要多少线程的问题提供了一种朴素的方法。每次当池中的所有其他现有线程都处于繁忙状态时,它都会生成一个新线程。所以它实际上是一个不受控制的执行文本。它可能会创建大量线程并导致内存不足错误等问题。因此,具有自定义执行上下文的解决方案实际上更好,因为它使此问题变得明显。Nilanjan还补充说,如果此池的请求过载,您需要考虑断路。blockingExecutionContextblockingExecutionContext

TLDR:是的,阻止呼叫很糟糕。使用自定义/专用的执行上下文来阻止调用。还要考虑断路。


答案 2

Akka 文档提供了一些有关如何处理阻塞调用的建议:

在某些情况下,不可避免地要执行阻塞操作,即使线程休眠一段不确定的时间,等待外部事件发生。示例是旧版 RDBMS 驱动程序或消息传递 API,其根本原因通常是(网络)I/O 是在幕后发生的。当遇到这种情况时,你可能会忍不住把阻塞调用包装在 Future 中,然后改用它,但这种策略太简单了:当应用程序在增加的负载下运行时,你很可能会发现瓶颈或内存或线程不足。

“阻塞问题”的适当解决方案的非详尽列表包括以下建议:

  • 在一个执行组件(或由路由器管理的一组执行组件)中执行阻塞调用,确保配置专用于此目的或大小足够大的线程池。

  • 在 Future 中执行阻塞调用,确保在任何时间点的此类调用数的上限(提交此类性质的任务数不受限制将耗尽您的内存或线程限制)。

  • 在 Future 中执行阻塞调用,为线程池提供线程池,该线程池具有适合运行应用程序的硬件的线程数上限。

  • 专用于单个线程来管理一组阻塞资源(例如,驱动多个通道的 NIO 选择器),并在事件作为参与者消息发生时调度事件。

第一种可能性特别适合于本质上是单线程的资源,例如传统上一次只能执行一个未完成查询并使用内部同步来确保这一点的数据库句柄。一种常见的模式是为 N 个执行组件创建一个路由器,每个执行组件包装一个数据库连接,并处理发送到路由器的查询。然后,必须针对最大吞吐量调整数字 N,这将根据在硬件上部署的 DBMS 而有所不同。


推荐