如何将流数据与数据流/光束中的大型历史数据集相结合
2022-09-04 23:10:27
我正在调查通过Google Dataflow / Apache Beam处理来自Web用户会话的日志,并且需要将用户进入(流式传输)时的日志与上个月用户会话的历史记录相结合。
我研究了以下方法:
- 使用30天的固定窗口:最有可能的大窗口来适应内存,而我不需要更新用户的历史记录,只需参考即可
- 使用CoGroupByKey联接两个数据集,但两个数据集必须具有相同的窗口大小(https://cloud.google.com/dataflow/model/group-by-key#join),这在我的情况下是不正确的(24小时与30天)
- 使用侧输入检索给定 中的用户的会话历史记录
element
processElement(ProcessContext processContext)
我的理解是,通过加载的数据需要适合内存。我知道我可以将单个用户的所有会话历史记录放入内存中,但不是所有会话历史记录。.withSideInputs(pCollectionView)
我的问题是,是否有一种方法可以从仅与当前用户会话相关的侧输入加载/流式传输数据?
我正在想象一个parDo函数,该函数将通过指定用户的ID从侧面输入加载用户的历史记录会话。但只有当前用户的历史记录会话才能放入内存中;通过侧面输入加载所有历史记录会话会太大。
一些伪代码来说明:
public static class MetricFn extends DoFn<LogLine, String> {
final PCollectionView<Map<String, Iterable<LogLine>>> pHistoryView;
public MetricFn(PCollectionView<Map<String, Iterable<LogLine>>> historyView) {
this.pHistoryView = historyView;
}
@Override
public void processElement(ProcessContext processContext) throws Exception {
Map<String, Iterable<LogLine>> historyLogData = processContext.sideInput(pHistoryView);
final LogLine currentLogLine = processContext.element();
final Iterable<LogLine> userHistory = historyLogData.get(currentLogLine.getUserId());
final String outputMetric = calculateMetricWithUserHistory(currentLogLine, userHistory);
processContext.output(outputMetric);
}
}