spark dataframe mapmap函数 =>是什么意思

Spark API 详解/大白话解释 之 map、mapPartitions、mapValues、mapWith、flatMap、flatMapWith、fla - 一元钱计划 - ITeye技术网站
博客分类:
map(function) map是对RDD中的每个元素都执行一个指定的函数来产生一个新的RDD。任何原RDD中的元素在新RDD中都有且只有一个元素与之对应。
val a = sc.parallelize(1 to 9, 3)
val b = a.map(x =& x*2)//x =& x*2是一个函数,x是传入参数即RDD的每个元素,x*2是返回值
//结果Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9)
//结果Array[Int] = Array(2, 4, 6, 8, 10, 12, 14, 16, 18)
当然map也可以把Key变成Key-Value对
val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", " eagle"), 2)
val b = a.map(x =& (x, 1))
b.collect.foreach(println(_))
(panther,1)
( eagle,1)
mapPartitions(function) map()的输入函数是应用于RDD中每个元素,而mapPartitions()的输入函数是应用于每个分区
package test
import scala.Iterator
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
object TestRdd {
def sumOfEveryPartition(input: Iterator[Int]): Int = {
var total = 0
input.foreach { elem =&
total += elem
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("Spark Rdd Test")
val spark = new SparkContext(conf)
val input = spark.parallelize(List(1, 2, 3, 4, 5, 6), 2)//RDD有6个元素,分成2个partition
val result = input.mapPartitions(
partition =& Iterator(sumOfEveryPartition(partition)))//partition是传入的参数,是个list,要求返回也是list,即Iterator(sumOfEveryPartition(partition))
result.collect().foreach {
println(_)//6 15
spark.stop()
mapValues(function) 原RDD中的Key保持不变,与新的Value一起组成新的RDD中的元素。因此,该函数只适用于元素为KV对的RDD
val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", " eagle"), 2)
val b = a.map(x =& (x.length, x))
b.mapValues("x" + _ + "x").collect
//"x" + _ + "x"等同于everyInput =&"x" + everyInput + "x" //结果 Array( (3,xdogx), (5,xtigerx), (4,xlionx), (3,xcatx), (7,xpantherx), (5,xeaglex) )
mapWith和flatMapWith 感觉用得不多,参考
flatMap(function) 与map类似,区别是原RDD中的元素经map处理后只能生成一个元素,而原RDD中的元素经flatmap处理后可生成多个元素
val a = sc.parallelize(1 to 4, 2)
val b = a.flatMap(x =& 1 to x)//每个元素扩展
Array[Int] = Array( 1,
1, 2, 3, 4)
flatMapValues(function)
val a = sc.parallelize(List((1,2),(3,4),(5,6)))
val b = a.flatMapValues(x=&1 to x)
b.collect.foreach(println(_))
浏览: 181460 次
来自: 北京
遇到这个问题我用的sed -i 's/\r$//' /mnt/ ...
学习了,支持
这是你直接翻译过来的???spark map flatMap flatMapToPair mapPartitions 的区别和用途
spark map flatMap flatMapToPair mapPartitions 的区别和用途
[摘要:import akka.japi.Function2;
import org.apache.spark.HashP
import org.apache.spark.SparkC
import org.apache.spark.api.java.JavaPairRDD;
import org.]
import akka.japi.Function2;
import org.apache.spark.HashP
import org.apache.spark.SparkC
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkC
import org.apache.spark.api.java.function.FlatMapF
import org.apache.spark.api.java.function.F
import org.apache.spark.api.java.function.PairFlatMapF
import org.apache.spark.storage.StorageL
import scala.Tuple2;
import java.io.F
import java.io.S
import java.util.ArrayL
import java.util.A
import java.util.I
import java.util.L
* map flatMap flatMapToPair mapPartitions 的区别和用途
* 例如数据是:name:gaoyue age:28
* 方法一:map,我们可以看到数据的每一行在map之后产生了一个数组,那么rdd存储的是一个数组的集合
* rdd存储的状态是Array[Array[String]] = Array(Array(name, gaoyue), Array(age, 28))
*Array[String] = Array(name, gaoyue, age, 28)
JavaRDD&String[]& mapresult=lines.map(new Function&String, String[]&() {
public String[] call(String s) throws Exception {
return s.split(&:&);
* 方法二:flatMap
* 操作1:同map函数一样:对每一条输入进行指定的操作,然后为每一条输入返回一个对象
* 操作2:最后将所有对象合并为一个对象
JavaRDD&String& objectJavaRDD = lines.flatMap(new FlatMapFunction&String, String&() {
public Iterable&String& call(String s) throws Exception {
return Arrays.asList(s.split(& &));
* 方法三:
* mappartition
*rdd的mapPartitions是map的一个变种,它们都可进行分区的并行处理。两者的主要区别是调用的粒度不一样:
* map的输入变换函数是应用于RDD中每个元素,而mapPartitions的输入函数是应用于每个分区。也就是把每个分区中的内容作为整体来处理的。
lines2.mapPartitions(new FlatMapFunction&Iterator&String&, String&() {
ArrayList&String& results = new ArrayList&String&();
public Iterable&String& call(Iterator&String& s) throws Exception {
while (s.hasNext()) {
results.addAll(Arrays.asList(s.next().split(&:&)));
}).saveAsTextFile(&/Users/luoluowushengmimi/Documents/result&);
* flatMapToPair
* 操作1:同map函数一样:对每一条输入进行指定的操作,然后为每一条输入返回一个key-value对象
* 操作2:最后将所有key-value对象合并为一个对象 Iterable&Tuple2&String, String&&
JavaPairRDD&String,String& pair=lines.flatMapToPair(new PairFlatMapFunction&String, String, String&() {
public Iterable&Tuple2&String, String&& call(String s) throws Exception {
String[] temp=s.split(&:&);
ArrayList&Tuple2&String,String&& list=new ArrayList&Tuple2&String,String&&();
list.add(new Tuple2&String,String&(temp[0],temp[1]));
感谢关注 Ithao123精品文库频道,是专门为互联网人打造的学习交流平台,全面满足互联网人工作与学习需求,更多互联网资讯尽在 IThao123!
Laravel是一套简洁、优雅的PHP Web开发框架(PHP Web Framework)。它可以让你从面条一样杂乱的代码中解脱出来;它可以帮你构建一个完美的网络APP,而且每行代码都可以简洁、富于表达力。
产品设计是互联网产品经理的核心能力,一个好的产品经理一定在产品设计方面有扎实的功底,本专题将从互联网产品设计的几个方面谈谈产品设计
随着国内互联网的发展,产品经理岗位需求大幅增加,在国内,从事产品工作的大部分岗位为产品经理,其实现实中,很多从事产品工作的岗位是不能称为产品经理,主要原因是对产品经理的职责不明确,那产品经理的职责有哪些,本专题将详细介绍产品经理的主要职责
Swift是Apple在WWDC2014所发布的一门编程语言,用来撰写OS X和iOS应用程序[1]。在设计Swift时.就有意和Objective-C共存,Objective-C是Apple操作系统在导入Swift前使用的编程语言
Swift是供iOS和OS X应用编程的新编程语言,基于C和Objective-C,而却没有C的一些兼容约束。Swift采用了安全的编程模式和添加现代的功能来使得编程更加简单、灵活和有趣。界面则基于广受人民群众爱戴的Cocoa和Cocoa Touch框架,展示了软件开发的新方向。
IThao123周刊spark 常用函数介绍(python)
spark 常用函数介绍(python)
发布时间: 20:16:37
编辑:www.fx114.net
本篇文章主要介绍了"spark 常用函数介绍(python) ",主要涉及到spark 常用函数介绍(python) 方面的内容,对于spark 常用函数介绍(python) 感兴趣的同学可以参考一下。
以下是个人理解,一切以官网文档为准。
在开始之前,我先介绍一下,RDD是什么?
&&&&& RDD是Spark中的抽象数据结构类型,任何数据在Spark中都被表示为RDD。从编程的角度来看,RDD可以简单看成是一个数组。和普通数组的区别是,RDD中的数据是分区存储的,这样不同分区的数据就可以分布在不同的机器上,同时可以被并行处理。因此,Spark应用程序所做的无非是把需要处理的数据转换为RDD,然后对RDD进行一系列的变换和操作从而得到结果。
  创建RDD:
&&& sc.parallelize([1,2,3,4,5], 3)
#意思是将数组中的元素转换为RDD,并且存储在3个分区上[1]、[2,3]、[4,5]。如果是4个分区:[1]、[2]、[3]、[4,5]
&  上面这种是数组创建,也可以从文件系统或者HDFS中的文件创建出来,后面会讲到。
只要搞懂了spark的函数们,你就成功了一大半。
spark的函数主要分两类,Transformations和Actions。Transformations为一些数据转换类函数,actions为一些行动类函数:
转换:转换的返回值是一个新的RDD集合,而不是单个值。调用一个变换方法,不会有任何求值计算,它只获取一个RDD作为参数,然后返回一个新的RDD。
行动:行动操作计算并返回一个新的值。当在一个RDD对象上调用行动函数时,会在这一时刻计算全部的数据处理查询并返回结果值。
下面介绍spark常用的Transformations, Actions函数:
Transformations
map(func [, preservesPartitioning=False])& --- 返回一个新的分布式数据集,这个数据集中的每个元素都是经过func函数处理过的。
&&& data = [1,2,3,4,5]
&&& distData = sc.parallelize(data).map(lambda x: x+1).collect()
#结果:[2,3,4,5,6]
filter(func)& --- 返回一个新的数据集,这个数据集中的元素是通过func函数筛选后返回为true的元素(简单的说就是,对数据集中的每个元素进行筛选,如果符合条件则返回true,不符合返回false,最后将返回为true的元素组成新的数据集返回)。
&&& rdd = sc.parallelize(data).filter(lambda x:x%2==0).collect()
#结果:[2, 4]
flatMap(func [, preservesPartitioning=False])& --- 类似于map(func), 但是不同的是map对每个元素处理完后返回与原数据集相同元素数量的数据集,而flatMap返回的元素数不一定和原数据集相同。each input item can be mapped to 0 or more output items (so&funcshould return a Seq rather than a single item)
#### for flatMap()
&&& rdd = sc.parallelize([2,3,4])
&&& sorted(rdd.flatMap(lambda x: range(1,x)).collect())
#结果:[1, 1, 1, 2, 2, 3]
&&& sorted(rdd.flatMap(lambda x:[(x,x), (x,x)]).collect())
#结果:[(2, 2), (2, 2), (3, 3), (3, 3), (4, 4), (4, 4)]
#### for map()
&&& rdd = sc.parallelize([2,3,4])
&&& sorted(rdd.flatMap(lambda x: range(1,x)).collect())
#结果:[[1], [1, 2], [1, 2, 3]]
&&& sorted(rdd.flatMap(lambda x:[(x,x), (x,x)]).collect())
#结果:[[(2, 2), (2, 2)], [(3, 3), (3, 3)], [(4, 4), (4, 4)]]
mapPartitions(func [, preservesPartitioning=False])& ---mapPartitions是map的一个变种。map的输入函数是应用于RDD中每个元素,而mapPartitions的输入函数是应用于每个分区,也就是把每个分区中的内容作为整体来处理的。
&&& rdd = sc.parallelize([1,2,3,4,5], 3)
&&& def f(iterator): yield sum(iterator)
&&& rdd.mapPartitions(f).collect()
#结果:[1,5,9]
mapPartitionsWithIndex(func [, preservesPartitioning=False])& ---Similar to&mapPartitions, but takes two parameters. The first parameter is the index of the partition and the second is an iterator through all the items within this partition. The output is an iterator containing the list of items after applying whatever transformation the function encodes.
&&& rdd = sc.parallelize([1,2,3,4,5], 3)
&&& def f(splitIndex, iterator): yield splitIndex
&&& rdd.mapPartitionsWithIndex(f).collect()
#结果:[0,1,2]
#三个分区的索引
reduceByKey(func [,&numPartitions=None,&partitionFunc=&function portable_hash at 0x7fa664f3cb90&])& --- reduceByKey就是对元素为kv对的RDD中Key相同的元素的value进行reduce,因此,key相同的多个元素的值被reduce为一个值,然后与原RDD中的key组成一个新的kv对。
&&& from operator import add
&&& rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
&&& sorted(rdd.reduceByKey(add).collect())
&&& #或者 sorted(rdd.reduceByKey(lambda a,b:a+b).collect())
#结果:[('a', 2), ('b', 1)]
aggregateByKey(zeroValue)(seqOp, combOp [, numPartitions=None])& ---
sortByKey([ascending=True, numPartitions=None, keyfunc=&function &lambda& at 0x7fa&])& --- 返回排序后的数据集。该函数就是队kv对的RDD数据进行排序,keyfunc是对key进行处理的函数,如非需要,不用管。
&&& tmp = [('a', 1), ('b', 2), ('1', 3), ('D', 4)]
&&& sc.parallelize(tmp).sortByKey(True, 1).collect()
#结果: [('1', 3), ('D', 4), ('a', 1), ('b', 2)]
&&& sc.parallelize(tmp).sortByKey(True, 2, keyfunc=lambda k:k.lower()).collect()
#结果:[('1', 3), ('a', 1), ('b', 2), ('D', 4)]
#注意,比较两个结果可看出,keyfunc对键的处理只是在数据处理的过程中起作用,不能真正的去改变键名
join(otherDataset [, numPartitions=None])& --- join就是对元素为kv对的RDD中key相同的value收集到一起组成(v1,v2),然后与原RDD中的key组合成一个新的kv对,返回。
&&& x = sc.parallelize([("a", 1), ("b", 4)])
&&& y = sc.parallelize([("a", 2), ("a", 3)])
&&& sorted(x.join(y).collect())
#结果:[('a', (1, 2)), ('a', (1, 3))]
cartesian(otherDataset)& --- 返回一个笛卡尔积的数据集,这个数据集是通过计算两个RDDs得到的。
&&& x = sc.parallelize([1,2,3])
&&& y = sc.parallelize([4,5])
&&& x.cartesian(y).collect()
#结果:[(1, 4), (1, 5), (2, 4), (2, 5), (3, 4), (3, 5)]
Action (这里只讲支持python的,java和scala的后面用到了在做详解,当然支持python就一定支持java和scala)
reduce(func)& --- reduce将RDD中元素两两传递给输入函数,同时产生一个新的值,新产生的值与RDD中下一个元素再被传递给输入函数直到最后只有一个值为止。
&&& from operator import add
&&& sc.parallelize([1,2,3,4,5]).reduce(add)
# 结果:15
collect()& --- 返回RDD中的数据,以list形式。
&&& sc.parallelize([1,2,3,4,5]).collect()
#结果:[1,2,3,4,5]
count()& --- 返回RDD中的元素个数。
&&& sc.parallelize([1,2,3,4,5]).count
#结果:5
first()& --- 返回RDD中的第一个元素。
&&& sc.parallelize([1,2,3,4,5]).first()
#结果:1
take(n)& --- 返回RDD中前n个元素。
&&& sc.parallelize([1,2,3,4,5]).take(2)
#结果:[1,2]
takeOrdered(n [, key=None])& --- 返回RDD中前n个元素,但是是升序(默认)排列后的前n个元素,或者是通过key函数指定后的RDD(这个key我也没理解透,后面在做详解)
&&& sc.parallelize([9,7,3,2,6,4]).takeOrdered(3)
#结果:[2,3,4]
&&& sc.parallelize([9,7,3,2,6,4]).takeOrdered(3, key=lambda x:-x)
#结果:[9,7,6]
saveAsTextFile(path [, compressionCodecClass=None])& --- 该函数将RDD保存到文件系统里面,并且将其转换为文本行的文件中的每个元素调用 tostring 方法。
parameters:& path - 保存于文件系统的路径
       compressionCodecClass - (None by default) string i.e. &org.apache.press.GzipCodec&
&&& tempFile = NamedTemporaryFile(delete=True)
&&& tempFile.close()
&&& sc.parallelize(range(10)).saveAsTextFile(tempFile.name)
&&& from fileinput import input
&&& from glob import glob
&&& ''.join(sorted(input(glob(tempFile.name + "/part-0000*"))))
'0\n1\n2\n3\n4\n5\n6\n7\n8\n9\n'
&Empty lines are tolerated when saving to text files:
&&& tempFile2 = NamedTemporaryFile(delete=True)
&&& tempFile2.close()
&&& sc.parallelize(['', 'foo', '', 'bar', '']).saveAsTextFile(tempFile2.name)
&&& ''.join(sorted(input(glob(tempFile2.name + "/part-0000*"))))
'\n\n\nbar\nfoo\n'
&Using compressionCodecClass:
&&& tempFile3 = NamedTemporaryFile(delete=True)
&&& tempFile3.close()
&&& codec = "org.apache.press.GzipCodec"
&&& sc.parallelize(['foo', 'bar']).saveAsTextFile(tempFile3.name, codec)
&&& from fileinput import input, hook_compressed
&&& result = sorted(input(glob(tempFile3.name + "/part*.gz"), openhook=hook_compressed))
&&& b''.join(result).decode('utf-8')
u'bar\nfoo\n'
countByKey()& --- 返回一个字典(key,count),该函数操作数据集为kv形式的数据,用于统计RDD中拥有相同key的元素个数。
&&& defdict = sc.parallelize([("a",1), ("b",1), ("a", 1)]).countByKey()
&&& defdict
#结果:defaultdict(&type 'int'&, {'a': 2, 'b': 1})
&&& defdict.items()
#结果:[('a', 2), ('b', 1)]
countByValue()& --- 返回一个字典(value,count),该函数操作一个list数据集,用于统计RDD中拥有相同value的元素个数。
&&& sc.parallelize([1,2,3,1,2,5,3,2,3,2]).countByValue().items()
#结果:[(1, 2), (2, 4), (3, 3), (5, 1)]
foreach(func)& --- 运行函数func来处理RDD中的每个元素,这个函数常被用来updating an Accumulator或者与外部存储系统的交互。
&&& def f(x): print(x)
&&& sc.parallelize([1, 2, 3, 4, 5]).foreach(f)
#note: 打印是随机的,并不是一定按1,2,3,4,5的顺序打印
本文标题:
本页链接:以前总是分不清楚spark中flatmap和map的区别,现在弄明白了,总结分享给大家,先看看flatmap和map的定义。map()是将函数用于RDD中的每个元素,将返回值构成新的RDD。flatmap()是将函数应用于RDD中的每个元素,将返回的迭代器的所有内容构成新的RDD有些拗口,看看例子就明白了。val&rdd&=&sc.parallelize(List("coffee&panda","happy&panda","happiest&panda&party"))输入rdd.map(x=&x).collect结果res9:&Array[String]&=&Array(coffee&panda,&happy&panda,&happiest&panda&party)输入rdd.flatMap(x=&x.split("&")).collect结果res8:&Array[String]&=&Array(coffee,&panda,&happy,&panda,&happiest,&panda,&party)flatMap说明白就是先map然后再flat,再来看个例子val&rdd1&=&sc.parallelize(List(1,2,3,3))scala&&rdd1.map(x=&x+1).collect
res10:&Array[Int]&=&Array(2,&3,&4,&4)scala&&rdd1.flatMap(x=&x.to(3)).collect
res11:&Array[Int]&=&Array(1,&2,&3,&2,&3,&3,&3)这下应该完全明白了吧,不懂给我留言,欢迎指正。
以上就介绍了spark快速大数据分析之读书笔记-flatmap与map的区别,包括了方面的内容,希望对其他编程教程有兴趣的朋友有所帮助。
本文网址链接:/article/detail_334333.html
上一篇: 下一篇:影响到Spark输出RDD分区的操作函数 – 过往记忆
欢迎关注Hadoop、Spark、FlinkHive、Hbase、Flume等大数据资料分享微信公共账号:iteblog_hadoop。
文章总数:733
浏览总数:8,584,012
评论:4610
分类目录:85 个
注册用户数:2221
最后更新:日
欢迎关注微信公共帐号:iteblog_hadoop
IT技术前沿:geek_toutiao
  下面的操作会影响到输出RDD分区(partitioner)的:
  cogroup, groupWith, join, leftOuterJoin, rightOuterJoin, groupByKey, reduceByKey, combineByKey, partitionBy, sort, mapValues (如果父RDD存在partitioner), flatMapValues(如果父RDD存在partitioner), 和 filter (如果父RDD存在partitioner)。其他的transform操作不会影响到输出RDD的partitioner,一般来说是None,也就是没有partitioner。下面举个例子进行说明:
scala& val pairs = sc.parallelize(List((1, 1), (2, 2), (3, 3)))
pairs: org.apache.spark.rdd.RDD[(Int, Int)] =
ParallelCollectionRDD[4] at parallelize at &console&:12
scala& val a = sc.parallelize(List(2,51,2,7,3))
a: org.apache.spark.rdd.RDD[Int] =
ParallelCollectionRDD[5] at parallelize at &console&:12
scala& val a = sc.parallelize(List(2,51,2))
a: org.apache.spark.rdd.RDD[Int] =
ParallelCollectionRDD[6] at parallelize at &console&:12
scala& val b = sc.parallelize(List(3,1,4))
b: org.apache.spark.rdd.RDD[Int] =
ParallelCollectionRDD[7] at parallelize at &console&:12
scala& val c = a.zip(b)
c: org.apache.spark.rdd.RDD[(Int, Int)] =
ZippedPartitionsRDD2[8] at zip at &console&:16
scala& val result = pairs.join(c)
result: org.apache.spark.rdd.RDD[(Int, (Int, Int))] =
FlatMappedValuesRDD[11] at join at &console&:20
scala& result.partitioner
res6: Option[org.apache.spark.Partitioner] = Some(org.apache.spark.HashPartitioner@2)
  大家可以看到输出来的RDD result分区变成了HashPartitioner,因为join中的两个分区都没有设置分区,所以默认用到了HashPartitioner,可以看join的实现:
def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))] = {
join(other, defaultPartitioner(self, other))
def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = {
val bySize = (Seq(rdd) ++ others).sortBy(_.partitions.size).reverse
for (r &- bySize if r.partitioner.isDefined) {
return r.partitioner.get
if (rdd.context.conf.contains(&spark.default.parallelism&)) {
new HashPartitioner(rdd.context.defaultParallelism)
new HashPartitioner(bySize.head.partitions.size)
  defaultPartitioner函数就确定了结果RDD的分区。从上面的实现可以看到,
  1、join的两个RDD如果都没有partitioner,那么join结果RDD将使用HashPartitioner;
  2、如果两个RDD中其中有一个有partitioner,那么join结果RDD将使用那个父RDD的partitioner;
  3、如果两个RDD都有partitioner,那么join结果RDD就使用调用join的那个RDD的partitioner。
本博客文章除特别声明,全部都是原创!
禁止个人和公司转载本文、谢谢理解:
下面文章您可能感兴趣}

我要回帖

更多关于 spark map 自定义函数 的文章

更多推荐

版权声明:文章内容来源于网络,版权归原作者所有,如有侵权请点击这里与我们联系,我们将及时删除。

点击添加站长微信