這篇主要記錄如何將HBase(0.20.3)當作Hadoop MapReduce程式的輸入來源,下述的程式碼很單純的從一個Table取出資料並直接輸出至HDFS上:
import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; public class TestTableInput { public static class Map extends TableMapper<Text, Text> { @Override public void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException { String v = Bytes.toString(value.value()); context.write(new Text(Bytes.toString(value.getRow())), new Text(v)); } } public static class Reduce extends Reducer<Text, Text, Text, Text> { @Override public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { context.write(key, new Text(values.iterator().next())); } } public static void main(String args[]) throws Exception { if(args.length != 2) { System.out.println("Usage: hadoop jar TestTableInput.jar <table> <output>"); System.exit(-1); } String tablename = args[0]; String output = args[1]; Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(conf); fs.delete(new Path(output), true); Job job = new Job(conf, "TestTableInput"); Scan scan = new Scan(); TableMapReduceUtil.initTableMapperJob(tablename, scan, Map.class, Text.class, Text.class, job); job.setJarByClass(TestTableInput.class); job.setReducerClass(Reduce.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setOutputFormatClass(TextOutputFormat.class); TextOutputFormat.setOutputPath(job, new Path(output)); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
這裡值得注意的有二個地方,一個是Map必須繼承HBase所提供的TableMapper(其實TableMapper也是繼承於Mapper),它用來指定KEYIN和VALUEIN兩個類別,它們分別為:ImmutableBytesWritable、Result,而這兩個類別分別對應的Table資料如下:
.ImmutableBytesWritable = Row Key
.Result = Row Key+Column+Timestamp+Value
另一個值得注意的是在main方式中用到的TableMapReduceUtil.initTableMapperJob()方法,它封裝了一些設定,如下圖所示:
從圖中我們可以知道該方法會幫我們設定InputFormatClass為TableInputFormat,還有一些相關設定,例如:TableInputFormat.INPUT_TABLE用來設定輸入的Table,TableInputFormat.SCAN用來設定Scan(由於Scan是一個物件,所以必須透過convertScanToString()方法來轉碼成Base64編碼)