- 为什么需要broadcast类型变量 ,它有哪些优点?
- spark中怎样创建和使用broadcast类型变量 ?
- spark中的具体实现
1. 为什么需要broadcast类型变量 ?
各个slave端都需要同一个数据,并且只有读取操作
例如: 一个object
对象,一个map
或者bloomFilter
等
broadcast类型变量和传输一个可序列化的变量的区别 ?
- broadcast类型变量可以保证只在executor的内存中存在一份
- 将要传输的变量不需要实现
Serializable
接口 - 可以高效地传输较大的数据集
以上3点可以在下面的实现中看到
2. spark中怎样创建和使用broadcast ?
具体的示例在Broadcast.scala的comment中,其中需要注意的是,一旦一个broadcast
初始化好了,今后对它的值的访问只能通过broadcast
间接访问,这也是一个wrapper
的使用模式,例如:
scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)
scala> broadcastVar.value
res0: Array[Int] = Array(1, 2, 3)
3. spark中的具体实现
1. broadcast类型具体的方法
1. 创建
sc.broadcast()
2. 读取
value
3. 销毁
- 可重建的销毁:
unpersist
- 不可重建的销毁:
destroy
2. 实现机制
目前spark中只有一种实现 TorrentBroadcast.scala
具体机制如下:
driver端:
将序列化过的对象分成小块,存放在driver端的
BlockManager
里
executor端:
- executor首先从自己的
BlockManager
去拿,如果有就直接用,如果没有执行2步- 从driver/其它executor端拉取对象的小块放入自己的
BlockManager
供自己和其它的executor使用/拉取
这样的好处是避免了driver端因为发送数据给每个executor而造成热点问题
TorrentBroadcast 实现摘要
TorrentBroadcast#value
调用顺序:
getValue
外部接口readBroadcastBlock
将读入的blocks拼成obj放入blockManager
readBlocks
将块读取入blockManager
broadcast变量的块的形成:
只会在driver端形成,调用顺序
private val numBlocks: Int = writeBlocks(obj)
writeBlocks
将对象分成块
executor端也会有取numBlocks
这个变量,但由于不是transient
的,就不会再调用writeBlocks
ps:
writeBlocks
: 将对象切分为块,返回块的个数
每块的id为: broadcast_1_piece2 (其中1表示broadcast的id,2表示第几个块),由于数据是在driver端收集的,然后其他executor通过byte流来拉取数据,这样就不需要broadcast的变量实现Serializable
接口
4.broadcast相关配置
观察spark的日志输出经常会看到 auto braocast,braocast确实是自动的,你也可以手动,也可以通过配置来指定。
广播表的最大大小被这个参数所控制 默认10M
spark.sql.autoBroadcastJoinThreshold
当小于该配置是,小表会被自动的广播出去。(参考的2.4.1版本)
当大于改配置时,需要显示的广播下。