spark怎么用scala spark rdd操作使grouprdd的算子

微信搜索bigdata029 | 公益:
| 赞助作者:
关键字:Spark算子、Spark RDD基本转换、coalesce、repartition
def coalesce(numPartitions: Int, shuffle: Boolean = false)(implicit ord: Ordering[T] = null): RDD[T]
该函数用于将RDD进行重分区,使用HashPartitioner。
第一个参数为重分区的数目,第二个为是否进行shuffle,默认为
以下面的例子来看:
scala& var data = sc.textFile("/tmp/lxw1234/1.txt")
data: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[53] at textFile at :21
scala& data.collect
res37: Array[String] = Array(hello world, hello spark, hello hive, hi spark)
scala& data.partitions.size
res38: Int = 2
//RDD data默认有两个分区
scala& var rdd1 = data.coalesce(1)
rdd1: org.apache.spark.rdd.RDD[String] = CoalescedRDD[2] at coalesce at :23
scala& rdd1.partitions.size
res1: Int = 1
//rdd1的分区数为1
scala& var rdd1 = data.coalesce(4)
rdd1: org.apache.spark.rdd.RDD[String] = CoalescedRDD[3] at coalesce at :23
scala& rdd1.partitions.size
res2: Int = 2
//如果重分区的数目大于原来的分区数,那么必须指定shuffle参数为true,//否则,分区数不便
scala& var rdd1 = data.coalesce(4,true)
rdd1: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[7] at coalesce at :23
scala& rdd1.partitions.size
res3: Int = 4
repartition
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]
该函数其实就是coalesce函数第二个参数为true的实现
scala& var rdd2 = data.repartition(1)
rdd2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[11] at repartition at :23
scala& rdd2.partitions.size
res4: Int = 1
scala& var rdd2 = data.repartition(4)
rdd2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[15] at repartition at :23
scala& rdd2.partitions.size
res5: Int = 4
如果觉得本博客对您有帮助,请
。转载请注明: &spark(3)
Spark-Scala-RDD 入门问题汇总
最近一段时间一直在了解Spark。作为初学者在这记录一些自己的新的,方便自己以后查看。
系统运行版本是:Hadoop 2.7.2 / Spark 2.0.0 / Scala 2.11.8
编辑器: IDEA
1:在Spark上如何运行程序
编写的程序必须打包并在终端运行。不能直接在编辑器上运行,这是因为程序运行的时没有提交给spark集群。
2:搭建开发环境导入类似spark-assembly-0.8.1-incubating- hadoop2.2.0.jar
在这里你可以导入类似的spark-assembly-0.8.1-incubating- hadoop2.2.0.jar。在早期spark版本中,lib目录下自带该文件。但在目前我使用的spark2.0.0,没有找到该文件。
因此,必须自己生成该文件。同样也可以直接将jars目录下的所有包都引入程序当中。
打开 Project Structure,快捷方式 ctrl + alt + shift + s
配置打包:按照下图先设置好打包的配置
打包:build-&Artifacts…-&build
运行:在$SPARK_HOME/bin 目录下(或者配置了环境变量),同时如果没有在程序中配置master和appname,可以通过下述方式设置。设置方法自己查看 help
spark-submit –class com.qh.ScalaPractice SparkScala.jar
3:scala spark RDD
在网上可以找到很多关于scala编写spark程序的例子。但是经过自己的实验,发现和多的RDD操作函数是没有的。
可能是这个版本被舍弃掉了,也可能是自己使用方式不对。下例是自己根据网上的教程编写的wordCount,亲测有效
package com.qh
import org.apache.spark.{SparkConf, SparkContext}
* Created by hadoop on 8/17/16.
* wordCount
object wordCount {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setAppName("wordCount")
.setMaster("spark://master:7077")
val sc = new SparkContext(conf)
val file = sc.textFile("/DataBigZYS/file", 3)
val result = file
.flatMap(value =& value.split("\\s+"))
.map(word =& (word, 1))
.reduceByKey((value1, value2) =& value1 + value2)
result.foreach(println)
result.saveAsTextFile("/DataBigZYS/file1")
3:scala spark RDD 基础操作
package com.qh
import org.apache.hadoop.io.IntWritable
import org.apache.hadoop.io.Text
import org.apache.hadoop.mapred.{JobConf, TextOutputFormat}
import org.apache.spark.{SparkConf, SparkContext}
* Created by zys on 8/17/16.
object ScalaPractice {
private val path = "hdfs://master:9000/Spark/Practice"
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setAppName("ScalaTest")
.setMaster("spark://master:7077")
val sc = new SparkContext(conf)
val raw = sc.parallelize(1 to 9, 3)
raw.top(3)
raw.take(3)
raw.takeOrdered(3)
raw.first()
raw.count()
raw.collect()
raw.reduce((x1, x2) =& x1 + x2)
raw.map(x =& (x, x * x)).lookup(2)
raw.map(x =& (x, x * x)).countByKey()
raw.foreach(x =& x * 2)
var Num = 0
raw.foreachPartition(iter =& {
iter.foreach(x =& Num += x)
raw.sortBy(x =& x)
raw.sortBy(x =& x, ascending = false)
raw.map(x =& (x, x * 2)).sortBy(x =& x._2)
val map = raw.map(x =& x * 2)
map.saveAsTextFile(path + "/map")
raw.mapPartitions(x =& {
x.foreach(arg =& i += arg)
List[Int]().::(i).iterator
).saveAsTextFile(path + "/mapPartitions")
raw.mapPartitionsWithIndex((index, iter) =& iter.map(x =& (index, x)))
.saveAsTextFile(path + "/mapPartitionsWithIndex")
raw.map(x =& (x, x)).mapValues(value =& value * value)
.saveAsTextFile(path + "/mapValues")
raw.map(x =& (x, x)).flatMapValues(x =& 1 to x)
.saveAsTextFile(path + "/flatMapValues")
raw.flatMap(x =& 1 to x).saveAsTextFile(path + "/flatMap")
raw.map(x =& (x, x * x)).reduceByKey((x1, x2) =& x1 + x2)
.saveAsTextFile(path + "/reduceByKey")
raw.map(x =& x % 5).distinct().saveAsTextFile(path + "/distinct")
val other = sc.parallelize(6 to 14, 3)
raw.union(other).saveAsTextFile(path + "/other")
raw.intersection(other, 1).saveAsTextFile(path + "/intersection")
raw.subtract(other, 1).saveAsTextFile(path + "/subtract")
raw.zip(other).saveAsTextFile(path + "/zip")
raw.zipPartitions(sc.parallelize(Seq('a', 'b', 'c'), 3)) {
(rdd1Iter, rdd2Iter) =& {
val result = ListBuffer[String]()
while (rdd1Iter.hasNext && rdd2Iter.hasNext) {
result.append(rdd1Iter.next() + "_" + rdd2Iter.next())
result.iterator
}.saveAsTextFile(path + "/zipPartitions")
raw.zipWithIndex().saveAsTextFile(path + "/zipWithIndex")
raw.zipWithUniqueId().saveAsTextFile(path + "/zipWithUniqueId")
raw.aggregate(1)({ (x, y) =& x + y }, { (a, b) =& a + b })
raw.fold(1)({ (x, y) =& x + y })
raw.zipWithUniqueId().saveAsHadoopFile(path + "/saveAsHadoopFile",
classOf[Text], classOf[IntWritable], classOf[TextOutputFormat[Text, IntWritable]])
val rdd1 = sc.makeRDD(Array(("A", 2), ("A", 1), ("B", 6), ("B", 3), ("B", 7)))
val jobConf = new JobConf()
jobConf.setOutputFormat(classOf[TextOutputFormat[Text, IntWritable]])
jobConf.setOutputKeyClass(classOf[Text])
jobConf.setOutputValueClass(classOf[IntWritable])
jobConf.set("mapred.output.dir", path + "/saveAsHadoopDataset")
rdd1.saveAsHadoopDataset(jobConf)
jobConf.set("mapred.output.dir", path + "/saveAsNewAPIHadoopDataset")
raw.zipWithUniqueId().saveAsNewAPIHadoopDataset(jobConf)
raw.filter(_ & 5)
raw.groupBy(_ & 5)
raw.map((_, 1)).groupByKey(1)
参考知识库
* 以上用户言论只代表其个人观点,不代表CSDN网站的观点或立场
访问:1818次
排名:千里之外
原创:27篇
(1)(3)(11)(6)(7)【Spark】RDD操作详解3——键值型Transformation算子 - 简书
<div class="fixed-btn note-fixed-download" data-toggle="popover" data-placement="left" data-html="true" data-trigger="hover" data-content=''>
写了320693字,被1200人关注,获得了708个喜欢
【Spark】RDD操作详解3——键值型Transformation算子
Transformation处理的数据为Key-Value形式的算子大致可以分为:输入分区与输出分区一对一、聚集、连接操作。
输入分区与输出分区一对一
mapValues:针对(Key,Value)型数据中的Value进行Map操作,而不对Key进行处理。
方框代表RDD分区。a=&a+2代表只对( V1, 1)数据中的1进行加2操作,返回结果为3。
* Pass each value in the key-value pair RDD through a map function witho
* this also retains the original RDD's partitioning.
def mapValues[U](f: V =& U): RDD[(K, U)] = {
val cleanF = self.context.clean(f)
new MapPartitionsRDD[(K, U), (K, V)](self,
(context, pid, iter) =& iter.map { case (k, v) =& (k, cleanF(v)) },
preservesPartitioning = true)
单个RDD或两个RDD聚集
(1)combineByKey
combineByKey是对单个Rdd的聚合。相当于将元素为(Int,Int)的RDD转变为了(Int,Seq[Int])类型元素的RDD。定义combineByKey算子的说明如下:
createCombiner: V =& C, 在C不存在的情况下,如通过V创建seq C。
mergeValue:(C, V) =& C, 当C已经存在的情况下,需要merge,如把item V加到seqC中,或者叠加。
mergeCombiners:(C,C) =& C,合并两个C。
partitioner: Partitioner(分区器),Shuffle时需要通过Partitioner的分区策略进行分区。
mapSideCombine: Boolean=true, 为了减小传输量,很多combine可以在map端先做。例如, 叠加可以先在一个partition中把所有相同的Key的Value叠加, 再shuffle。
serializerClass:String=null,传输需要序列化,用户可以自定义序列化类。
方框代表RDD分区。 通过combineByKey,将(V1,2)、 (V1,1)数据合并为(V1,Seq(2,1))。
* Generic function to combine the elements for each key using a custom set of aggregation
* functions. Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a "combined type" C
* Note that V and C can be different -- for example, one might group an RDD of type
* (Int, Int) into an RDD of type (Int, Seq[Int]). Users provide three functions:
* - `createCombiner`, which turns a V into a C (e.g., creates a one-element list)
* - `mergeValue`, to merge a V into a C (e.g., adds it to the end of a list)
* - `mergeCombiners`, to combine two C's into a single one.
* In addition, users can control the partitioning of the output RDD, and whether to perform
* map-side aggregation (if a mapper can produce multiple items with the same key).
def combineByKey[C](createCombiner: V =& C,
mergeValue: (C, V) =& C,
mergeCombiners: (C, C) =& C,
partitioner: Partitioner,
mapSideCombine: Boolean = true,
serializer: Serializer = null): RDD[(K, C)] = {
require(mergeCombiners != null, "mergeCombiners must be defined") // required as of Spark 0.9.0
if (keyClass.isArray) {
if (mapSideCombine) {
throw new SparkException("Cannot use map-side combining with array keys.")
if (partitioner.isInstanceOf[HashPartitioner]) {
throw new SparkException("Default partitioner cannot partition array keys.")
val aggregator = new Aggregator[K, V, C](
self.context.clean(createCombiner),
self.context.clean(mergeValue),
self.context.clean(mergeCombiners))
if (self.partitioner == Some(partitioner)) {
self.mapPartitions(iter =& {
val context = TaskContext.get()
new InterruptibleIterator(context, bineValuesByKey(iter, context))
}, preservesPartitioning = true)
new ShuffledRDD[K, V, C](self, partitioner)
.setSerializer(serializer)
.setAggregator(aggregator)
.setMapSideCombine(mapSideCombine)
* Simplified version of combineByKey that hash-partitions the output RDD.
def combineByKey[C](createCombiner: V =& C,
mergeValue: (C, V) =& C,
mergeCombiners: (C, C) =& C,
numPartitions: Int): RDD[(K, C)] = {
combineByKey(createCombiner, mergeValue, mergeCombiners, new HashPartitioner(numPartitions))
(2)reduceByKey
reduceByKey是更简单的一种情况,只是两个值合并成一个值,所以createCombiner很简单,就是直接返回v,而mergeValue和mergeCombiners的逻辑相同,没有区别。
方框代表RDD分区。 通过用户自定义函数(A,B)=&(A+B),将相同Key的数据(V1,2)、(V1,1)的value相加,结果为(V1,3)。
* Merge the values for each key using an associative reduce function. This will also perform
* the merging locally on each mapper before sending results to a reducer, similarly to a
* "combiner" in MapReduce.
def reduceByKey(partitioner: Partitioner, func: (V, V) =& V): RDD[(K, V)] = {
combineByKey[V]((v: V) =& v, func, func, partitioner)
* Merge the values for each key using an associative reduce function. This will also perform
* the merging locally on each mapper before sending results to a reducer, similarly to a
* "combiner" in MapReduce. Output will be hash-partitioned with numPartitions partitions.
def reduceByKey(func: (V, V) =& V, numPartitions: Int): RDD[(K, V)] = {
reduceByKey(new HashPartitioner(numPartitions), func)
* Merge the values for each key using an associative reduce function. This will also perform
* the merging locally on each mapper before sending results to a reducer, similarly to a
* "combiner" in MapReduce. Output will be hash-partitioned with the existing partitioner/
* parallelism level.
def reduceByKey(func: (V, V) =& V): RDD[(K, V)] = {
reduceByKey(defaultPartitioner(self), func)
(3)partitionBy
partitionBy函数对RDD进行分区操作。如果原有RDD的分区器和现有分区器(partitioner)一致,则不重分区,如果不一致,则相当于根据分区器生成一个新的ShuffledRDD。
方框代表RDD分区。 通过新的分区策略将原来在不同分区的V1、 V2数据都合并到了一个分区。
* Return a copy of the RDD partitioned using the specified partitioner.
def partitionBy(partitioner: Partitioner): RDD[(K, V)] = {
if (keyClass.isArray && partitioner.isInstanceOf[HashPartitioner]) {
throw new SparkException("Default partitioner cannot partition array keys.")
if (self.partitioner == Some(partitioner)) {
new ShuffledRDD[K, V, V](self, partitioner)
(4)cogroup
cogroup函数将两个RDD进行协同划分。对在两个RDD中的Key-Value类型的元素,每个RDD相同Key的元素分别聚合为一个集合,并且返回两个RDD中对应Key的元素集合的迭代器(K, (Iterable[V], Iterable[w]))。其中,Key和Value,Value是两个RDD下相同Key的两个数据集合的迭代器所构成的元组。
大方框代表RDD,大方框内的小方框代表RDD中的分区。 将RDD1中的数据(U1,1)、(U1,2)和RDD2中的数据(U1,2)合并为(U1,((1,2),(2)))。
* For each key k in `this` or `other1` or `other2` or `other3`,
* return a resulting RDD that contains a tuple with the list of values
* for that key in `this`, `other1`, `other2` and `other3`.
def cogroup[W1, W2, W3](other1: RDD[(K, W1)],
other2: RDD[(K, W2)],
other3: RDD[(K, W3)],
partitioner: Partitioner)
: RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))] = {
if (partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) {
throw new SparkException("Default partitioner cannot partition array keys.")
val cg = new CoGroupedRDD[K](Seq(self, other1, other2, other3), partitioner)
cg.mapValues { case Array(vs, w1s, w2s, w3s) =&
(vs.asInstanceOf[Iterable[V]],
w1s.asInstanceOf[Iterable[W1]],
w2s.asInstanceOf[Iterable[W2]],
w3s.asInstanceOf[Iterable[W3]])
* For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
* list of values for that key in `this` as well as `other`.
def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner)
: RDD[(K, (Iterable[V], Iterable[W]))]
if (partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) {
throw new SparkException("Default partitioner cannot partition array keys.")
val cg = new CoGroupedRDD[K](Seq(self, other), partitioner)
cg.mapValues { case Array(vs, w1s) =&
(vs.asInstanceOf[Iterable[V]], w1s.asInstanceOf[Iterable[W]])
* For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a
* tuple with the list of values for that key in `this`, `other1` and `other2`.
def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], partitioner: Partitioner)
: RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] = {
if (partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) {
throw new SparkException("Default partitioner cannot partition array keys.")
val cg = new CoGroupedRDD[K](Seq(self, other1, other2), partitioner)
cg.mapValues { case Array(vs, w1s, w2s) =&
(vs.asInstanceOf[Iterable[V]],
w1s.asInstanceOf[Iterable[W1]],
w2s.asInstanceOf[Iterable[W2]])
* For each key k in `this` or `other1` or `other2` or `other3`,
* return a resulting RDD that contains a tuple with the list of values
* for that key in `this`, `other1`, `other2` and `other3`.
def cogroup[W1, W2, W3](other1: RDD[(K, W1)], other2: RDD[(K, W2)], other3: RDD[(K, W3)])
: RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))] = {
cogroup(other1, other2, other3, defaultPartitioner(self, other1, other2, other3))
* For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
* list of values for that key in `this` as well as `other`.
def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))] = {
cogroup(other, defaultPartitioner(self, other))
* For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a
* tuple with the list of values for that key in `this`, `other1` and `other2`.
def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)])
: RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] = {
cogroup(other1, other2, defaultPartitioner(self, other1, other2))
* For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
* list of values for that key in `this` as well as `other`.
def cogroup[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Iterable[V], Iterable[W]))] = {
cogroup(other, new HashPartitioner(numPartitions))
* For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a
* tuple with the list of values for that key in `this`, `other1` and `other2`.
def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], numPartitions: Int)
: RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] = {
cogroup(other1, other2, new HashPartitioner(numPartitions))
* For each key k in `this` or `other1` or `other2` or `other3`,
* return a resulting RDD that contains a tuple with the list of values
* for that key in `this`, `other1`, `other2` and `other3`.
def cogroup[W1, W2, W3](other1: RDD[(K, W1)],
other2: RDD[(K, W2)],
other3: RDD[(K, W3)],
numPartitions: Int)
: RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))] = {
cogroup(other1, other2, other3, new HashPartitioner(numPartitions))
join对两个需要连接的RDD进行cogroup函数操作。cogroup操作之后形成的新RDD,对每个key下的元素进行笛卡尔积操作,返回的结果再展平,对应Key下的所有元组形成一个集合,最后返回RDD[(K,(V,W))]。join的本质是通过cogroup算子先进行协同划分,再通过flatMapValues将合并的数据打散。
对两个RDD的join操作示意图。 大方框代表RDD,小方框代表RDD中的分区。函数对拥有相同Key的元素(例如V1)为Key,以做连接后的数据结果为(V1,(1,1))和(V1,(1,2))。
* Return an RDD containing all pairs of elements with matching keys in `this` and `other`. Each
* pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and
* (k, v2) is in `other`. Uses the given Partitioner to partition the output RDD.
def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = {
this.cogroup(other, partitioner).flatMapValues( pair =&
for (v &- pair._1. w &- pair._2.iterator) yield (v, w)
(2)leftOuterJoin和rightOuterJoin
LeftOuterJoin(左外连接)和RightOuterJoin(右外连接)相当于在join的基础上先判断一侧的RDD元素是否为空,如果为空,则填充为空。 如果不为空,则将数据进行连接运算,并返回结果。
* Perform a left outer join of `this` and `other`. For each element (k, v) in `this`, the
* resulting RDD will either contain all pairs (k, (v, Some(w))) for w in `other`, or the
* pair (k, (v, None)) if no elements in `other` have key k. Uses the given Partitioner to
* partition the output RDD.
def leftOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, Option[W]))] = {
this.cogroup(other, partitioner).flatMapValues { pair =&
if (pair._2.isEmpty) {
pair._1.iterator.map(v =& (v, None))
for (v &- pair._1. w &- pair._2.iterator) yield (v, Some(w))
* Perform a right outer join of `this` and `other`. For each element (k, w) in `other`, the
* resulting RDD will either contain all pairs (k, (Some(v), w)) for v in `this`, or the
* pair (k, (None, w)) if no elements in `this` have key k. Uses the given Partitioner to
* partition the output RDD.
def rightOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner)
: RDD[(K, (Option[V], W))] = {
this.cogroup(other, partitioner).flatMapValues { pair =&
if (pair._1.isEmpty) {
pair._2.iterator.map(w =& (None, w))
for (v &- pair._1. w &- pair._2.iterator) yield (Some(v), w)
转载请注明作者Jason Ding及其出处Google搜索jasonding1354进入我的博客主页
如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!
打开微信“扫一扫”,打开网页后点击屏幕右上角分享按钮
被以下专题收入,发现更多相似内容:
Spark深入学习专题旨在通过高质量的文章对Spark相关技术进行研究学习
· 741人关注
本专题介绍Scala编程的相关知识及应用实践
· 481人关注
如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!
选择支付方式:微信搜索bigdata029 | 公益:
| 赞助作者:
关键字:Spark算子、Spark RDD键值转换、cogroup、join
##参数为1个RDD
def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))]
def cogroup[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Iterable[V], Iterable[W]))]
def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Iterable[V], Iterable[W]))]
##参数为2个RDD
def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)]): RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))]
def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], numPartitions: Int): RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))]
def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], partitioner: Partitioner): RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))]
##参数为3个RDD
def cogroup[W1, W2, W3](other1: RDD[(K, W1)], other2: RDD[(K, W2)], other3: RDD[(K, W3)]): RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))]
def cogroup[W1, W2, W3](other1: RDD[(K, W1)], other2: RDD[(K, W2)], other3: RDD[(K, W3)], numPartitions: Int): RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))]
def cogroup[W1, W2, W3](other1: RDD[(K, W1)], other2: RDD[(K, W2)], other3: RDD[(K, W3)], partitioner: Partitioner): RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))]
cogroup相当于SQL中的全外关联full outer join,返回左右RDD中的记录,关联不上的为空。
参数numPartitions用于指定结果的分区数。
参数partitioner用于指定分区函数。
##参数为1个RDD的例子
var rdd1 = sc.makeRDD(Array(("A","1"),("B","2"),("C","3")),2)
var rdd2 = sc.makeRDD(Array(("A","a"),("C","c"),("D","d")),2)
scala& var rdd3 = rdd1.cogroup(rdd2)
rdd3: org.apache.spark.rdd.RDD[(String, (Iterable[String], Iterable[String]))] = MapPartitionsRDD[12] at cogroup at :25
scala& rdd3.partitions.size
res3: Int = 2
scala& rdd3.collect
res1: Array[(String, (Iterable[String], Iterable[String]))] = Array(
(B,(CompactBuffer(2),CompactBuffer())),
(D,(CompactBuffer(),CompactBuffer(d))),
(A,(CompactBuffer(1),CompactBuffer(a))),
(C,(CompactBuffer(3),CompactBuffer(c)))
scala& var rdd4 = rdd1.cogroup(rdd2,3)
rdd4: org.apache.spark.rdd.RDD[(String, (Iterable[String], Iterable[String]))] = MapPartitionsRDD[14] at cogroup at :25
scala& rdd4.partitions.size
res5: Int = 3
scala& rdd4.collect
res6: Array[(String, (Iterable[String], Iterable[String]))] = Array(
(B,(CompactBuffer(2),CompactBuffer())),
(C,(CompactBuffer(3),CompactBuffer(c))),
(A,(CompactBuffer(1),CompactBuffer(a))),
(D,(CompactBuffer(),CompactBuffer(d))))
##参数为2个RDD的例子
var rdd1 = sc.makeRDD(Array(("A","1"),("B","2"),("C","3")),2)
var rdd2 = sc.makeRDD(Array(("A","a"),("C","c"),("D","d")),2)
var rdd3 = sc.makeRDD(Array(("A","A"),("E","E")),2)
scala& var rdd4 = rdd1.cogroup(rdd2,rdd3)
rdd4: org.apache.spark.rdd.RDD[(String, (Iterable[String], Iterable[String], Iterable[String]))] =
MapPartitionsRDD[17] at cogroup at :27
scala& rdd4.partitions.size
res7: Int = 2
scala& rdd4.collect
res9: Array[(String, (Iterable[String], Iterable[String], Iterable[String]))] = Array(
(B,(CompactBuffer(2),CompactBuffer(),CompactBuffer())),
(D,(CompactBuffer(),CompactBuffer(d),CompactBuffer())),
(A,(CompactBuffer(1),CompactBuffer(a),CompactBuffer(A))),
(C,(CompactBuffer(3),CompactBuffer(c),CompactBuffer())),
(E,(CompactBuffer(),CompactBuffer(),CompactBuffer(E))))
##参数为3个RDD示例略,同上。
def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]
def join[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, W))]
def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))]
join相当于SQL中的内关联join,只返回两个RDD根据K可以关联上的结果,join只能用于两个RDD之间的关联,如果要多个RDD关联,多关联几次即可。
参数numPartitions用于指定结果的分区数
参数partitioner用于指定分区函数
var rdd1 = sc.makeRDD(Array(("A","1"),("B","2"),("C","3")),2)
var rdd2 = sc.makeRDD(Array(("A","a"),("C","c"),("D","d")),2)
scala& rdd1.join(rdd2).collect
res10: Array[(String, (String, String))] = Array((A,(1,a)), (C,(3,c)))
更多关于Spark算子的介绍,可参考
如果觉得本博客对您有帮助,请
。转载请注明: &}

我要回帖

更多关于 scala rdd转dataframe 的文章

更多推荐

版权声明:文章内容来源于网络,版权归原作者所有,如有侵权请点击这里与我们联系,我们将及时删除。

点击添加站长微信