如何选择算法和实现思路,还要看具体情况,小表和小表join就不用说了,如果是大表和小表join,就可以选择把小表先加载到内存,提高效率。如果是大表和大表join,可以用key来匹配,这就要求两个大表都是key-value存储。
上面两种情况就是spark实现大数据join操作的两个算法,map-side join和reduce-side join。
1.Map-Side Join
- Map-side Join使用场景是一个大表和一个小表的连接操作,其中,“小表”是指文件足够小,可以加载到内存中。该算法可以将join算子执行在Map端,无需经历shuffle和reduce等阶段,因此效率非常高。
- 在Hadoop MapReduce中, map-side join是借助DistributedCache实现的。DistributedCache可以帮我们将小文件分发到各个节点的Task工作目录下,这样,我们只需在程序中将文件加载到内存中(比如保存到Map数据结构中),然后借助Mapper的迭代机制,遍历另一个大表中的每一条记录,并查找是否在小表中,如果在则输出,否则跳过。
- 在Apache Spark中,同样存在类似于DistributedCache的功能,称为“广播变量”(Broadcast variable)。其实现原理与DistributedCache非常类似,但提供了更多的数据/文件广播算法,包括高效的P2P算法,该算法在节点数目非常多的场景下,效率远远好于DistributedCache这种基于HDFS共享存储的方式。使用MapReduce DistributedCache时,用户需要显示地使用File API编写程序从本地读取小表数据,而Spark则不用,它借助Scala语言强大的函数闭包特性,可以隐藏数据/文件广播过程,让用户编写程序更加简单。
适用于一个数据集小,另一个数据集大的情况
package spark.examples.join
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.SparkContext._
object SparkMapsideJoin {
def main(args: Array[String]) {
val conf = new SparkConf()
conf.setAppName("SparkMapsideJoin")
conf.setMaster("local[3]")
conf.set("spark.shuffle.manager", "sort");
val sc = new SparkContext(conf)
//val table1 = sc.textFile(args(1))
//val table2 = sc.textFile(args(2))
val table1 = sc.parallelize(List("k1,v11", "k2,v12", "k3,v13"))
val table2 = sc.parallelize(List("k1,v21", "k4,v24", "k3,v23"))
// table1 is smaller, so broadcast it as a map<String, String>
val pairs = table1.map { x =>
val pos = x.indexOf(',')
(x.substring(0, pos), x.substring(pos + 1))
}.collectAsMap
val broadCastMap = sc.broadcast(pairs) //save table1 as map, and broadcast it
// table2 join table1 in map side
val result = table2.map { x =>
val pos = x.indexOf(',')
(x.substring(0, pos), x.substring(pos + 1))
}.mapPartitions({ iter =>
val m = broadCastMap.value
for {
(key, value) <- iter
if (m.contains(key))
} yield (key, (value, m.get(key).getOrElse("")))
})
val output = "d:/wordcount-" + System.currentTimeMillis() ;
result.saveAsTextFile(output) //save result to local file or HDFS
}
}
2.Reduce Side Join
- 当两个文件/目录中的数据非常大,难以将某一个存放到内存中时,Reduce-side Join是一种解决思路。该算法需要通过Map和Reduce两个阶段完成,在Map阶段,将key相同的记录划分给同一个Reduce Task(需标记每条记录的来源,便于在Reduce阶段合并),在Reduce阶段,对key相同的进行合并。
- Spark提供了Join算子,可以直接通过该算子实现reduce-side join,但要求RDD中的记录必须是pair,即RDD[KEY, VALUE],
适用于两个join表数据量都很大的情况
package spark.examples.join
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.SparkContext._
object SparkReducesideJoin {
def main(args: Array[String]) {
val conf = new SparkConf()
conf.setAppName("SparkMapsideJoin")
conf.setMaster("local[3]")
conf.set("spark.shuffle.manager", "sort");
val sc = new SparkContext(conf)
val table1 = sc.parallelize(List("k1,v11", "k2,v12", "k3,v13"))
val table2 = sc.parallelize(List("k1,v21", "k4,v24", "k3,v23"))
//table1 and table 2 are both very large
val pairs1 = table1.map { x =>
val pos = x.indexOf(',')
(x.substring(0, pos), x.substring(pos + 1))
}
val pairs2 = table2.map { x =>
val pos = x.indexOf(',')
(x.substring(0, pos), x.substring(pos + 1))
}
val result = pairs1.join(pairs2)
val output = "d:/wordcount-" + System.currentTimeMillis();
result.saveAsTextFile(output) //save result to local file or HDFS
}
}