如何将spark mapreducee 转化为 Spark

当前位置: &&>> 阅读正文
Author: Dong
- 359,966 阅 - 273,613 阅 - 261,861 阅 - 247,109 阅 - 245,229 阅 - 243,153 阅 - 223,096 阅 - 214,590 阅 - 211,832 阅 - 204,386 阅
collapsItems['collapsArch-'] = '
collapsItems['collapsArch-'] = '
collapsItems['collapsArch-'] = '
collapsItems['collapsArch-'] = '
collapsItems['collapsArch-'] = '
collapsItems['collapsArch-'] = '
collapsItems['collapsArch-'] = '
collapsItems['collapsArch-'] = '
collapsItems['collapsArch-'] = '
collapsItems['collapsArch-'] = '
collapsItems['collapsArch-'] = '
collapsItems['collapsArch-'] = '
collapsItems['collapsArch-'] = '
collapsItems['collapsArch-'] = '
collapsItems['collapsArch-'] = '
collapsItems['collapsArch-'] = '
collapsItems['collapsArch-'] = '
collapsItems['collapsArch-'] = '
collapsItems['collapsArch-'] = '
collapsItems['collapsArch-'] = '
collapsItems['collapsArch-'] = '
collapsItems['collapsArch-'] = '
collapsItems['collapsArch-'] = '
collapsItems['collapsArch-'] = '
collapsItems['collapsArch-'] = '
collapsItems['collapsArch-'] = '
collapsItems['collapsArch-'] = '
collapsItems['collapsArch-'] = '
collapsItems['collapsArch-'] = '
collapsItems['collapsArch-'] = '
collapsItems['collapsArch-'] = '
collapsItems['collapsArch-'] = '
collapsItems['collapsArch-'] = '
collapsItems['collapsArch-'] = '
collapsItems['collapsArch-'] = '
collapsItems['collapsArch-'] = '
collapsItems['collapsArch-'] = '如何用Spark来实现已有的MapReduce程序_百度知道
如何用Spark来实现已有的MapReduce程序
提问者采纳
  MapReduce从出现以来,已经成为Apache Hadoop计算范式的扛鼎之作。它对于符合其设计的各项工作堪称完美:大规模日志处理,ETL批处理操作等。  随着Hadoop使用范围的不断扩大,人们已经清楚知道MapReduce不是所有计算的最佳框架。Hadoop 2将资源管理器YARN作为自己的顶级组件,为其他计算引擎的接入提供了可能性。如Impala等非MapReduce架构的引入,使平台具备了支持交互式SQL的能力。  今天,Apache Spark是另一种这样的替代,并且被称为是超越MapReduce的通用计算范例。也许您会好奇:MapReduce一直以来已经这么有用了,怎么能突然被取代?毕竟,还有很多ETL这样的工作需要在Hadoop上进行,即使该平台目前也已经拥有其他实时功能。  值得庆幸的是,在Spark上重新实现MapReduce一样的计算是完全可能的。它们可以被更简单的维护,而且在某些情况下更快速,这要归功于Spark优化了刷写数据到磁盘的过程。Spark重新实现MapReduce编程范式不过是回归本源。Spark模仿了Scala的函数式编程风格和API。而MapReduce的想法来自于函数式编程语言LISP。  尽管Spark的主要抽象是RDD(弹性分布式数据集),实现了Map,reduce等操作,但这些都不是Hadoop的Mapper或Reducer API的直接模拟。这些转变也往往成为开发者从Mapper和Reducer类平行迁移到Spark的绊脚石。  与Scala或Spark中经典函数语言实现的map和reduce函数相比,原有Hadoop提供的Mapper和Reducer API 更灵活也更复杂。这些区别对于习惯了MapReduce的开发者而言也许并不明显,下列行为是针对Hadoop的实现而不是MapReduce的抽象概念:  · Mapper和Reducer总是使用键值对作为输入输出。  · 每个Reducer按照Key对Value进行reduce。  · 每个Mapper和Reducer对于每组输入可能产生0个,1个或多个键值对。  · Mapper和Reducer可能产生任意的keys和values,而不局限于输入的子集和变换。  Mapper和Reducer对象的生命周期可能横跨多个map和reduce操作。它们支持setup和cleanup方法,在批量记录处理开始之前和结束之后被调用。  本文将简要展示怎样在Spark中重现以上过程,您将发现不需要逐字翻译Mapper和Reducer!  作为元组的键值对  假定我们需要计算大文本中每一行的长度,并且报告每个长度的行数。在HadoopMapReduce中,我们首先使用一个Mapper,生成为以行的长度作为key,1作为value的键值对。  public class
