1. 首页
  2. 大数据
  3. Spark教程

【Spark教程】(五)弹性分布式数据集Rdd

什么是弹性分布式数据集Rdd?


  • 概念:RDD(Resilient Distributed Datasets)简单来说,就是Spark中元素的集合,如数组、集合、文本等等都能称作RDD,但是和普通数据集相比,它有个特点就是:分布式。每个RDD都有多个分区,这些分区分布在集群不同的节点中(分区数的设置一般是 spark.default.parallelism ,也可以手动指定,下面会说到手动指定的方式)。
  • 优势:其实只要理解了分布式的优势就不难理解Rdd的优势了,Spark会自动将Rdd的数据分发到集群上,将操作并行化执行。比如一个非常大的数据量(PB级),一台物理机处理肯定是不行的(除非是超级计算机但那非常昂贵),所以分布到多台机器上处理,不仅提高了处理速度还廉价许多。
    (如果还是不清楚没有关系,下面会演示几个案例,建议自己手动敲一敲代码感受一下就懂了)
  • Spark中对数据的所有操作不外乎:RDD create(创建) 、RDD transformation(转换)、RDD action(行动)

Rdd的创建


创建Rdd有两种方式:

  • 外部存储系统(如共享文件系统,HDFS,HBase等)中引用数据集。使用textFile()方法
//从本地文件系统读取数据集,可以指定分区数,比如textFile(".......",2)
scala> val rdd =sc.textFile("file:///usr/local/spark-2.2.0-bin-hadoop2.6.0-cdh5.11.1/README.md")
rdd: org.apache.spark.rdd.RDD[String] = file:///usr/local/spark-2.2.0-bin-hadoop2.6.0-cdh5.11.1/README.md MapPartitionsRDD[1] at textFile at <console>:24
//从HDFS中读取数据集
scala> val rdd1 =sc.textFile("hdfs://master:8020/user/test.rdd")
rdd1: org.apache.spark.rdd.RDD[String] = hdfs://master:8020/user/test.rdd MapPartitionsRDD[5] at textFile at <console>:24

//打印操作 注:collect()是将所有元素先收集到一台节点上,有可能会内存溢出,如果只是为了打印部分元素,建议使用take()
scala> rdd1.collect().foreach(println)
hello
spark
love
you

scala> rdd1.take(2)
res5: Array[String] = Array(hello, spark)
  • (2)从驱动程序中的现有集合(如List,Array等),使用parallelize()方法
//区间转Rdd
scala>val rdd1 = sc.parallelize(1 to 5)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[6] at parallelize at <console>:24

scala>rdd1.collect().foreach(println)
1
2
3
4
5

//List集合转Rdd
scala> val list = List(1,"a",'b')
list: List[Any] = List(1, a, b)

scala> sc.parallelize(list)
res12: org.apache.spark.rdd.RDD[Any] = ParallelCollectionRDD[10] at parallelize at <console>:27

scala> res12.collect().foreach(println)
1
a
b

//Array数组转Rdd
scala> val array = Array("a","b")
array: Array[String] = Array(a, b)

scala> sc.parallelize(array)
res8: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[9] at parallelize at <console>:27

scala> res8.collect().foreach(println)
a
b

RDD的转化(Transformation)操作

转换操作就是从现有的Rdd生成一个新的Rdd的操作,比如filter操作,它从现有的Rdd筛选出符合条件的数据,创建一个新的Rdd。
还是那句话,概念不多说,看实际操作最为直观!

单个Rdd操作

  • map(func) 对RDD中的每个元素都执行一个指定的函数来产生一个新的RDD
scala> val rdd = sc.parallelize(1 to 5)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[11] at parallelize at <console>:24

scala> rdd.map(x=>(x+2))
res14: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[12] at map at <console>:27

scala> res14.collect()
res15: Array[Int] = Array(3, 4, 5, 6, 7)

scala> rdd.map(x=>(x,1))
res18: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[14] at map at <console>:27

scala> res18.collect()
res19: Array[(Int, Int)] = Array((1,1), (2,1), (3,1), (4,1), (5,1))
  • flatMap(func) 和map类似,但是它会把map“扁平化”,每个输入项可以映射到0个或更多个输出项。常用于统计单词数。举例对比一下和map的区别更容易理解:
scala> val rdd = sc.textFile("hdfs://master/user/spark.hello")
rdd: org.apache.spark.rdd.RDD[String] = hdfs://master/user/spark.hello MapPartitionsRDD[16] at textFile at <console>:24

scala> rdd.map(line => line.split(" "))
res20: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[17] at map at <console>:27

scala> res20.collect()
res22: Array[Array[String]] = Array(Array(hello, spark, it, is, perfect), Array(i, want, to, learn))

scala> rdd.flatMap(line => line.split(" "))
res0: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at flatMap at <console>:27

