3、Spark-RDD常用算子操作

RDD常用算子操作

  • 启动spark-shell 进行测试:

  • spark-shell –master spark://node-1:7077

  • 练习1:map、filter

1
2
3
4
5
6
7
8
9
10
//通过并行化生成rdd
val rdd1 = sc.parallelize(List(5, 6, 4, 7, 3, 8, 2, 9, 1, 10))
//对rdd1里的每一个元素乘2然后排序
val rdd2 = rdd1.map(_ * 2).sortBy(x => x, true)
//过滤出大于等于5的元素
val rdd3 = rdd2.filter(_ >= 5)
//将元素以数组的方式在客户端显示
rdd3.collect
结果:
Array[Int] = Array(6, 8, 10, 12, 14, 16, 18, 20)
  • 练习2:flatMap

1
2
3
4
5
6
val rdd1 = sc.parallelize(Array("a b c", "d e f", "h i j"))
//将rdd1里面的每一个元素先切分在压平
val rdd2 = rdd1.flatMap(_.split(" "))
rdd2.collect
结果:
Array[String] = Array(a, b, c, d, e, f, h, i, j)
  • 练习3:交集、并集

1
2
3
4
5
6
7
8
9
10
11
val rdd1 = sc.parallelize(List(5, 6, 4, 3))
val rdd2 = sc.parallelize(List(1, 2, 3, 4))
//求并集
val rdd3 = rdd1.union(rdd2)
//求交集
val rdd4 = rdd1.intersection(rdd2)
//去重
rdd3.distinct.collect
Array[Int] = Array(1, 2, 3, 4, 5, 6)
rdd4.collect
Array[Int] = Array(4, 3)
  • 练习4:join、groupByKey

1
2
3
4
5
6
7
8
9
10
11
12
13
14
val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 3), ("kitty", 2)))
val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2)))
//求join
val rdd3 = rdd1.join(rdd2)
rdd3.collect
Array[(String, (Int, Int))] = Array((tom,(1,1)), (jerry,(3,2)))
//求并集
val rdd4 = rdd1 union rdd2
rdd4.collect
Array[(String, Int)] = Array((tom,1), (jerry,3), (kitty,2), (jerry,2), (tom,1), (shuke,2))
//按key进行分组
val rdd5=rdd4.groupByKey
rdd5.collect
Array[(String, Iterable[Int])] = Array((tom,CompactBuffer(1, 1)), (shuke,CompactBuffer(2)), (kitty,CompactBuffer(2)), (jerry,CompactBuffer(2, 3)))
  • 练习5:cogroup

1
2
3
4
5
6
7
val rdd1 = sc.parallelize(List(("tom", 1), ("tom", 2), ("jerry", 3), ("kitty", 2)))
val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("jim", 2)))
//cogroup
val rdd3 = rdd1.cogroup(rdd2)
//注意cogroup与groupByKey的区别
rdd3.collect
Array[(String, (Iterable[Int], Iterable[Int]))] = Array((jim,(CompactBuffer(),CompactBuffer(2))), (tom,(CompactBuffer(2, 1),CompactBuffer(1))), (jerry,(CompactBuffer(3),CompactBuffer(2))), (kitty,(CompactBuffer(2),CompactBuffer())))
  • 练习6:reduce

1
2
3
4
val rdd1 = sc.parallelize(List(1, 2, 3, 4, 5))
//reduce聚合
val rdd2 = rdd1.reduce(_ + _)
rdd2.collect
  • 练习7:reduceByKey、sortByKey

1
2
3
4
5
6
7
8
9
10
11
val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 3), ("kitty", 2),  ("shuke", 1)))
val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 3), ("shuke", 2), ("kitty", 5)))
val rdd3 = rdd1.union(rdd2)
//按key进行聚合
val rdd4 = rdd3.reduceByKey(_ + _)
rdd4.collect
Array[(String, Int)] = Array((tom,4), (shuke,3), (kitty,7), (jerry,5))
//按value的降序排序
val rdd5 = rdd4.map(t => (t._2, t._1)).sortByKey(false).map(t => (t._2, t._1))
rdd5.collect
Array[(String, Int)] = Array((kitty,7), (jerry,5), (tom,4), (shuke,3))
  • 练习8:repartition、coalesce

1
2
3
4
5
6
7
8
9
10
11
val rdd1 = sc.parallelize(1 to 10,3)
//利用repartition改变rdd1分区数
//减少分区
rdd1.repartition(2).partitions.size
//增加分区
rdd1.repartition(4).partitions.size
//利用coalesce改变rdd1分区数
//减少分区
rdd1.coalesce(2).partitions.size

注意:repartition可以增加和减少rdd中的分区数,coalesce只能减少rdd分区数,增加rdd分区数不会生效。