mapreduce 键值对怎么定义的

Python027

mapreduce 键值对怎么定义的,第1张

一般情况下Mapreduce输出的键值对是以制表符\t为分隔符的,如下图所示:

但有时候我们像将其设置为其它的分隔符输出,比如",",如下图所示:

此时可以在Mapreduce的主函数中添加如下的两行代码:

[java] view plain copy print?

conf.set("mapred.textoutputformat.ignoreseparator","true")

conf.set("mapred.textoutputformat.separator",",")

具体如下的WordCount程序

[java] view plain copy print?

import java.io.IOException

import java.util.StringTokenizer

import org.apache.hadoop.conf.Configuration

import org.apache.hadoop.fs.Path

import org.apache.hadoop.io.IntWritable

import org.apache.hadoop.io.Text

import org.apache.hadoop.mapreduce.Job

import org.apache.hadoop.mapreduce.Mapper

import org.apache.hadoop.mapreduce.Reducer

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat

import org.apache.hadoop.util.GenericOptionsParser

public class WordCount {

public static class TokenizerMapper extends

Mapper<Object, Text, Text, IntWritable>{

private final static IntWritable one = new IntWritable(1)

private Text word = new Text()

public void map(Object key, Text value, Context context)

throws IOException, InterruptedException {

StringTokenizer itr = new StringTokenizer(value.toString())

while (itr.hasMoreTokens()) {

word.set(itr.nextToken())

context.write(word, one)

}

}

}

public static class IntSumReducer extends

Reducer<Text, IntWritable, Text, IntWritable>{

private IntWritable result = new IntWritable()

public void reduce(Text key, Iterable<IntWritable>values,

Context context) throws IOException, InterruptedException {

int sum = 0

for (IntWritable val : values) {

sum += val.get()

}

result.set(sum)

context.write(key, result)

}

}

public static void main(String[] args) throws Exception {

Configuration conf = new Configuration()

//设置MapReduce的输出的分隔符为逗号

conf.set("mapred.textoutputformat.ignoreseparator", "true")

conf.set("mapred.textoutputformat.separator", ",")

String[] otherArgs = new GenericOptionsParser(conf, args)

.getRemainingArgs()

if (otherArgs.length != 2) {

System.err.println("Usage: wordcount <in><out>")

System.exit(2)

}

Job job = new Job(conf, "word count")

job.setJarByClass(WordCount.class)

job.setMapperClass(TokenizerMapper.class)

job.setCombinerClass(IntSumReducer.class)

job.setReducerClass(IntSumReducer.class)

job.setOutputKeyClass(Text.class)

job.setOutputValueClass(IntWritable.class)

FileInputFormat.addInputPath(job, new Path(otherArgs[0]))

FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]))

System.exit(job.waitForCompletion(true) ? 0 : 1)

}

}

今日我在eclipse上运行Map/Reduce框架进行数据分析的时候遇到了一个很奇怪的错误:

一开始我发现运行程序之后也没在控制台报错,也生成了目标目录,但一直出不来统计好的数据文件。

我先通过位置标记输出来判断各个类的加载是否正常,

发现Map是可以正常加载执行的,但是Reduce一直无法加载执行。

然后我通过设置,让其在运行时显示日志信息,再运行。

文末会分享设置显示日志信息的方法。

这一次便发现了如下的报错:

日志中的报错信息:

