阿卡的正确设计。- 消息传递

2022-09-01 19:18:27

我已经浏览了一些关于akka如何以及为什么不保证消息传递的帖子。文档,此讨论和其他关于小组的讨论确实很好地解释了它。

我对akka很陌生,想知道一个案例的适当设计。例如,假设我有3个不同的演员都在不同的机器上。一个负责烹饪书,另一个负责历史,最后一个负责技术书籍。

我在另一台机器上有一个主要演员。假设有一个查询给主要参与者,如果我们有一些可用的书,请搜索。主角色向 3 个远程参与者发送请求,并期望获得结果。所以我这样做:

  val scatter = system.actorOf(
        Props[SearchActor].withRouter(ScatterGatherFirstCompletedRouter(
              routees=someRoutees, within = 10 seconds)), "router")
  implicit val timeout = Timeout(10 seconds)
  val futureResult = scatter ?  Text("Concurrency in Practice")
  //      What should I do here?.
  //val result = Await.result(futureResult, timeout.duration) line(a)

简而言之,我已经向所有3个远程Actor发送了请求,并期望在10秒内获得结果。

应该采取什么行动?

  1. 假设我在10秒内没有收到结果,我应该再次向他们所有人发送新请求吗?
  2. 如果上面的时间还为时过早怎么办?但我不知道这需要多少时间。within
  3. 如果时间足够,但消息被丢弃了怎么办。within

如果我没有及时得到回应,请再次重新发送请求。像这样的东西,它仍然是异步的:within

futureResult onComplete{
  case Success(i) => println("Result "+i)
  case Failure(e) => //send again
}

但是在太多的查询下,调用上的线程不会太多而且体积庞大吗?如果我取消注释,它将变为同步,并且在负载下可能会表现不佳。line(a)

假设我在 10 秒内没有得到回应。如果时间还为时过早,那么它又是一个沉重的无用的计算。如果消息被丢弃,那么宝贵的时间就浪费了几秒钟。如果说我知道消息已经传递,我可能会等待更长的时间而不会怀疑。within10

人们如何解决这些问题? ?但是,我必须将状态存储在所有查询的 actor 中。这一定是一件常见的事情,我正在寻找合适的设计。ACK


答案 1

我将尝试为您回答其中一些问题。我不会对所有事情都有具体的答案,但希望我能引导你朝着正确的方向前进。

对于初学者来说,您需要改变将请求传达给进行图书搜索的3个演员的方式。在这里使用 a 可能不是正确的方法。此路由器将仅等待来自其中一个路由(第一个响应的路由)的应答,因此您的结果集将不完整,因为它不包含来自其他 2 个路由的结果。还有一个 ,但这也不符合您的需求,因为它只能处理,而不是 。要执行要执行的操作,一种选择是将请求发送给每个接收者,获取响应,然后使用 将它们组合成一个聚合。一个简化的示例可能如下所示:ScatterGatherFirstCompletedRouterBroadcastRoutertell (!)ask (?)FuturesFutureFuture.sequence

case class SearchBooks(title:String)
case class Book(id:Long, title:String)

class BookSearcher extends Actor{

  def receive = {
    case req:SearchBooks =>
      val routees:List[ActorRef] = ...//Lookup routees here
      implicit val timeout = Timeout(10 seconds)
      implicit val ec = context.system.dispatcher

      val futures = routees.map(routee => (routee ? req).mapTo[List[Book]])
      val fut = Future.sequence(futures)

      val caller = sender //Important to not close over sender
      fut onComplete{
        case Success(books) => caller ! books.flatten

        case Failure(ex) => caller ! Status.Failure(ex)
      }
  }
}

现在,这不会是我们的最终代码,但它是您的示例尝试执行的操作的近似值。在此示例中,如果任何一个下游路由失败/超时,我们将命中我们的块,并且调用方也将失败。如果它们都成功,则调用方将改为获取对象的聚合列表。FailureBook

现在进入您的问题。首先,您询问如果超时内没有从其中一个路由获得答案,是否应该再次向所有参与者发送请求。这个问题的答案真的取决于你。您是否允许另一端的用户看到部分结果(即3个参与者中的2个的结果),还是每次都必须是完整的结果集?如果答案是肯定的,则可以将发送到路由的代码调整为如下所示:

val futures = routees.map(routee => (routee ? req).mapTo[List[Book]].recover{
  case ex =>
    //probably log something here
    List()
})

使用此代码,如果任何路由超时或由于任何原因失败,则将在 中替换一个空的“Book”列表,而不是失败。现在,如果您无法忍受部分结果,那么您可以再次重新发送整个请求,但您必须记住,另一端可能有人在等待他们的书结果,他们不想永远等待。

对于第二个问题,你会问如果你的超时过早怎么办?您选择的超时值将完全取决于您,但它很可能应该基于两个因素。第一个因素将来自测试搜索的调用时间。找出平均需要多长时间,并基于该值选择一个小垫子的值,以确保安全。第二个因素是另一端的人愿意等待他们的结果多长时间。你可能只是在超时时时非常保守,只是为了安全起见,让它像60秒,但如果另一端确实有人在等待结果,他们愿意等待多长时间?我宁愿得到一个失败的响应,指示我应该再试一次,而不是永远等待。因此,考虑到这两个因素,您应该选择一个值,该值将允许您在非常高的时间百分比内获得响应,同时仍然不会使另一端的调用者等待太长时间。

对于问题 3,您询问如果邮件被丢弃会发生什么情况。在这种情况下,我猜想接收该消息的人的未来将超时,因为它不会得到响应,因为接收者参与者永远不会收到要响应的消息。Akka不是JMS;它没有确认模式,如果收件人没有接收并确认消息,则可以多次重新发送消息。

另外,从我的例子中可以看出,我同意不使用 在聚合上阻止。我更喜欢使用非阻塞回调。接收函数中的阻止并不理想,因为在该阻止操作完成之前,该实例将停止处理其邮箱。通过使用非阻塞回调,可以释放该实例以返回到处理其邮箱,并允许对结果的处理只是在 中执行的另一个作业,与处理其邮箱的 actor 分离。FutureAwaitActorExecutionContext

现在,如果您真的想在网络不可靠时不浪费通信,可以查看Akka 2.2中可用的可靠代理。如果您不想走这条路,可以通过定期向路由发送类型消息来自行滚动。如果一个人没有及时响应,你可以将其标记为关闭,并且不要向它发送消息,直到你能从它那里得到一个可靠的(在很短的时间内),有点像每个路由的FSM。如果您绝对需要此行为,则其中任何一个都可以工作,但您需要记住,这些解决方案会增加复杂性,并且只有在您绝对需要此行为时才应使用。如果您正在开发银行软件,并且您绝对需要有保证的交付语义,否则会导致不良的财务影响,那么一定要采用这种方法。在决定是否需要这样的东西时要明智,因为我敢打赌90%的时间你不需要。在您的模型中,唯一可能受到等待您可能已经知道不会成功的事情的影响的人是另一端的调用方。通过在actor中使用非阻塞回调,它不会因为某些事情可能需要很长时间而停止;它已经移动到下一条消息。如果您决定在失败时重新提交,您也需要小心。您不希望淹没接收执行组件的邮箱。如果您决定重新发送,请将其限制在固定的次数。pingping

如果您需要这些有保证的语义,另一种可能的方法可能是研究Akka的聚类分析模型。如果群集了下游路由,并且其中一个服务器出现故障,则所有流量都将路由到该节点,该节点在另一个节点恢复之前仍处于运行状态。


答案 2

推荐