scala> res0.collect()
res1: Array[String] = Array(hello, spark, it, is, perfect, i, want, to, learn)

即map输出是Array[Array[String]]、flatMap输出是Array[String],等于将map再次打平。

  • reduceByKey(func,[numTask]) 根据函数定义的规则,按照key进行整合,类似MapReduce中的Reduce操作。比如,在map阶段(spark,1) (spark,1),定义方法(x,y)=>(x+y),即value进行加法运算,1+1 所以使用reduceByKey后变为(spark,2)
[root@master hadoop-2.6.0-cdh5.11.1]# hadoop fs -cat /user/wordcount.test
spark i love you
spark i want learn you
wordcount案例:

scala> val rdd = sc.textFile("hdfs://master/user/wordcount.test")
rdd: org.apache.spark.rdd.RDD[String] = hdfs://master/user/wordcount.test MapPartitionsRDD[4] at textFile at <console>:24

scala> val wordmap = rdd.flatMap(line=>line.split(" ")).map(x=>(x,1))
wordmap: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[6] at map at <console>:26

scala> wordmap.collect()
res2: Array[(String, Int)] = Array((spark,1), (i,1), (love,1), (you,1), (spark,1), (i,1), (want,1), (learn,1), (you,1))

scala> val wordreduce = wordmap.reduceByKey((x,y)=>(x+y))
wordreduce: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[7] at reduceByKey at <console>:28

scala> wordreduce.collect()
res3: Array[(String, Int)] = Array((learn,1), (spark,2), (you,2), (love,1), (i,2), (want,1))
  • filter(func) 很好理解,就是过滤的意思
scala> val rdd = sc.parallelize(Array(1,2,3))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[8] at parallelize at <console>:24

scala> rdd.filter( x => (x==2)).collect()
res8: Array[Int] = Array(2)

scala> val rdd = sc.parallelize(List("sp","sa","vv"))
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[11] at parallelize at <console>:24

scala> rdd.filter(x=>x.contains('s')).collect()
res10: Array[String] = Array(sp, sa)
  • groupByKey([numTasks]) 学过SQL的应该都知道group by,这里一样的意思,就是按照key来进行分组。numTasks是可选的,用来设置任务数量。
scala> val rdd = sc.parallelize(List((1,"sp"),(1,"sa"),(2,"vv")))
rdd: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[14] at parallelize at <console>:24

scala> rdd.groupByKey().collect()
res12: Array[(Int, Iterable[String])] = Array((1,CompactBuffer(sp, sa)), (2,CompactBuffer(vv)))
  • distinct([numTasks]) 去除重复元素,但注意:开销大,需要将所有数据通过网络传输进行Shuffle混洗
scala> val rdd = sc.parallelize(List("sp","sp","vv"))
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[17] at parallelize at <console>:24

scala> rdd.distinct().collect()
res13: Array[String] = Array(sp, vv)
  • sortByKey([ascending], [numTasks]) 排序,ascending表示递增的,默认ascending为true 即按递增排序 可以改为false,也可以自定义排序函数。
scala> val rdd = sc.parallelize(List((1,"sp"),(3,"sa"),(2,"vv")))
rdd: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[21] at parallelize at <console>:24

scala> rdd.sortByKey(false).collect()
res14: Array[(Int, String)] = Array((3,sa), (2,vv), (1,sp))

还有很多很多函数,大家可以参考:SparkAPI(请依据自己的版本号,这个是2.2.0的)

官方其实说的很详细,但是很多同学可能没有scala基础就会有点看不懂,举个例子解释一下,其他大同小异。比如:

def foldByKey(zeroValue: V, numPartitions: Int)(func: (V, V) ⇒ V): RDD[(K, V)]
Merge the values for each key using an associative function and a neutral “zero value” which may be added to the result an arbitrary number of times, and must not change the result (e.g., Nil for list concatenation, 0 for addition, or 1 for multiplication.).

zeroValue是一个初始的给定值,V是范型的意思,可以是Int型等等,但是这个类型必须和RDD中的V一样。
比如下例:zeroValue为3,是int型,rdd中的List(K,V) ,V也是int型,然后再根据func定义的规则对这两个V进行操作。最后返回RDD[(K,V)] 。
3(初始值)+1+2=6 即(1,6)。 3+5+6=14 即(2,14) 建议:((x,y) => x+y)可以简写为(+) scala语言真的非常优雅!

scala> val rdd = sc.parallelize(List((1,1),(1,2),(2,6),(2,5)))
rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[23] at parallelize at <console>:24

scala> rdd.foldByKey(3)(_+_).collect()
res15: Array[(Int, Int)] = Array((1,6), (2,14))

多个Rdd操作

  • 交集差集并集
scala> val list1 = sc.parallelize(List("spark","spark","hello"))
list1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[25] at parallelize at <console>:24

