Spark三大数据结构之RDD

Spark三大数据结构之RDD

基本概念

RDD:弹性分布式数据集。是Spark中最基本的数据处理模型。

所谓弹性是指:
存储弹性:内存与磁盘自动切换
容错弹性:数据丢失可以自动恢复
计算弹性:计算出错重试机制
分片弹性:可以根据需要重新分片

所谓数据集是指:RDD封装了计算逻辑,并不保存数据。

注意:RDD是不可变的,想要改变,只能产生新的RDD,在新的RDD里面封装计算逻辑。

五大核心属性

  • 分区列表:RDD数据结构中存在分区列表,用于执行任务时并行计算,是实现分布式计算的重要属性。
  • 分区计算函数:Spark在计算时,使用分区函数对每一个分区进行计算。
  • 依赖关系:RDD是计算模型的封装,当需求中需要将多个计算模型进行组合时,就需要将多个RDD建立依赖关系。
  • 分区器:当数据为KV类型时,可以通过分区器来自定义数据分区。
  • 首选位置:也叫优先位置。计算数据时,可以根据计算节点的状态选择不同的节点位置进行计算。

RDD源码:

1
2
3
4
5
6
protected def getPartitions: Array[Partition]
@DeveloperApi
def compute(split: Partition, context: TaskContext): Iterator[T]
protected def getDependencies: Seq[Dependency[_]] = deps
@transient val partitioner: Option[Partitioner] = None
protected def getPreferredLocations(split: Partition): Seq[String] = Nil

自定义分区器实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import org.apache.spark._

object Test {
def main(args: Array[String]) = {
// SparkContext初始化
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkWordCountDemo")
val sc = new SparkContext(sparkConf)
// 加载数据源
val rdd = sc.makeRDD(List(("flink", "1.12"), ("flink", "1.13"), ("spark", "2.4"), ("spark", "3.0"), ("hadoop", "3.0")))
// 调用自定义分区器
val myPartitionRDD = rdd.partitionBy(new MyPartitioner)
// 输出
myPartitionRDD.saveAsTextFile("output")
// 关闭Spark连接
sc.stop()
}

/**
* 自定义分区器
* 1.继承Partitioner
* 2.重写方法
*/
class MyPartitioner extends Partitioner {
// 自定义三个分区
override def numPartitions: Int = 3

// 根据key值设置分区索引
override def getPartition(key: Any): Int = {
key match {
case "flink" => 0
case "spark" => 1
case _ => 2
}
}
}

}

创建方式

1
2
3
4
5
6
7
8
9
10
11
// SparkContext初始化
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkDemo")
val sc = new SparkContext(sparkConf)
// 1.从集合中创建
val rdd1: RDD[Int] = sc.parallelize(List(1, 2, 3, 4))
// 2.从数据文件中创建
val rdd2: RDD[String] = sc.textFile("...")
// 3.从其他RDD中创建(转换)
val rdd3: RDD[String] = rdd2.map(_ + 1)
// 关闭Spark连接
sc.stop()

transformations操作

  • map:将数据逐条进行映射转换,这里的转换可以是类型的转换,也可以是值的转换。
  • mapPartition:将数据以分区为单位发送到计算节点进行处理,这里的处理指任意处理,包括数据过滤。
  • flatMap:将数据进行扁平化后再进行映射处理。
  • groupBy:将数据根据指定的规则进行分组,分区默认不变,但是数据会被打乱重新组合。这样的操作称之为Shuffle。极端情况下,数据可能会被分到同一个分区。
  • filter:将数据根据指定的规则进行筛选过滤,符合规则的数据保留。当数据进行筛选过滤之后,分区不变,但是分区内的数据可能不均衡,生产环境下,可能会出现数据倾斜。
  • distinct:将数据去重。
  • coalesce:根据数据量缩减分区,用于大数据集过滤后,提高小数据集的执行效率。
  • repartition:该操作内部其实执行的就是coalesce操作,参数Shuffle的默认值为true。
  • sortBy:将数据进行排序。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// map算子。将所有元素乘以2
val mapRDD: RDD[Int] = rdd.map(_ * 2)
mapRDD.foreach(println)

// mapPartition算子。以分区为单位。将所有元素加1
val mapPartitionsRDD = rdd.mapPartitions(datas => {
datas.map(_ + 1)
})
mapPartitionsRDD.foreach(println)

// groupBy算子。以奇偶数进行分组。
val groupByRDD = rdd.groupBy(_ % 2 == 0)
groupByRDD.foreach(println)

// filter算子。过滤掉奇数。
val filterRDD = rdd.filter(_ % 2 == 0)
filterRDD.foreach(println)

// distinct算子
val distinctRDD = rdd.distinct()
distinctRDD.foreach(println)

// coalesce算子
val coalesceRDD = rdd.coalesce(1)
// 这里以文件夹的形式保存,里面只有1个part文件
coalesceRDD.saveAsTextFile("./coalesceDir")

// repartition算子
val repartitionRDD = rdd.repartition(4)
// 这里以文件夹的形式保存,里面有4个part文件
repartitionRDD.saveAsTextFile("./repartitionDir")

