1. 首页
  2. 大数据
  3. Spark教程

【Spark教程】(七)Spark共享变量: 广播变量和累加器

Spark是集群部署的,具有很多节点,节点之间的运算是相互独立的,Spark会自动把闭包中所有引用到的变量发送到每个工作节点上。

虽然很方便,但有时也很低效,比如你可能会在多个并行操作中使用同一个变量,而Spark每次都要把它分别发送给每个节点。所以共享变量的存在是很有必要的。

另外,分发到每个节点也可能导致某些需求出现问题,比如累加操作,读者可以先去试一下,会发现是有问题的,因此我们需要累加器的存在。

累加器


讲概念之前先演示一个案例:该案例需求是累加count,对于每个X都进行一次count=count+1,代码毫无疑问是正确的,但是却没有得到正确的结果,为什么呢?

scala> val rdd = sc.parallelize(1 to 10,3)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[10] at parallelize at <console>:24

scala> var count = 0
count: Int = 0

scala> rdd.map(x=> { count=count+1;println("x: "+x+" count: "+count) }).collect()
x: 1 count: 1
x: 2 count: 2
x: 3 count: 3
x: 4 count: 1
x: 5 count: 2
x: 6 count: 3
x: 7 count: 1
x: 8 count: 2
x: 9 count: 3
x: 10 count: 4
res19: Array[Unit] = Array((), (), (), (), (), (), (), (), (), ())

原因:大数据操作几乎都是并行的,分节点的,分区的,在此例中我们分了三个区。但是集群中每个节点的运算时独立的,每个运行的任务都会得到该变量的一份新的副本,更新这些副本的值不会影响驱动器中的对应变量

所以我们需要一个共享变量:累加器。累加器的作用就是多个节点之间共享一个变量。它将工作节点的值聚合到驱动器程序。

使用方法:

scala> val rdd = sc.parallelize(1 to 10,3)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24

scala> val accum = sc.longAccumulator("Count Add")
accum: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 0, name: Some(Count Add), value: 0)

scala> rdd.foreach(x => accum.add(x))

scala> accum.value
res3: Long = 55
  • 在驱动器中调用 SparkContext中的Accumulato相关方法创建累加器,并给它定义name,方便在Web UI中查看。

def doubleAccumulator(name: String): DoubleAccumulator
Create and register a double accumulator, which starts with 0 and accumulates inputs by add.

def longAccumulator(name: String): LongAccumulator
Create and register a long accumulator, which starts with 0 and accumulates inputs by add

  • Spark闭包里的执行器代码可以使用累加器的add方法增加累加器的值。

def add(v: Long): Unit
Adds v to the accumulator, i.e.

  • 驱动器程序可以调用累加器的 value 属性来访问累加器的值。

def value: Long
Defines the current value of this accumulator

  • WEB UI中可以查看累加进度,跟踪UI中的累加器对于理解运行阶段的进度很有用
    WEB UI

注意:本案例基于2.2.0版本。2.0以下版本的使用方法不同,直接用sc.accumulator(0),并使用+=累加,请自行参考API。

广播变量


广播变量就是要解决我们前言中提到的,假如我们多个并行操作会用到同一个变量,而Spark每次都将这个变量自动分发到每个节点,如果变量很大,那么会很低效。我们可以引入一个广播变量,它可以让程序高效的给所有工作节点发送一个较大的可读值,而不是每个任务保存一份拷贝。这样该变量不会多次发送到各节点,提高了效率。

使用方法:使用sparkContext的broadcast()创建广播变量。使用value属性访问广播值。使用unpersist()清除广播变量。

def broadcast[T](value: T)(implicit arg0: ClassTag[T]): Broadcast[T]
Broadcast a read-only variable to the cluster, returning org.apache.spark.broadcast.Broadcast object for reading it in distributed functions. The variable will be sent to each cluster only once.
value:value to broadcast to the Spark nodes
returns:Broadcast object, a read-only variable cached on each machine

scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(3)

scala> broadcastVar.value
res6: Array[Int] = Array(1, 2, 3)

BDStar原创文章。发布者:Liuyanling,转载请注明出处:http://bigdata-star.com/archives/915

发表评论

登录后才能评论

联系我们

562373081

在线咨询:点击这里给我发消息

邮件:562373081@qq.com

工作时间:周一至周五,9:30-18:30,节假日休息

QR code