spark broadcast广播变量详解

spark | 2020-07-27 09:24:35
  1. 为什么需要broadcast类型变量 ,它有哪些优点?
  2. spark中怎样创建和使用broadcast类型变量 ?
  3. spark中的具体实现

1. 为什么需要broadcast类型变量 ?

各个slave端都需要同一个数据,并且只有读取操作
例如: 一个object对象,一个map或者bloomFilter

broadcast类型变量和传输一个可序列化的变量的区别 ?

  1. broadcast类型变量可以保证只在executor的内存中存在一份
  2. 将要传输的变量不需要实现Serializable接口
  3. 可以高效地传输较大的数据集

以上3点可以在下面的实现中看到

2. spark中怎样创建和使用broadcast ?

具体的示例在Broadcast.scala的comment中,其中需要注意的是,一旦一个broadcast初始化好了,今后对它的值的访问只能通过broadcast间接访问,这也是一个wrapper的使用模式,例如:

scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)
scala> broadcastVar.value
res0: Array[Int] = Array(1, 2, 3)

3. spark中的具体实现

1. broadcast类型具体的方法

1. 创建

sc.broadcast()

2. 读取

value

3. 销毁

  1. 可重建的销毁: unpersist
  2. 不可重建的销毁: destroy

2. 实现机制

目前spark中只有一种实现 TorrentBroadcast.scala

具体机制如下:
driver端:

将序列化过的对象分成小块,存放在driver端的BlockManager

executor端:

  • executor首先从自己的BlockManager去拿,如果有就直接用,如果没有执行2步
  • 从driver/其它executor端拉取对象的小块放入自己的BlockManager 供自己和其它的executor使用/拉取

这样的好处是避免了driver端因为发送数据给每个executor而造成热点问题

TorrentBroadcast 实现摘要

TorrentBroadcast#value调用顺序:

  1. getValue 外部接口
  2. readBroadcastBlock 将读入的blocks拼成obj放入blockManager
  3. readBlocks 将块读取入blockManager

broadcast变量的块的形成:
只会在driver端形成,调用顺序

  1. private val numBlocks: Int = writeBlocks(obj)
  2. writeBlocks 将对象分成块

executor端也会有取numBlocks这个变量,但由于不是transient的,就不会再调用writeBlocks

ps:
writeBlocks: 将对象切分为块,返回块的个数
每块的id为: broadcast_1_piece2 (其中1表示broadcast的id,2表示第几个块),由于数据是在driver端收集的,然后其他executor通过byte流来拉取数据,这样就不需要broadcast的变量实现Serializable接口

 

 

4.broadcast相关配置

观察spark的日志输出经常会看到 auto braocast,braocast确实是自动的,你也可以手动,也可以通过配置来指定。

广播表的最大大小被这个参数所控制 默认10M

spark.sql.autoBroadcastJoinThreshold

当小于该配置是,小表会被自动的广播出去。(参考的2.4.1版本)
当大于改配置时,需要显示的广播下。

 

登录后即可回复 登录 | 注册
    
关注编程学问公众号