// sortBy算子,默认升序。
val sortByRDD = rdd.sortBy(x => x, false, 2)
sortByRDD.foreach(println)

action操作

  • reduce:聚合RDD中的所有元素,先聚合分区内的数据,再聚合分区间的数据
  • collect:以数组的形式返回数据集的所有元素
  • count:返回RDD中元素的个数
  • first:返回RDD中第一个元素
  • take:返回RDD中排序后的前n个元素组成的数组
  • aggregate:分区内的数据通过初始值和分区内的数据进行聚合,然后再和初始值进行分区间的数据聚合
  • fold:折叠操作。aggregate的简化版操作
  • countByKey:统计每种key的个数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
// SparkContext初始化
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkWordCountDemo")
val sc = new SparkContext(sparkConf)
// 从集合中创建
val rdd: RDD[Int] = sc.parallelize(List(1, 2, 3, 4, 5))
// reduce算子
val res = rdd.reduce(_ + _)
println(res)
// collect算子
val array = rdd.collect()
array.foreach(println)
// count算子
val count = rdd.count()
println(count)
// first算子
val first = rdd.first()
println(first)
// take算子
val takeArr = rdd.take(3)
takeArr.foreach(println)
// aggregate算子,它是一个柯里化函数,存在两个参数列表
// 第一个参数列表表示初始值
// 第二个参数列表鼠标计算规则。分别指向分区间和分区内:第一个分区内元素相加然后和初始值聚合,第二个分区间相乘,然后和初始值聚合。
val repRDD = rdd.repartition(5)
// 假设分成5个分区,每个分区都只有一个元素,
// 第一次分区内:元素变成((1+2),(2+2),(3+2),(4+2),(5+2)),
// 第二次分区间:元素变成(3+4+5+6+7) + 2 = 27
val aggregateRes1 = repRDD.aggregate(0)(_ + _, _ + _)
val aggregateRes2 = repRDD.aggregate(2)(_ + _, _ + _)
println(aggregateRes1)
println(aggregateRes2)
// 分别指向分区间和分区内:第一次表示分区内元素相加然后和初始值聚合,第二次表示分区间相乘,然后和初始值聚合。
// 第一次分区内:(1+1) + (1+2) + (1+3) + (1+4) + (1+5) = 2+3+4+5+6
// 第二次分区间:(2+3+4+5+6) + 1 = 21
val foldRes = repRDD.fold(1)(_+_)
println(foldRes)
//countByKey算子
val rdd1 = sc.parallelize(List((1,"a"),(1,"b"),(2,"c"),(3,"d")))
val countByKeyRes = rdd1.countByKey()
countByKeyRes.foreach(println)
// 关闭Spark连接
sc.stop()

如何分辨

分辨一个操作是transformation操作还是action操作,可以根据返回值类型来直观判断,即transformation操作返回值皆为RDD,action则是表示计算结果的Int、String、Array、List类型返回值。
当然也存在例外,例如reduceByKey,虽然是action操作,但是返回的仍然是RDD。

依赖关系

窄依赖:每一个父RDD的Partition最多被子RDD的一个Partition使用。
宽依赖:同一个父RDD的Partition被多个子RDD的Partition使用。

任务划分

可以划分为四个部分:
Application:初始化一个SparkContext即生成一个Application
Job:一个action算子就会生成一个Job
Stage:Stage等于宽依赖的个数+1
Task:一个Stage阶段中,最后一个RDD的分区个数就是Task的个数。

RDD缓存

Cache缓存

1
2
3
4
5
6
7
8
9
// SparkContext初始化
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkWordCountDemo")
val sc = new SparkContext(sparkConf)
// 从集合中创建
val rdd: RDD[Int] = sc.parallelize(List(1, 2, 3, 4, 5))

val cache: rdd.type = rdd.cache()
// 关闭Spark连接
sc.stop()

Persist

1
2
3
4
5
6
7
8
9
// SparkContext初始化
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkWordCountDemo")
val sc = new SparkContext(sparkConf)
// 从集合中创建
val rdd: RDD[Int] = sc.parallelize(List(1, 2, 3, 4, 5))

val persist: rdd.type = rdd.persist(StorageLevel.MEMORY_ONLY)
// 关闭Spark连接
sc.stop()

Checkpoint

1
2
3
4
5
6
7
8
9
// SparkContext初始化
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkWordCountDemo")
val sc = new SparkContext(sparkConf)
// 从集合中创建
val rdd: RDD[Int] = sc.parallelize(List(1, 2, 3, 4, 5))

val check: Unit = rdd.checkpoint()
// 关闭Spark连接
sc.stop()

三者区别:
Cache是将数据缓存起来,临时存储在内存中进行数据重用。Persist可以将数据临时存储在内存或磁盘文件中,如果作业执行完毕,临时保存的数据文件会丢失。Checkpoint会将数据永久的保存在磁盘文件中,不会出现数据丢失。

打赏
  • 版权声明: 本博客所有文章除特别声明外,著作权归作者所有。转载请注明出处!
  • Copyrights © 2015-2023 henrrywan

请我喝杯咖啡吧~

支付宝
微信