博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Spark算子:RDD行动Action操作(3)–aggregate、fold、lookup
阅读量:6624 次
发布时间:2019-06-25

本文共 2222 字,大约阅读时间需要 7 分钟。

hot3.png

aggregate

def aggregate[U](zeroValue: U)(seqOp: (U, T) ⇒ U, combOp: (U, U) ⇒ U)(implicit arg0: ClassTag[U]): U

aggregate用户聚合RDD中的元素,先使用seqOp将RDD中每个分区中的T类型元素聚合成U类型,再使用combOp将之前每个分区聚合后的U类型聚合成U类型,特别注意seqOp和combOp都会使用zeroValue的值,zeroValue的类型为U。

 

 
  1. var rdd1 = sc.makeRDD(1 to 10,2)
  2. rdd1.mapPartitionsWithIndex{
  3. (partIdx,iter) => {
  4. var part_map = scala.collection.mutable.Map[String,List[Int]]()
  5. while(iter.hasNext){
  6. var part_name = "part_" + partIdx;
  7. var elem = iter.next()
  8. if(part_map.contains(part_name)) {
  9. var elems = part_map(part_name)
  10. elems ::= elem
  11. part_map(part_name) = elems
  12. } else {
  13. part_map(part_name) = List[Int]{elem}
  14. }
  15. }
  16. part_map.iterator
  17.  
  18. }
  19. }.collect
  20. res16: Array[(String, List[Int])] = Array((part_0,List(5, 4, 3, 2, 1)), (part_1,List(10, 9, 8, 7, 6)))
  21.  

##第一个分区中包含5,4,3,2,1

##第二个分区中包含10,9,8,7,6

 
  1. scala> rdd1.aggregate(1)(
  2. | {(x : Int,y : Int) => x + y},
  3. | {(a : Int,b : Int) => a + b}
  4. | )
  5. res17: Int = 58
  6.  

结果为什么是58,看下面的计算过程:

##先在每个分区中迭代执行 (x : Int,y : Int) => x + y 并且使用zeroValue的值1

##即:part_0中 zeroValue+5+4+3+2+1 = 1+5+4+3+2+1 = 16

## part_1中 zeroValue+10+9+8+7+6 = 1+10+9+8+7+6 = 41

##再将两个分区的结果合并(a : Int,b : Int) => a + b ,并且使用zeroValue的值1

##即:zeroValue+part_0+part_1 = 1 + 16 + 41 = 58

再比如:

 
  1. scala> rdd1.aggregate(2)(
  2. | {(x : Int,y : Int) => x + y},
  3. | {(a : Int,b : Int) => a * b}
  4. | )
  5. res18: Int = 1428
  6.  

##这次zeroValue=2

##part_0中 zeroValue+5+4+3+2+1 = 2+5+4+3+2+1 = 17

##part_1中 zeroValue+10+9+8+7+6 = 2+10+9+8+7+6 = 42

##最后:zeroValue*part_0*part_1 = 2 * 17 * 42 = 1428

因此,zeroValue即确定了U的类型,也会对结果产生至关重要的影响,使用时候要特别注意。

 

fold

def fold(zeroValue: T)(op: (T, T) ⇒ T): T

fold是aggregate的简化,将aggregate中的seqOp和combOp使用同一个函数op。

 
  1. scala> rdd1.fold(1)(
  2. | (x,y) => x + y
  3. | )
  4. res19: Int = 58
  5.  
  6. ##结果同上面使用aggregate的第一个例子一样,即:
  7. scala> rdd1.aggregate(1)(
  8. | {(x,y) => x + y},
  9. | {(a,b) => a + b}
  10. | )
  11. res20: Int = 58
  12.  

lookup

def lookup(key: K): Seq[V]

lookup用于(K,V)类型的RDD,指定K值,返回RDD中该K对应的所有V值。

 

 
  1. scala> var rdd1 = sc.makeRDD(Array(("A",0),("A",2),("B",1),("B",2),("C",1)))
  2. rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[0] at makeRDD at :21
  3.  
  4. scala> rdd1.lookup("A")
  5. res0: Seq[Int] = WrappedArray(0, 2)
  6.  
  7. scala> rdd1.lookup("B")
  8. res1: Seq[Int] = WrappedArray(1, 2)
  9.  

 

转载于:https://my.oschina.net/chensanti234/blog/809187

你可能感兴趣的文章
WIN2008R2下安装plsqldeveloper和toad
查看>>
jquery 通过点击事件获取id
查看>>
ELK学习笔记b
查看>>
Linux无人值守自动化安装详细配置流程!
查看>>
jquery实现radio按钮在分组状态下点击选中,再次点击取消选中
查看>>
【51CTO学院三周年】我和51CTO学院的点滴
查看>>
hadoop2.4.1+hbase0.98.3实现的分布式网盘系统初步
查看>>
ibatis批量新增-自增长序列
查看>>
linux系统管理之九:rpm安装包
查看>>
Linux系统中查看日志的常用命令
查看>>
java基础(二) 自增自减与贪心规则
查看>>
VMWare View的组件
查看>>
Oracle GoldenGate学习之--异构平台同步(Mysql到Oracle)
查看>>
Linux下date命令使用举例说明
查看>>
Centos6下SVN服务器(结合Apache)的搭建
查看>>
Reactor和Proactor模式
查看>>
实验:关于XPath中的13个轴
查看>>
品牌的网闸介绍
查看>>
手势滑动源码(适合新手)
查看>>
我的友情链接
查看>>