博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Spark RDD Action 简单用例(二)
阅读量:6582 次
发布时间:2019-06-24

本文共 10202 字,大约阅读时间需要 34 分钟。

foreach(f: T => Unit)

对RDD的所有元素应用f函数进行处理,f无返回值。 /**  * Applies a function f to all elements of this RDD.  */ def foreach(f: T => Unit): Unit
scala> val rdd = sc.parallelize(1 to 9, 2)rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at 
:24scala> rdd.foreach(x=>{println(x)})[Stage 0:> (0 + 0) / 2] 123456789
 

foreachPartition(f: Iterator[T] => Unit)

遍历所有的分区进行f函数操作 /**  * Applies a function f to each partition of this RDD.  */ def foreachPartition(f: Iterator[T] => Unit): Unit
 
scala> val rdd = sc.parallelize(1 to 9, 2)scala> rdd.foreachPartition(x=>{     | while(x.hasNext){     | println(x.next)     | }     | println("===========")     | }     | )1234===========56789===========
 

getCheckpointFile

获取RDD checkpoint的目录. /**  * Gets the name of the directory to which this RDD was checkpointed.  * This is not defined if the RDD is checkpointed locally.  */ def getCheckpointFile: Option[String]
 
scala> val rdd = sc.parallelize(1 to 9,2)rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at 
:24scala> rdd.checkpoint/*checkpoint操作后直接查询得到None,说明checkpoint是lazy的*/scala> rdd.getCheckpointFileres6: Option[String] = Nonescala> rdd.countres7: Long = 9 scala> rdd.getCheckpointFileres8: Option[String] = Some(file:/home/check/ca771099-b1bf-46c8-9404-68b4ace7feeb/rdd-1)
 

getNumPartitions

获取分区数量 /**  * Returns the number of partitions of this RDD.  */ @Since("1.6.0") final def getNumPartitions: Int = partitions.length
scala> val rdd = sc.parallelize(1 to 9,2)rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at parallelize at 
:24scala> rdd.getNumPartitionsres9: Int = 2

getStorageLevel

获取当前RDD的存储级别 /** Get the RDD's current storage level, or StorageLevel.NONE if none is set. */ def getStorageLevel: StorageLevel = storageLevel
 
scala> val rdd = sc.parallelize(1 to 9,2)rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at parallelize at 
:24scala> rdd.getStorageLevelres10: org.apache.spark.storage.StorageLevel = StorageLevel(1 replicas)scala> rdd.cacheres11: rdd.type = ParallelCollectionRDD[3] at parallelize at
:24scala> rdd.getStorageLevelres12: org.apache.spark.storage.StorageLevel = StorageLevel(memory, deserialized, 1 replicas)
 

isCheckpointed

获取该RDD是否已checkpoint处理 /**  * Return whether this RDD is checkpointed and materialized, either reliably or locally.  */ def isCheckpointed: Boolean
scala> val rdd = sc.parallelize(1 to 9,2)rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at parallelize at 
:24scala> rdd.isCheckpointedres13: Boolean = falsescala> rdd.checkpointscala> rdd.isCheckpointedres15: Boolean = falsescala> rdd.countres16: Long = 9scala> rdd.isCheckpointedres17: Boolean = true

isEmpty()

获取RDD是否为空,如果RDD为Nothing或Null,则抛出异常 /**  * @note due to complications in the internal implementation, this method will raise an  * exception if called on an RDD of `Nothing` or `Null`. This may be come up in practice  * because, for example, the type of `parallelize(Seq())` is `RDD[Nothing]`.  * (`parallelize(Seq())` should be avoided anyway in favor of `parallelize(Seq[T]())`.)  * @return true if and only if the RDD contains no elements at all. Note that an RDD  *         may be empty even when it has at least 1 partition.  */ def isEmpty(): Boolean
 
scala> val rdd = sc.parallelize(Seq())rdd: org.apache.spark.rdd.RDD[Nothing] = ParallelCollectionRDD[5] at parallelize at 
:24scala> rdd.isEmptyorg.apache.spark.SparkDriverExecutionException: Execution error at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1187) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1656) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1618) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1607) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1871) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1884) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1897) at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1305) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:358) at org.apache.spark.rdd.RDD.take(RDD.scala:1279) at org.apache.spark.rdd.RDD$$anonfun$isEmpty$1.apply$mcZ$sp(RDD.scala:1413) at org.apache.spark.rdd.RDD$$anonfun$isEmpty$1.apply(RDD.scala:1413) at org.apache.spark.rdd.RDD$$anonfun$isEmpty$1.apply(RDD.scala:1413) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:358) at org.apache.spark.rdd.RDD.isEmpty(RDD.scala:1412) ... 48 elidedCaused by: java.lang.ArrayStoreException: [Ljava.lang.Object; at scala.runtime.ScalaRunTime$.array_update(ScalaRunTime.scala:90) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1884) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1884) at org.apache.spark.scheduler.JobWaiter.taskSucceeded(JobWaiter.scala:59) at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1183) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1656) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1618) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1607) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)scala> val rdd = sc.parallelize(Seq(1 to 9))rdd: org.apache.spark.rdd.RDD[scala.collection.immutable.Range.Inclusive] = ParallelCollectionRDD[6] at parallelize at
:24scala> rdd.isEmptyres19: Boolean = false
 

 

