基本上HBase的RPC設計是採用Hadoop的RPC並做一些更動而寫成的(HBase/RoadMaps),而如果要用一句話來解釋HBase/Hadoop的RPC設計可以這麼說:它是透過Dynamic Proxy Pattern + Reflection + NIO(Multiplexed, non-blocking I/O)所構成的,中間溝通的物件會透過序列化(serialization)的方式來傳遞,所以都需要實作Hadoop的Writable介面,下述是筆者利用HBase/Hadoop的RPC設計來自訂一個Hello, Java範例:
RPCInterface
自行定義一個「say()」方法供RPC呼叫。
package hbase.rpc; import org.apache.hadoop.hbase.ipc.HBaseRPCProtocolVersion; public interface RPCInterface extends HBaseRPCProtocolVersion { public String say(); }
Message
實作RPCInterface介面的Message類別,純粹回傳一個「Hello, Java」字串。
package hbase.rpc; import java.io.IOException; import org.apache.hadoop.hbase.ipc.HBaseRPCProtocolVersion; public class Message implements RPCInterface { public String say() { return "Hello, Java"; } @Override public long getProtocolVersion(String protocol, long clientVersion) throws IOException { return HBaseRPCProtocolVersion.versionID; } }
TestRPCServer
該程式會透過「HBaseRPC.addToMap()」來註冊自行定義的方法(Method Registry),而內部就是利用Reflection來取得該類別所擁有的方法(Method),它會給予每個方法一個特定的ID,而該ID是一個整數值。
由於HBase/Hadoop的Server是採用Multiplexed, non-blocking I/O方式而設計的,所以它可以透過一個Thread來完成處理,但是由於處理Client端所呼叫的方法是Blocking I/O,所以它的設計會將Client所傳遞過來的物件先放置在Queue,並在啟動Server時就先產生一堆Handler(Thread),該Handler會透過Polling的方式來取得該物件並執行對應的方法,下述範例預設為10個Handler(HMaster/HRegionServer預設都為25個,根據"hbase.regionserver.handler.count"設定)。
package hbase.rpc; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HServerAddress; import org.apache.hadoop.hbase.ipc.HBaseRPC; import org.apache.hadoop.hbase.ipc.HBaseServer; public class TestRPCServer { private HBaseServer server; private final HServerAddress address; static{HBaseRPC.addToMap(RPCInterface.class, (byte)37);} public TestRPCServer() { this.address = new HServerAddress("localhost:56789"); } public void start() { try { Message msg = new Message(); this.server = HBaseRPC.getServer(msg, address.getBindAddress(), address.getPort(), 10, true, new HBaseConfiguration()); this.server.start(); while (true) { Thread.sleep(3000); } } catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args) { new TestRPCServer().start(); } }
TestRPCClient
這裡的「HBaseRPC.getProxy()」就是採用Dynamic Proxy Pattern + Reflection來設計,有興趣的朋友可以去研究它的Source Code。
package hbase.rpc; import java.io.IOException; import java.net.InetSocketAddress; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.ipc.HBaseRPC; import org.apache.hadoop.hbase.ipc.HBaseRPCProtocolVersion; public class TestRPCClient { protected RPCInterface server; static{HBaseRPC.addToMap(RPCInterface.class, (byte)37);} @SuppressWarnings("unchecked") public TestRPCClient() { try { server = (RPCInterface) HBaseRPC.getProxy(RPCInterface.class, HBaseRPCProtocolVersion.versionID, new InetSocketAddress("localhost", 56789), new HBaseConfiguration()); } catch (Exception e) { e.printStackTrace(); } } public String call() throws IOException { return server.say(); } public static void main(String[] args) throws IOException { TestRPCClient client = new TestRPCClient(); System.out.println(client.call()); } }
玩玩看吧!