tablereact reducerr在哪个包下

Hadoop学习三十一:Win7下HBase与MapReduce集成时XXX.jar is not a valid DFS filename - zy - ITeye技术网站
博客分类:
Hbase In Action(HBase实战)和Hbase:The Definitive Guide(HBase权威指南)两本书中,有很多入门级的代码,可以选择自己感兴趣的check out。地址分别为 。
在Win7下运行Hbase与MapReduce集成章节的代码时,出现了错误。比喻这个代码
Exception in thread "main" java.lang.IllegalArgumentException: Pathname /D:/GoogleCode/platform-components/trunk/SourceCode/study-hadoop/lib/hbase-client-0.96.1.1-hadoop2.jar from hdfs://192.168.1.200:9000/D:/GoogleCode/platform-components/trunk/SourceCode/study-hadoop/lib/hbase-client-0.96.1.1-hadoop2.jar is not a valid DFS filename.
at org.apache.hadoop.hdfs.DistributedFileSystem.getPathName(DistributedFileSystem.java:184)
at org.apache.hadoop.hdfs.DistributedFileSystem.access$000(DistributedFileSystem.java:92)
at org.apache.hadoop.hdfs.DistributedFileSystem$17.doCall(DistributedFileSystem.java:1106)
at org.apache.hadoop.hdfs.DistributedFileSystem$17.doCall(DistributedFileSystem.java:1102)
at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1102)
at org.apache.hadoop.mapreduce.filecache.ClientDistributedCacheManager.getFileStatus(ClientDistributedCacheManager.java:288)
at org.apache.hadoop.mapreduce.filecache.ClientDistributedCacheManager.getFileStatus(ClientDistributedCacheManager.java:224)
at org.apache.hadoop.mapreduce.filecache.ClientDistributedCacheManager.determineTimestamps(ClientDistributedCacheManager.java:93)
at org.apache.hadoop.mapreduce.filecache.ClientDistributedCacheManager.determineTimestampsAndCacheVisibilities(ClientDistributedCacheManager.java:57)
at org.apache.hadoop.mapreduce.JobSubmitter.copyAndConfigureFiles(JobSubmitter.java:264)
at org.apache.hadoop.mapreduce.JobSubmitter.copyAndConfigureFiles(JobSubmitter.java:300)
at org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:387)
at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1268)
at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1265)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:396)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)
at org.apache.hadoop.mapreduce.Job.submit(Job.java:1265)
at org.apache.hadoop.mapreduce.Job.waitForCompletion(Job.java:1286)
at com.jyz.study.hadoop.hbase.mapreduce.AnalyzeData.main(AnalyzeData.java:249)
三. 跟踪代码
org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil
public static void addHBaseDependencyJars(Configuration conf) throws IOException {
addDependencyJars(conf,
// explicitly pull a class from each module
org.apache.hadoop.hbase.HConstants.class,
// hbase-common
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.class, // hbase-protocol
org.apache.hadoop.hbase.client.Put.class,
// hbase-client
org.apache.patibilityFactory.class,
// hbase-hadoop-compat
org.apache.hadoop.hbase.mapreduce.TableMapper.class,
// hbase-server
// pull necessary dependencies
org.apache.zookeeper.ZooKeeper.class,
org.jboss.netty.channel.ChannelFactory.class,
com.google.protobuf.Message.class,
mon.collect.Lists.class,
org.cloudera.htrace.Trace.class);
public static void addDependencyJars(Configuration conf,
Class&?&... classes) throws IOException {
Path path = findOrCreateJar(clazz, localFs, packagedClasses);
conf.set("tmpjars", StringUtils.arrayToString(jars.toArray(new String[jars.size()])));
此时tmpjars例如
file:/D:/GoogleCode/platform-components/trunk/SourceCode/study-hadoop/lib/hbase-client-0.96.1.1-hadoop2.jar,file:/D:/GoogleCode/platform-components/trunk/SourceCode/study-hadoop/lib/hbase-server-0.96.1.1-hadoop2.jar,file:/D:/GoogleCode/platform-components/trunk/SourceCode/study-hadoop/lib/htrace-core-2.01.jar,file:/D:/GoogleCode/platform-components/trunk/SourceCode/study-hadoop/lib/hbase-common-0.96.1.1-hadoop2.jar,file:/D:/GoogleCode/platform-components/trunk/SourceCode/study-hadoop/lib/guava-12.0.1.jar,file:/D:/GoogleCode/platform-components/trunk/SourceCode/study-hadoop/lib/hadoop-common-2.2.0.jar,file:/D:/GoogleCode/platform-components/trunk/SourceCode/study-hadoop/lib/hbase-protocol-0.96.1.1-hadoop2.jar,file:/D:/GoogleCode/platform-components/trunk/SourceCode/study-hadoop/lib/hbase-hadoop-compat-0.96.1.1-hadoop2.jar,file:/D:/GoogleCode/platform-components/trunk/SourceCode/study-hadoop/lib/netty-3.6.6.Final.jar,file:/D:/GoogleCode/platform-components/trunk/SourceCode/study-hadoop/lib/protobuf-java-2.5.0.jar,file:/D:/GoogleCode/platform-components/trunk/SourceCode/study-hadoop/lib/hadoop-mapreduce-client-core-2.2.0.jar,file:/D:/GoogleCode/platform-components/trunk/SourceCode/study-hadoop/lib/zookeeper-3.4.5.jar
JobSubmitter的copyAndConfigureFiles方法
String libjars = conf.get("tmpjars");
if (libjars != null) {
FileSystem.mkdirs(jtFs, libjarsDir, mapredSysPerms);
String[] libjarsArr = libjars.split(",");
for (String tmpjars: libjarsArr) {
Path tmp = new Path(tmpjars);
Path newPath = copyRemoteFiles(libjarsDir, tmp, conf, replication);
DistributedCache.addFileToClassPath(
new Path(newPath.toUri().getPath()), conf);
copyRemoteFiles会copies 这些jar to the jobtracker filesystem and returns the path where itwas copied to。
当集群环境运行时,就会返回
[hdfs://192.168.1.200:9000/tmp/hadoop-yarn/staging/root/.staging/job_2_0035/libjars/hbase-client-0.96.1.1-hadoop2.jar, hdfs://192.168.1.200:9000/tmp/hadoop-yarn/staging/root/.staging/job_2_0035/libjars/hbase-server-0.96.1.1-hadoop2.jar, hdfs://192.168.1.200:9000/tmp/hadoop-yarn/staging/root/.staging/job_2_0035/libjars/htrace-core-2.01.jar, hdfs://192.168.1.200:9000/tmp/hadoop-yarn/staging/root/.staging/job_2_0035/libjars/hbase-common-0.96.1.1-hadoop2.jar, hdfs://192.168.1.200:9000/tmp/hadoop-yarn/staging/root/.staging/job_2_0035/libjars/guava-12.0.1.jar, hdfs://192.168.1.200:9000/tmp/hadoop-yarn/staging/root/.staging/job_2_0035/libjars/hadoop-common-2.2.0.jar, hdfs://192.168.1.200:9000/tmp/hadoop-yarn/staging/root/.staging/job_2_0035/libjars/hbase-protocol-0.96.1.1-hadoop2.jar, hdfs://192.168.1.200:9000/tmp/hadoop-yarn/staging/root/.staging/job_2_0035/libjars/hbase-hadoop-compat-0.96.1.1-hadoop2.jar, hdfs://192.168.1.200:9000/tmp/hadoop-yarn/staging/root/.staging/job_2_0035/libjars/netty-3.6.6.Final.jar, hdfs://192.168.1.200:9000/tmp/hadoop-yarn/staging/root/.staging/job_2_0035/libjars/protobuf-java-2.5.0.jar, hdfs://192.168.1.200:9000/tmp/hadoop-yarn/staging/root/.staging/job_2_0035/libjars/hadoop-mapreduce-client-core-2.2.0.jar, hdfs://192.168.1.200:9000/tmp/hadoop-yarn/staging/root/.staging/job_2_0035/libjars/zookeeper-3.4.5.jar]
如果是本地运行时,则返回
[hdfs://192.168.1.200:9000/D:/GoogleCode/platform-components/trunk/SourceCode/study-hadoop/lib/hbase-client-0.96.1.1-hadoop2.jar, hdfs://192.168.1.200:9000/D:/GoogleCode/platform-components/trunk/SourceCode/study-hadoop/lib/hbase-server-0.96.1.1-hadoop2.jar, hdfs://192.168.1.200:9000/D:/GoogleCode/platform-components/trunk/SourceCode/study-hadoop/lib/htrace-core-2.01.jar, hdfs://192.168.1.200:9000/D:/GoogleCode/platform-components/trunk/SourceCode/study-hadoop/lib/hbase-common-0.96.1.1-hadoop2.jar, hdfs://192.168.1.200:9000/D:/GoogleCode/platform-components/trunk/SourceCode/study-hadoop/lib/guava-12.0.1.jar, hdfs://192.168.1.200:9000/D:/GoogleCode/platform-components/trunk/SourceCode/study-hadoop/lib/hadoop-common-2.2.0.jar, hdfs://192.168.1.200:9000/D:/GoogleCode/platform-components/trunk/SourceCode/study-hadoop/lib/hbase-protocol-0.96.1.1-hadoop2.jar, hdfs://192.168.1.200:9000/D:/GoogleCode/platform-components/trunk/SourceCode/study-hadoop/lib/hbase-hadoop-compat-0.96.1.1-hadoop2.jar, hdfs://192.168.1.200:9000/D:/GoogleCode/platform-components/trunk/SourceCode/study-hadoop/lib/netty-3.6.6.Final.jar, hdfs://192.168.1.200:9000/D:/GoogleCode/platform-components/trunk/SourceCode/study-hadoop/lib/protobuf-java-2.5.0.jar, hdfs://192.168.1.200:9000/D:/GoogleCode/platform-components/trunk/SourceCode/study-hadoop/lib/hadoop-mapreduce-client-core-2.2.0.jar, hdfs://192.168.1.200:9000/D:/GoogleCode/platform-components/trunk/SourceCode/study-hadoop/lib/zookeeper-3.4.5.jar]
后面会使用Hadoop文件系统检查这两批URL。问题就在这里,它没有区分是本地Window文件系统还是集群Hadoop文件系统,应该区分检查。所以提交到集群运行没问题,本地运行出现上述问题。找个时间去Hadoop Jira上create a issue。
四. 代码能跑下去的解决方法
在TableMapReduceUtil里initTableMapperJob,initTableReducerJob都有大量的重构方法,其中可以指定参数
* @param addDependencyJars upload HBase jars and jars for any of the configured
job classes via the distributed cache (tmpjars).
也正是因为addDependencyJars默认为true,才触发了上面的错误
if (addDependencyJars) {
addDependencyJars(job);
所以我们可以将其设置为false。修改 代码为
TableMapReduceUtil.initTableMapperJob(input, scan, ParseMapper.class, // co ParseJson-3-SetMap Setup map phase details using the utility method.
ImmutableBytesWritable.class, Put.class, job, false);
TableMapReduceUtil.initTableReducerJob(output, // co ParseJson-4-SetReduce Configure an identity reducer to store the parsed data.
IdentityTableReducer.class, job, null, null, null, null, false);
运行正常,查看结果,testtable data:json的数据划分为 testtable data:column1 data:column2...符合期望。
浏览: 424942 次
来自: 深圳
浏览量:160226
七.等待队列(本是Object里的方法,但影响了线程)noti ...
虽然是三年前的帖子,但还是想回复博主,logger是继承了ca ...
好好学习,天天向上!
楼主你好!我运行报错SLF4J: Class path con ...
Hello你的World 写道..分享下,那个错误确实是个配置 ...Hadoop学习三十九:HBase与MapReduce集成 - zy - ITeye技术网站
博客分类:
HBase与MapReduce集成时,有以下三种情形
HBase作为数据流向。
HBase作为数据源。
HBase作为数据源和数据流向。
阅读本文前,最好先了解
二.HBase作为数据流向
HBase作为数据流向时,如从Hdfs里向HBase里导入数据,可以有下列方式
map里直接调用HBase Api,往HBase插入数据。此时job.setNumReduceTasks(0),并且job.setOutputFormatClass(NullOutputFormat.class)
使用TableOutputFormat,TableOutputFormat的RecordWriter会直接往HBase写数据。
可以在map阶段就写入。此时job.setNumReduceTasks(0)。
也可以有reduce来写入如IdentityTableReducer。
使用BulkLoad,HFileOutputFormat.configureIncrementalLoad(job, htable); 的背后job.setOutputFormatClass(HFileOutputFormat.class);
三.HBase作为数据源
HBase作为数据源,如分析HBase里的数据
自定义mapper继承TableMapper,实际以Result作为数据源,map和reduce阶段按业务逻辑来即可。
四.HBase作为数据源和数据流向
HBase作为数据源和数据流向,如将一个HBase表拆分为两个HBase表。mapper继承TableMapper,main方法里TableMapReduceUtil.initTableMapperJob后至少可以以下三种方式处理
map阶段处理好数据,调用HBase Api插入到新HBase表。此时job.setNumReduceTasks(0),并且job.setOutputFormatClass(NullOutputFormat.class)。
map阶段处理好数据,由TableOutputFormat写入到Hbase。
可以在map阶段就写入。此时job.setNumReduceTasks(0)。
也可以有reduce来写入如IdentityTableReducer。此时需要TableMapReduceUtil.initTableReducerJob。
目录下的几个java代码可以很好的说明上述任一情况。
浏览: 424944 次
来自: 深圳
浏览量:160226
七.等待队列(本是Object里的方法,但影响了线程)noti ...
虽然是三年前的帖子,但还是想回复博主,logger是继承了ca ...
好好学习,天天向上!
楼主你好!我运行报错SLF4J: Class path con ...
Hello你的World 写道..分享下,那个错误确实是个配置 ...2218人阅读
Tips:如果用Eclipse开发,需要加入hadoop所有的jar包以及HBase三个jar包(hbase,zooKooper,protobuf-java)。
下面介绍一下,用mapreduce怎样操作HBase,主要对HBase中的数据进行读取。
首先先介绍下如何上传数据,还是以最熟悉到wordcount案例开始,我们的目的是将wordcount的结果存储到Hbase而不是HDFS下。
给出代码:
package test1;
import java.io.IOE
import java.util.StringT
import org.apache.hadoop.conf.C
import org.apache.hadoop.fs.P
import org.apache.hadoop.io.IntW
import org.apache.hadoop.io.T
import org.apache.hadoop.mapreduce.J
import org.apache.hadoop.mapreduce.M
import org.apache.hadoop.mapreduce.R
import org.apache.hadoop.mapreduce.lib.input.FileInputF
import org.apache.hadoop.mapreduce.lib.output.FileOutputF
import org.apache.hadoop.util.GenericOptionsP
import org.apache.hadoop.hbase.HBaseC
import org.apache.hadoop.hbase.HColumnD
import org.apache.hadoop.hbase.HTableD
import org.apache.hadoop.hbase.client.HBaseA
import org.apache.hadoop.hbase.client.P
import org.apache.hadoop.hbase.io.ImmutableBytesW
import org.apache.hadoop.hbase.mapreduce.TableMapReduceU
import org.apache.hadoop.hbase.mapreduce.TableOutputF
import org.apache.hadoop.hbase.mapreduce.TableR
import org.apache.hadoop.hbase.util.B
public class WordCountHBase {
public static class TokenizerMapper
extends Mapper&Object, Text, Text, IntWritable&{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
//map函数没有改变
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
map函数没有改变
//Reduce类,主要是将键值传到HBase表中
public static class IntSumReducer
extends TableReducer &Text,IntWritable,ImmutableBytesWritable& {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable&IntWritable& values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
result.set(sum);
Put put = new Put(key.getBytes()); //put实例化,每一个词存一行
//列族为content,列修饰符为count,列值为数目
put.add(Bytes.toBytes(&content&), Bytes.toBytes(&count&), Bytes.toBytes(String.valueOf(sum)));
context.write(new ImmutableBytesWritable(key.getBytes()), put);
由上面可知IntSumReducer继承自TableReduce,在hadoop里面TableReducer继承Reducer类。它的原型为:TableReducer&KeyIn,Values,KeyOut&可以看出,HBase里面是读出的Key类型是ImmutableBytesWritable,意为不可变类型,因为HBase里所有数据都是用字符串存储的。
@SuppressWarnings(&deprecation&)
public static void main(String[] args) throws Exception {
String tablename
= &wordcount&;
//实例化Configuration,注意不能用 new HBaseConfiguration()了。
Configuration conf = HBaseConfiguration.create();
HBaseAdmin admin = new HBaseAdmin(conf);
if(admin.tableExists(tablename)){
System.out.println(&table exists! recreating ...&);
admin.disableTable(tablename);
admin.deleteTable(tablename);
HTableDescriptor htd = new HTableDescriptor(tablename);
HColumnDescriptor hcd = new HColumnDescriptor(&content&);
htd.addFamily(hcd);
//创建列族
admin.createTable(htd); //创建表
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 1) {
System.err.println(&Usage: wordcount &in& &out&&+otherArgs.length);
System.exit(2);
Job job = new Job(conf, &word count&);
job.setJarByClass(WordCountHBase.class);
job.setMapperClass(TokenizerMapper.class);
//job.setCombinerClass(IntSumReducer.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
//此处的TableMapReduceUtil注意要用hadoop.hbase.mapreduce包中的,而不是hadoop.hbase.mapred包中的
TableMapReduceUtil.initTableReducerJob(tablename, IntSumReducer.class, job);
//key和value到类型设定最好放在initTableReducerJob函数后面,否则会报错
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}在job配置的时候没有设置 job.setReduceClass(); 而是用&TableMapReduceUtil.initTableReducerJob(tablename, IntSumReducer.class, job);&来执行reduce类。
需要注意的是此处的TableMapReduceUtil是hadoop.hbase.mapreduce包中的,而不是hadoop.hbase.mapred包中的,否则会报错。
下面再介绍下如何进行读取,读取数据时比较简单,编写Mapper函数,读取&key,value&值就行了,Reducer函数直接输出得到的结果就行了。
package test1;
import java.io.IOE
import java.util.StringT
import org.apache.hadoop.conf.C
import org.apache.hadoop.fs.P
import org.apache.hadoop.io.IntW
import org.apache.hadoop.io.T
import org.apache.hadoop.mapreduce.J
import org.apache.hadoop.mapreduce.M
import org.apache.hadoop.mapreduce.R
import org.apache.hadoop.mapreduce.lib.input.FileInputF
import org.apache.hadoop.mapreduce.lib.output.FileOutputF
import org.apache.hadoop.util.GenericOptionsP
import org.apache.hadoop.hbase.HBaseC
import org.apache.hadoop.hbase.HColumnD
import org.apache.hadoop.hbase.HTableD
import org.apache.hadoop.hbase.client.HBaseA
import org.apache.hadoop.hbase.client.P
import org.apache.hadoop.hbase.client.R
import org.apache.hadoop.hbase.client.S
import org.apache.hadoop.hbase.io.ImmutableBytesW
import org.apache.hadoop.hbase.mapreduce.TableMapReduceU
import org.apache.hadoop.hbase.mapreduce.TableM
import org.apache.hadoop.hbase.mapreduce.TableOutputF
import org.apache.hadoop.hbase.mapreduce.TableR
import org.apache.hadoop.hbase.util.B
import test1.WordCount.IntSumR
import com.sun.corba.se.impl.encoding.OSFCodeSetRegistry.E
public class ReadHBase {
public static class TokenizerMapper
extends TableMapper&Text, Text&{
public void map(ImmutableBytesWritable row, Result values, Context context
) throws IOException, InterruptedException {
StringBuffer sb = new StringBuffer(&&);
for(java.util.Map.Entry&byte[],byte[]& value : values.getFamilyMap(
&content&.getBytes()).entrySet()){
String str = new String(value.getValue()); //将字节数组转换成String类型,需要new String();
if(str != null){
sb.append(new String(value.getKey()));
sb.append(&:&);
sb.append(str);
context.write(new Text(row.get()), new Text(new String(sb)));
map函数继承到TableMapper接口,从result中读取查询结果。
public static class IntSumReducer
extends Reducer &Text,Text,Text,Text& {
private Text result = new Text();
public void reduce(Text key, Iterable&Text& values,
Context context
) throws IOException, InterruptedException {
for (Text val : values) {
result.set(val);
context.write(key,result);
reduce函数没有改变,直接输出到文件中即可
@SuppressWarnings(&deprecation&)
public static void main(String[] args) throws Exception {
String tablename
= &wordcount&;
//实例化Configuration,注意不能用 new HBaseConfiguration()了。
Configuration conf = HBaseConfiguration.create();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println(&Usage: wordcount &in& &out&&+otherArgs.length);
System.exit(2);
Job job = new Job(conf, &word count&);
job.setJarByClass(ReadHBase.class);
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
job.setReducerClass(IntSumReducer.class);
//此处的TableMapReduceUtil注意要用hadoop.hbase.mapreduce包中的,而不是hadoop.hbase.mapred包中的
Scan scan = new Scan(args[0].getBytes());
TableMapReduceUtil.initTableMapperJob(tablename, scan, TokenizerMapper.class, Text.class, Text.class, job);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}其中我输入的两个参数分别是“aa ouput”&& 分别是开始查找的行(这里为从“aa”行开始找),和输出文件到存储路径(这里为存到HDFS目录到output文件夹下)
要注意的是,在JOB的配置中需要实现initTableMapperJob方法。与第一个例子类似,&&&
在job配置的时候不用设置 job.setMapperClass(); 而是用&TableMapReduceUtil.initTableMapperJob(tablename, scan, TokenizerMapper.class, Text.class, Text.class, job);来执行mapper类。Scan实例是查找的起始行。
参考知识库
* 以上用户言论只代表其个人观点,不代表CSDN网站的观点或立场
访问:8610次
排名:千里之外
(1)(2)(3)(1)(1)(1)(1)(2)Writing An Hadoop MapReduce Program In Python - Michael G. Noll
Applied Research. Big Data. Distributed Systems.
Open Source.
Writing an Hadoop MapReduce Program in Python
In this tutorial I will describe how to write a simple
program for
programming language.
Motivation
Even though the Hadoop framework is written in Java, programs for Hadoop need not to be coded in Java but can also be
developed in other languages like Python or C++ (the latter since version 0.14.1).
and the most prominent
on the Hadoop website could make you think that you
must translate your Python code using
into a Java jar file.
Obviously, this is not
very convenient and can even be problematic if you depend on Python features not provided by Jython.
Another issue of
the Jython approach is the overhead of writing your Python program in such a way that it can interact with Hadoop –
just have a look at the example in $HADOOP_HOME/src/examples/python/WordCount.py and you see what I mean.
That said, the ground is now prepared for the purpose of this tutorial: writing a Hadoop MapReduce program in a more
Pythonic way, i.e. in a way you should be familiar with.
What we want to do
We will write a simple
program (see also the
) for Hadoop in Python but without using
Jython to translate our code to Java jar files.
Our program will mimick the , i.e. it reads text files and
counts how often words occur.
The input is text files and the output is text files, each line of which contains a
word and the count of how often it occured, separated by a tab.
Note: You can also use programming languages other than Python such as Perl or Ruby with the “technique” described in this tutorial.
Prerequisites
You should have an Hadoop cluster up and running because we will get our hands dirty.
If you don’t have a cluster
yet, my following tutorials might help you to build one.
The tutorials are tailored to Ubuntu Linux but the information
does also apply to other Linux/Unix variants.
– How to set up a pseudo-distributed, single-node Hadoop cluster backed by the Hadoop Distributed File System
– How to set up a distributed, multi-node Hadoop cluster backed by the Hadoop Distributed File System
Python MapReduce Code
The “trick” behind the following Python code is that we will use the
(see also the corresponding
) for helping us passing data between our Map and Reduce
code via STDIN (standard input) and STDOUT (standard output).
We will simply use Python’s sys.stdin to
read input data and print our own output to sys.stdout.
That’s all we need to do because Hadoop Streaming will
take care of everything else!
Map step: mapper.py
Save the following code in the file /home/hduser/mapper.py. It will read data from STDIN, split it into words
and output a list of lines mapping words to their (intermediate) counts to STDOUT.
The Map script will not
compute an (intermediate) sum of a word’s occurrences though. Instead, it will output &word& 1 tuples immediately
– even though a specific word might occur multiple times in the input.
In our case we let the subsequent Reduce
step do the final sum count.
Of course, you can change this behavior in your own scripts as you please, but we will
keep it like that in this tutorial because of didactic reasons. :-)
Make sure the file has execution permission (chmod +x /home/hduser/mapper.py should do the trick) or you will run
into problems.
#!/usr/bin/env python
import sys
# input comes from STDIN (standard input)
for line in sys.stdin:
# remove leading and trailing whitespace
line = line.strip()
# split the line into words
words = line.split()
# increase counters
for word in words:
# write the results to STDOUT (standard output);
# what we output here will be the input for the
# Reduce step, i.e. the input for reducer.py
# tab- the trivial word count is 1
print '%s\t%s' % (word, 1)
Reduce step: reducer.py
Save the following code in the file /home/hduser/reducer.py.
It will read the results of mapper.py from
STDIN (so the output format of mapper.py and the expected input format of reducer.py must match) and sum the
occurrences of each word to a final count, and then output its results to STDOUT.
Make sure the file has execution permission (chmod +x /home/hduser/reducer.py should do the trick) or you will run
into problems.
reducer.py
#!/usr/bin/env python
from operator import itemgetter
import sys
current_word = None
current_count = 0
word = None
# input comes from STDIN
for line in sys.stdin:
# remove leading and trailing whitespace
line = line.strip()
# parse the input we got from mapper.py
word, count = line.split('\t', 1)
# convert count (currently a string) to int
count = int(count)
except ValueError:
# count was not a number, so silently
# ignore/discard this line
# this IF-switch only works because Hadoop sorts map output
# by key (here: word) before it is passed to the reducer
if current_word == word:
current_count += count
if current_word:
# write result to STDOUT
print '%s\t%s' % (current_word, current_count)
current_count = count
current_word = word
# do not forget to output the last word if needed!
if current_word == word:
print '%s\t%s' % (current_word, current_count)
Test your code (cat data | map | sort | reduce)
I recommend to test your mapper.py and reducer.py scripts locally before using them in a MapReduce job.
Otherwise your jobs might successfully complete but there will be no job result data at all or not the results
you would have expected. If that happens, most likely it was you (or me) who screwed up.
Here are some ideas on how to test the functionality of the Map and Reduce scripts.
Test mapper.py and reducer.py locally first
# very basic test
hduser@ubuntu:~$ echo &foo foo quux labs foo bar quux& | /home/hduser/mapper.py
hduser@ubuntu:~$ echo &foo foo quux labs foo bar quux& | /home/hduser/mapper.py | sort -k1,1 | /home/hduser/reducer.py
# using one of the ebooks as example input
# (see below on where to get the ebooks)
hduser@ubuntu:~$ cat /tmp/gutenberg/20417-8.txt | /home/hduser/mapper.py
(you get the idea)
Running the Python Code on Hadoop
Download example input data
We will use three ebooks from Project Gutenberg for this example:
Download each ebook as text files in Plain Text UTF-8 encoding and store the files in a local temporary directory of
choice, for example /tmp/gutenberg.
hduser@ubuntu:~$ ls -l /tmp/gutenberg/
total 3604
-rw-r--r-- 1 hduser hadoop
674566 Feb
3 10:17 pg20417.txt
-rw-r--r-- 1 hduser hadoop 1573112 Feb
3 10:18 pg4300.txt
-rw-r--r-- 1 hduser hadoop 1423801 Feb
3 10:18 pg5000.txt
hduser@ubuntu:~$
Copy local example data to HDFS
Before we run the actual MapReduce job, we
from our local file system to Hadoop’s .
hduser@ubuntu:/usr/local/hadoop$ bin/hadoop dfs -copyFromLocal /tmp/gutenberg /user/hduser/gutenberg
hduser@ubuntu:/usr/local/hadoop$ bin/hadoop dfs -ls
Found 1 items
drwxr-xr-x
- hduser supergroup
17:40 /user/hduser/gutenberg
hduser@ubuntu:/usr/local/hadoop$ bin/hadoop dfs -ls /user/hduser/gutenberg
Found 3 items
-rw-r--r--
3 hduser supergroup
1-03-10 11:38 /user/hduser/gutenberg/pg20417.txt
-rw-r--r--
3 hduser supergroup
1-03-10 11:38 /user/hduser/gutenberg/pg4300.txt
-rw-r--r--
3 hduser supergroup
1-03-10 11:38 /user/hduser/gutenberg/pg5000.txt
hduser@ubuntu:/usr/local/hadoop$
Run the MapReduce job
Now that everything is prepared, we can finally run our Python MapReduce job on the Hadoop cluster.
As I said above,
we leverage the Hadoop Streaming API for helping us passing data between our Map and Reduce code via STDIN and
hduser@ubuntu:/usr/local/hadoop$ bin/hadoop jar contrib/streaming/hadoop-*streaming*.jar \
-file /home/hduser/mapper.py
-mapper /home/hduser/mapper.py \
-file /home/hduser/reducer.py
-reducer /home/hduser/reducer.py \
-input /user/hduser/gutenberg/* -output /user/hduser/gutenberg-output
If you want to modify some Hadoop settings on the fly like increasing the number of Reduce tasks, you can use the
-D option:
hduser@ubuntu:/usr/local/hadoop$ bin/hadoop jar contrib/streaming/hadoop-*streaming*.jar -D mapred.reduce.tasks=16 ...
Note about mapred.map.tasks:
beyond considering it a hint. But it accepts the user specified mapred.reduce.tasks and doesn’t manipulate that. You cannot force mapred.map.tasks but can specify mapred.reduce.tasks.
The job will read all the files in the HDFS directory /user/hduser/gutenberg, process it, and store the results in
the HDFS directory /user/hduser/gutenberg-output.
In general Hadoop will create one out in
our case however it will only create a single file because the input files are very small.
Example output of the previous command in the console:
hduser@ubuntu:/usr/local/hadoop$ bin/hadoop jar contrib/streaming/hadoop-*streaming*.jar -mapper /home/hduser/mapper.py -reducer /home/hduser/reducer.py -input /user/hduser/gutenberg/* -output /user/hduser/gutenberg-output
additionalConfSpec_:null
null=@@@userJobConfProps_.get(stream.shipped.hadoopstreaming
packageJobJar: [/app/hadoop/tmp/hadoop-unjar54543/]
[] /tmp/streamjob54544.jar tmpDir=null
[...] INFO mapred.FileInputFormat: Total input paths to process : 7
[...] INFO streaming.StreamJob: getLocalDirs(): [/app/hadoop/tmp/mapred/local]
[...] INFO streaming.StreamJob: Running job: job__0021
[...] INFO streaming.StreamJob:
[...] INFO streaming.StreamJob:
[...] INFO streaming.StreamJob:
[...] INFO streaming.StreamJob:
[...] INFO streaming.StreamJob:
reduce 33%
[...] INFO streaming.StreamJob:
reduce 70%
[...] INFO streaming.StreamJob:
reduce 77%
[...] INFO streaming.StreamJob:
reduce 100%
[...] INFO streaming.StreamJob: Job complete: job__0021
[...] INFO streaming.StreamJob: Output: /user/hduser/gutenberg-output
hduser@ubuntu:/usr/local/hadoop$
As you can see in the output above, Hadoop also provides a basic web interface for statistics and information.
the Hadoop cluster is running, open
in a browser and have a look
Here’s a screenshot of the Hadoop web interface for the job we just ran.
Figure 1: A screenshot of Hadoop’s JobTracker web interface, showing the details of the MapReduce job we just ran
Check if the result is successfully stored in HDFS directory /user/hduser/gutenberg-output:
hduser@ubuntu:/usr/local/hadoop$ bin/hadoop dfs -ls /user/hduser/gutenberg-output
Found 1 items
/user/hduser/gutenberg-output/part-00000
07-09-21 13:00
hduser@ubuntu:/usr/local/hadoop$
You can then inspect the contents of the file with the dfs -cat command:
hduser@ubuntu:/usr/local/hadoop$ bin/hadoop dfs -cat /user/hduser/gutenberg-output/part-00000
hduser@ubuntu:/usr/local/hadoop$
Note that in this specific output above the quote signs (") enclosing the words have not been inserted by Hadoop.
They are the result of how our Python code splits words, and in this case it matched the beginning of a quote in the
ebook texts.
Just inspect the part-00000 file further to see it for yourself.
Improved Mapper and Reducer code: using Python iterators and generators
The Mapper and Reducer examples above should have given you an idea of how to create your first MapReduce application.
The focus was code simplicity and ease of understanding, particularly for beginners of the Python programming language.
In a real-world application however, you might want to optimize your code by using
Generally speaking, iterators and generators (functions that create iterators, for example with Python’s yield
statement) have the advantage that an element of a sequence is not produced until you actually need it.
This can help
a lot in terms of computational expensiveness or memory consumption depending on the task at hand.
Note: The following Map and Reduce scripts will only work “correctly” when being run in the Hadoop context, i.e. as Mapper and Reducer in a MapReduce job. This means that running the naive test command “cat DATA | ./mapper.py | sort -k1,1 | ./reducer.py” will not work correctly anymore because some functionality is intentionally outsourced to Hadoop.
Precisely, we compute the sum of a word’s occurrences, e.g. ("foo", 4), only if by chance the same word (foo)
appears multiple times in succession.
In the majority of cases, however, we let the Hadoop group the (key, value) pairs
between the Map and the Reduce step because Hadoop is more efficient in this regard than our simple Python scripts.
mapper.py (improved)
#!/usr/bin/env python
&&&A more advanced Mapper, using Python iterators and generators.&&&
import sys
def read_input(file):
for line in file:
# split the line into words
yield line.split()
def main(separator='\t'):
# input comes from STDIN (standard input)
data = read_input(sys.stdin)
for words in data:
# write the results to STDOUT (standard output);
# what we output here will be the input for the
# Reduce step, i.e. the input for reducer.py
# tab- the trivial word count is 1
for word in words:
print '%s%s%d' % (word, separator, 1)
if __name__ == &__main__&:
reducer.py
reducer.py (improved)
#!/usr/bin/env python
&&&A more advanced Reducer, using Python iterators and generators.&&&
from itertools import groupby
from operator import itemgetter
import sys
def read_mapper_output(file, separator='\t'):
for line in file:
yield line.rstrip().split(separator, 1)
def main(separator='\t'):
# input comes from STDIN (standard input)
data = read_mapper_output(sys.stdin, separator=separator)
# groupby groups multiple word-count pairs by word,
# and creates an iterator that returns consecutive keys and their group:
current_word - string containing a word (the key)
group - iterator yielding all [&&current_word&&, &&count&&] items
for current_word, group in groupby(data, itemgetter(0)):
total_count = sum(int(count) for current_word, count in group)
print &%s%s%d& % (current_word, separator, total_count)
except ValueError:
# count was not a number, so silently discard this item
if __name__ == &__main__&:
Related Links
From yours truly:
From others:
, by Jun Tian
Please enable JavaScript to view the
Copyright &
All rights reserved.
Views expressed here are my own.
Powered by .}

我要回帖

更多关于 winreducer100 的文章

更多推荐

版权声明:文章内容来源于网络,版权归原作者所有,如有侵权请点击这里与我们联系,我们将及时删除。

点击添加站长微信