blog.Ring.idv.tw

Hadoop

MapReduce - Mark/Reset of Values Iterator

不曉得是否有人和我有同樣的疑問,就是關於能否將reduce function中的「Iterator」改為「ResettableIterator」,雖然這可以在reduce中花額外的功夫來達成,不過總覺得不是一種有效率的方式,最近看了一下Hadoop相關文件,顯然不是只有我有這樣的需求與想法,因為在Hadoop 0.21.0已經將此功能補上了(Values Iterator should support "mark" and "reset"),用法如下:

WordCount - reudce function

public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException
{
	int sum = 0;
	Iterator<IntWritable> i = values.iterator();
	while(i.hasNext())
	{
		sum += i.next().get();
	}
	result.set(sum);
	context.write(key, result);
}

上述程式是一般WordCount程式中的reduce function,假設現在要為「每個單字多累加一次出現次數」,修改後的reduce程式如下:

修改後的WordCount

public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException
{
	int sum = 0;
	MarkableIterator<IntWritable> mitr = new MarkableIterator<IntWritable>(values.iterator());
	mitr.mark();
	while (mitr.hasNext()) 
	{
		sum += mitr.next().get();
	}
	mitr.reset();
	while (mitr.hasNext()) 
	{
		sum += mitr.next().get();
	} 
	result.set(sum);
	context.write(key, result);
}

從上述程式可以知道它採用「MarkableIterator」來取代先前所使用的「Iterator」,而關於MarkableIterator詳細的實作細節可以從Hadoop 0.21.0 原始碼中的「org.apache.hadoop.mapreduce.task.ReduceContextImpl」程式來查看,另外可以發現原本出現在Hadoop 0.20.x的ValueIterator class(in ReduceContext.java),這在Hadoop 0.21版也都被改寫過了,原先該ValueIterator class只是單純的實作Iteraotr,而這在Hadoop 0.21.0 已將該ValueIterator class改為實作「ReduceContext.ValueIterator」,而ReduceContext.ValueIterator繼承「MarkableIteratorInterface」。

2011-01-19 11:48:54 | Add Comment

Hadoop - Mapper如何處理Split?

在開始進入主題之前,先回顧一下HDFS的儲存方式,在HDFS中預設每個Block Size是64MB(dfs.blocksize:67108864),意指為如果寫一個大於64MB的檔案到HDFS之中,那麼它會自動地將檔案以64MB為基礎來進行切分的動作,這裡我們實驗一個例子:

筆者從「Peter Norvig: How to Write a Spelling Corrector」下載了一個大約包含一佰萬個單詞的純文字文件「big.txt」來實驗,不過由於該檔案約只佔了6MB(6488666),所以透過下述指令將它擴展成大於64MB:

P.S. 請執行12次 = =" (謎之音:有更快的方式嗎?..Orz)

cat big.txt >> test.txt

現在的「test.txt」檔案大約有77MB了(77863992),接著將它寫到HDFS並透過hadoop fsck指令來觀察一下:

bin/hadoop dfs -mkdir /testfile
bin/hadoop dfs -put test.txt /testfile/
bin/hadoop fsck /testfile/test.txt -blocks -files -locations

結果:

0. blk_-4603164807368241811_6252 len=67108864 repl=1 [127.0.0.1:50010]
1. blk_1896285744196882269_6252 len=10755128 repl=1 [127.0.0.1:50010]

從上述的結果來看,「test.txt」的確被切分成兩個Block單位了,分別佔了「67108864」和「10755128」bytes的檔案大小,而從這兩個Block所包含的內容來看可以發現,介於兩個Block之間的「mucous」單詞硬是被拆散成了兩半(「m」和「ucous」)?

blk_-4603164807368241811 - tail

Definition.--Virus.--ACQUIRED SYPHILIS--Primary period:
    _Incubation, primary chancre, glandular enlargement_;
    _Extra-genital chancres_--Treatment--Secondary period: _General
    symptoms, skin affections, m

blk_1896285744196882269 - head

