Spark实现 join 操作 的方式

spark | 2019-09-16 08:45:58

        如何选择算法和实现思路,还要看具体情况,小表和小表join就不用说了,如果是大表和小表join,就可以选择把小表先加载到内存,提高效率。如果是大表和大表join,可以用key来匹配,这就要求两个大表都是key-value存储。

        上面两种情况就是spark实现大数据join操作的两个算法,map-side join和reduce-side join。

 

1.Map-Side Join

  • Map-side Join使用场景是一个大表和一个小表的连接操作,其中,“小表”是指文件足够小,可以加载到内存中。该算法可以将join算子执行在Map端,无需经历shuffle和reduce等阶段,因此效率非常高。
  • 在Hadoop MapReduce中, map-side join是借助DistributedCache实现的。DistributedCache可以帮我们将小文件分发到各个节点的Task工作目录下,这样,我们只需在程序中将文件加载到内存中(比如保存到Map数据结构中),然后借助Mapper的迭代机制,遍历另一个大表中的每一条记录,并查找是否在小表中,如果在则输出,否则跳过。
  • 在Apache Spark中,同样存在类似于DistributedCache的功能,称为“广播变量”(Broadcast variable)。其实现原理与DistributedCache非常类似,但提供了更多的数据/文件广播算法,包括高效的P2P算法,该算法在节点数目非常多的场景下,效率远远好于DistributedCache这种基于HDFS共享存储的方式。使用MapReduce DistributedCache时,用户需要显示地使用File API编写程序从本地读取小表数据,而Spark则不用,它借助Scala语言强大的函数闭包特性,可以隐藏数据/文件广播过程,让用户编写程序更加简单。

适用于一个数据集小,另一个数据集大的情况

package spark.examples.join  
  
import org.apache.spark.{SparkContext, SparkConf}  
import org.apache.spark.SparkContext._  
  
object SparkMapsideJoin {  
  def main(args: Array[String]) {  
    val conf = new SparkConf()  
    conf.setAppName("SparkMapsideJoin")  
    conf.setMaster("local[3]")  
    conf.set("spark.shuffle.manager", "sort");  
    val sc = new SparkContext(conf)  
  
    //val table1 = sc.textFile(args(1))  
    //val table2 = sc.textFile(args(2))  
  
    val table1 = sc.parallelize(List("k1,v11", "k2,v12", "k3,v13"))  
    val table2 = sc.parallelize(List("k1,v21", "k4,v24", "k3,v23"))  
    // table1 is smaller, so broadcast it as a map<String, String>  
    val pairs = table1.map { x =>  
      val pos = x.indexOf(',')  
      (x.substring(0, pos), x.substring(pos + 1))  
    }.collectAsMap  
    val broadCastMap = sc.broadcast(pairs) //save table1 as map, and broadcast it  
  
    // table2 join table1 in map side  
    val result = table2.map { x =>  
      val pos = x.indexOf(',')  
      (x.substring(0, pos), x.substring(pos + 1))  
    }.mapPartitions({ iter =>  
      val m = broadCastMap.value  
      for {  
        (key, value) <- iter  
        if (m.contains(key))  
      } yield (key, (value, m.get(key).getOrElse("")))  
    })  
  
    val output = "d:/wordcount-" + System.currentTimeMillis() ;  
    result.saveAsTextFile(output) //save result to local file or HDFS  
  }  
}  

2.Reduce Side Join

  • 当两个文件/目录中的数据非常大,难以将某一个存放到内存中时,Reduce-side Join是一种解决思路。该算法需要通过Map和Reduce两个阶段完成,在Map阶段,将key相同的记录划分给同一个Reduce Task(需标记每条记录的来源,便于在Reduce阶段合并),在Reduce阶段,对key相同的进行合并。
  • Spark提供了Join算子,可以直接通过该算子实现reduce-side join,但要求RDD中的记录必须是pair,即RDD[KEY, VALUE],

 

适用于两个join表数据量都很大的情况

package spark.examples.join  
  
import org.apache.spark.{SparkContext, SparkConf}  
import org.apache.spark.SparkContext._  
  
object SparkReducesideJoin {  
  def main(args: Array[String]) {  
    val conf = new SparkConf()  
    conf.setAppName("SparkMapsideJoin")  
    conf.setMaster("local[3]")  
    conf.set("spark.shuffle.manager", "sort");  
    val sc = new SparkContext(conf)  
  
    val table1 = sc.parallelize(List("k1,v11", "k2,v12", "k3,v13"))  
    val table2 = sc.parallelize(List("k1,v21", "k4,v24", "k3,v23"))  
    //table1 and table 2 are both very large  
    val pairs1 = table1.map { x =>  
      val pos = x.indexOf(',')  
      (x.substring(0, pos), x.substring(pos + 1))  
    }  
  
    val pairs2 = table2.map { x =>  
      val pos = x.indexOf(',')  
      (x.substring(0, pos), x.substring(pos + 1))  
    }  
    val result = pairs1.join(pairs2)  
    val output = "d:/wordcount-" + System.currentTimeMillis();  
    result.saveAsTextFile(output) //save result to local file or HDFS  
  }  
}  

 

登录后即可回复 登录 | 注册
    
关注编程学问公众号