Hadoop RPC通信Client客户端的流程分析
来源:程序员人生 发布时间:2015-01-10 08:58:38 阅读次数:3865次
Hadoop的RPC的通讯与其他系统的RPC通讯不太1样,作者针对Hadoop的使用特点,专门的设计了1套RPC框架,这套框架个人感觉还是有点小复杂的。所以我打算分成Client客户端和Server服务端2个模块做分析。如果你对RPC的整套流程已非常了解的条件下,对Hadoop的RPC,你也1定可以非常迅速的了解的。OK,下面切入正题。
Hadoop的RPC的相干代码都在org.apache.hadoop.ipc的包下,首先RPC的通讯必须遵照许多的协议,其中最最基本的协议即便以下;
/**
* Superclass of all protocols that use Hadoop RPC.
* Subclasses of this interface are also supposed to have
* a static final long versionID field.
* Hadoop RPC所有协议的基类,返回协议版本号
*/
public interface VersionedProtocol {
/**
* Return protocol version corresponding to protocol interface.
* @param protocol The classname of the protocol interface
* @param clientVersion The version of the protocol that the client speaks
* @return the version that the server will speak
*/
public long getProtocolVersion(String protocol,
long clientVersion) throws IOException;
}
他是所有协议的基类,他的下面还有1堆的子类,分别对应于不同情况之间的通讯,下面是1张父子类图:
顾名思义,只有客户端和服务端遵守相同的版本号,才能进行通讯。
RPC客户真个所有相干操作都被封装在了1个叫Client.java的文件中:
/** A client for an IPC service. IPC calls take a single {@link Writable} as a
* parameter, and return a {@link Writable} as their value. A service runs on
* a port and is defined by a parameter class and a value class.
* RPC客户端类
* @see Server
*/
public class Client {
public static final Log LOG =
LogFactory.getLog(Client.class);
//客户端到服务真个连接
private Hashtable<ConnectionId, Connection> connections =
new Hashtable<ConnectionId, Connection>();
//回调值类
private Class<? extends Writable> valueClass; // class of call values
//call回调id的计数器
private int counter; // counter for call ids
//原子变量判断客户端是不是还在运行
private AtomicBoolean running = new AtomicBoolean(true); // if client runs
final private Configuration conf;
//socket工厂,用来创建socket
private SocketFactory socketFactory; // how to create sockets
private int refCount = 1;
......
从代码中明显的看到,这里存在着1个类似于connections连接池的东西,其实这暗示着连接是可以被复用的,在hashtable中,与每一个Connecttion连接的对应的是1个ConnectionId,明显这里不是1个Long类似的数值:
/**
* This class holds the address and the user ticket. The client connections
* to servers are uniquely identified by <remoteAddress, protocol, ticket>
* 连接的唯1标识,主要通过<远程地址,协议类型,用户组信息>
*/
static class ConnectionId {
//远程的socket地址
InetSocketAddress address;
//用户组信息
UserGroupInformation ticket;
//协议类型
Class<?> protocol;
private static final int PRIME = 16777619;
private int rpcTimeout;
private String serverPrincipal;
private int maxIdleTime; //connections will be culled if it was idle for
//maxIdleTime msecs
private int maxRetries; //the max. no. of retries for socket connections
private boolean tcpNoDelay; // if T then disable Nagle's Algorithm
private int pingInterval; // how often sends ping to the server in msecs
....
这里用了3个属性组成唯1的标识属性,为了保证可以进行ID的复用,所以作者对ConnectionId的equal比较方法和hashCode 进行了重写:
/**
* 作者重写了equal比较方法,只要成员变量都想等也就想到了
*/
@Override
public boolean equals(Object obj) {
if (obj == this) {
return true;
}
if (obj instanceof ConnectionId) {
ConnectionId that = (ConnectionId) obj;
return isEqual(this.address, that.address)
&& this.maxIdleTime == that.maxIdleTime
&& this.maxRetries == that.maxRetries
&& this.pingInterval == that.pingInterval
&& isEqual(this.protocol, that.protocol)
&& this.rpcTimeout == that.rpcTimeout
&& isEqual(this.serverPrincipal, that.serverPrincipal)
&& this.tcpNoDelay == that.tcpNoDelay
&& isEqual(this.ticket, that.ticket);
}
return false;
}
/**
* 重写了hashCode的生成规则,保证不同的对象产生不同的hashCode值
*/
@Override
public int hashCode() {
int result = 1;
result = PRIME * result + ((address == null) ? 0 : address.hashCode());
result = PRIME * result + maxIdleTime;
result = PRIME * result + maxRetries;
result = PRIME * result + pingInterval;
result = PRIME * result + ((protocol == null) ? 0 : protocol.hashCode());
result = PRIME * rpcTimeout;
result = PRIME * result
+ ((serverPrincipal == null) ? 0 : serverPrincipal.hashCode());
result = PRIME * result + (tcpNoDelay ? 1231 : 1237);
result = PRIME * result + ((ticket == null) ? 0 : ticket.hashCode());
return result;
}
这样就可以保证对应同类型的连接就可以够完全复用了,而不是仅仅凭仗援用的关系判断对象是不是相等,
这里就是1个不错的设计了。
与连接Id对应的就是Connection了,它里面保护是1下的1些变量;
/** Thread that reads responses and notifies callers. Each connection owns a
* socket connected to a remote address. Calls are multiplexed through this
* socket: responses may be delivered out of order. */
private class Connection extends Thread {
//所连接的
服务器地址
private InetSocketAddress server; // server ip:port
//服务真个krb5的名字,与安全方面相干
private String serverPrincipal; // server's krb5 principal name
//连接头部,内部包括了,所用的协议,客户端用户组信息和验证的而方法
private ConnectionHeader header; // connection header
//远程连接ID
private final ConnectionId remoteId; // connection id
//连接验证方法
private AuthMethod authMethod; // authentication method
//下面3个变量都是安全方面的
private boolean useSasl;
private Token<? extends TokenIdentifier> token;
private SaslRpcClient saslRpcClient;
//下面是1组socket通讯方面的变量
private Socket socket = null; // connected socket
private DataInputStream in;
private DataOutputStream out;
private int rpcTimeout;
private int maxIdleTime; //connections will be culled if it was idle for
//maxIdleTime msecs
private int maxRetries; //the max. no. of retries for socket connections
//tcpNoDelay可设置是不是阻塞模式
private boolean tcpNoDelay; // if T then disable Nagle's Algorithm
private int pingInterval; // how often sends ping to the server in msecs
// currently active calls 当前活跃的回调,1个连接 可能会有很多个call回调
private Hashtable<Integer, Call> calls = new Hashtable<Integer, Call>();
//最后1次IO活动通讯的时间
private AtomicLong lastActivity = new AtomicLong();// last I/O activity time
//连接关闭标记
private AtomicBoolean shouldCloseConnection = new AtomicBoolean(); // indicate if the connection is closed
private IOException closeException; // close reason
.....
里面保护了大量的和连接通讯相干的变量,在这里有1个很成心思的东西connectionHeader,连接头部,里面的数据时为了在通讯最开始的时候被使用:
class ConnectionHeader implements Writable {
public static final Log LOG = LogFactory.getLog(ConnectionHeader.class);
//客户端和服务端通讯的协议名称
private String protocol;
//客户真个用户组信息
private UserGroupInformation ugi = null;
//验证的方式,关系到写入数据的时的格式
private AuthMethod authMethod;
.....
起到标识验证的作用。1个Client类的基本结构我们基本可以描绘出来了,下面是完全的类关系图:
在上面这幅图中,你肯定会发现我少了1个很关键的类了,就是Call回调类。Call回调在很多异步通讯中是常常出现的。由于在通讯进程中,当1个对象通过网络发送要求给另外1个对象的时候,如果采取同步的方式,会1直阻塞在那里,会带来非常不好的效力和体验的,所以很多时候,我们采取的是1种叫回调接口的方式。在这期间,用户可以继续做自己的事情。所以一样的Call这个概念固然也是适用在Hadoop RPC中。在Hadoop的RPC的核心调用原理,
简单的说,就是我把parame参数序列化到1个对象中,通过参数的情势把对象传入,进行RPC通讯,最后服务端把处理好的结果值放入call对象,在返回给客户端,也就是说客户端和服务端都是通过Call对象进行操作,Call里面存着,要求的参数,和处理后的结构值2个变量。通过Call对象的封装,客户单实现了完善的不必知道细节的调用。下面是Call类的类按时
/** A call waiting for a value. */
//客户真个1个回调
private class Call {
//回调ID
int id; // call id
//被序列化的参数
Writable param; // parameter
//返回值
Writable value; // value, null if error
//出错时返回的异常
IOException error; // exception, null if value
//回调是不是已被完成
boolean done; // true when call is done
....
看到这个Call回调类,或许你渐渐的会明白Hadoop RPC的1个基本原型了,这些Call固然是存在于某个连接中的,1个连接可能会产生多个回调,所以在Connection中保护了calls列表:
private class Connection extends Thread {
....
// currently active calls 当前活跃的回调,1个连接 可能会有很多个call回调
private Hashtable<Integer, Call> calls = new Hashtable<Integer, Call>();
作者在设计Call类的时候,比较聪明的斟酌1种并发情况下的Call调用,所以为此设计了下面这个Call的子类,就是专门用于短时间内的瞬间Call调用:
/** Call implementation used for parallel calls. */
/** 继承自Call回调类,可以并行的使用,通过加了index下标做Call的辨别 */
private class ParallelCall extends Call {
//每一个ParallelCall并行的回调就会有对应的结果类
private ParallelResults results;
//index作为Call的辨别
private int index;
....
如果要查找值,就通过里面的ParallelCall查找,原理是根据index索引:
/** Result collector for parallel calls. */
private static class ParallelResults {
//并行结果类中具有1组返回值,需要ParallelCall的index索引匹配
private Writable[] values;
//结果值的数量
private int size;
//values中已知的值的个数
private int count;
.....
/** Collect a result. */
public synchronized void callComplete(ParallelCall call) {
//将call中的值赋给result中
values[call.index] = call.value; // store the value
count++; // count it
//如果计数的值等到终究大小,通知caller
if (count == size) // if all values are in
notify(); // then notify waiting caller
}
}
由于Call结构集是这些并发Call共有的,所以用的是static变量,都存在在了values数组中了,只有所有的并发Call都把值取出来了,才算回调成功,这个是个非常细小的辅助设计,这个在有些书籍上并没有多少提及。下面我们看看1般Call回调的流程,正如刚刚说的,终究客户端看到的情势就是,传入参数,取得结果,疏忽内部1切逻辑,这是怎样做到的呢,答案在下面:
在履行之前,你会先得到ConnectionId:
public Writable call(Writable param, InetSocketAddress addr,
Class<?> protocol, UserGroupInformation ticket,
int rpcTimeout)
throws InterruptedException, IOException {
ConnectionId remoteId = ConnectionId.getConnectionId(addr, protocol,
ticket, rpcTimeout, conf);
return call(param, remoteId);
}
接着才是主流程:
public Writable call(Writable param, ConnectionId remoteId)
throws InterruptedException, IOException {
//根据参数构造1个Call回调
Call call = new Call(param);
//根据远程ID获得连接
Connection connection = getConnection(remoteId, call);
//发送参数
connection.sendParam(call); // send the parameter
boolean interrupted = false;
synchronized (call) {
//如果call.done为false,就是Call还没完成
while (!call.done) {
try {
//等待远端程序的履行终了
call.wait(); // wait for the result
} catch (InterruptedException ie) {
// save the fact that we were interrupted
interrupted = true;
}
}
//如果是异常中断,则终止当前线程
if (interrupted) {
// set the interrupt flag now that we are done waiting
Thread.currentThread().interrupt();
}
//如果call回到出错,则返回call出错信息
if (call.error != null) {
if (call.error instanceof RemoteException) {
call.error.fillInStackTrace();
throw call.error;
} else { // local exception
// use the connection because it will reflect an ip change, unlike
// the remoteId
throw wrapException(connection.getRemoteAddress(), call.error);
}
} else {
//如果是正常情况下,返回回调解理后的值
return call.value;
}
}
}
在这上面的操作步骤中,重点关注2个函数,获得连接操作,看看人家是如何保证连接的复用性的:
private Connection getConnection(ConnectionId remoteId,
Call call)
throws IOException, InterruptedException {
.....
/* we could avoid this allocation for each RPC by having a
* connectionsId object and with set() method. We need to manage the
* refs for keys in HashMap properly. For now its ok.
*/
do {
synchronized (connections) {
//从connection连接池中获得连接,可以保证相同的连接ID可以复用
connection = connections.get(remoteId);
if (connection == null) {
connection = new Connection(remoteId);
connections.put(remoteId, connection);
}
}
} while (!connection.addCall(call));
有点单例模式的味道哦,还有1个方法叫sendParam发送参数方法:
public void sendParam(Call call) {
if (shouldCloseConnection.get()) {
return;
}
DataOutputBuffer d=null;
try {
synchronized (this.out) {
if (LOG.isDebugEnabled())
LOG.debug(getName() + " sending #" + call.id);
//for serializing the
//data to be written
//将call回调中的参数写入到输出流中,传向服务端
d = new DataOutputBuffer();
d.writeInt(call.id);
call.param.write(d);
byte[] data = d.getData();
int dataLength = d.getLength();
out.writeInt(dataLength); //first put the data length
out.write(data, 0, dataLength);//write the data
out.flush();
}
....
代码只发送了Call的id,和要求参数,并没有把所有的Call的内容都扔出去了,1定是为了减少数据量的传输,这里还把数据的长度写入了,这是为了方燕服务端准确的读取到不定长的数据。这服务端中间的处理操作不是今天讨论的重点。Call的履行进程就是这样。那末Call是如何被调用的呢,这又要重新回到了Client客户端上去了,Client有1个run()函数,所有的操作都是始于此的;
public void run() {
if (LOG.isDebugEnabled())
LOG.debug(getName() + ": starting, having connections "
+ connections.size());
//等待工作,等待要求调用
while (waitForWork()) {//wait here for work - read or close connection
//调用完要求,则立即获得回复
receiveResponse();
}
close();
if (LOG.isDebugEnabled())
LOG.debug(getName() + ": stopped, remaining connections "
+ connections.size());
}
操作很简单,程序1直跑着,有要求,处理要求,获得要求,没有要求,就死等。
private synchronized boolean waitForWork() {
if (calls.isEmpty() && !shouldCloseConnection.get() && running.get()) {
long timeout = maxIdleTime-
(System.currentTimeMillis()-lastActivity.get());
if (timeout>0) {
try {
wait(timeout);
} catch (InterruptedException e) {}
}
}
....
获得回复的操作以下:
/* Receive a response.
* Because only one receiver, so no synchronization on in.
* 获得回复值
*/
private void receiveResponse() {
if (shouldCloseConnection.get()) {
return;
}
//更新最近1次的call活动时间
touch();
try {
int id = in.readInt(); // try to read an id
if (LOG.isDebugEnabled())
LOG.debug(getName() + " got value #" + id);
//从获得call中获得相应的call
Call call = calls.get(id);
//判断该结果状态
int state = in.readInt(); // read call status
if (state == Status.SUCCESS.state) {
Writable value = ReflectionUtils.newInstance(valueClass, conf);
value.readFields(in); // read value
call.setValue(value);
calls.remove(id);
} else if (state == Status.ERROR.state) {
call.setException(new RemoteException(WritableUtils.readString(in),
WritableUtils.readString(in)));
calls.remove(id);
} else if (state == Status.FATAL.state) {
// Close the connection
markClosed(new RemoteException(WritableUtils.readString(in),
WritableUtils.readString(in)));
}
.....
} catch (IOException e) {
markClosed(e);
}
}
从之前保护的Call列表中取出,做判断。Client本身的履行流程比较的简单:
Hadoop RPC客户真个通讯模块的部份大致就是我上面的这个流程,中间其实还疏忽了很多的细节,大家学习的时候,针对源码会有助于更好的理解,Hadoop RPC的服务真个实现更加复杂,所以建议采取分模块的学习也许会更好1点。
生活不易,码农辛苦
如果您觉得本网站对您的学习有所帮助,可以手机扫描二维码进行捐赠