ucous patches, affections of bones,
    joints, eyes_, etc.--Treatment: _Salvarsan_--_Methods of
    administering mercury_--Syphilis and marriage--Intermediate
    stage--_Reminders_--Tertiary period: _General symptoms_,

而這樣的情況在執行MapReduce又是如何處理的?由於在Map階段執行時是一個Mapper對應一個Split,重點就在於該Block最後的「mucous」單字硬是被拆成兩半,而這在Map階段又是如何處理的?總不可能跑個「WordCount」都會有問題吧?而這就是本文的主軸,既然上述「mucous」單詞被拆散成兩半,那麼就應該有對應的處理方式,透過「TextInputFormat」原始碼可以得知它採用「LineRecordReader」來讀取Block的資料內容,如果看過「LineRecordReader」原始碼的話其實已經發現答案了!不過筆者為求實驗正確仍改了一下原始碼去確認,結果證實:

「一個Mapper在讀取一個Block的時候,如果該Block不是檔案(e.g. test.txt)的最後一個Block時,那麼它會多讀取下一個Block的第一行資料」,也就是說處理第一個Block的Mapper最後會讀取「symptoms, skin affections, mucous patches, affections of bones,」這一整行就對了,當然下一個Mapper去處理第二個Block時就自動忽略第一行的資料了,以此類推。

雖然處理的方式很簡單,不過沒看過Source Code就是不曉得它怎麼處理的.. Orz

P.S. 關於這個問題之前同事曾問過我,那時看了Source Code之後一直沒時間實際去驗證它,所以本文記錄了這個驗證過程。

2010-12-07 12:16:18 | Comments (2)

Hadoop - Chaining Maps

本文主要介紹在Hadoop 0.19.0就開始提供的Chaining Maps(Hadoop-3702)功能,在開始介紹之前先假想下述情況~ 是否曾在一個Mapper中處理許多步驟的事項?或者是否曾在Reducer結束之後還要針對每筆資料個別處理的情況?簡單來說~ 如果在你的MapReduce程式中有需要preprocessing或postprocessing的情況,那就適合用Chaining Maps來完成。

這裡我們假設將MapReduce的執行順序用「[MR]+」符號來表示,那麼透過Chaining Maps所執行的工作就可以用「M+RM*」來表示,代表可以執行一個以上的Mapper(preprocessing),接著在Reducer之後再由零或多個Mapper做後續的處理(postprocessing),這樣的作法可以帶來一些額外的好處,譬如:方便除錯、測試、易維護以及MapReduce程式的可重用性,下述是一個簡單的展示範例,主要藉由「ChainMapper」和「ChainReducer」兩個類別來完成,此範例執行的順序為:「WordSegmentationMap | StopWordMap | Reduce | FilterMap」。

Chaining Maps for Hadoop 0.21.x

import java.io.IOException;
import java.util.HashSet;
import java.util.Set;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Cluster;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.chain.ChainMapper;
import org.apache.hadoop.mapreduce.lib.chain.ChainReducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;


public class ChainExample
{
	public static class WordSegmentationMap extends Mapper<LongWritable, Text, Text, IntWritable>
	{
		@Override
		public void map(LongWritable key, Text value , Context context) throws IOException, InterruptedException
		{
			String s[] = value.toString().split(" ");
			for(String w : s)
				context.write(new Text(w), new IntWritable(1));

		}
	}
	public static class StopWordMap extends Mapper<Text, IntWritable, Text, IntWritable>
	{
		private Set<String> stopwords = new HashSet<String>();
		public static final String[] ENGLISH_STOP_WORDS = {
		    "a", "and", "are", "as", "at", "be", "but", "by",
		    "for", "if", "in", "into", "is", "it",
		    "no", "not", "of", "on", "or", "s", "such",
		    "t", "that", "the", "their", "then", "there", "these",
		    "they", "this", "to", "was", "will", "with"
		  };
		
