如何在Apache Flink中查找和更新数据库中的记录状态?

2022-09-04 22:19:08

我正在开发一个数据流应用程序,我正在研究将Apache Flink用于此项目的可能性。这样做的主要原因是它支持不错的高级流构造,与Java 8的Stream API非常相似。

我将接收与数据库中的特定记录相对应的事件,并且我希望能够处理这些事件(来自RabbitMQ或Kafka等消息代理),并最终更新数据库中的记录并将处理/转换的事件推送到另一个接收器(可能是另一个消息代理)。

理想情况下,与特定记录相关的事件需要以FIFO排序进行处理(尽管会有一个时间戳也有助于检测无序事件),但与不同记录相关的事件可以并行处理。我计划使用该构造按记录对流进行分区。keyBy()

需要完成的处理取决于数据库中有关记录的当前信息。但是,我无法找到一个示例或推荐的方法来查询数据库中的此类记录,以使用我需要处理它的其他信息来丰富正在处理的事件。

我心目中的管道如下:

-> keyBy() 在收到的 id 上 ->从数据库中检索与 id 对应的记录 ->对记录执行处理步骤 ->将处理后的事件推送到外部队列并更新数据库记录

需要更新数据库记录,因为另一个应用程序将查询数据。

在此管道实现后,可以进行其他优化。例如,可以将(更新的)记录缓存在托管状态,以便同一记录上的下一个事件不需要另一个数据库查询。但是,如果应用程序不知道特定记录,则需要从数据库中检索该记录。

在Apache Flink中用于这种场景的最佳方法是什么?


答案 1

您可以通过扩展一个丰富的函数来执行数据库查找,例如一个函数,在其方法中初始化一次数据库连接,然后处理该方法中的每个事件:RichFlatMapopen()flatMap()

public static class DatabaseMapper extends RichFlatMapFunction<Event, EncrichedEvent> {

    // Declare DB coonection and query statements

    @Override
    public void open(Configuration parameters) throws Exception {
      // Initialize Database connection
      // Prepare Query statements
    }

    @Override
    public void flatMap(Event currentEvent, Collector<EncrichedEvent> out) throws Exception {
      // look up the Database, update record, enrich event
      out.collect(enrichedEvent);        
    }
})

然后你可以使用如下:DatabaseMapper

stream.keyby(id)
      .flatmap(new DatabaseMapper())
      .addSink(..);

您可以在此处找到使用 Redis 缓存数据的示例。


答案 2

推荐