Spark SQL核心编程

Spark SQL核心编程

SparkSession

SparkSession是Spark最新的SQL查询起始点,实质上是SQLContext和HiveContext的组合。SparkSession内部封装了SparkContext,所以计算实际上是由SparkContext完成的。

引入相关pom依赖

1
2
3
4
5
6
7
8
9
10
11
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.1.2</version>
</dependency>
<!--使用SparkSession需要引入spark-sql相关依赖-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.1.2</version>
</dependency>

初始化SparkSession

1
2
3
4
5
6
7
8
9
10
// 初始化SparkSession
val sparkConf = new SparkConf()
val sparkSession: SparkSession = SparkSession
.builder()
.config(sparkConf)
.master("local[*]")
.appName("SparkDemo")
.getOrCreate()
// 获取SparkContext对象
val sc = sparkSession.sparkContext

DataFrame、DataSet和RDD

DataFrame是一种以RDD为基础的分布式数据集,类似于传统数据库中的二维表格。
DataFrame与RDD的主要区别在于DataFrame带有Schema元信息。
DataSet是DataFrame API的一个扩展,是SparkSQL最新的数据抽象。
DataSet是强类型的。比如DataSet[Person]。DataFrame是DataSet的特例,DataFrame = DataSet[Row]。这里的Row是一个类型,所有的表结构信息都要用Row来表示,获取数据时需要指定顺序。
如果同样的数据都给到这三个数据结构,他们分别计算之后,都会得到相同的结果。不同的是他们的执行效率和执行方式。在后期的Spark版本中,DataSet有可能逐步取代RDD和DataFrame成为唯一的API接口。

创建DataFrame

1
2
3
4
// 创建DataFrame
val studentDataFrame: DataFrame = sparkSession.read.json("datas/student.json")
// 查看数据。注意:df.show()只显示前20行记录。
studentDataFrame.show()

SQL语法

1
2
3
4
5
6
// 创建DataFrame
val studentDataFrame: DataFrame = sparkSession.read.json("datas/student.json")
// 1.创建视图
studentDataFrame.createGlobalTempView("student")
// 2.编写SQL查看数据。注意:这里需要带上数据库名
sparkSession.sql("select username,age from global_temp.student").show()

DSL语法

DataFrame 提供一个特定领域语言(domain-specific language, DSL)去管理结构化的数据。使用 DSL 语法风格不必创建临时视图。

1
2
3
4
5
6
// 创建DataFrame
val studentDataFrame: DataFrame = sparkSession.read.json("datas/student.json")
// 1.使用DSL语言。
studentDataFrame.select("username", "age").show()
// 查看Schema信息
studentDataFrame.printSchema()

RDD转DataFrame

1
2
3
4
5
6
7
8
9
10
11
  // 创建RDD
val rdd: RDD[(String, Int)] = sparkSession.sparkContext.makeRDD(List(("zhangsan",20),("lisi",21)))
// 注意:这里的 SparkSession是创建的 sparkSession 对象的变量名称,所以必须先创建 SparkSession 对象再导入。
import sparkSession.implicits._
// 1.RDD转DataFrame。通过case class实现(实际开发更常用)。
rdd.map(student => Student(student._1,student._2)).toDF().show()
// 2.也可以直接指定字段名
val dataFrame1 = rdd.toDF("username", "age")

// Student样例类
case class Student(name:String,age:Int)

DataFrame转RDD

DataFrame其实就是对RDD的封装,所以可以直接获取内部RDD

1
2
3
4
5
// 创建DataFrame
val dataFrame = sparkSession.read.json("datas/Student.json")
// DataFrame转RDD
val rdd = dataFrame.rdd
rdd.foreach(println)

创建DataSet

可以使用样例类序列创建或者使用基本类型的序列创建DataSet。
实际开发过程中,更多的是通过RDD来得到DataSet。

1
2
3
4
5
6
7
import sparkSession.implicits._
// 1.使用样例类序列创建DataSet
val dataSet = Seq(Student("zhangsan", 18), Student("lisi", 17)).toDS()
dataSet.printSchema()
// 2.使用基本类型序列创建DataSet
val dataSet1 = Seq(1, 2, 3, 4, 5).toDS()
dataSet1.printSchema()

DataSet转RDD

DataSet其实也是对RDD的封装,所以也可以直接获取内部的RDD

1
2
3
// DataSet转RDD
val rdd = dataSet.rdd
rdd.collect().foreach(println)

RDD转DataSet

SparkSQL能够自动将包含case类的RDD转换成DataSet,case类定义了table的结构,case类属性通过反射变成了表的列名。case类可以包含Array或者Seq等复杂结构。

1
2
3
4
5
6
import sparkSession.implicits._
// 创建RDD
val rdd = sparkSession.sparkContext.makeRDD(List(("zhangsan", 15), ("lisi", 16)))
// 通过样例类将RDD转为DataSet
val dataSet = rdd.map(student => Student(student._1, student._2)).toDS()
dataSet.show()

DataSet转DataFrame

DataFrame实际上就是DataSet的特例,可以直接转换。

1
2
3
4
5
6
import sparkSession.implicits._
// 创建DataSet
val dataSet = Seq(Student("zhangsan",18), Student("lisi",10)).toDS()
// dataSet转DataFrame
val dataFrame = dataSet.toDF()
dataFrame.show()

DataFrame转DataSet

通过样例类进行转换。

1
2
3
4
5
6
7
8
9
10
11
12
import sparkSession.implicits._
// 创建DataFrame
val dataFrame = sparkSession.read.json("datas/Student.json")
// 通过样例类进行转换。注意:遇到数字类型,系统会自动地将其作为bigint来处理。后续如果把这些变量装进int型,则会抛出异常。
val dataSet1 = dataFrame.as[Student]
dataSet1.show()
// 或者直接指定字段名实现转换
val dataSet2 = dataFrame.as("username,age")
dataSet2.show()

// 这里将age由Int类型变为Long类型,防止DataFrame转DataSet时系统报错
case class Student(username: String, age: Long)
打赏
  • 版权声明: 本博客所有文章除特别声明外,著作权归作者所有。转载请注明出处!
  • Copyrights © 2015-2023 henrrywan

请我喝杯咖啡吧~

支付宝
微信