blog.Ring.idv.tw

Cassandra

整合Cassandra和Hadoop - WordCount

由於Cassandra在0.6版開始提供和Hadoop整合,可以將Cassandra所儲存的資料當做Hadoop MapReduce的輸入來源,所以筆者前幾天在試著玩玩Cassandra該如何要和Hadoop整合時,用著「官方所提供的WordCount」範例,跑出來的結果居然完全錯誤!! trace了大半天才發現原來是Cassandra的Bug,還好這在最新釋出的0.6.4版已經被修正了(ColumnFamilyRecordReader returns duplicate rows),不過目前Cassandra還是沒有提供將Hadoop資料輸出到Cassandrad的介面實作(雖然可以在Reduce自行處理),這要等到0.7版才會釋出(A Hadoop Output Format That Targets Cassandra),下述就是Cassandra+Hadoop的WordCount程式:

測試資料

Key     Value
-----------------------------------------
Doc1    new home sales top forecasts 
Doc2    home sales rise in july 
Doc3    increase in home sales in july 
Doc4    july new home sales rise 

IRWordCountSetup

package cassandra;

import org.apache.cassandra.thrift.Cassandra;
import org.apache.cassandra.thrift.ColumnPath;
import org.apache.cassandra.thrift.ConsistencyLevel;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;

public class IRWordCountSetup
{
	public static final String UTF8 = "UTF8";

	public static void main(String[] args)throws Exception
	{
		TTransport tr = new TSocket("localhost", 9160);
		TProtocol proto = new TBinaryProtocol(tr);
		Cassandra.Client client = new Cassandra.Client(proto);
		tr.open();

		String keyspace = "Keyspace1";
		String columnFamily = "Standard1";

		ColumnPath colPathName = new ColumnPath(columnFamily);
		colPathName.setColumn("Doc".getBytes(UTF8));
		long timestamp = System.currentTimeMillis();
		
		client.insert(keyspace, "Doc1", colPathName, "new home sales top forecasts".getBytes(UTF8), timestamp, ConsistencyLevel.ONE);
		client.insert(keyspace, "Doc2", colPathName, "home sales rise in july".getBytes(UTF8), timestamp, ConsistencyLevel.ONE);
		client.insert(keyspace, "Doc3", colPathName, "increase in home sales in july".getBytes(UTF8), timestamp, ConsistencyLevel.ONE);
		client.insert(keyspace, "Doc4", colPathName, "july new home sales rise".getBytes(UTF8), timestamp, ConsistencyLevel.ONE);
	}
}

IRWordCount

package cassandra;

import java.io.IOException;
import java.util.Arrays;
import java.util.SortedMap;
import java.util.StringTokenizer;

import org.apache.cassandra.db.IColumn;
import org.apache.cassandra.hadoop.ColumnFamilyInputFormat;
import org.apache.cassandra.hadoop.ConfigHelper;
import org.apache.cassandra.thrift.SlicePredicate;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class IRWordCount
{
	static final String KEYSPACE = "Keyspace1";
	static final String COLUMN_FAMILY = "Standard1";
	private static final String CONF_COLUMN_NAME = "Doc";
	private static final String OUTPUT_PATH_PREFIX = "/tmp/doc_word_count";

	public static class TokenizerMapper extends Mapper<String, SortedMap<byte[], IColumn>, Text, IntWritable>
	{
		private final static IntWritable one = new IntWritable(1);
		private Text word = new Text();
		private String columnName;
		
		protected void setup(Context context) throws IOException, InterruptedException
		{
			this.columnName = context.getConfiguration().get(CONF_COLUMN_NAME);
		}
		public void map(String key, SortedMap<byte[], IColumn> columns, Context context) throws IOException, InterruptedException
		{
			IColumn column = columns.get(columnName.getBytes());
			if(column == null)
				return;
			
			String value = new String(column.value());			
			System.out.println("read " + key + ":" + value + " from " + context.getInputSplit());

			StringTokenizer itr = new StringTokenizer(value);
			while (itr.hasMoreTokens())
			{
				word.set(itr.nextToken());
				context.write(word, one);
			}
		}
	}

	public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable>
	{
		private IntWritable result = new IntWritable();

		public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException
		{
			int sum = 0;
			for (IntWritable val : values)
			{
				sum += val.get();
			}
			result.set(sum);
			context.write(key, result);
		}
	}
	
	public static void main(String[] args) throws Exception
	{
		Path output = new Path(OUTPUT_PATH_PREFIX);
		Configuration conf = new Configuration();
        
		FileSystem fs = FileSystem.get(conf);
		if(fs.exists(output))
			fs.delete(output, true);
		
		String columnName = "Doc";
		conf.set(CONF_COLUMN_NAME, columnName);
		Job job = new Job(conf, "wordcount");
		job.setJarByClass(IRWordCount.class);
		job.setMapperClass(TokenizerMapper.class);
		job.setCombinerClass(IntSumReducer.class);
		job.setReducerClass(IntSumReducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);

		job.setInputFormatClass(ColumnFamilyInputFormat.class);
		FileOutputFormat.setOutputPath(job, output);

		ConfigHelper.setColumnFamily(job.getConfiguration(), KEYSPACE, COLUMN_FAMILY);
		SlicePredicate predicate = new SlicePredicate().setColumn_names(Arrays.asList(columnName.getBytes()));
		ConfigHelper.setSlicePredicate(job.getConfiguration(), predicate);

		boolean status = job.waitForCompletion(true);
		System.exit(status ? 0 : 1);
	}
}

輸出的結果為:

forecasts	1
home	4
in	3
increase	1
july	3
new	2
rise	2
sales	4
top	1

2010-08-09 15:27:32 | Add Comment

Copyright (C) Ching-Shen Chen. All rights reserved.

::: 搜尋 :::

::: 分類 :::

::: 最新文章 :::

::: 最新回應 :::

::: 訂閱 :::

Atom feed
Atom Comment