Spark Streaming:为什么内部处理成本如此之高,无法处理几 MB 的用户状态?
根据我们的实验,我们看到,当状态变得超过一百万个对象时,有状态的 Spark 流内部处理成本会花费大量时间。因此,延迟会受到影响,因为我们必须增加批处理间隔以避免不稳定的行为(处理时间>批处理间隔)。
它与我们应用程序的细节无关,因为它可以通过下面的代码复制。
究竟有哪些 Spark 内部处理/基础结构成本需要花费大量时间来处理用户状态?除了简单地增加批次间隔之外,是否有任何选项可以减少处理时间?
我们计划广泛使用状态:在几个节点中的每个节点上至少使用100MB左右,以将所有数据保存在内存中,并且每小时仅转储一次。
增加批处理间隔会有所帮助,但我们希望将批处理间隔保持在最小值。
原因可能不是状态占用的空间,而是大的对象图,因为当我们将list更改为大型基元数组时,问题就消失了。
只是一个猜测:它可能与Spark内部使用有关,因为它不时在分析时显示。org.apache.spark.util.SizeEstimator
这是在现代iCore7上重现上图的简单演示:
- 状态小于 15 MB
- 根本没有流输入
- 最快的可能(虚拟)'updateStateByKey'函数
- 批处理间隔 1 秒
- 检查点(Spark 要求,必须具有)到本地磁盘
- 在本地和 YARN 上进行测试
法典:
package spark;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.spark.HashPartitioner;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.util.SizeEstimator;
import scala.Tuple2;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
public class SlowSparkStreamingUpdateStateDemo {
// Very simple state model
static class State implements Serializable {
final List<String> data;
State(List<String> data) {
this.data = data;
}
}
public static void main(String[] args) {
SparkConf conf = new SparkConf()
// Tried KryoSerializer, but it does not seem to help much
//.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.setMaster("local[*]")
.setAppName(SlowSparkStreamingUpdateStateDemo.class.getName());
JavaStreamingContext javaStreamingContext = new JavaStreamingContext(conf, Durations.seconds(1));
javaStreamingContext.checkpoint("checkpoint"); // a must (if you have stateful operation)
List<Tuple2<String, State>> initialRddGeneratedData = prepareInitialRddData();
System.out.println("Estimated size, bytes: " + SizeEstimator.estimate(initialRddGeneratedData));
JavaPairRDD<String, State> initialRdd = javaStreamingContext.sparkContext().parallelizePairs(initialRddGeneratedData);
JavaPairDStream<String, State> stream = javaStreamingContext
.textFileStream(".") // fake: effectively, no input at all
.mapToPair(input -> (Tuple2<String, State>) null) // fake to get JavaPairDStream
.updateStateByKey(
(inputs, maybeState) -> maybeState, // simplest possible dummy function
new HashPartitioner(javaStreamingContext.sparkContext().defaultParallelism()),
initialRdd); // set generated state
stream.foreachRDD(rdd -> { // simplest possible action (required by Spark)
System.out.println("Is empty: " + rdd.isEmpty());
return null;
});
javaStreamingContext.start();
javaStreamingContext.awaitTermination();
}
private static List<Tuple2<String, State>> prepareInitialRddData() {
// 'stateCount' tuples with value = list of size 'dataListSize' of strings of length 'elementDataSize'
int stateCount = 1000;
int dataListSize = 200;
int elementDataSize = 10;
List<Tuple2<String, State>> initialRddInput = new ArrayList<>(stateCount);
for (int stateIdx = 0; stateIdx < stateCount; stateIdx++) {
List<String> stateData = new ArrayList<>(dataListSize);
for (int dataIdx = 0; dataIdx < dataListSize; dataIdx++) {
stateData.add(RandomStringUtils.randomAlphanumeric(elementDataSize));
}
initialRddInput.add(new Tuple2<>("state" + stateIdx, new State(stateData)));
}
return initialRddInput;
}
}