max()

/**  * Returns the max of this RDD as defined by the implicit Ordering[T].  * @return the maximum element of the RDD  * */ def max()(implicit ord: Ordering[T]): T

min()

/**  * Returns the min of this RDD as defined by the implicit Ordering[T].  * @return the minimum element of the RDD  * */ def min()(implicit ord: Ordering[T]): T
 
scala> val rdd = sc.parallelize(1 to 9)rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[7] at parallelize at 
:24scala> rdd.maxres21: Int = 9scala> rdd.minres22: Int = 1
 

reduce(f: (T, T) => T)

对RDD所有元素进行聚合运算 /**  * Reduces the elements of this RDD using the specified commutative and  * associative binary operator.  */ def reduce(f: (T, T) => T): T
 
scala> val rdd = sc.parallelize(1 to 9)rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[7] at parallelize at 
:24scala> def func(x:Int, y:Int):Int={ | if(x >= y){ | x | }else{ | y} | }func: (x: Int, y: Int)Intscala> rdd.reduce(func(_,_))res23: Int = 9scala> rdd.reduce((x,y)=>{ | if(x>=y){ | x | }else{ | y | } | } | )res24: Int = 9
 

saveAsObjectFile(path: String)

将RDD保存指定目录下文件中 /**  * Save this RDD as a SequenceFile of serialized objects.  */ def saveAsObjectFile(path: String): Unit
 
scala> val rdd = sc.parallelize(1 to 9)rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[7] at parallelize at 
:24scala> rdd.saveAsObjectFile("/home/check/object")[root@localhost ~]# ls /home/check/object/part-00000 _SUCCESS
 

saveAsTextFile(path: String)

将RDD保存至文本文件
/**  * Save this RDD as a text file, using string representations of elements.  */ def saveAsTextFile(path: String): Unit
 
scala> val rdd = sc.parallelize(1 to 9)rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[7] at parallelize at 
:24scala> rdd.saveAsTextFile("/home/check/text")[root@localhost ~]# ls /home/check/text/part-00000 /home/check/text/part-00000[root@localhost ~]# more /home/check/text/part-00000 123456789
 

take(num: Int)

返回前num个元素。 /**  * Take the first num elements of the RDD. It works by first scanning one partition, and use the  * results from that partition to estimate the number of additional partitions needed to satisfy  * the limit.  *  * @note this method should only be used if the resulting array is expected to be small, as  * all the data is loaded into the driver's memory.  *  * @note due to complications in the internal implementation, this method will raise  * an exception if called on an RDD of `Nothing` or `Null`.  */ def take(num: Int): Array[T]
 
scala> val rdd = sc.parallelize(1 to 9)rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[13] at parallelize at 
:24scala> rdd.take(5)res28: Array[Int] = Array(1, 2, 3, 4, 5)
 
takeOrdered(num: Int) 排序后返回前num个元素
scala> val rdd = sc.parallelize(List(2,6,3,1,5,9))rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[15] at parallelize at 
:24scala> rdd.takeOrdered(3)res30: Array[Int] = Array(1, 2, 3)

 

def takeSample(     withReplacement: Boolean,     num: Int,     seed: Long = Utils.random.nextLong): Array[T]
 
scala> val rdd = sc.parallelize(List(2,6,3,1,5,9))rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[18] at parallelize at 
:24scala> rdd.takeSample(true,6,8)res34: Array[Int] = Array(5, 2, 2, 5, 3, 2)scala> rdd.takeSample(false,6,8)res35: Array[Int] = Array(9, 3, 2, 6, 1, 5)
 

top(num: Int)

降序排列后返回top n /* * @param num k, the number of top elements to return  * @param ord the implicit ordering for T  * @return an array of top elements  */ def top(num: Int)(implicit ord: Ordering[T]): Array[T]
 
scala> val rdd = sc.parallelize(List(2,6,3,1,5,9))rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[18] at parallelize at 
:24scala> rdd.top(3)res37: Array[Int] = Array(9, 6, 5)
 

 

 

转载于:https://www.cnblogs.com/alianbog/p/5839680.html

你可能感兴趣的文章
Python开发购物车程序
查看>>
超大数据库的备份和恢复问题:分区表、文件组备份、部分还原
查看>>
WDS+MDT部署Windows7操作系统6—创建任务序列
查看>>
python+selenium+eclipse问题排查
查看>>
FFMPEG中最关键的结构体之间的关系
查看>>
Apache+Tomcat集群配置
查看>>
OneAPM x 腾讯 | OneAPM 技术公开课·深圳 报名:前端性能大作战!
查看>>
化解工程师与传输接口到传感器的第一次战争,让设计更容易
查看>>
不要宅要养生--程序员健康生活指北
查看>>
Ubuntu jdk环境变量配置 虚拟机vm
查看>>
加密和解密基础
查看>>
三元表达式
查看>>
架构设计:生产者/消费者模式 第2页:如何确定数据单元
查看>>
RHCS
查看>>
C# 获取文件MD5与SHA1
查看>>
【源资讯 第25期】一波开源项目将停止维护
查看>>
IO 多路服用模型
查看>>
硬盘的读写原理
查看>>
STP-生成树协议-在交换网络中,存在备份链路的情况,防止2层数据转发环路的发生。...
查看>>
Java 核心内容相关面试题【4】
查看>>