		protected void setup(Context context) throws IOException, InterruptedException
		{
			for(int i = 0 ; i < ENGLISH_STOP_WORDS.length ; i++)
				stopwords.add(ENGLISH_STOP_WORDS[i]);
		}
		@Override
		public void map(Text key, IntWritable value , Context context) throws IOException, InterruptedException
		{
			if(!stopwords.contains(key.toString()))
				context.write(key, value);
		}
	}
	
	public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable>
	{
		@Override
		public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException
		{
			int sum = 0;
			for(IntWritable i : values)
				sum++;	

			context.write(key, new IntWritable(sum));
		}
	}
	public static class FilterMap extends Mapper<Text, IntWritable, IntWritable,Text>
	{
		@Override
		public void map(Text key, IntWritable value, Context context) throws IOException, InterruptedException
		{
			if(value.get()>100)
				context.write(value, key);
		}
	}

	public static void main(String[] args)throws Exception
	{
		String output = "/chainExample_out/";
		Configuration conf = new Configuration();
		Cluster cluster = new Cluster(conf);
		
		FileSystem fs = FileSystem.get(conf);
		fs.delete(new Path(output), true);
		
		Job job = Job.getInstance(cluster);
		job.setJarByClass(ChainExample.class);
		job.setJobName("ChainExample");
		job.setInputFormatClass(TextInputFormat.class);
		job.setOutputFormatClass(TextOutputFormat.class);
		TextInputFormat.setInputPaths(job, new Path("/chainExample/*"));
		TextOutputFormat.setOutputPath(job, new Path(output));
		
		ChainMapper.addMapper(job, WordSegmentationMap.class, LongWritable.class, Text.class, Text.class, IntWritable.class,  new Configuration(false));
		ChainMapper.addMapper(job, StopWordMap.class, Text.class, IntWritable.class, Text.class, IntWritable.class,  new Configuration(false));
		ChainReducer.setReducer(job, Reduce.class, Text.class, IntWritable.class, Text.class, IntWritable.class, new Configuration(false));
		ChainReducer.addMapper(job, FilterMap.class, Text.class, IntWritable.class, IntWritable.class, Text.class, new Configuration(false));
		job.waitForCompletion(true);
	}
}

2010-12-01 10:17:15 | Comments (13)

PageRank in MapReduce

圖片來源:www.jauhari.net

前言

有一陣子沒有寫MapReduce程式了,所以找個代表性的實例來練習一下...

PageRank in MapReduce

PageRank演算法最早是由Google兩位創辦人Sergey Brin & Larry Page在1998年的時候發表在World-Wide Web Conference的一篇論文:The Anatomy of a Large-Scale Hypertextual Web Search Engine所提出來的,該演算法主要用來計算網頁的重要性,以決定搜尋引擎該呈現搜尋結果時的一個排名依據,然而根據Google在1998年當時所索引的網頁數量來看,他們共索引了26 million pages(We knew the web was big...),所以可能三、四台機器就足以運算完成,但是到2000年時Google就索引了超過one billion pages,而這樣的規模就適合用MapReduce來分散式處理了,而本文主要介紹該如何用MapReduce的方式來完成這樣的演算法,然而重點在於PageRank是一種反覆式演算法(Iterative Algorithm),所以該如何應用在MapReduce並決定何時該跳離這個反覆式迴圈以結束運算就需要一些方式來處理。

P.S. 本範例純粹使用「純文字型態」來處理,如果你有效率的考量請試著改寫特定的OutputFormat和Writable實作。

Google PageRank 範例

這裡的範例假設全世界只有四個網頁,它們分別為:Adobe, Google, MSN and Yahoo,每個網頁的PageRank值(簡稱PR值)預設為10。

1. Adobe有三個對外連結,分別連到Google, MSN and Yahoo。

2. Google只有一個對外連結為Adobe。

3. MSN有一個對外連結為Google。

4. Yahoo則有兩個對外連結為MSN and Google。

Adobe	10.00 Google,MSN,Yahoo
Google	10.00 Adobe
MSN	10.00 Google
Yahoo	10.00 MSN,Google

所以從這個範例來看,由於有三個網頁都連結到Google,所以相對來說它的PR值應該是最高的,其次應則為Adobe,因為Google的分數最高且又只連結到Adobe,所以Adobe的PR值也會比較高。

