Vert.x 事件循环 - 这如何异步?

2022-09-02 19:44:28

我正在使用Vert.x,并且对基于事件循环而不是线程/连接模型的服务器非常陌生。

public void start(Future<Void> fut) {
    vertx
        .createHttpServer()
        .requestHandler(r -> {
            LocalDateTime start = LocalDateTime.now();
            System.out.println("Request received - "+start.format(DateTimeFormatter.ISO_DATE_TIME));
            final MyModel model = new MyModel();
            try {

                for(int i=0;i<10000000;i++){
                    //some simple operation
                }

                model.data = start.format(DateTimeFormatter.ISO_DATE_TIME) +" - "+LocalDateTime.now().format(DateTimeFormatter.ISO_DATE_TIME);

            } catch (Exception e1) {
                // TODO Auto-generated catch block
                e1.printStackTrace();
            }

          r.response().end(
                  new Gson().toJson(model)
                 );
        })
        .listen(4568, result -> {
          if (result.succeeded()) {
            fut.complete();
          } else {
            fut.fail(result.cause());
          }
        });
    System.out.println("Server started ..");
  }
  • 我只是尝试模拟一个长时间运行的请求处理程序,以了解此模型的工作原理。
  • 我观察到的是,所谓的事件循环被阻止,直到我的第一个请求完成。无论花费多少时间,后续请求在前一个请求完成之前都不会被执行。
  • 显然,我在这里遗漏了一块,这就是我在这里的问题。

根据到目前为止的答案进行编辑:

  1. 接受所有请求不被认为是异步的吗?如果只有在清除前一个连接时才能接受新连接,那么它是如何异步的?
    • 假设一个典型的请求需要 100 毫秒到 1 秒之间的任何时间(基于请求的类型和性质)。这意味着,在上一个请求完成之前,事件循环不能接受新连接(即使它在一秒钟内结束)。如果我作为一个程序员必须仔细考虑所有这些,并将这些请求处理程序推送到工作线程,那么它与线程/连接模型有什么不同?
    • 我只是试图了解这个模型如何比传统的线程/连接服务器模型更好?假设没有 I/O 操作或所有 I/O 操作都是异步处理的?它如何解决c10k问题,当它不能并行启动所有并发请求并且必须等到前一个请求终止时?
  2. 即使我决定将所有这些操作推送到工作线程(池化),那么我又回到了同样的问题,不是吗?线程之间的上下文切换?编辑并顶上这个问题以获得赏金

    • 不完全了解此模型如何声明为异步模型。
    • Vert.x有一个异步JDBC客户端(Asyncronous是关键字),我试图用RXJava适应它。
    • 下面是一个代码示例(相关部分)

    server.requestStream().toObservable().subscribe(req -> {

            LocalDateTime start = LocalDateTime.now();
            System.out.println("Request for " + req.absoluteURI() +" received - " +start.format(DateTimeFormatter.ISO_DATE_TIME));
            jdbc.getConnectionObservable().subscribe(
                    conn -> {
    
                        // Now chain some statements using flatmap composition
                        Observable<ResultSet> resa = conn.queryObservable("SELECT * FROM CALL_OPTION WHERE UNDERLYING='NIFTY'");
                        // Subscribe to the final result
                        resa.subscribe(resultSet -> {
    
                            req.response().end(resultSet.getRows().toString());
                            System.out.println("Request for " + req.absoluteURI() +" Ended - " +LocalDateTime.now().format(DateTimeFormatter.ISO_DATE_TIME));
                        }, err -> {
                            System.out.println("Database problem");
                            err.printStackTrace();
                        });
                    },
    
                    // Could not connect
                    err -> {
                        err.printStackTrace();
                    }
                    );
    
    });
    server.listen(4568);
    
    • 其中的选择查询大约需要 3 秒才能返回完整的表转储。
    • 当我触发并发请求(仅使用2个请求)时,我看到第二个请求完全等待第一个请求完成。
    • 如果 JDBC 选择是异步的,那么让框架在等待选择查询返回任何内容时处理第二个连接难道不是一个公平的期望吗?

答案 1

事实上,Vert.x 事件循环是存在于许多平台上的经典事件循环。当然,大多数解释和文档都可以找到Node.js,因为它是基于这种架构模式的最流行的框架。看看Node.js事件循环下对机制的或多或少的一个很好的解释Vert.x教程在“不要打电话给我们,我们会打电话给你”和“顶点”之间有很好的解释。

编辑更新:

首先,当您使用事件循环时,主线程应该对所有请求都非常快速地工作。你不应该在这个循环中做任何长时间的工作。当然,您不应该等待对数据库调用的响应。- 异步计划调用 - 分配回调(处理程序)以产生 - 回调将在工作线程中执行,而不是在事件循环线程中执行。例如,此回调将返回对套接字的响应。因此,事件循环中的操作应仅使用回调来安排所有异步操作,并转到下一个请求,而不等待任何结果。

