blog.Ring.idv.tw

Hadoop - Chaining Maps

Hadoop - Chaining Maps


本文主要介紹在Hadoop 0.19.0就開始提供的Chaining Maps(Hadoop-3702)功能,在開始介紹之前先假想下述情況~ 是否曾在一個Mapper中處理許多步驟的事項?或者是否曾在Reducer結束之後還要針對每筆資料個別處理的情況?簡單來說~ 如果在你的MapReduce程式中有需要preprocessing或postprocessing的情況,那就適合用Chaining Maps來完成。

這裡我們假設將MapReduce的執行順序用「[MR]+」符號來表示,那麼透過Chaining Maps所執行的工作就可以用「M+RM*」來表示,代表可以執行一個以上的Mapper(preprocessing),接著在Reducer之後再由零或多個Mapper做後續的處理(postprocessing),這樣的作法可以帶來一些額外的好處,譬如:方便除錯、測試、易維護以及MapReduce程式的可重用性,下述是一個簡單的展示範例,主要藉由「ChainMapper」和「ChainReducer」兩個類別來完成,此範例執行的順序為:「WordSegmentationMap | StopWordMap | Reduce | FilterMap」。

Chaining Maps for Hadoop 0.21.x

import java.io.IOException;
import java.util.HashSet;
import java.util.Set;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Cluster;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.chain.ChainMapper;
import org.apache.hadoop.mapreduce.lib.chain.ChainReducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;


public class ChainExample
{
	public static class WordSegmentationMap extends Mapper<LongWritable, Text, Text, IntWritable>
	{
		@Override
		public void map(LongWritable key, Text value , Context context) throws IOException, InterruptedException
		{
			String s[] = value.toString().split(" ");
			for(String w : s)
				context.write(new Text(w), new IntWritable(1));

		}
	}
	public static class StopWordMap extends Mapper<Text, IntWritable, Text, IntWritable>
	{
		private Set<String> stopwords = new HashSet<String>();
		public static final String[] ENGLISH_STOP_WORDS = {
		    "a", "and", "are", "as", "at", "be", "but", "by",
		    "for", "if", "in", "into", "is", "it",
		    "no", "not", "of", "on", "or", "s", "such",
		    "t", "that", "the", "their", "then", "there", "these",
		    "they", "this", "to", "was", "will", "with"
		  };
		
		protected void setup(Context context) throws IOException, InterruptedException
		{
			for(int i = 0 ; i < ENGLISH_STOP_WORDS.length ; i++)
				stopwords.add(ENGLISH_STOP_WORDS[i]);
		}
		@Override
		public void map(Text key, IntWritable value , Context context) throws IOException, InterruptedException
		{
			if(!stopwords.contains(key.toString()))
				context.write(key, value);
		}
	}
	
	public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable>
	{
		@Override
		public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException
		{
			int sum = 0;
			for(IntWritable i : values)
				sum++;	

			context.write(key, new IntWritable(sum));
		}
	}
	public static class FilterMap extends Mapper<Text, IntWritable, IntWritable,Text>
	{
		@Override
		public void map(Text key, IntWritable value, Context context) throws IOException, InterruptedException
		{
			if(value.get()>100)
				context.write(value, key);
		}
	}

	public static void main(String[] args)throws Exception
	{
		String output = "/chainExample_out/";
		Configuration conf = new Configuration();
		Cluster cluster = new Cluster(conf);
		
		FileSystem fs = FileSystem.get(conf);
		fs.delete(new Path(output), true);
		
		Job job = Job.getInstance(cluster);
		job.setJarByClass(ChainExample.class);
		job.setJobName("ChainExample");
		job.setInputFormatClass(TextInputFormat.class);
		job.setOutputFormatClass(TextOutputFormat.class);
		TextInputFormat.setInputPaths(job, new Path("/chainExample/*"));
		TextOutputFormat.setOutputPath(job, new Path(output));
		
		ChainMapper.addMapper(job, WordSegmentationMap.class, LongWritable.class, Text.class, Text.class, IntWritable.class,  new Configuration(false));
		ChainMapper.addMapper(job, StopWordMap.class, Text.class, IntWritable.class, Text.class, IntWritable.class,  new Configuration(false));
		ChainReducer.setReducer(job, Reduce.class, Text.class, IntWritable.class, Text.class, IntWritable.class, new Configuration(false));
		ChainReducer.addMapper(job, FilterMap.class, Text.class, IntWritable.class, IntWritable.class, Text.class, new Configuration(false));
		job.waitForCompletion(true);
	}
}