PageRank - MapReduce for Hadoop 0.21.x

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Cluster;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class PageRank
{
	static enum PageCount{
		Count,TotalPR
	}
	public static class PageRankMapper extends Mapper<Object, Text, Text, Text>
	{
		public void map(Object key, Text value, Context context) throws IOException, InterruptedException
		{
			context.getCounter(PageCount.Count).increment(1);
			String[] kv = value.toString().split("\t");
			String _key = kv[0];
			String _value = kv[1];
			String _PRnLink[] = _value.split(" ");
			String pr = _PRnLink[0];
			String link = _PRnLink[1];
			context.write(new Text(_key), new Text(link));

			String site[] = link.split(",");
			float score = Float.valueOf(pr)/(site.length)*1.0f;
			for(int i = 0 ; i < site.length ; i++)
			{
				context.write(new Text(site[i]), new Text(String.valueOf(score)));
			}	

		}
	}
 
	public static class PageRankReducer extends Reducer<Text, Text, Text, Text>
	{
		public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException
		{
			StringBuilder sb = new StringBuilder();
			float factor = 0.85f;
			float pr = 0f;
			for(Text f : values)
			{				
				String value = f.toString();
				int s = value.indexOf(".");
				if(s != -1)
				{
					pr += Float.valueOf(value);
				}else{
					String site[] = value.split(",");
					int _len = site.length;
					for(int k = 0 ; k < _len ;k++)
					{
						sb.append(site[k]);
						sb.append(",");
					}
				}
			}

			pr = ((1-factor)+(factor*(pr)));
			context.getCounter(PageCount.TotalPR).increment((int)(pr*1000));
			String output = pr+" "+sb.toString();
			context.write(key, new Text(output));
		}
	}

	public static void main(String[] args) throws Exception
	{
		String input;
		String output;
		int threshold = 1000;
		int iteration = 0;
		int iterationLimit = 100;
		boolean status = false;
		
		while(iteration < iterationLimit)
		{
			if((iteration % 2) == 0)
			{
				input = "/pagerank_output/p*";
				output = "/pagerank_output2/";
			}else{
				input = "/pagerank_output2/p*";
				output = "/pagerank_output/";
			}

			Configuration conf = new Configuration();	
			
			FileSystem fs = FileSystem.get(conf);
			fs.delete(new Path(output), true);
			
			Job job = Job.getInstance(new Cluster(conf));
			job.setJobName("PageRank");
			job.setJarByClass(PageRank.class);
			job.setMapperClass(PageRankMapper.class);
			job.setReducerClass(PageRankReducer.class);
			job.setOutputKeyClass(Text.class);
			job.setOutputValueClass(Text.class);
			TextInputFormat.addInputPath(job, new Path(input));
			TextOutputFormat.setOutputPath(job, new Path(output));
			status = job.waitForCompletion(true);
			iteration++;
			
			long count = job.getCounters().findCounter(PageCount.Count).getValue();
			long total_pr = job.getCounters().findCounter(PageCount.TotalPR).getValue();
			System.out.println("PageCount:"+count);
			System.out.println("TotalPR:"+total_pr);
			double per_pr = total_pr/(count*1.0d);
			System.out.println("Per PR:"+per_pr);
			if((int)per_pr == threshold)
			{
				System.out.println("Iteration:"+iteration);
				break;
			}
		}
		System.exit(status?0:1);
	}
}

關於上述程式所執行Map和Reduce所處理的過程及輸出結果就不詳加敘述了,留待有興趣的朋友們自行研究~

而關於如何決定跳離反覆式迴圈以結束運算的處理方式,筆者採用下述兩種方式:

1. 最多執行100次的反覆式運算,讓程式有一定的執行次數限制。

2. 分別累加頁面數量和每個網頁的PR值,並觀察其變化量呈現穩定狀態時就離開迴圈,上述範例求到小數第三位。

透過上述的處理方式,可以觀察到在執行第54次MapReduce運算時所呈現出來的結果:

