持续收听 AWS SQS 消息的模式

2022-09-01 15:54:10

我有一个简单的类,它使用一些方法来包装适用于 Java 的 AWS SQS 开发工具包中的方法。例如:QueueService

public ArrayList<Hashtable<String, String>> receiveMessages(String queueURL) {
        List<Message> messages = this.sqsClient.receiveMessage(queueURL).getMessages();

        ArrayList<Hashtable<String, String>> resultList = new ArrayList<Hashtable<String, String>>();
        for(Message message : messages) {
            Hashtable<String, String> resultItem = new Hashtable<String, String>();
            resultItem.put("MessageId", message.getMessageId());
            resultItem.put("ReceiptHandle", message.getReceiptHandle());
            resultItem.put("Body", message.getBody());
            resultList.add(resultItem);
        }
        return resultList;
    }

我还有另一个名为的类,它有一个,并创建了一个.AppmainQueueService

我正在寻找一个“模式”来使in进入以侦听队列中的新消息。现在我有一个循环,我调用了该方法:mainAppwhile(true)receiveMessages

while(true) {
            messages = queueService.receiveMessages(queueURL); 
            for(Hashtable<String, String> message: messages) {
                String receiptHandle = message.get("ReceiptHandle");
                String messageBody = message.get("MessageBody");
                System.out.println(messageBody);
                queueService.deleteMessage(queueURL, receiptHandle);
            }
        }

这是正确的方法吗?我应该在 SQS SDK 中使用异步消息接收方法吗?


答案 1

据我所知,Amazon SQS 无法支持活动侦听器模型,在该模型中,Amazon SQS 会将消息“推送”到您的侦听器,或者在有消息时调用您的消息侦听器。

因此,您始终必须轮询消息。轮询支持两种轮询机制 - 短轮询和长轮询。每个都有自己的优点和缺点,但在大多数情况下,长轮询是您通常最终使用的一个,尽管默认的是短轮询。长轮询机制在网络流量方面肯定更有效率,更具成本效益(因为亚马逊按发出的请求数量向您收费),并且当您希望以时间敏感的方式处理消息(~=尽快处理)时,它也是首选机制。

关于长轮询和短轮询还有更多值得了解的复杂性,在这里解释所有这些有点困难,但如果你愿意,你可以通过下面的博客阅读更多关于这一点的细节。它还有一些代码示例,应该会有所帮助。

http://pragmaticnotes.com/2017/11/20/amazon-sqs-long-polling-versus-short-polling/

就一段时间(真)循环而言,我会说这取决于。如果您使用的是长轮询,并且可以将等待时间设置为(最大)20 秒,这样,如果没有消息,则轮询 SQS 的频率不会超过 20 秒。如果有消息,您可以决定是经常轮询(在消息到达后立即处理消息),还是始终按时间间隔(例如每 n 秒)处理它们。

需要注意的另一点是,您可以在单个 receiveMessages 请求中读取多达 10 条消息,因此这也将减少您对 SQS 的调用次数,从而降低成本。正如上面的博客详细解释的那样,您可以请求阅读10条消息,但即使队列中有那么多消息,它可能不会返回您10条消息。

不过,一般来说,我想说的是,如果你愿意在运行时关闭轮询,你需要构建适当的钩子和异常处理来关闭轮询,以防你使用一种 while(true) 类型的结构。

要考虑的另一个方面是,您是要在主应用程序线程中轮询 SQS,还是要生成另一个线程。因此,另一种选择可能是创建一个 ScheduledThreadPoolExecutor,在 main 中具有单个线程,以调度一个线程定期(每隔几秒钟)轮询 SQS,并且您可能不需要 while(true) 结构。


答案 2

您缺少一些内容:

  • 使用 并设置等待时间以启用长轮询。receiveMessages(ReceiveMessageRequest)
  • 将您的 AWS 调用包装在 try/catch 块中。特别是,请注意 ,如果您有太多的动态消息,则可以从 中抛出。OverLimitExceptionreceiveMessages()
  • 将循环的整个主体包装在其自己的 try/catch 块中,记录捕获的任何异常(不应该有 - 这是为了确保您的应用程序不会因为 AWS 更改了其 API 或您忽略了处理预期的异常而崩溃)。while

有关长轮询和可能的异常的详细信息,请参阅文档

至于使用异步客户端:你有什么特别的理由使用它吗?如果没有,那就不要:单个接收器线程更容易管理。


推荐