本文主要介紹在Hadoop 0.19.0就開始提供的Chaining Maps(Hadoop-3702)功能,在開始介紹之前先假想下述情況~ 是否曾在一個Mapper中處理許多步驟的事項?或者是否曾在Reducer結束之後還要針對每筆資料個別處理的情況?簡單來說~ 如果在你的MapReduce程式中有需要preprocessing或postprocessing的情況,那就適合用Chaining Maps來完成。
這裡我們假設將MapReduce的執行順序用「[MR]+」符號來表示,那麼透過Chaining Maps所執行的工作就可以用「M+RM*」來表示,代表可以執行一個以上的Mapper(preprocessing),接著在Reducer之後再由零或多個Mapper做後續的處理(postprocessing),這樣的作法可以帶來一些額外的好處,譬如:方便除錯、測試、易維護以及MapReduce程式的可重用性,下述是一個簡單的展示範例,主要藉由「ChainMapper」和「ChainReducer」兩個類別來完成,此範例執行的順序為:「WordSegmentationMap | StopWordMap | Reduce | FilterMap」。
Chaining Maps for Hadoop 0.21.x
import java.io.IOException; import java.util.HashSet; import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Cluster; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.chain.ChainMapper; import org.apache.hadoop.mapreduce.lib.chain.ChainReducer; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; public class ChainExample { public static class WordSegmentationMap extends Mapper<LongWritable, Text, Text, IntWritable> { @Override public void map(LongWritable key, Text value , Context context) throws IOException, InterruptedException { String s[] = value.toString().split(" "); for(String w : s) context.write(new Text(w), new IntWritable(1)); } } public static class StopWordMap extends Mapper<Text, IntWritable, Text, IntWritable> { private Set<String> stopwords = new HashSet<String>(); public static final String[] ENGLISH_STOP_WORDS = { "a", "and", "are", "as", "at", "be", "but", "by", "for", "if", "in", "into", "is", "it", "no", "not", "of", "on", "or", "s", "such", "t", "that", "the", "their", "then", "there", "these", "they", "this", "to", "was", "will", "with" }; protected void setup(Context context) throws IOException, InterruptedException { for(int i = 0 ; i < ENGLISH_STOP_WORDS.length ; i++) stopwords.add(ENGLISH_STOP_WORDS[i]); } @Override public void map(Text key, IntWritable value , Context context) throws IOException, InterruptedException { if(!stopwords.contains(key.toString())) context.write(key, value); } } public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> { @Override public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for(IntWritable i : values) sum++; context.write(key, new IntWritable(sum)); } } public static class FilterMap extends Mapper<Text, IntWritable, IntWritable,Text> { @Override public void map(Text key, IntWritable value, Context context) throws IOException, InterruptedException { if(value.get()>100) context.write(value, key); } } public static void main(String[] args)throws Exception { String output = "/chainExample_out/"; Configuration conf = new Configuration(); Cluster cluster = new Cluster(conf); FileSystem fs = FileSystem.get(conf); fs.delete(new Path(output), true); Job job = Job.getInstance(cluster); job.setJarByClass(ChainExample.class); job.setJobName("ChainExample"); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); TextInputFormat.setInputPaths(job, new Path("/chainExample/*")); TextOutputFormat.setOutputPath(job, new Path(output)); ChainMapper.addMapper(job, WordSegmentationMap.class, LongWritable.class, Text.class, Text.class, IntWritable.class, new Configuration(false)); ChainMapper.addMapper(job, StopWordMap.class, Text.class, IntWritable.class, Text.class, IntWritable.class, new Configuration(false)); ChainReducer.setReducer(job, Reduce.class, Text.class, IntWritable.class, Text.class, IntWritable.class, new Configuration(false)); ChainReducer.addMapper(job, FilterMap.class, Text.class, IntWritable.class, IntWritable.class, Text.class, new Configuration(false)); job.waitForCompletion(true); } }
Hello
我想請問, 這個中間的中間值, 要怎麼去取得呢?
看起來, FilterMap 才是最後輸出context的地方.
2011-08-28 17:59:49
何謂中間值?舉個例子說明
2011-08-29 10:01:18
Q1: 例如:
WordSegmentationMap -> StopWordMap 過程中, 如果想知道StopWordMap 輸入的內容是否正確, 那什麼方法比較好呢?
Q2: 另外, 我用另一種寫法要做ChainMap 一直失敗, 我查了StackOverflow 問題網站,
有一個相似問題, 但一樣在 Hadoop r.0.20.2 上沒有結果
Configuration conf = new Configuration();
.
.
.
JobConf mapAConf = new JobConf(false); // <- deprecated WHY!
ChainMapper.addMapper( conf, HadoopMapper.class, LongWritable.class, Text.class, PairOfStrings.class, String2IntOpenHashMapWritable.class, true, mapAConf );
addMapper會錯, 這一行有問題. (有點長, 不知是我第1個conf有錯還是最後1個conf有錯)
<K1,V1,K2,V2>addMapper(org.apache.hadoop.mapred.JobConf,java.lang.Class<? extends org.apache.hadoop.mapred.Mapper<K1,V1,K2,V2>>,java.lang.Class<? extends K1>
,java.lang.Class<? extends V1>,java.lang.Class<? extends K2>,java.lang.Class<? extends V2>,boolean,org.apache.hadoop.mapred.JobConf)
in org.apache.hadoop.mapred.lib.ChainMapper cannot be applied to (org.apache.hadoop.conf.Configuration,java.lang.Class<dfs.HadoopMapper>,java.lang.Class<org.apache.hadoop.io.LongWritable>,java.lang.Class<org.apache.hadoop.io.Text>
,java.lang.Class<edu.umd.cloud9.io.pair.PairOfStrings>,java.lang.Class<edu.umd.cloud9.io.fastuil.String2IntOpenHashMapWritable>,boolean,org.apache.hadoop.mapred.JobConf)
2011-08-30 12:40:02
>>Q1: 例如:WordSegmentationMap -> StopWordMap 過程中, 如果想知道StopWordMap 輸入的內容是否正確, 那什麼方法比較好呢?
可以將StopWordMap獨立採用MRUnit來寫Test Case,請參考:http://blog.ring.idv.tw/comment.ser?i=362
>>Q2: JobConf mapAConf = new JobConf(false); // <- deprecated WHY!
代表該API已經不建議使用,請使用新的API
2011-08-30 16:26:27
我發現我都卡在0.20.2 , 0.21.0 寫法上的問題
他們不建議使用JobConf 但, 在0.20, 0.20.203 的ChainMapper.addMapper 又使用JobConf , 弄得很混亂啊...
特別是0.20 版本的chainMapper 參數比版主的多1個, 而且第1個和最後1個又是JobConf
那我已經宣告 Job conf = new Job(...); 之下就一直出錯!
他一直講我的addMapper 使用有錯! 又不講錯在那一個.
2011-08-31 17:43:56
如果你的Hadoop是0.20.x 就採用舊API的寫法,如果是用0.21.x的Hadoop就用我上述的寫法即可,應該沒那麼難才是~
2011-08-31 18:14:46
我大概找到問題了, 我猜是因為我的Mapper class 是用
import org.apache.hadoop.mapreduce.Mapper; 這裡的, 而不是用
import org.apache.hadoop.mapred.Mapper
所以我在0.20 的chain mapper 才會指令有問題吧, 我DE了幾天.
等我確定再上來repo , 為什麼會有2組Mapper寫法呢????
2011-08-31 21:20:01
請參考:http://www.slideshare.net/sh1mmer/upgrading-to-the-new-map-reduce-api
基本上就是簡化一些API的設計,讓開發者更方便使用
2011-09-05 09:21:08
您好
Q1:
我的問題其實跟CJ大大的問題是一樣的
在hadoop 0.20.2的API JobConf 已經deprecated,已改成Job的方式(Shen大大提供的參考網址有去看過了)
但是0.20.2中ChainMapper 的addMapper方法是
addMapper( JobConf job ...., JobConf mapperConf)
這邊是有想過是否可以用 Override的方式??
或者是我api沒有看熟
還請不吝指教
Q2:
另外一個部分則是
原本在M->R之間會有sort的動作
那今天在M->M之間 或者是R->M之間 仍然會有sort的動作嗎?
還請不吝指教,謝謝
如有冒犯之處 還請見諒
2011-10-08 11:03:30
Dear wei,
Q1: hadoop 0.20.2 的ChainMapper只允許JobConf,雖然JobConf已經Deprecated,但你還是可以在0.20.2使用它的,只是未來還是建議改用0.21.x or 更新的API
Q2: sort都是在reduce階段做的
2011-10-08 12:18:53
Shen大大您好:
謝謝之前的回覆~已經可以執行!
但對Q2的部分還是有一些不清楚的地方:
原本的M->R 這邊 ,相同的key(map output)會送到同一reducer task(不知是不是shuffle && sort 的動作)
假設今天我要在R->M,看官方的文件說明這應該屬於同一個task
所以若是以下這種情況(MID表示reducer的output):
R -> M
R -> M
MID
那這邊的MID是否就不會再有根據同一個key送到同一個mapper的情況
其實自己有跑例子測試,相同的key會在兩個 M (不同node)上出現
想問看看是否能保有相同key送到同一個node的情況??
還請不吝指教 謝謝
2011-10-16 11:33:39
Dear wei,
>>相同的key(map output)會送到同一reducer task(不知是不是shuffle && sort 的動作)
是的!
>>想問看看是否能保有相同key送到同一個node的情況??
可以,但是要透過跑第二次的MR讓它透過reduce階段再跑一次,所以你可以去思考看看為何key會跑到同一個node?其實那是因為它會透過HashPartitioner將你的key做各別的分配,而你所提到的R->M的情況,它中間會再透過Partitioner分配嗎?當然不會~ 有機會的話去看一下Hadoop MapReduce的原始碼,你會收獲更多的! good luck!
2011-10-16 14:13:14
好的,我會再去看看原始碼
謝謝您熱心地回答:)
2011-10-16 14:34:56