Adobe	1.3334262 Google,MSN,Yahoo,
Google	1.39192 Adobe,
MSN	0.7523096 Google,
Yahoo	0.5279022 MSN,Google,

結果如預期的,Google的PR值最高,其次為Adobe,最後才是MSN和Yahoo。

P.S. 筆者沒有討厭Yahoo也沒有特別喜歡Google,純粹實驗性質... Orz (我比較愛Adobe)

相關資源

.Jimmy Lin and Michael Schatz. Design Patterns for Efficient Graph Algorithms in MapReduce. Proceedings of the 2010 Workshop on Mining and Learning with Graphs Workshop (MLG-2010), July 2010, Washington, D.C.

一式解讀 PageRank

2010-11-29 16:32:57 | Comments (9)

MRUnit - 測試你的MapReduce程式

隨著Hadoop 0.21.0的釋出,你可以更方便的來測試你的MapReduce程式,因為它包含了一套由Cloudera所貢獻用來測試MapReudce程式的Library - MRUnit,不過目前除了官方所提供的overview.html檔之外,其它的相關文件卻相當稀少(補充中:Add MRUnit documentation),而本文純粹記錄一下簡單的WordCount測試程式(New API)來介紹MRUnit的使用方式:

TokenizerMapper

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>
{
	private final static IntWritable one = new IntWritable(1);
	private Text word = new Text();

	public void map(Object key, Text value, Context context) throws IOException, InterruptedException
	{
		StringTokenizer itr = new StringTokenizer(value.toString());
		while (itr.hasMoreTokens())
		{
			word.set(itr.nextToken());
			context.write(word, one);
		}
	}
}

WordCountReducer

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable>
{
	private IntWritable result = new IntWritable();

	public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException
	{
		int sum = 0;
		for (IntWritable val : values)
		{
			sum += val.get();
		}
		result.set(sum);
		context.write(key, result);
	}
}

MRUnitTest

import junit.framework.TestCase;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mrunit.mapreduce.MapReduceDriver;
import org.apache.hadoop.mrunit.types.Pair;

import org.junit.Before;
import org.junit.Test;

public class MRUnitTest extends TestCase
{

	private Mapper<Object, Text, Text, IntWritable> mapper;
	private Reducer<Text, IntWritable, Text, IntWritable> reducer;
	private MapReduceDriver<Object, Text, Text, IntWritable,Text, IntWritable> driver;

	@Before
	public void setUp()
	{
		mapper = new TokenizerMapper();
		reducer = new WordCountReducer();
		driver = new MapReduceDriver<Object, Text, Text, IntWritable,Text, IntWritable>(mapper, reducer);
	}

	@Test
	public void testIdentityMapper()
	{
		Pair<Object, Text> p1 = new Pair<Object, Text>(new Object(), new Text("bar"));
		Pair<Object, Text> p2 = new Pair<Object, Text>(new Object(), new Text("foo"));
		Pair<Object, Text> p3 = new Pair<Object, Text>(new Object(), new Text("bar"));
		Pair<Object, Text> p4 = new Pair<Object, Text>(new Object(), new Text("bar"));
		driver.withInput(p1);
		driver.withInput(p2);
		driver.withInput(p3);
		driver.withInput(p4);
		driver.withOutput(new Text("bar"), new IntWritable(3));
		driver.withOutput(new Text("foo"), new IntWritable(1));
		driver.runTest();
	}
}

由於上述範例主要測試MapReduce整個流程,所以透過「MapReduceDriver」物件來驅動整個測試,當然你也可以只測試Map流程,那就透過「MapDriver」即可,另外MRUnit也支援Counters,所以也可以透過它來驗證程式。

相關資源

Debugging MapReduce Programs With MRUnit

Testing your Hadoop jobs with MRUnit

2010-09-13 18:01:25 | Add Comment

Previous Posts
Copyright (C) Ching-Shen Chen. All rights reserved.

::: 搜尋 :::

::: 分類 :::

::: 最新文章 :::

::: 最新回應 :::

::: 訂閱 :::

Atom feed
Atom Comment