假设一个典型的请求需要 100 毫秒到 1 秒之间的任何时间(基于请求的类型和性质)。

在这种情况下,您的请求具有一些计算成本高昂的部分或对IO的访问权限 - 事件循环中的代码不应等待这些操作的结果。

我只是试图了解这个模型如何比传统的线程/连接服务器模型更好?假设没有 I/O 操作或所有 I/O 操作都是异步处理的?

当您有太多的并发请求和传统的编程模型时,您将为每个请求创建线程。此线程将做什么?它们将主要等待 IO 操作(例如,来自数据库的结果)。这是对资源的浪费。在我们的事件循环模型中,您有一个计划操作的主线程,并为长任务预先分配了工作线程的数量。+这些工作线程实际上都没有等待响应,他们只能在等待IO结果时执行另一个代码(它可以实现为回调或定期检查当前正在进行的IO作业的状态)。我建议您浏览Java NIO和Java NIO 2,以了解如何在框架中实际实现此异步IO。绿色线程也是一个非常相关的概念,这很好理解。绿色线程和协程是一种阴影事件循环,它试图实现同样的事情 - 更少的线程,因为我们可以在绿色线程等待某些东西时重用系统线程。

它如何解决c10k问题,当它不能并行启动所有并发请求并且必须等到前一个请求终止时?

当然,我们不会在主线程中等待发送上一个请求的响应。获取请求,安排长/IO 任务执行,下一个请求。

即使我决定将所有这些操作推送到工作线程(池化),那么我又回到了同样的问题,不是吗?线程之间的上下文切换?

如果你把一切都做好了 - 不。更重要的是,您将获得良好的数据局部性和执行流预测。一个 CPU 内核将执行您的短事件循环并安排异步工作,而无需上下文切换,仅此而已。其他内核调用数据库并返回响应,并且仅返回此响应。在回调之间切换或检查不同通道的 IO 状态实际上并不需要任何系统线程的上下文切换 - 它实际上在一个工作线程中工作。因此,我们每个内核有一个工作线程,例如,这个系统线程等待/检查来自多个数据库连接的结果可用性。重新审视Java NIO概念,了解它如何以这种方式工作。(NIO的经典示例 - 代理服务器,可以接受许多并行连接(数千个),对其他远程服务器的代理请求,侦听响应并将响应发送回客户端,所有这些都使用一个或两个线程)

关于你的代码,我为你做了一个示例项目,以证明一切都按预期工作:

public class MyFirstVerticle extends AbstractVerticle {

    @Override
    public void start(Future<Void> fut) {
        JDBCClient client = JDBCClient.createShared(vertx, new JsonObject()
                .put("url", "jdbc:hsqldb:mem:test?shutdown=true")
                .put("driver_class", "org.hsqldb.jdbcDriver")
                .put("max_pool_size", 30));


        client.getConnection(conn -> {
            if (conn.failed()) {throw new RuntimeException(conn.cause());}
            final SQLConnection connection = conn.result();

            // create a table
            connection.execute("create table test(id int primary key, name varchar(255))", create -> {
                if (create.failed()) {throw new RuntimeException(create.cause());}
            });
        });

        vertx
            .createHttpServer()
            .requestHandler(r -> {
                int requestId = new Random().nextInt();
                System.out.println("Request " + requestId + " received");
                    client.getConnection(conn -> {
                         if (conn.failed()) {throw new RuntimeException(conn.cause());}

                         final SQLConnection connection = conn.result();

                         connection.execute("insert into test values ('" + requestId + "', 'World')", insert -> {
                             // query some data with arguments
                             connection
                                 .queryWithParams("select * from test where id = ?", new JsonArray().add(requestId), rs -> {
                                     connection.close(done -> {if (done.failed()) {throw new RuntimeException(done.cause());}});
                                     System.out.println("Result " + requestId + " returned");
                                     r.response().end("Hello");
                                 });
                         });
                     });
            })
            .listen(8080, result -> {
                if (result.succeeded()) {
                    fut.complete();
                } else {
                    fut.fail(result.cause());
                }
            });
    }
}

@RunWith(VertxUnitRunner.class)
public class MyFirstVerticleTest {

  private Vertx vertx;

  @Before
  public void setUp(TestContext context) {
    vertx = Vertx.vertx();
    vertx.deployVerticle(MyFirstVerticle.class.getName(),
        context.asyncAssertSuccess());
  }

  @After
  public void tearDown(TestContext context) {
    vertx.close(context.asyncAssertSuccess());
  }

