由于spark streaming中使用线程池来解决spark job串行导致的并发问题,但createOrReplaceTempView由会引起冲突,那么就只有来探究一下createOrReplaceTempView和createGlobalTempView.
在讲解 createOrReplaceTempView 和createGlobalTempView的区别前,先了解下Spark Application 和 Spark Session区别 。
1.Spark Application
Spark Application 使用:
1.1 针对单个批处理作业
1.2 多个job通过session交互式
1.3 不断满足请求的,长期存在的server
1.4 一个Spark job 可以包含多个map和reduce
1.5 Spark Application 可以包含多个session实例
2.Spark Session
SparkSession与Spark应用程序相关联:
2.1 session 是两个或更多实体之间的交互媒介
2.2 在Spark 2.0中,你可以使用SparkSession创建
2.3 可以在不创建SparkConf,SparkContext或SQLContext的情况下创建SparkSession(它们封装在SparkSession中)
3. createOrReplaceTempView使用
3.1 创建临时视图
createOrReplaceTempView:创建临时视图,此视图的生命周期与用于创建此数据集的[SparkSession]相关联。
createGlobalTempView:创建全局临时视图,此时图的生命周期与Spark Application绑定。
df.createOrReplaceTempView("tempViewName") df.createGlobalTempView("tempViewName")
createOrReplaceTempView(): 创建或替换本地临时视图。
3.2 删除临时视图
此视图的生命周期依赖于SparkSession类,如果想drop此视图可采用dropTempView删除
spark.catalog.dropTempView("tempViewName")
或者 stop() 来停掉 session,因为他是和session生命周期同步的。
self.ss = SparkSession(sc) ... self.ss.stop()
4.createGlobalTempView使用
4.1 创建及作用
createGlobalTempView():创建全局临时视图。可以在不同sessions 之间共享数据
4.2 删除视图
这种视图的生命周期取决于spark application本身。如果想drop此视图可采用dropGlobalTempView删除
spark.catalog.dropGlobalTempView("tempViewName")
或者stop() 将停止
ss = SparkContext(conf=conf, ......) ... ss.stop()
4.3使用方式详解
注:Spark 2.1.0版本中引入了Global temporary views 。
当您希望在不同sessions 之间共享数据并保持活动直到application结束时,此功能非常有用。
为了说明createTempView和createGlobalTempView的用法,展现实例如下:
object NewSessionApp { def main(args: Array[String]): Unit = { val logFile = "data/README.md" // Should be some file on your system val spark = SparkSession. builder. appName("Simple Application"). master("local"). getOrCreate() val logData = spark.read.textFile(logFile).cache() logData.createGlobalTempView("logdata") spark.range(1).createTempView("foo") // within the same session the foo table exists println("""spark.catalog.tableExists("foo") = """ + spark.catalog.tableExists("foo")) //spark.catalog.tableExists("foo") = true // for a new session the foo table does not exists val newSpark = spark.newSession println("""newSpark.catalog.tableExists("foo") = """ + newSpark.catalog.tableExists("foo")) //newSpark.catalog.tableExists("foo") = false //both session can access the logdata table spark.sql("SELECT * FROM global_temp.logdata").show() newSpark.sql("SELECT * FROM global_temp.logdata").show() spark.stop() } }