blog.Ring.idv.tw

HBase/Hadoop RPC

HBase/Hadoop RPC

基本上HBase的RPC設計是採用Hadoop的RPC並做一些更動而寫成的(HBase/RoadMaps),而如果要用一句話來解釋HBase/Hadoop的RPC設計可以這麼說:它是透過Dynamic Proxy Pattern + Reflection + NIO(Multiplexed, non-blocking I/O)所構成的,中間溝通的物件會透過序列化(serialization)的方式來傳遞,所以都需要實作Hadoop的Writable介面,下述是筆者利用HBase/Hadoop的RPC設計來自訂一個Hello, Java範例:

RPCInterface

自行定義一個「say()」方法供RPC呼叫。

package hbase.rpc;

import org.apache.hadoop.hbase.ipc.HBaseRPCProtocolVersion;

public interface RPCInterface extends HBaseRPCProtocolVersion
{
	public String say();
}

Message

實作RPCInterface介面的Message類別,純粹回傳一個「Hello, Java」字串。

package hbase.rpc;

import java.io.IOException;

import org.apache.hadoop.hbase.ipc.HBaseRPCProtocolVersion;

public class Message implements RPCInterface
{
	public String say()
	{
		return "Hello, Java";
	}
	
	@Override
	public long getProtocolVersion(String protocol, long clientVersion) throws IOException
	{
		return HBaseRPCProtocolVersion.versionID;
	}
}

TestRPCServer

該程式會透過「HBaseRPC.addToMap()」來註冊自行定義的方法(Method Registry),而內部就是利用Reflection來取得該類別所擁有的方法(Method),它會給予每個方法一個特定的ID,而該ID是一個整數值。

由於HBase/Hadoop的Server是採用Multiplexed, non-blocking I/O方式而設計的,所以它可以透過一個Thread來完成處理,但是由於處理Client端所呼叫的方法是Blocking I/O,所以它的設計會將Client所傳遞過來的物件先放置在Queue,並在啟動Server時就先產生一堆Handler(Thread),該Handler會透過Polling的方式來取得該物件並執行對應的方法,下述範例預設為10個Handler(HMaster/HRegionServer預設都為25個,根據"hbase.regionserver.handler.count"設定)。

package hbase.rpc;

import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HServerAddress;
import org.apache.hadoop.hbase.ipc.HBaseRPC;
import org.apache.hadoop.hbase.ipc.HBaseServer;

public class TestRPCServer
{
	private HBaseServer server;
	private final HServerAddress address;
	static{HBaseRPC.addToMap(RPCInterface.class, (byte)37);}
	
	public TestRPCServer()
	{
		this.address = new HServerAddress("localhost:56789");
	}

	public void start()
	{
		try
		{
			Message msg = new Message();
			this.server = HBaseRPC.getServer(msg, address.getBindAddress(), address.getPort(), 10, true, new HBaseConfiguration());
			this.server.start();
			while (true)
			{
				Thread.sleep(3000);
			}
		} catch (Exception e)
		{
			e.printStackTrace();
		}

	}
	public static void main(String[] args)
	{
		new TestRPCServer().start();
	}
}

TestRPCClient

這裡的「HBaseRPC.getProxy()」就是採用Dynamic Proxy Pattern + Reflection來設計,有興趣的朋友可以去研究它的Source Code。

package hbase.rpc;

import java.io.IOException;
import java.net.InetSocketAddress;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.ipc.HBaseRPC;
import org.apache.hadoop.hbase.ipc.HBaseRPCProtocolVersion;


public class TestRPCClient
{
	protected RPCInterface server;
	static{HBaseRPC.addToMap(RPCInterface.class, (byte)37);}
	@SuppressWarnings("unchecked")
	public TestRPCClient()
	{
		try
		{
			server = (RPCInterface) HBaseRPC.getProxy(RPCInterface.class, HBaseRPCProtocolVersion.versionID, new InetSocketAddress("localhost", 56789), new HBaseConfiguration());
		} catch (Exception e)
		{
			e.printStackTrace();
		}
	}

	public String call() throws IOException
	{
		return server.say();
	}

	public static void main(String[] args) throws IOException
	{
		TestRPCClient client = new TestRPCClient();
		System.out.println(client.call());
	}
}

玩玩看吧!

2010-05-09 01:46:58

1 comments on "HBase/Hadoop RPC"

  1. 1. bacchus 說:

    很清晰,受教

    2011-05-19 15:35:29

Leave a Comment

Copyright (C) Ching-Shen Chen. All rights reserved.

::: 搜尋 :::

::: 分類 :::

::: 最新文章 :::

::: 最新回應 :::

::: 訂閱 :::

Atom feed
Atom Comment