1.SparkSession与SparkContext概念
SparkSession-Spark的一个全新的切入点
SparkSession是Spark 2.0引如的新概念。SparkSession为用户提供了统一的切入点,来让用户学习spark的各项功能。
在spark的早期版本中,SparkContext是spark的主要切入点,由于RDD是主要的API,我们通过sparkcontext来创建和操作RDD。对于每个其他的API,我们需要使用不同的context。例如,对于Streming,我们需要使用StreamingContext;对于sql,使用sqlContext;对于Hive,使用hiveContext。但是随着DataSet和DataFrame的API逐渐成为标准的API,就需要为他们建立接入点。所以在spark2.0中,引入SparkSession作为DataSet和DataFrame API的切入点,SparkSession封装了SparkConf、SparkContext和SQLContext。为了向后兼容,SQLContext和HiveContext也被保存下来。
SparkSession实质上是SQLContext和HiveContext的组合(未来可能还会加上StreamingContext),所以在SQLContext和HiveContext上可用的API在SparkSession上同样是可以使用的。SparkSession内部封装了sparkContext,所以计算实际上是由sparkContext完成的。
2.如何创建和使用SparkSession
创建SparkSession
SparkSession的设计遵循了工厂设计模式(factory design pattern),下面代码片段介绍如何创建 SparkSession
2.1 scala代码详解
val sparkSession = SparkSession.builder. master("local") .appName("spark session example") .getOrCreate()
上面代码类似于创建一个SparkContext,master设置为local,然后创建了一个SQLContext封装它。如果你想创建hiveContext,可以使用下面的方法来创建SparkSession,以使得它支持Hive:
val sparkSession = SparkSession.builder. master("local") .appName("spark session example") .enableHiveSupport() .getOrCreate()
enableHiveSupport 函数的调用使得SparkSession支持hive,类似于HiveContext。
使用SparkSession读取数据
创建完SparkSession之后,我们就可以使用它来读取数据,下面代码片段是使用SparkSession来从csv文件中读取数据:
val df = sparkSession.read.option("header","true").csv("src/main/resources/sales.csv")
上面代码非常像使用SQLContext来读取数据,我们现在可以使用SparkSession来替代之前使用SQLContext编写的代码。下面是完整的代码片段:
val sparkSession = SparkSession.builder. master("local") .appName("spark session example") .getOrCreate() val df = sparkSession.read.option("header","true").csv("src/main/resources/sales.csv") df.show() sparkSession.close
Spark 2.0现在还支持SQLContext和HiveContext吗?
并没有,Spark的设计是向后兼容的,所有SQLContext和HiveContext相关的API在Spark 2.0还是可以使用的。不过既然有SparkSession了,所以大家还是尽量在Spark 2.0中使用它。
2.2java代码案例
SparkSession spark = SparkSession.builder() .master("yourmaster") .appName("UnderstandingSparkSession") .config("spark.some.config.option", "config-value") .getOrCreate(); //当然你也可以通过SparkSession来创建SparkContext。 JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());