Spark Streaming:为什么内部处理成本如此之高,无法处理几 MB 的用户状态?

根据我们的实验,我们看到,当状态变得超过一百万个对象时,有状态的 Spark 流内部处理成本会花费大量时间。因此,延迟会受到影响,因为我们必须增加批处理间隔以避免不稳定的行为(处理时间>批处理间隔)。

它与我们应用程序的细节无关,因为它可以通过下面的代码复制。

究竟有哪些 Spark 内部处理/基础结构成本需要花费大量时间来处理用户状态?除了简单地增加批次间隔之外,是否有任何选项可以减少处理时间?

我们计划广泛使用状态:在几个节点中的每个节点上至少使用100MB左右,以将所有数据保存在内存中,并且每小时仅转储一次。

增加批处理间隔会有所帮助,但我们希望将批处理间隔保持在最小值。

原因可能不是状态占用的空间,而是大的对象图,因为当我们将list更改为大型基元数组时,问题就消失了。

只是一个猜测:它可能与Spark内部使用有关,因为它不时在分析时显示。org.apache.spark.util.SizeEstimator

enter image description here

这是在现代iCore7上重现上图的简单演示:

  • 状态小于 15 MB
  • 根本没有流输入
  • 最快的可能(虚拟)'updateStateByKey'函数
  • 批处理间隔 1 秒
  • 检查点(Spark 要求,必须具有)到本地磁盘
  • 在本地和 YARN 上进行测试

法典:

package spark;

import org.apache.commons.lang3.RandomStringUtils;
import org.apache.spark.HashPartitioner;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.util.SizeEstimator;
import scala.Tuple2;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

public class SlowSparkStreamingUpdateStateDemo {

    // Very simple state model
    static class State implements Serializable {
        final List<String> data;
        State(List<String> data) {
            this.data = data;
        }
    }

    public static void main(String[] args) {
        SparkConf conf = new SparkConf()
                // Tried KryoSerializer, but it does not seem to help much
                //.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
                .setMaster("local[*]")
                .setAppName(SlowSparkStreamingUpdateStateDemo.class.getName());

        JavaStreamingContext javaStreamingContext = new JavaStreamingContext(conf, Durations.seconds(1));
        javaStreamingContext.checkpoint("checkpoint"); // a must (if you have stateful operation)

        List<Tuple2<String, State>> initialRddGeneratedData = prepareInitialRddData();
        System.out.println("Estimated size, bytes: " + SizeEstimator.estimate(initialRddGeneratedData));
        JavaPairRDD<String, State> initialRdd = javaStreamingContext.sparkContext().parallelizePairs(initialRddGeneratedData);

        JavaPairDStream<String, State> stream = javaStreamingContext
                .textFileStream(".") // fake: effectively, no input at all
                .mapToPair(input -> (Tuple2<String, State>) null) //  fake to get JavaPairDStream
                .updateStateByKey(
                        (inputs, maybeState) -> maybeState, // simplest possible dummy function
                        new HashPartitioner(javaStreamingContext.sparkContext().defaultParallelism()),
                        initialRdd); // set generated state

        stream.foreachRDD(rdd -> { // simplest possible action (required by Spark)
            System.out.println("Is empty: " + rdd.isEmpty());
            return null;
        });

        javaStreamingContext.start();
        javaStreamingContext.awaitTermination();
    }

    private static List<Tuple2<String, State>> prepareInitialRddData() {
        // 'stateCount' tuples with value = list of size 'dataListSize' of strings of length 'elementDataSize'
        int stateCount = 1000;
        int dataListSize = 200;
        int elementDataSize = 10;
        List<Tuple2<String, State>> initialRddInput = new ArrayList<>(stateCount);
        for (int stateIdx = 0; stateIdx < stateCount; stateIdx++) {
            List<String> stateData = new ArrayList<>(dataListSize);
            for (int dataIdx = 0; dataIdx < dataListSize; dataIdx++) {
                stateData.add(RandomStringUtils.randomAlphanumeric(elementDataSize));
            }
            initialRddInput.add(new Tuple2<>("state" + stateIdx, new State(stateData)));
        }
        return initialRddInput;
    }

}

答案 1

在 spark 1.6 中改进了状态管理。
请参阅 SPARK-2629 改进的火花流状态管理;

在详细设计规范中:
改进了 Spark Streaming 中的状态管理

一个性能缺陷如下:

需要更优化的状态管理,不会扫描每个密钥 Current updateStateByKey 在每个批处理间隔内扫描每个密钥,即使该密钥没有数据也是如此。虽然此语义在某些工作负载中很有用,但大多数工作负载只需要“”扫描和更新有新数据的状态。在每个批次间隔内,只需要触及所有状态的一小部分。The cogroup-based implementation of updateStateByKey is not designed for this; cogroup scans all the keys every time. In fact, this causes the batch processing times of updateStateByKey to increase with the number of keys in the state, even if the data rate stays fixed. enter image description here


答案 2

推荐