2010-12-01 10:17:15

13 comments on "Hadoop - Chaining Maps"

  1. 1. CJ 說:

    Hello
    我想請問, 這個中間的中間值, 要怎麼去取得呢?
    看起來, FilterMap 才是最後輸出context的地方.

    2011-08-28 17:59:49

  2. 2. Shen 說:

    何謂中間值?舉個例子說明

    2011-08-29 10:01:18

  3. 3. CJ 說:

    Q1: 例如:
    WordSegmentationMap -> StopWordMap 過程中, 如果想知道StopWordMap 輸入的內容是否正確, 那什麼方法比較好呢?

    Q2: 另外, 我用另一種寫法要做ChainMap 一直失敗, 我查了StackOverflow 問題網站,
    有一個相似問題, 但一樣在 Hadoop r.0.20.2 上沒有結果
    Configuration conf = new Configuration();
    .
    .
    .
    JobConf mapAConf = new JobConf(false); // <- deprecated WHY!
    ChainMapper.addMapper( conf, HadoopMapper.class, LongWritable.class, Text.class, PairOfStrings.class, String2IntOpenHashMapWritable.class, true, mapAConf );

    addMapper會錯, 這一行有問題. (有點長, 不知是我第1個conf有錯還是最後1個conf有錯)

    <K1,V1,K2,V2>addMapper(org.apache.hadoop.mapred.JobConf,java.lang.Class<? extends org.apache.hadoop.mapred.Mapper<K1,V1,K2,V2>>,java.lang.Class<? extends K1>
    ,java.lang.Class<? extends V1>,java.lang.Class<? extends K2>,java.lang.Class<? extends V2>,boolean,org.apache.hadoop.mapred.JobConf)
    in org.apache.hadoop.mapred.lib.ChainMapper cannot be applied to (org.apache.hadoop.conf.Configuration,java.lang.Class<dfs.HadoopMapper>,java.lang.Class<org.apache.hadoop.io.LongWritable>,java.lang.Class<org.apache.hadoop.io.Text>
    ,java.lang.Class<edu.umd.cloud9.io.pair.PairOfStrings>,java.lang.Class<edu.umd.cloud9.io.fastuil.String2IntOpenHashMapWritable>,boolean,org.apache.hadoop.mapred.JobConf)

    2011-08-30 12:40:02

  4. 4. Shen 說:

    >>Q1: 例如:WordSegmentationMap -> StopWordMap 過程中, 如果想知道StopWordMap 輸入的內容是否正確, 那什麼方法比較好呢?
    可以將StopWordMap獨立採用MRUnit來寫Test Case,請參考:http://blog.ring.idv.tw/comment.ser?i=362

    >>Q2: JobConf mapAConf = new JobConf(false); // <- deprecated WHY!
    代表該API已經不建議使用,請使用新的API

    2011-08-30 16:26:27

  5. 5. CJ 說:

    我發現我都卡在0.20.2 , 0.21.0 寫法上的問題

    他們不建議使用JobConf 但, 在0.20, 0.20.203 的ChainMapper.addMapper 又使用JobConf , 弄得很混亂啊...

    特別是0.20 版本的chainMapper 參數比版主的多1個, 而且第1個和最後1個又是JobConf

    那我已經宣告 Job conf = new Job(...); 之下就一直出錯!
    他一直講我的addMapper 使用有錯! 又不講錯在那一個.

    2011-08-31 17:43:56

  6. 6. shen 說:

    如果你的Hadoop是0.20.x 就採用舊API的寫法,如果是用0.21.x的Hadoop就用我上述的寫法即可,應該沒那麼難才是~

    2011-08-31 18:14:46

  7. 7. CJ 說:

    我大概找到問題了, 我猜是因為我的Mapper class 是用
    import org.apache.hadoop.mapreduce.Mapper; 這裡的, 而不是用
    import org.apache.hadoop.mapred.Mapper

    所以我在0.20 的chain mapper 才會指令有問題吧, 我DE了幾天.
    等我確定再上來repo , 為什麼會有2組Mapper寫法呢????

    2011-08-31 21:20:01

  8. 8. Shen 說:

    請參考:http://www.slideshare.net/sh1mmer/upgrading-to-the-new-map-reduce-api
    基本上就是簡化一些API的設計,讓開發者更方便使用

    2011-09-05 09:21:08

  9. 9. wei 說:

    您好
    Q1:
    我的問題其實跟CJ大大的問題是一樣的
    在hadoop 0.20.2的API JobConf 已經deprecated,已改成Job的方式(Shen大大提供的參考網址有去看過了)
    但是0.20.2中ChainMapper 的addMapper方法是
    addMapper( JobConf job ...., JobConf mapperConf)
    這邊是有想過是否可以用 Override的方式??
    或者是我api沒有看熟
    還請不吝指教
    Q2:
    另外一個部分則是
    原本在M->R之間會有sort的動作
    那今天在M->M之間 或者是R->M之間 仍然會有sort的動作嗎?

    還請不吝指教,謝謝

    如有冒犯之處 還請見諒

    2011-10-08 11:03:30

  10. 10. Shen 說:

    Dear wei,

    Q1: hadoop 0.20.2 的ChainMapper只允許JobConf,雖然JobConf已經Deprecated,但你還是可以在0.20.2使用它的,只是未來還是建議改用0.21.x or 更新的API

    Q2: sort都是在reduce階段做的

    2011-10-08 12:18:53

  11. 11. wei 說:

    Shen大大您好:
    謝謝之前的回覆~已經可以執行!

    但對Q2的部分還是有一些不清楚的地方:

    原本的M->R 這邊 ,相同的key(map output)會送到同一reducer task(不知是不是shuffle && sort 的動作)
    假設今天我要在R->M,看官方的文件說明這應該屬於同一個task
    所以若是以下這種情況(MID表示reducer的output):

    R -> M
    R -> M
    MID
    那這邊的MID是否就不會再有根據同一個key送到同一個mapper的情況
    其實自己有跑例子測試,相同的key會在兩個 M (不同node)上出現

    想問看看是否能保有相同key送到同一個node的情況??

    還請不吝指教 謝謝

    2011-10-16 11:33:39

  12. 12. Shen 說:

    Dear wei,

    >>相同的key(map output)會送到同一reducer task(不知是不是shuffle && sort 的動作)
    是的!

    >>想問看看是否能保有相同key送到同一個node的情況??
    可以,但是要透過跑第二次的MR讓它透過reduce階段再跑一次,所以你可以去思考看看為何key會跑到同一個node?其實那是因為它會透過HashPartitioner將你的key做各別的分配,而你所提到的R->M的情況,它中間會再透過Partitioner分配嗎?當然不會~ 有機會的話去看一下Hadoop MapReduce的原始碼,你會收獲更多的! good luck!

    2011-10-16 14:13:14

  13. 13. wei 說:

    好的,我會再去看看原始碼

    謝謝您熱心地回答:)

    2011-10-16 14:34:56

Leave a Comment

Copyright (C) Ching-Shen Chen. All rights reserved.

::: 搜尋 :::

::: 分類 :::

::: Ads :::

::: 最新文章 :::

::: 最新回應 :::

::: 訂閱 :::

Atom feed
Atom Comment