如何在火花流中更新广播变量?

我相信,我有一个相对常见的火花流用例:

我有一个对象流,我想根据一些参考数据进行过滤

最初,我认为使用广播变量实现这将是一件非常简单的事情:

public void startSparkEngine {
    Broadcast<ReferenceData> refdataBroadcast
      = sparkContext.broadcast(getRefData());

    final JavaDStream<MyObject> filteredStream = objectStream.filter(obj -> {
        final ReferenceData refData = refdataBroadcast.getValue();
        return obj.getField().equals(refData.getField());
    }

    filteredStream.foreachRDD(rdd -> {
        rdd.foreach(obj -> {
            // Final processing of filtered objects
        });
        return null;
    });
}

但是,尽管不经常,我的参考数据会定期更改

我的印象是,我可以在驱动程序上修改和重新广播我的变量,它将传播到每个工作线程,但是对象不是并且需要 。BroadcastSerializablefinal

我有什么替代方案?我能想到的三个解决方案是:

  1. 将参考数据查找移动到 一个或中,使其完全驻留在工作线程上。但是,参考数据存在于REST API中,因此我还需要以某种方式存储计时器/计数器,以阻止对流中的每个元素访问远程数据。forEachPartitionforEachRdd

  2. 每次引用数据更改时,使用新的广播变量重新启动 Spark 上下文。

  3. 将参考数据转换为RDD,然后以我现在流式传输的方式流式传输,尽管这将随每个对象一起提供参考数据。joinPair<MyObject, RefData>


答案 1

延伸答案 作者:@Rohan Aletty。下面是一个 BroadcastWrapper 的示例代码,它基于某些 ttl 刷新广播变量

public class BroadcastWrapper {

    private Broadcast<ReferenceData> broadcastVar;
    private Date lastUpdatedAt = Calendar.getInstance().getTime();

    private static BroadcastWrapper obj = new BroadcastWrapper();

    private BroadcastWrapper(){}

    public static BroadcastWrapper getInstance() {
        return obj;
    }

    public JavaSparkContext getSparkContext(SparkContext sc) {
       JavaSparkContext jsc = JavaSparkContext.fromSparkContext(sc);
       return jsc;
    }

    public Broadcast<ReferenceData> updateAndGet(SparkContext sparkContext){
        Date currentDate = Calendar.getInstance().getTime();
        long diff = currentDate.getTime()-lastUpdatedAt.getTime();
        if (var == null || diff > 60000) { //Lets say we want to refresh every 1 min = 60000 ms
            if (var != null)
               var.unpersist();
            lastUpdatedAt = new Date(System.currentTimeMillis());

            //Your logic to refresh
            ReferenceData data = getRefData();

            var = getSparkContext(sparkContext).broadcast(data);
       }
       return var;
   }
}

您的代码将如下所示:

public void startSparkEngine() {

    final JavaDStream<MyObject> filteredStream = objectStream.transform(stream -> {
        Broadcast<ReferenceData> refdataBroadcast = BroadcastWrapper.getInstance().updateAndGet(stream.context());

        stream.filter(obj -> obj.getField().equals(refdataBroadcast.getValue().getField()));
    });

    filteredStream.foreachRDD(rdd -> {
        rdd.foreach(obj -> {
        // Final processing of filtered objects
        });
        return null;
    });
}

这在多集群上也对我有用。希望这有帮助


答案 2

最近遇到了这个问题。认为它可能对scala用户有所帮助。

Scala的做事方式如下例所示。BroadCastWrapper

import java.io.{ ObjectInputStream, ObjectOutputStream }
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.streaming.StreamingContext
import scala.reflect.ClassTag

/* wrapper lets us update brodcast variables within DStreams' foreachRDD
 without running into serialization issues */
case class BroadcastWrapper[T: ClassTag](
 @transient private val ssc: StreamingContext,
  @transient private val _v: T) {

  @transient private var v = ssc.sparkContext.broadcast(_v)

  def update(newValue: T, blocking: Boolean = false): Unit = {

    v.unpersist(blocking)
    v = ssc.sparkContext.broadcast(newValue)
  }

  def value: T = v.value

  private def writeObject(out: ObjectOutputStream): Unit = {
    out.writeObject(v)
  }

  private def readObject(in: ObjectInputStream): Unit = {
    v = in.readObject().asInstanceOf[Broadcast[T]]
  }
}

每次需要调用更新函数才能获得新的广播变量。


推荐