scala> val list2 = sc.parallelize(List("spark","love","you"))
list2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[26] at parallelize at <console>:24

//并集不去重

scala> list1.union(list2).collect()
res16: Array[String] = Array(spark, spark, hello, spark, love, you)

//交集去重

scala> list1.intersection(list2).collect()
res17: Array[String] = Array(spark)

//差集(只存在于第一个dataset、不存在于第二个)不去重

scala> list1.subtract(list2).collect()
res18: Array[String] = Array(hello)
  • Join
scala> val list1 = sc.parallelize(List((1,"spark"),(2,"spark"),(3,"hello")))
list1: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[38] at parallelize at <console>:24

scala> val list2 = sc.parallelize(List((1,"spark"),(3,"you"),(4,"good")))
list2: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[39] at parallelize at <console>:24

//内连接

scala> list1.join(list2).collect()
res20: Array[(Int, (String, String))] = Array((1,(spark,spark)), (3,(hello,you)))

// 左外连接,左边rdd全部显示,右边没有的补null

scala> list1.leftOuterJoin(list2).collect()
res21: Array[(Int, (String, Option[String]))] = Array((1,(spark,Some(spark))), (3,(hello,Some(you))), (2,(spark,None)))

// 右外连接,右边rdd全部显示,左边没有的补null

scala> list1.rightOuterJoin(list2).collect()
res22: Array[(Int, (Option[String], String))] = Array((4,(None,good)), (1,(Some(spark),spark)), (3,(Some(hello),you)))

最后再强调一句,函数太多了,此文只列举了常见的,最好的办法就是直接看官方API !!

RDD的行动(Action)操作

Action操作和Transformation操作的区别

  • 惰性求值:Action操作会触发实际的计算,而Transformation是没有触发实际计算的,是惰性求值的(见下一篇博客)
  • 返回类型:Transformation操作是一个RDD转化为一个新的RDD,即返回RDD,而Action操作返回其他数据类型。
  • 输出结果:Action操作会有实际结果的输出,向驱动器程序返回结果或者把结果写入外部系统。Transformation并无实际输出。

Action操作常用函数

  • reduce(func) 根据函数规则对数据集进行整合
  • count() 返回元素个数
  • first() 返回第一个元素
  • collect() 返回数据集所有元素,注意内存溢出问题,只有当你的整个数据集在单台机器中内存放得下时才使用
  • top(n) 按默认或指定排序返回前n个元素,默认按降序 
  • take(n) 返回前n个元素
scala> val rdd = sc.parallelize(1 to 10)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[51] at parallelize at <console>:24

scala> rdd.reduce(_+_)
res25: Int = 55

scala> rdd.count()
res26: Long = 10

scala> rdd.first()
res27: Int = 1

scala> rdd.collect()
res28: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

scala> rdd.take(5)
res29: Array[Int] = Array(1, 2, 3, 4, 5)
  • saveAsTextFile(path) 存储最后结果到文件系统中

scala> rdd.map(x=>(x,1)).saveAsTextFile(“hdfs://master/user/out1”)`
【Spark教程】(五)弹性分布式数据集Rdd

  • countByKey()  分别计算每个Key的个数
scala> val rdd = sc.parallelize(List(("spark",3),("spark",2),("hello",2)))
rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[56] at parallelize at <console>:24

scala> rdd.countByKey()
res40: scala.collection.Map[String,Long] = Map(spark -> 2, hello -> 1)
  • aggregate聚合函数

def aggregate[U](zeroValue: U)(seqOp: (U, T) ⇒ U, combOp: (U, U) ⇒ U)(implicit arg0: ClassTag[U]): U

意思是说,对于每个分区的元素,进行某种操作seqOp: (U, T) ⇒ U,然后聚合这些分区的元素,combOp: (U, U) ⇒ U,(zeroValue: U)是一个初始值。看案例解释比较清楚:

scala> val rdd = sc.parallelize(1 to 10,2)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[49] at parallelize at <console>:24

scala> rdd.aggregate(1)((x,y)=>x+y,(a,b)=>(a*b))
res23: Int = 656

1,2,3,4,5,6,7,8,9,10分成了两个区

分区一:1,2,3,4,5 进行(x,y)=>x+y 注意有初始值1 即1+1+2+3+4+5=16

分区二:6,7,8,9,10 进行(x,y)=>x+y 注意有初始值1 即 1+6+7+8+9+10=41

对这两个分区的结果16和41 进行(a,b)=>ab 即1641=656

BDStar原创文章。发布者:Liuyanling,转载请注明出处:http://bigdata-star.com/archives/895

发表评论

登录后才能评论

联系我们

562373081

在线咨询:点击这里给我发消息

邮件:562373081@qq.com

工作时间:周一至周五,9:30-18:30,节假日休息

QR code