LineLengthMapper extends  Mapper&LongWritable, Text,
IntWritable, IntWritable& {  @Override  protected void map(LongWritable lineNumber,
Text line, Context context)  throws IOException,
InterruptedException {  context.write(new
IntWritable(line.getLength()), new IntWritable(1));  }  }  值得注意的是Mappers和Reducers只对键值对进行操作。所以由TextInputFormat提供输入给LineLengthMapper,实际上也是以文本中位置为key(很少这么用,但是总是需要有东西作为Key),文本行为值的键值对。  与之对应的Spark实现:  lines.map(line =& (line.length, 1))  Spark中,输入只是String构成的RDD,而不是key-value键值对。Spark中对key-value键值对的表示是一个Scala的元组,用(A,B)这样的语法来创建。上面的map操作的结果是(Int,Int)元组的RDD。当一个RDD包含很多元组,它获得了多个方法,如reduceByKey,这对再现MapReduce行为将是至关重要的。  Reduce  reduce()与reduceBykey()  统计行的长度的键值对,需要在Reducer中对每种长度作为key,计算其行数的总和作为value。  public class
LineLengthReducer extends  Reducer&IntWritable, IntWritable,
IntWritable, IntWritable& {  @Override  protected void reduce(IntWritable length,
Iterable&IntWritable& counts,  Context context) throws IOException,
InterruptedException {  int sum = 0;  for (IntWritable count : counts) {  sum += count.get();  }  context.write(length, new
IntWritable(sum));  }  }  Spark中与上述Mapper,Reducer对应的实现只要一行代码:  val lengthCounts = lines.map(line =&
(line.length, 1)).reduceByKey(_ + _)  Spark的RDD API有个reduce方法,但是它会将所有key-value键值对reduce为单个value。这并不是Hadoop MapReduce的行为,Spark中与之对应的是ReduceByKey。  另外,Reducer的Reduce方法接收多值流,并产生0,1或多个结果。而reduceByKey,它接受的是一个将两个值转化为一个值的函数,在这里,就是把两个数字映射到它们的和的简单加法函数。此关联函数可以被调用者用来reduce多个值到一个值。与Reducer方法相比,他是一个根据Key来Reduce Value的更简单而更精确的API。  Mapper  map() 与 flatMap()  现在,考虑一个统计以大写字母开头的单词的个数的算法。对于每行输入文本,Mapper可能产生0个,1个或多个键值对。  public class
CountUppercaseMapper extends  Mapper&LongWritable, Text, Text,
IntWritable& {  @Override  protected void map(LongWritable lineNumber,
Text line, Context context)  throws IOException,
InterruptedException {  for (String word :
line.toString().split(& &)) {  if
(Character.isUpperCase(word.charAt(0))) {  context.write(new Text(word), new
IntWritable(1));  }  }  }  }  Spark对应的写法:  lines.flatMap(  _.split(&
&).filter(word =& Character.isUpperCase(word(0))).map(word =&
(word,1))  )  简单的Spark map函数不适用于这种场景,因为map对于每个输入只能产生单个输出,但这个例子中一行需要产生多个输出。所以,和MapperAPI支持的相比,Spark的map函数语义更简单,应用范围更窄。  Spark的解决方案是首先将每行映射为一组输出值,这组值可能为空值或多值。随后会通过flatMap函数被扁平化。数组中的词会被过滤并被转化为函数中的元组。这个例子中,真正模仿Mapper行为的是flatMap,而不是map。  groupByKey()  写一个统计次数的reducer是简单的,在Spark中,reduceByKey可以被用来统计每个单词的总数。比如出于某种原因要求输出文件中每个单词都要显示为大写字母和其数量,在MapReduce中,实现如下:  public class
CountUppercaseReducer extends  Reducer&Text, IntWritable, Text,
IntWritable& {  @Override  protected void reduce(Text word,
Iterable&IntWritable& counts, Context context)  throws IOException,
InterruptedException {  int sum = 0;  for (IntWritable count : counts) {  sum += count.get();  }  context  .write(new
Text(word.toString().toUpperCase()), new IntWritable(sum));  }  }  但是redeceByKey不能单独在Spark中工作,因为他保留了原来的key。为了在Spark中模拟,我们需要一些更像Reducer API的操作。我们知道Reducer的reduce方法接受一个key和一组值,然后完成一组转换。groupByKey和一个连续的map操作能够达到这样的目标:  groupByKey().map {
case (word,ones) =& (word.toUpperCase, ones.sum) }  groupByKey只是将某一个key的所有值收集在一起,并且不提供reduce功能。以此为基础,任何转换都可以作用在key和一系列值上。此处,将key转变为大写字母,将values直接求和。  setup()和cleanup()  在MapReduce中,Mapper和Reducer可以声明一个setup方法,在处理输入之前执行,来进行分配数据库连接等昂贵资源,同时可以用cleanup函数可以释放资源。 订丹斥柑俪纺筹尸船建 public class
SetupCleanupMapper extends  Mapper&LongWritable, Text, Text,
IntWritable& {  private Connection dbC  @Override  protected void setup(Context context) {  dbConnection = ...;  }  ...  @Override  protected void cleanup(Context context) {  dbConnection.close();  }  }  Spark中的map和flatMap方法每次只能在一个input上操作,而且没有提供在转换大批值前后执行代码的方法,看起来,似乎可以直接将setup和cleanup代码放在Sparkmap函数调用之前和之后:  val dbConnection =
...  lines.map(...
dbConnection.createStatement(...) ...)  dbConnection.close()
// Wrong!  然而这种方法却不可行,原因在于:  ·
它将对象dbConnection放在map函数的闭包中,这需要他是可序列化的(比如,通过java.io.Serializable实现)。而数据库连接这种对象一般不能被序列化。  ·
map是一种转换,而不是操作,并且拖延执行。连接对象不能被及时关闭。  ·
即便如此,它也只能关闭driver上的连接,而不是释放被序列化拷贝版本分配的资源连接。  事实上,map和flatMap都不是Spark中Mapper的最接近的对应函数,Spark中Mapper的最接近的对应函数是十分重要的mapPartitions()方法,这个方法能够不仅完成单值对单值的映射,也能完成一组值对另一组值的映射,很像一个批映射(bulkmap)方法。这意味着mapPartitions()方法能够在开始时从本地分配资源,并在批映射结束时释放资源。  添加setup方法是简单的,添加cleanup会更困难,这是由于检测转换完成仍然是困难的。例如,这样是能工作的:  lines.mapPartitions
{ valueIterator =&  val dbConnection = ... // OK  val transformedIterator =
valueIterator.map(... dbConnection ...)  dbConnection.close() // Still wrong! May
not have evaluated iterator  transformedIterator  }  一个完整的范式应该看起来类似于:  lines.mapPartitions
{ valueIterator =&  if (valueIterator.isEmpty) {  Iterator[...]()  } else {  val dbConnection = ...  valueIterator.map { item =&  val transformedItem = ...  if (!valueIterator.hasNext) {  dbConnection.close()  }  transformedItem  }  }  }  虽然后者代码翻译注定不如前者优雅,但它确实能够完成工作。  flatMapPartitions方法并不存在,然而,可以通过调用mapPartitions,后面跟一个flatMap(a= & a)的调用达到同样效果。  带有setup和cleanup的Reducer对应只需仿照上述代码使用groupByKey后面跟一个mapPartition函数。  别急,等一下,还有更多  MapReduce的开发者会指出,还有更多的还没有被提及的API:  · MapReduce支持一种特殊类型的Reducer,也称为Combiner,可以从Mapper中减少洗牌(shuffled)数据大小。  · 它还支持同通过Partitioner实现的自定义分区,和通过分组Comparator实现的自定义分组。  · Context对象授予Counter API的访问权限以及它的累积统计。  · Reducer在其生命周期内一直能看到已排序好的key 。  ·
MapReduce有自己的Writable序列化方案。  · Mapper和Reducer可以一次发射多组输出。  ·
MapReduce有几十个调优参数。  有很多方法可以在Spark中实现这些方案,使用类似Accumulator的API,类似groupBy和在不同的这些方法中加入partitioner参数的方法,Java或Kryo序列化,缓存和更多。由于篇幅限制,在这篇文章中就不再累赘介绍了。  需要指出的是,MapReduce的概念仍然有用。只不过现在有了一个更强大的实现,并利用函数式语言,更好地匹配其功能性。理解Spark RDD API和原来的Mapper和ReducerAPI之间的差异,可以帮助开发者更好地理解所有这些函数的工作原理,以及理解如何利用Spark发挥其优势。
资深电脑人
其他类似问题
为您推荐:
mapreduce的相关知识
等待您来回答
下载知道APP
随时随地咨询
出门在外也不愁如何将 MapReduce 转化为 Spark_百度知道
如何将 MapReduce 转化为 Spark
}}But reduceByKey() by itself doesn’t quite work in Spark! groupByKey() works.,total) =&gt, to perhaps allocate an expensive resource like a database connection. It looks possible to simply put the setup and cleanup code before and after a call to map() in Spark, and release them when done
}}It’s worth noting that Mappers and Reducers only operate on key-value pairs.The solution in Spark is to first map each line to an array of output values.close() &#47, and does not apply a reduce function,Text, Spark could have simply transformed the keys after a call to reduceByKey, T&#47. It’s like a “bulk map” method. It is a simpler. The result of the map() operation above is an RDD of (Int, the map() function in Spark is simpler and narrower compared to what the Mapper API {
protected void reduce(IntWritable length. Here. Instead, a worker could run out of memory.Be careful, that is not even the most direct equivalent in Spark.map { case (word, it’s not necessarily the best choice in all cases. Merely map()-ing lines to arrays would produce an RDD of arrays as the result. In Hadoop MapReduce, reduceByKey() could be used to sum counts per word.close() &#47, followed by a call to flatMap(a =&gt,IntWritable& &quot, and then emits some transformation of those. This method does not map just one value to one other value. This is not what Hadoop MapReduce does.map() (word, the key is transformed to uppercase:1121314
lines.groupByKey():1
.)The Spark equivalent is, InterruptedException {
context, new IntWritable(sum)), but also collects all values for a key into memory. For each line LongWritable.. , Text line! May not have evaluated iterator
transformedIterator}A more complete formulation (HT Tobias Pfeiffer) is roughly, and flatMap() does exactly this.;
for (IntWritable count , and count of 1 is the value, for fun, but unlike before. (It’s rarely used, it’s necessary to sum the counts per length in a Reducer...flatMap(
_,Text,IntWritable&gt, as before.map(line =&gt, and a cleanup() method to release the resource, new IntWritable(sum)), it would only close the connection on the driver. To emulate this in Spark,IntWritable., narrower API for reducing values by key than what a Reducer exposes, instead consider counting the occurrences of only words beginning with an uppercase character.E/
protected void setup(Context context) {
dbConnection = , InterruptedException {
int sum = 0.toString(). The connection can’t be closed immediately here. So the input to LineLengthMapper, or have many values!valueIterator!However..toUpperCase());
}}The Spark map() and flatMap() methods only operate on one inIntWritable&gt.getLength()), it’s flatMap() that’s required to emulate such a Mapper.Serializable),IntWritable.
protected void cleanup(Context context) {
dbConnection, that’s.toUpperCase, in contrast,IntWritable&gt, because map() must produce exactly one output per input, Iterable&lt. &#47, and the values are directly summed.write(new Text(word, by implementing java. The result needs to be “flattened” afterward..reduceByKey(_ + _); &quot. When an RDD contains tuples.The equivalent of a Reducer with setup() and cleanup() is just a groupByKey() followed by a mapPartitions() call like the one above:1
lines. From there.setup() and cleanup()In MapReduce, new IntWritable(1)), it can be done.,Text.map(word =&gt, though, but it will reduce the entire set of key-value pairs to one single value,T (word.: line.: counts) {
sum += count. Again, along with a count..Adding setup code is simple, Context context)
throws IOException, Context context)
throws IOException, new IntWritable(1));)) {
if (Character.Mapper and map() versus flatMap()Now,b) syntax shown above, with position within the file thrown in as a key.filter(word =&LongWritable, not of key-value pairs,Text.write(new Text(word).split(&quot, 1 or more results. And in Spark, the input is an RDD of Strings only, a Mapper and Reducer can declare a setup() method.map(line =& see groupByKey() below,IntWritable.isEmpty) {
Iterator[;
context:112
public class CountUppercaseMapper
extends Mapper& OK
val transformedIterator = valueIterator.reduceByKey(_ + _)Spark’s RDD API has a reduce() method, but rather maps an Iterator of values to an Iterator of other values., this begins with a Mapper that produces key-value pairs in which the line length is the key. Recall that Reducer’s reduce() method receives a key and Iterable of values, which requires that it be serializable (
}}The equivalent of the Mapper and Reducer above together is a one-liner in Spark, a Mapper might emit 0:112
public class CountUppercaseReducer
extends Reducer&lt,Int) tuples, this does not work.close()
transformedItem
}}Although decidedly less elegant than previous translations, something has to be the key.map(, and produces 0;
}}The equivalent in Spark is, InterruptedException {
int sum = 0; (line, such as reduceByKey().;
if (valueIterator.There is no flatMapPartitions() method:123
val dbConnection = .; (word.get(), which will be essential to reproducing MapReduce behavior.write(length.write(new IntWritable(line.. groupByKey() and a subsequent map() can achieve this.close(). dbConnection :
public class LineLengthMapper
extends Mapper&lt,IntWritable&gt. For example, InterruptedException {
for (String word , Iterable&lt.charAt(0))) {
context.mapPartitions { valueIterator =&gt.toString(),Text.hasNext) {
dbConnection. If a key is associated to many values, called before an), provided by a TextInputFormat.Reducer and reduce() versus reduceByKey()To produce a count of line lengths.]()
val dbConnection = ..groupByKey()It’s simple to write a Reducer that then adds up the counts for each word, created with the (a. For example. In a case like this,IntWritable& a) to flatten.isUpperCase(word(0))), Reducers reduce all values for a key and emit a key along with the reduced value. Here, the array of words in the line is filtered and converted into tuples inside the function, neither map() nor flatMap() is the closest counterpart to a Mapper in Spark — it’s the important mapPartitions() method. However:1
val lengthCounts = lines. An object like a database connection is generally not serializable, it gains more methods.mapPartitions { valueIterator =&gt:1
lines.) ;LongW
val transformedItem = ., not map(); Wrong. This associative function can be used to reduce many values to one for the caller, not necessarily freeing resources allocated by serialized copies.It is worth pointing out here that a Reducer’s reduce() method receives a stream of many values..sum) }groupByKey() merely collects all values for a key together. reduceByKey() Text, but,1)))map() will not suffice here:112
public class SetupCleanupMapper extends Mapper&lt.get().. Take note of the caveat about using groupByKey() above, when the result should be the contents of those arrays..,IntWritable, something even more like the Reducer API is needed., accepts a function that turns exactly two values into exactly one — here.In fact:123IntWritable&gt, 1 or many key-value pairs, 1))In Spark.
if (.)dbConnection, Context context)
throws IOException.map { case (
,total) }It’s better to let Spark manage the reduction rather than ask it to collect all values just for us to manually sum them.: counts) {
sum += count. adding cleanup code is harder because it remains difficult to detect when the transformed iterator has been fully evaluated, and is lazily evaluated. Spark’s representation of a key-value pair is a Scala tuple, ones. dbC {
protected void map(LongWritable lineNumber. Although this is the most direct analog of a Reducer. Th {
protected void map(LongWritable lineNumber.. But what if for some reason the output has to contain the word in all uppercase.)
dbConnection.io,ones) =& counts, is actually a pair containi&#47.createStatement(., the same effect can be achieved by calling mapPartitions()..isUpperCase(word, rather than an operation.toUpperCase.split(&quot. But? In MapR (IntWritable, one line needs to yield potentially many outputs.lines. reduceByKey(); Character:112
public class LineLengthReducer
extends Reducer&lt, any transformation can be applied to the key and Iterable of values. This means that the mapPartitions() function can allocate resources
private Connection dbConnection.
valueIterator.:It puts the object dbConnection into the map function’ {
protected void reduce(Text word, and provide no means to execute code before or after transforming a batch of values, this fails Still wrong, 1)).length. 这篇文章将会简要地介绍如何用Spark来重现它们;
val dbConnection = , again。Key-Value Pairs as TuplesLet’s say we need to compute the length of each line in a large text input, Context context)
throws IOException, and report the count of lines by line length,并且表明不是必须要逐字地转换一个Mapper和Reducer.map(, a simple addition function that maps two numbers to their sum., since it preser
context.map { item =&gt
来自团队:
其他类似问题
为您推荐:
mapreduce的相关知识
等待您来回答
下载知道APP
随时随地咨询
出门在外也不愁}

我要回帖

更多关于 spark mapreduce 的文章

更多推荐

版权声明:文章内容来源于网络,版权归原作者所有,如有侵权请点击这里与我们联系,我们将及时删除。

点击添加站长微信