阿卡演员与未来:以身作则

2022-09-01 17:42:02

我试图学习阿卡演员和未来,但在阅读了 http://akka.io 的文档并做了 http://doc.akka.io/docs/akka/2.0.2/intro/getting-started-first-java.html 之后,我仍然在理解方面遇到了一些问题。我想计算Pi的值是很多人也可以联系到的事情,但不是我=)。我搜索了一下,但没有找到任何适合我的例子。因此,我想我会把我的一些现实生活中的代码扔在这里,并用它交换一个如何使用Akka做到这一点的例子。

好的,我们开始吧:

我有一个java play2应用程序,我需要从我的数据库中获取一些数据,并在我的elasticsearch实例中对其进行索引。

  1. 我打电话给DB并获取场地的ID。

  2. 然后,我将列表拆分并创建几个可调用的索引任务。

  3. 之后,我调用所有任务,其中每个任务从数据库中收集分配的ID的场所。

  4. 对于每个场所,将其索引到 elasticsearch 实例并使其可搜索。

  5. 做。

应用.java:

public class Application extends Controller {

  private static final int VENUE_BATCH = 1000;
  private static int size;

  public static Result index() {

      List<Long> venueIds = DbService.getAllVenueIds();
      size = venueIds.size();
      Logger.info("Will index " + size + " items in total.");
      ExecutorService service = Executors.newFixedThreadPool(getRuntime().availableProcessors());
      int startIx = 0;
      Collection<Callable<Object>> indexTasks = new ArrayList<Callable<Object>>();
      do {
          int endIx = Math.min(startIx + VENUE_BATCH, size);
          List<Long> subList = venueIds.subList(startIx, endIx);
          VenueIndexTask indexTask = new VenueIndexTask(subList);
          indexTasks.add(indexTask);
      } while ((startIx += VENUE_BATCH) < size);

      Logger.info("Invoking all tasks!");
      try {
          service.invokeAll(indexTasks);
      } catch (InterruptedException e) {
          e.printStackTrace();
      }

      return ok(index.render("Done indexing."));
  } 
}

地点任务:

public class VenueIndexTask implements Callable<Object> {

    private List<Long> idSubList;

    public VenueIndexTask(List<Long> idSubList){
        this.idSubList = idSubList;
        Logger.debug("Creating task which will index " + idSubList.size() + " items. " +
                "Range: " + rangeAsString() + ".");
    }

    @Override
    public Object call() throws Exception {
        List<Venue> venues = DbService.getVenuesForIds(idSubList);
        Logger.debug("Doing some indexing: "+venues.size());

        for(Venue venue : venues) {
            venue.index();
        }
        return null;
    }
    private String rangeAsString() {
        return "[" + idSubList.get(0) + "-" + idSubList.get(idSubList.size() - 1) + "]";
    }
}

场地:

@IndexType(name = "venue")
public class Venue extends Index {

    private String name;

    // Find method static for request
    public static Finder<Venue> find = new Finder<Venue>(Venue.class);

    public Venue() {
    }

    public Venue(String id, String name) {
        super.id = id;
        this.name = name;
    }

    @Override
    public Map toIndex() {
        HashMap map = new HashMap();
        map.put("id", super.id);
        map.put("name", name);
        return map;
    }

    @Override
    public Indexable fromIndex(Map map) {
        if (map == null) {
            return this;
        }
        this.name = (String) map.get("name");
        return this;
    }
}

所以你们所有阿卡人都疯了!请尽可能多地做,提出可以使用的很酷的未来功能,或者我可以用来学习这些东西的任何其他知识/代码。


答案 1

我喜欢如何看待Akka(或任何其他基于消息的系统)就像在工厂里一样,像传送带一样思考。Actors的一种简化思维方式可能是接受披萨订单。

  • 您,饥饿的客户(演员/角色)向披萨店发送订单(A消息)

  • 客户服务(演员/角色)接受您的订单,为您提供订单号(未来)

  • 如果您不耐烦,您可能已经在电话/互联网/商店等到您拿到比萨饼(同步/阻止交易),否则您会对订单号感到满意,稍后再检查它(非阻止)

  • 客户服务在厨房经理(演员)的监督下将消息发送给厨师(演员)。这是一个非常繁重的厨房,有等级制度。阿卡喜欢这样。请参阅监督

  • Chef 创建一个新的披萨并附加订单的详细信息(新消息),并通过交付经理(主管 Actor)将其传递给送货员(演员)。

  • 在此过程中,您的订单详细信息没有更改,这将是一场噩梦。如果你想要普通的奶酪,如果你有意大利辣香肠,你不会高兴的!所有消息都应该是不可变的!但是,对于不同的参与者,消息可能有所不同。一个送货男孩会期待比萨饼和附加的订单细节,厨师会期待订单。当消息需要更改时,将创建一个新消息。

  • 每个演员都擅长一个角色,如果一个人必须完成所有任务,那会有多大效果?可能是一些演员的人数超过了其他演员(例如,厨师有10个线程,送货男孩有2个线程,客户服务有1个线程)。

  • 堵住行为是一种痛苦,想象一下客服在等着厨师和送货员才见到下一个顾客?

希望我对你有所帮助,这是一个巨大的话题和巨大的思想转变。祝你好运


答案 2

Coursera目前开设了一门关于反应式编程的课程,其中有3个关于Akka和演员模型的最后3讲课。这包括视频讲座和家庭作业(在Scala中,但不是Java)。虽然您为时已晚,无法收到完整的证书,但您仍然可以加入课程,只需查看过去三周。

https://class.coursera.org/reactive-001/class


推荐