join两个或多个数据集是对数据进行最广泛使用的操作之一,但在分布式系统中,这可能是一个令人头痛的问题。通常,由于数据分布在许多节点中,因此必须在连接之前对它们进行混洗(shuffled ),这会导致明显的网络I / O和性能降低。
幸运的是,如果您需要使用相对较小的表(dimensions 表)join大型表(fact表),即执行星型模式连接,您可以避免通过网络发送大型表的所有数据。这种类型的连接在Hadoop社区中称为映射端连接(map-side)。在其他分布式系统中,它通常称为复制或广播连接(broadcast join)。
让我们使用以下示例数据(一个fact和dimensions 表)
// Fact table
val flights = sc.parallelize(List(
("SEA", "JFK", "DL", "418", "7:00"),
("SFO", "LAX", "AA", "1250", "7:05"),
("SFO", "JFK", "VX", "12", "7:05"),
("JFK", "LAX", "DL", "424", "7:10"),
("LAX", "SEA", "DL", "5737", "7:10")))
// Dimension table
val airports = sc.parallelize(List(
("JFK", "John F. Kennedy International Airport", "New York", "NY"),
("LAX", "Los Angeles International Airport", "Los Angeles", "CA"),
("SEA", "Seattle-Tacoma International Airport", "Seattle", "WA"),
("SFO", "San Francisco International Airport", "San Francisco", "CA")))
// Dimension table
val airlines = sc.parallelize(List(
("AA", "American Airlines"),
("DL", "Delta Airlines"),
("VX", "Virgin America")))
我们需要连接fact和dimensions 表以获得以下结果:
Seattle New York Delta Airlines 418 7:00
San Francisco Los Angeles American Airlines 1250 7:05
San Francisco New York Virgin America 12 7:05
New York Los Angeles Delta Airlines 424 7:10
Los Angeles Seattle Delta Airlines 5737 7:10
fact表非常大,而dimensions 表通常很小。让我们将dimensions 表下载到Spark driver端,创建map并将它们广播到每个工作节点:
val airportsMap = sc.broadcast(airports.map{case(a, b, c, d) => (a, c)}.collectAsMap)
val airlinesMap = sc.broadcast(airlines.collectAsMap)
现在你可以进行map-side join
flights.map{case(a, b, c, d, e) =>
(airportsMap.value.get(a).get,
airportsMap.value.get(b).get,
airlinesMap.value.get(c).get, d, e)}.collect
执行结果(格式化):
res: Array[(String, String, String, String, String)] = Array(
(Seattle, New York, Delta Airlines, 418, 7:00),
(San Francisco, Los Angeles, American Airlines, 1250, 7:05),
(San Francisco, New York, Virgin America, 12, 7:05),
(New York, Los Angeles, Delta Airlines, 424, 7:10),
(Los Angeles, Seattle, Delta Airlines, 5737, 7:10))
运行原理:
首先,我们为每个表创建了一个RDD。airports和airlines是我们将进行map-side join的维度表,因此我们将它们转换为map并广播到每个执行节点executor。请注意,我们只从airports表中提取了2列。
然后,我们只是对flights表使用map函数,从airportsMap和airlinesMap表检索到维度值。如果flights表非常大,map函数则将在airportsMap和airlinesMap每个分区同时执行。
这种方法不对fact表进行shuffle操作,并获得相当好的join性能。