  @Test
  public void testMyApplication(TestContext context) {
      for (int i = 0; i < 10; i++) {
          final Async async = context.async();
          vertx.createHttpClient().getNow(8080, "localhost", "/",
                            response -> response.handler(body -> {
                                context.assertTrue(body.toString().contains("Hello"));
                                async.complete();
                            })
        );
    }
  }
}

输出:

Request 1412761034 received
Request -1781489277 received
Request 1008255692 received
Request -853002509 received
Request -919489429 received
Request 1902219940 received
Request -2141153291 received
Request 1144684415 received
Request -1409053630 received
Request -546435082 received
Result 1412761034 returned
Result -1781489277 returned
Result 1008255692 returned
Result -853002509 returned
Result -919489429 returned
Result 1902219940 returned
Result -2141153291 returned
Result 1144684415 returned
Result -1409053630 returned
Result -546435082 returned

因此,我们接受一个请求 - 安排对数据库的请求,转到下一个请求,我们使用所有这些请求,并且仅在数据库完成所有操作时才为每个请求发送响应。

关于你的代码示例,我看到两个可能的问题 - 首先,看起来你没有连接,这很重要,把它返回到池。其次,您的池是如何配置的?如果只有一个空闲连接 - 这些请求将序列化等待此连接。close()

我建议您为两个请求添加一些时间戳打印,以找到序列化的位置。您有一些东西使事件循环中的调用被阻塞。或。。。检查您是否在测试中并行发送请求。在上一个之后得到响应后,不是下一个。


答案 2

这如何异步?答案就在你的问题本身

我观察到的是,所谓的事件循环被阻止,直到我的第一个请求完成。无论花费多少时间,后续请求在前一个请求完成之前都不会被执行。

这个想法是,不是使用新的线程来为每个HTTP请求提供服务,而是使用相同的线程,而这些线程已被长时间运行的任务所阻止。

事件循环的目标是节省上下文从一个线程切换到另一个线程所涉及的时间,并在任务使用 IO/网络活动时利用理想的 CPU 时间。如果在处理您的请求时,它必须执行其他IO / Network操作,例如:在此期间从远程MongoDB实例获取数据,您的线程不会被阻止,而是由同一线程提供另一个请求,这是事件循环模型的理想用例(考虑到您有并发请求进入服务器)。

如果您有长时间运行的任务不涉及网络/IO操作,则应考虑改用线程池,如果您阻止主事件循环线程本身,其他请求将被延迟。即,对于长时间运行的任务,您可以支付上下文切换的代价,以使服务器具有响应性。

编辑:服务器处理请求的方式可能会有所不同:

1)为每个传入的请求生成一个新线程(在此模型中,上下文切换会很高,并且每次生成新线程都会产生额外的成本)

2)使用线程池来服务器请求(同一组线程将用于为请求提供服务,并且额外的请求将排队)

3) 使用事件循环(对所有请求使用单线程。可忽略不计的上下文切换。因为会有一些线程在运行,例如:对传入的请求进行排队)

首先,上下文切换还不错,需要保持应用程序服务器的响应速度,但是,如果并发请求数太高(大约超过 10k),则过多的上下文切换可能会成为问题。如果你想更详细地了解,我建议你阅读C10K文章

假设一个典型的请求需要 100 毫秒到 1 秒之间的任何时间(基于请求的类型和性质)。这意味着,在上一个请求完成之前,事件循环不能接受新连接(即使它在一秒钟内结束)。

如果您需要响应大量并发请求(超过10k),我会认为超过500ms是运行时间更长的操作。其次,就像我说的,涉及一些线程/上下文切换,例如:对传入的请求进行排队,但是,线程之间的上下文切换将大大减少,因为一次线程太少。第三,如果在解析第一个请求时涉及网络/IO操作,则第二个请求将有机会在第一个请求解析之前得到解决,这就是此模型运行良好的位置。

如果我作为一个程序员必须仔细考虑所有这些,并将这些请求处理程序推送到工作线程,那么它与线程/连接模型有什么不同?

Vertx试图为您提供最好的线程和事件循环,因此,作为程序员,您可以调用如何在两种情况下(即长时间运行的操作,有和没有网络/ IO操作)使您的应用程序高效。

我只是试图了解这个模型如何比传统的线程/连接服务器模型更好?假设没有 I/O 操作或所有 I/O 操作都是异步处理的?它如何解决c10k问题,当它不能并行启动所有并发请求并且必须等到前一个请求终止时?

上面的解释应该回答这个问题。

即使我决定将所有这些操作推送到工作线程(池化),那么我又回到了同样的问题,不是吗?线程之间的上下文切换?

就像我说的,两者都有优点和缺点,vertx为您提供了模型,并且根据您的用例,您必须选择适合您的场景。


推荐