java.lang.Exception: org.apache.hadoop.mapreduce.task.reduce.Shuffle$ShuffleError: error in shuffle in localfetcher#1at org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:462)at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:529)Caused by: org.apache.hadoop.mapreduce.task.reduce.Shuffle$ShuffleError: error in shuffle in localfetcher#1at org.apache.hadoop.mapreduce.task.reduce.Shuffle.run(Shuffle.java:134)at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:376)at org.apache.hadoop.mapred.LocalJobRunner$Job$ReduceTaskRunnable.run(LocalJobRunner.java:319)at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)at java.util.concurrent.FutureTask.run(Unknown Source)at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)at java.lang.Thread.run(Unknown Source)Caused by:(1)]java.io.FileNotFoundException: E:/tmp/hadoop-Alan%20Yang/mapred/local/localRunner/Alan%20Yang/jobcache/job_local214639494_0001/attempt_local214639494_0001_m_000003_0/output/file.out.indexat org.apache.hadoop.fs.RawLocalFileSystem.open(RawLocalFileSystem.java:200)at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:767)at (2)]org.apache.hadoop.io.SecureIOUtils.openFSDataInputStream(SecureIOUtils.java:156)at org.apache.hadoop.mapred.SpillRecord.<init>(SpillRecord.java:70)at org.apache.hadoop.mapred.SpillRecord.<init>(SpillRecord.java:62)at org.apache.hadoop.mapred.SpillRecord.<init>(SpillRecord.java:57)at org.apache.hadoop.mapreduce.task.reduce.LocalFetcher.copyMapOutput(LocalFetcher.java:124)at org.apache.hadoop.mapreduce.task.reduce.LocalFetcher.doCopy(LocalFetcher.java:102)at org.apache.hadoop.mapreduce.task.reduce.LocalFetcher.run(LocalFetcher.java:85)2017-07-12 16:19:02,616 INFO [org.apache.hadoop.mapreduce.Job] - Job job_local214639494_0001 failed with state FAILED due to: NA2017-07-12 16:19:02,663 INFO [org.apache.hadoop.mapreduce.Job] - Counters: 22File System CountersFILE: Number of bytes read=62223FILE: Number of bytes written=176635984FILE: Number of read operations=0FILE: Number of large read operations=0FILE: Number of write operations=0HDFS: Number of bytes read=898750946HDFS: Number of bytes written=0HDFS: Number of read operations=140HDFS: Number of large read operations=0HDFS: Number of write operations=10Map-Reduce FrameworkMap input records=2629660Map output records=2626091Map output bytes=26260910Map output materialized bytes=31513152Input split bytes=1210Combine input records=0Spilled Records=2626091Failed Shuffles=0Merged Map outputs=0GC time elapsed (ms)=496Total committed heap usage (bytes)=7754743808File Input Format Counters Bytes Read=163038920

按照通常的排错思路,后面的错误往往是前面的错误导致产生的,所以我先入为主的直接去搜索 error in shuffle in localfetcher的解决办法,发现几乎都在说是内存的问题。百般尝试始终是毫无成效。

后来当我往下看的时候发现其实日志给出了大概的错误原因,是路径的问题。这一下子就找到了正确的方向。通过查找资料,发现有的朋友通过修改默认的路径即可解决,我尝试了一下,结果只是换个路径报错而已。这就耐人寻味了,我再自己看了看这条报错信息,里面用到了我的Windows的用户名,但是中间的空格变成了%20,在路径中出现%这意味着什么?配过环境变量的人都知道。我一下子恍然大悟。很有可能是Windows用户名的问题。所以我去修改了Windows本机的用户名,把空格去掉。一下子就解决了!!!

eclipse控制台不显示MapReduce程序日志的解决方法:

使用Hadoop2.6.0,在eclipse下面调试mapreduce程序的时候,控制台不打印程序运行时的日志,而是显示如下信息:

log4j:WARN No appenders could be found for logger (org.apache.[Hadoop]

log4j:WARN Please initialize the log4j system properly.

log4j:WARN [hadoop] See noconfig for more info.

说明没有配置log4j.properties文件。这虽然不影响程序的正常运行,但是看不到日志难免不爽。解决方法:把Hadoop2.6.0的安装目录下面的/etc/hadoop/目录下面的log4j.properties文件拷贝放到MapReduce工程的src目录下面。

如果你自己用“调”api,来读写hbase的话,我觉得具体考虑的话是任务能否最终实现的问题了,毕竟mapreduce所做的工作很多,它自己的master,zookeeper,hbase的master之间的通信,计算任务的reduce和mapping,细节太多,考虑到mapreduce通常处理的数据量,即便不考虑fault tolerant 都不一定能有效协调各个任务,更何况怎么可能不考虑?...所以,自己用java来实现的话,也许是个不错的学习过程,但是基本出不了东西,也就没有实用的可能...