spark Map-Side join详解

spark | 2019-09-16 09:15:46

join两个或多个数据集是对数据进行最广泛使用的操作之一,但在分布式系统中,这可能是一个令人头痛的问题。通常,由于数据分布在许多节点中,因此必须在连接之前对它们进行混洗(shuffled ),这会导致明显的网络I / O和性能降低。

幸运的是,如果您需要使用相对较小的表(dimensions 表)join大型表(fact表),即执行星型模式连接,您可以避免通过网络发送大型表的所有数据。这种类型的连接在Hadoop社区中称为映射端连接(map-side)。在其他分布式系统中,它通常称为复制或广播连接(broadcast join)。

让我们使用以下示例数据(一个fact和dimensions 表)

// Fact table
val flights = sc.parallelize(List(
  ("SEA", "JFK", "DL", "418",  "7:00"),
  ("SFO", "LAX", "AA", "1250", "7:05"),
  ("SFO", "JFK", "VX", "12",   "7:05"),
  ("JFK", "LAX", "DL", "424",  "7:10"),
  ("LAX", "SEA", "DL", "5737", "7:10")))  
   
// Dimension table
val airports = sc.parallelize(List(
  ("JFK", "John F. Kennedy International Airport", "New York", "NY"),
  ("LAX", "Los Angeles International Airport", "Los Angeles", "CA"),
  ("SEA", "Seattle-Tacoma International Airport", "Seattle", "WA"),
  ("SFO", "San Francisco International Airport", "San Francisco", "CA")))
   
// Dimension table
val airlines = sc.parallelize(List(
  ("AA", "American Airlines"), 
  ("DL", "Delta Airlines"), 
  ("VX", "Virgin America")))   

 

我们需要连接fact和dimensions 表以获得以下结果:

Seattle           New York       Delta Airlines       418   7:00
San Francisco     Los Angeles    American Airlines    1250  7:05
San Francisco     New York       Virgin America       12    7:05
New York          Los Angeles    Delta Airlines       424   7:10
Los Angeles       Seattle        Delta Airlines       5737  7:10

 

fact表非常大,而dimensions 表通常很小。让我们将dimensions 表下载到Spark driver端,创建map并将它们广播到每个工作节点:

val airportsMap = sc.broadcast(airports.map{case(a, b, c, d) => (a, c)}.collectAsMap)
val airlinesMap = sc.broadcast(airlines.collectAsMap)

 

现在你可以进行map-side join

flights.map{case(a, b, c, d, e) => 
   (airportsMap.value.get(a).get, 
    airportsMap.value.get(b).get, 
    airlinesMap.value.get(c).get, d, e)}.collect

 

执行结果(格式化):

res: Array[(String, String, String, String, String)] = Array(
  (Seattle, New York, Delta Airlines, 418, 7:00), 
  (San Francisco, Los Angeles, American Airlines, 1250, 7:05), 
  (San Francisco, New York, Virgin America, 12, 7:05), 
  (New York, Los Angeles, Delta Airlines, 424, 7:10), 
  (Los Angeles, Seattle, Delta Airlines, 5737, 7:10))

 

运行原理:

首先,我们为每个表创建了一个RDD。airports和airlines是我们将进行map-side join的维度表,因此我们将它们转换为map并广播到每个执行节点executor。请注意,我们只从airports表中提取了2列。

然后,我们只是对flights表使用map函数,从airportsMap和airlinesMap表检索到维度值。如果flights表非常大,map函数则将在airportsMap和airlinesMap每个分区同时执行。

这种方法不对fact表进行shuffle操作,并获得相当好的join性能。

 

登录后即可回复 登录 | 注册
    
关注编程学问公众号