Hadoop源码分析之RPC一

Contents

  1. 1. 远程过程调用基础概念
    1. 1.1. RPC原理
  2. 2. JAVA远程方法调用RMI
  3. 3. Java动态代理和Java NIO
    1. 3.1. java动态代理
    2. 3.2. Java NIO
      1. 3.2.1. java基本套接字
      2. 3.2.2. Java NIO基础
  4. 4. Hadoop中的远程过程调用
    1. 4.1. Hadoop IPC连接相关过程
      1. 4.1.1. IPC连接成员变量
      2. 4.1.2. 建立IPC连接
    2. 4.2. Hadoop IPC调用相关过程
      1. 4.2.1. IPC方法调用成员变量
      2. 4.2.2. 客户端方法调用过程
      3. 4.2.3. 服务器端方法调用过程

Hadoop中各个实体间存在大量的交互,远程过程调用让用户可以像调用本地方法一样调用另外一个应用程序提供的服务,而不必设计和开发相关的消息发送、处理和接收等具体代码。是一种重要的分布式计算技术,它提高了程序的互操作性,在Hadoop的实现中得到了广泛地应用。

远程过程调用基础概念

RPC原理

传统的过程调用:主程序将参数压入栈内并调用过程,这时候主程序停止执行并开始执行相应的过程。被调用的过程从栈中获取参数,然后执行过程函数;执行完毕后,将返回参数入栈,并将控制权交还给调用方。调用方获取返回参数并继续执行。
RPC:当RPC客户需要调用一个远程过程的时候,将参数打包为一个消息,并附加被调用过程的名字,然后发送消息到服务器,然后等待服务器的应答。RPC服务器接收到客户端的请求后,它会解包以获取请求参数,类似传统过程调用,被调用函数从栈中接收参数,确定调用过程并执行,最后返回结果。

JAVA远程方法调用RMI

JAVA远程方法调用(Remote Method Invocation,RMI)是java的一个核心API和类库,它允许一个java虚拟机上运行的Java程序调用不同虚拟机上运行的对象中的方法,既是这两个虚拟机运行于物理隔离的不同主机上。
RMI提供了和RPC中类似的、标准的Stub/Skeleton机制。Stub代表可以被客户端引用的远程对象,位于客户端中并保存着远程对象的接口和方法列表。客户端应用调用远程对象时,Stub将调用请求,通过RMI的基础结构转发到远程对象上。接收到调用请求时,服务器端的Skeleton对象处理有关调用“远方”对象中的所有细节,并调用Skeleton对象。调用远程方法和调用本地方法有着近乎相同的“感观”,一个主要的差别在需要通过命名服务获得远程对象的引用,Java引入了RMI远端对象注册点(registry),RMI服务器创建远程对象类的对象,并将这些对象绑定到注册服务上;RMI客户程序则通过注册点,利用名字查找远程对象,并构造出相应地本地存根对象。另外,和本地方法调用不一样的是,远程方法可能抛出RemoteException异常,客户端程序需要处理该异常
Java.io.Serialization:用于对调用参数和返回值进行序列化和反序列化
Java.rmi.Remote:用于标识作为远程对象的对象
抽象类RemoteObject是作用于远程对象的特殊版本的java.lang.Object,提供了对远程对象有意义的toString(),hashCode()等方法。
RemoteServer扩展了RemoteObject并提供了大多数服务器对象需要的工具方法。
UnicastRemoteObject类实现了处理一般远程对象的任务。如序列化和反序列化参数及返回值,使用TCP和客户端通信等功能。
客户端RMIQueryStatusClient的工作依赖于RMI存根,这个存根是通过Java的代理机制java.lang.reflect.Proxy动态生成的。

Observer 设计模式
Command 模式将调用操作的对象与如何实现该操作的对象解耦。在运行时,TestCase 或 TestSuite 被当作 Test 命令对象,可以像一般对象那样进行操作和扩展,也可以在实现 Composite 模式时将多个命令复合成一个命令。另外,增加新的命令十分容易,隔离了现有类的影响,今后,也可以与备忘录模式结合,实现 undo 等高级功能。

Java动态代理和Java NIO

java动态代理

在Java中,代理对象往往实现和目标对象一致的接口,并作为目标对象的代替,接收对象用户(Client)的调用,并将全部或部分调用转发给目标对象,在这个过程中,实现代理接口和调用转发,是代理对象必须完成的两个重要任务。代理对象可以在调用转发之前或者之后执行一些功能,如输出日志、实施访问控制、访问数据库、加载远程资源。
Java.lang.reflect.Proxy中的另一个关键字是reflect反射,反射提供了一个非常丰富而精心设计的工具集,以便编写能够动态操作Java代码的程序。即通过Java.lang.reflect.Proxy可以动态地创建某个接口实现。
通过java.lang.reflect.InvocationHandler的实例完成调用转发。InvocationHandler的实例也叫调用句柄实例,其实一个接口,invoke()是其唯一方法。当用户调用newProxyInstance()返回的对象上的方法时,该对象将方法调用转发给调用句柄实例,即invoke(),而invoke()的返回值将作为代理对象方法的调用结果返回给用户,其中参数Method是Java反射中的一个重要抽象,它提供了关于类或接口上某个方法以及如何访问该方法的信息.

Java NIO

java基本套接字

socket是两台主机间的一个连接,可以进行7项基本操作:连接远程机器、发送数据、接收数据、关闭连接、绑定端口、监听入站数据、在所绑定端口上接受来自远程机器的连接。其中,前四项用于客户端,后六项用于服务器,最后三项只有服务器才需要。

Java NIO基础

当需要实现同时处理上千个客户请求的服务器时,Java基本套接字会产生一些问题:由于OutputStream的write()方法、InputStream的read()方法和ServerSocket的accept()方法都是阻塞方法,往往需要采用一个客户对应一个线程的服务器系统,虽然使用线程池在某种程度上能够节省部分系统开销,但对于生存期很长的协议来说,大量的闲置客户端限制了系统可以同时服务的客户端总数。为了解决这样的问题,JDK1.4中引入了Java新输入输出系统(New Input/Output,NIO)。非阻塞是NIO实现的重要功能之一,为了实现非阻塞,NIO引入了选择器(Selector)和通道(Channel)的概念。通道表示到实体的开发连接,如硬件设备、文件、网络套接字或可以执行一个或多个不同I/O操作(如读取或写入)的程序组件。一些通道,如文件、网络套接字,允许选择器对通道进行轮询。也就是说,通道能注册一个选择器实例,通过该实例的select()方法,用户可以询问“在(一个或一组)通道中,哪一个是当前需要的服务(即被读、写或接受)”。在一个准备好的通道上进行相应地I/O操作,就不需要等待,也就不会阻塞了。

Hadoop中的远程过程调用

Hadoop没有使用前面介绍的JavaRMI,而是实现了一套自己独有的节点间通信机制,因为有效地IPC(Inter-Process Communication,进程间通信)对于Hadoop来说是至关重要的,hadoop需要精确控制进程间通信中比如连接、超时、缓存等通信细节,显然,Java RMI达不到这些需求。
Hadoop中与IPC相关的代码都在org.apache.hadoop.ipc包中,共7个文件,包括远程过程调用实现需要的辅助类:

  1. RemoteException:远程异常,应用于IPC客户端,表示远程过程调用中的错误。
  2. Status.java:Status:是一个枚举类,定义了远程过程调用的返回结果,包括SUCCESS(成功)、ERROR(一般错误)和FATAL(致命错误)三种情况。
  3. VersionedProtocol接口: Hadoop IPC的远程接口都扩展自VersionedProtocol。
  4. ConnectionHeader.java:IPC客户端与服务器建立连接时发送的消息头。


与客户端、服务器实现相关的代码主要在Client.java、Server.java和RPC.java三个文件中。也就是Hadoop IPC的核心代码在这三个文件中

  1. Client.java包含了与IPC客户端相关的代码,Client是对IPC客户的抽象,它包含的内部类可以分为与IPC连接相关的,如Client.Connection和Client.ConnectionId等;和远程调用Call相关的如Client.Call、Client.ParallelCall等。
  2. Server.java实现了一个IPC服务器端的抽象类,和Client.java类似,Server.java也有对应的IPC连接、远程调用Call相关的类,即Server.Connection与Server.Call。Server.java中,对远程调用Call的处理,由Listener、Handler和Responder配合完成,它们都继承自java.lang.Thread,在各自的线程中运行。
  3. RPC.java在Client.java和Server.java基础上实现了Hadoop IPC,RPC.java中实现的功能主要分为两部分:(
    1)与客户端相关的功能包括RPC.ClientCache、RPC.Invoker和RPC.Invocation,其中RPC.Invoker继承自前面介绍的调用句柄java.lang.reflect.InvocationHandler。
    (2)与服务器相关的内部类只有RPC.Server,它是IPC服务器抽象类(定义在Server.java中)的一个具体子类

Hadoop IPC连接相关过程

Hadoop IPC的实现主要有两类流程,分别是IPC连接相关过程IPC方法调用相关过程。IPC连接是客户端和服务器关系的一个抽象,它的实现包括连接建立、连接上的数据读写、连接维护和连接关闭四个流程。

IPC连接成员变量

  1. Client.Connection的成员变量
    Client. Connection的成员变量分为三部分:与TCP相关的成员变量、与IPC连接相关的成员变量和与远程调用的成员变量,代码如下:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    private InetSocketAddress server;//IPC服务器地址
    private ConnectionHeader header;//连接消息头
    private ConnectionId remoteId;//IPC连接标识
    private Socket socket=null//TCP连接的Socket对象
    private DataInputStream in;
    private DataOutputStream out;
    //当前正在处理的远程调用
    private Hashtable<Integer, Call>calls=new Hashtable<Integer, Call>();
    //IPC连接的最后一次通信时间
    private AtomicLong lastActivity=new AtomicLong();
    //连接关闭标记
    private AtomicBoolean shouldCloseConnection=new AtomicBoolean();
    private IOException closeException;//导致IPC连接关闭的异常

与TCP相关的成员变量包括:IPC服务器的地址server(类InetSocketAddress包含了建立TCP连接需要的IP地址/主机名和端口号信息)、TCP连接对应的Socket对象socket、以及socket对象上的输入流对象in和输出流对象out,这部分成员变量的使用相对比简单。
与IPC连接相关的成员变量包括:相对不变的连接消息头header、连接标识remoteId,它们在连接对象初始化的时候就已经初始化,后续Client.Connection工作时,也不会对它们进行修改。连接标识和连接消息头的作用前面都已经介绍过了,连接标识用于IPC连接复用,连接消息头是Socket连接建立后交换的第一条消息。在维护IPC连接和关闭IPC连接过程中,需要使用到成员变量lastActivity、shouldCloseConnection和closeException。lastActivity记录最后一次I/O发生的时间,如果IPC连接上长时间没有数据交换,即没有I/O发生,那么IPC连接会进行相应的维护工作。当需要关闭IPC连接时,通过置位AtomicBoolean型的标记shouldCloseConnection,可以停止Client.Connection线程的工作(注意,Client.Connection继承自java.lang.Thread)。IPC连接可以是正常关闭,也可以是因为某些异常导致异常关闭,当IPC连接异常关闭时,导致关闭的异常原因,会记录在closeException中。
与远程调用相关的变量只包含一个成员变量calls,它保存目前IPC连接上的所有远程调用。成员变量calls是一个哈希表,其中键的类型是整形,是每个Client.Call对象都包含的,在一个IPC客户端里唯一的整数标识,哈希表的值就是Client.Call对象本身,通过这个标识可以很快在服务器端发送过来的应答里找出对应的Client.Call对象。
2. Server.Connection的成员变量
Server. Connection的成员变量比Client.Connection多:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
private boolean rpcHeaderRead=false//状态,是否已经读入RPC版本号
private boolean headerRead=false//状态,是否已经读入连接消息头
private SocketChannel channel;
private ByteBuffer data;
private ByteBuffer dataLengthBuffer;
private LinkedList<Call>responseQueue;
private volatile int rpcCount=0//当前正在处理的RPC请求量
private long lastContact;
private int dataLength;
private Socket socket;
//客户端的主机名和端口
private String hostAddress;
private int remotePort;
ConnectionHeader header=new ConnectionHeader();
Class<?>protocol;
Subject user=null
//一个假的远程调用,作为鉴权失败时的应答
//(Fake‘call’for failed authorization response)
private fnal int AUTHROIZATION_FAILED_CALLID=-1
private fnal Call authFailedCall=
new Call(AUTHROIZATION_FAILED_CALLID, null, null);
private ByteArrayOutputStream authFailedResponse=
new ByteArrayOutputStream();

和Client.Connection类似,Server.Connection的变量也可以分为与TCP相关、与连接相关和与远程调用相关等三类。
与TCP相关的主要是Socket通道channel和配合通道工作的一些缓冲区,如data、dataLengthBuffer等,以及配合缓冲区工作的dataLength等。
与连接相关的成员变量比较多,包括:
连接状态管理的versionRead、headerRead,它们的初始值都为false。
连接建立的时候,需要对客户端做一些检查,然后才允许客户端发送IPC请求,这样的检查包括版本检查,即检查客户端使用的IPC版本(注意,这里的版本是IPC实现的版本,不是具体IPC接口的版本)和服务器端的是一样的,以及连接头检查,主要检查服务器是否实现了客户端需要的IPC接口,还有就是客户端用户有权限使用这些接口。
通过了这些检查后,也就是versionRead、headerRead的值都为true时,连接才会进入处理IPC调用的状态。连接建立时检查的同时会保存一些工作时需要的信息,如连接头header(Client.Connection也保持了这个信息)、该IPC连接上的远程接口protocol、客户端用户user等,这些也都作为Server.Connection的成员变量。authFailedCall和authFailedResponse用于用户鉴权/授权失败后对客户端的应答,具体使用方法,请参考后面IPC连接建立过程的分析。和连接相关的最后一个变量是lastContact,它保持了最后一次接收到客户端数据的时间,如果客户端长时间没有发送数据,那么,服务器将会关闭连接,以释放相关资源。

建立IPC连接

1. Client客户端
Client类的成员connections保存ConnectionId到Connection的映射,Client需要使用连接时,都会调用getConnection()方法。该方法先检查connections中是否有满足条件的IPC连接,如果有,复用连接,否则才会创建新的IPC连接。
GetConnection()的参数包括构造ConnectionId需要的远程服务器的地址、用户信息和远程接口信息,最后一个参数是call,其类型是Client.Call。这表明,连接是在需要的时候,也就是有IPC调用发生的时候才建立的。相关代码如下:

getConnection()方法主要部分在do…while循环中,循环内实现了连接复用。addCall()方法的作用是将一个IPC调用放入IPC连接中,如果连接的成员变量shouldCloseConnection为true,addCall()会返回false,也就防止将一个IPC调用放入一个正处于关闭过程中的连接。IPC连接可以在多个地方触发,进入关闭过程,但直到Connection.close()方法中,ConnectionId和连接的映射关系才会被删除。连接被删除后,getConnection()才有机会构造出一个新的连接,并将调用addCall()时传入的IPC调用放入新的IPC连接中。所以,getConnection()的do…while循环结束条件,保证调用者能得到一个打开的、正常工作的IPC连接,不会获得一个即将关闭、处于清除阶段的连接。

GetConnection()获取的Client.Connection对象,需要通过setupIOstreams()方法和服务器建立连接。该方法首先使用Java基本套接字和服务器建立Socket连接。建立Socket连接如果失败,setupIOstreams()会进行一定次数的重试,重试次数由Client的成员变量maxRetries指定,可通过配置项${ipc.client.connect.max.retries}设置。基础的Socket连接建立以后,setupIOstreams()接着调用writeHeader()和IPC服务器进行握手。writeHeader()方法对客户端进行协议版本检查、接口检查和权限检查。writeHeader()中发送ConnectionHeader部分,其实相当于发起了一次RPC调用。调用结束后,setupIOstream()调用touch()记录最后一次I/O发生的时间,并启动线程,该线程用于在Socket上读取并处理响应数据。

2. Server服务器端
在远程过程调用的另一边,服务器建立IPC连接的代码分散在Listener和Server.Connection中。Listener基于Java NIO开发,是一个标准的NIO应用。Listener在它的构造函数中打开服务器的端口,创建Selector并开始监听。参数backlogLength是调用ServerSocket.bind()时可以额外提供的一个参数,用于指定在监听端口上排队的请求的最大长度,队列满了以后到达的客户端连接请求,会被拒绝。
Listener.run()实现了NIO中的选择器循环,即调用选择器的select()方法并处理事件,这里只通过doRead()方法处理OP_READ,以及通过doAccept()方法处理OP_ACCEPT事件,不关注通道的OP_WRITE事件。
doAccept()方法接受客户端的连接请求、注册Socket到选择器并创建Server.Connection对象。
doRead()方法通过Server.Connection.readAndProcess()读取并处理数据,如果处理中出现问题,doRead()会通过Server.closeConnection()关闭连接。doRead()方法还会更新连接的成员变量lastContact(最后一次接收到客户端数据的时间),在服务器端维护IPC连接。相关代码如下:
Server.Connection中最主要的方法是readAndProcess(),doRead()调用这个方法读取客户端发送过来的数据并处理。按照Client.Connection.writeHeader()发送的握手信息,如果顺利读取到IPC连接数和协议版本号并完成版本检查,则设置rpcHeaderRead为true然后进入连接头检查,连接头检查通过后,headerRead置为true,服务器开始处理客户端的IPC请求。

在版本检查阶段,readAndProcess()先读IPC连接数和协议版本号,通过versionBuffer缓冲区读入。数据读入以后,立即比较客户端发送过来的IPC连接数和协议版本号和服务器是否一致,如果一致,设置为versionRead为true;否则向客户端返回-1表示失败。相关代码如下:


对连接头ConnectionHeader进行处理的逻辑在readAndProcess()的底部,包含两个步骤:

  • 第一个步骤是调用读入ConnectionHeader,保证服务器实现了IPC接口和获取用户信息。
  • 第二个步骤是调用authorize(),保证用户有相关的权限访问远程接口。

Hadoop IPC调用相关过程

IPC方法调用,在客户端只是一个比较复杂的Java动态代理应用;在服务器端,由Listener、Handler和Responder配合,完成请求读取、请求处理和请求应答三个步骤。

IPC方法调用成员变量

IPC发生时将会涉及3个用于抽象调用的类:RPC.Invocation、Client.Call和Server.Call,其类图如下

客户端方法调用过程

这个过程的入口在RPC.Invoker.invoke(),RPC.Invoker类实现了动态代理调用转发接口java.lang.reflect.InvocationHandler,invoke()方法是这个接口的唯一方法。RPC.Invoker最重要的成员变量是client,即RPC客户端,代码如下:

在Invoker.invoke中,首先根据method和args参数构造Invocation对象,接着调用Client.call()方法发送IPC请求并获取结果,其结果通过value.get()返回给调用者。代码如下:

以上代码首先根据输入参数param构造一个Client.Call对象,其中param就是包含方法调用的方法名和参数的Invocation对象。接着,利用getConnection方法获取IPC连接,然后把Client.Call对象中和调用相关的信息发送出去,参数发送给服务器以后,客户端开始等待服务器发送回来的应答,通过典型的wait()循环实现。当接收到应答或者IPC客户端发现错误中断等待过程后,循环结束,接下来就是根据不同的情况,处理异常或方法调用返回。

服务器端方法调用过程

Hadoop IPC客户端的请求发送到服务器后,首先由Listener接收。
1. Listener的工作过程
Listener主要运行NIO选择器循环,并在Listener.doRead()方法中读取数据,在Connection.readAndProcess()中恢复数据帧,然后调用processData()。processData()处理一帧数据,它从数据帧中获取调用的标识符,并反序列化调用参数,构造服务器端Call对象,然后将该对象放入callQueue队列中,代码如下:

阻塞队列callQueue定义于Server类中,是Listener和Handler的边界。Listener和Handler是典型的生产者-消费者,processData()将待处理的数据放入队列中,由Handler(消费者)调用对应的服务器方法。
2. Handler的工作过程
Handler的主要工作都在run()方法中完成。在方法的主循环中,每次循环处理一个请求,请求通过调用Server的抽象方法call()完成服务器端方法调用。抽象方法call()一共需要三个参数,前面两个参数提供了方法调用需要的上下文信息,其中接口名称由参数protocol提供,在建立IPC连接的时候,通信的双方已经交换了这个信息。
上述代码中,对抽象方法Server.call()的调用是通过Subject.doAs()完成的。Subject.doAs()是Java鉴权与授权服务中的方法。如果用户call.connection.user没有执行某个远程过程调用的权限,第二个catch语句会捕获SecurityException异常。至于Subject对象是否有调用特定方法的权限,JAAS提供了灵活的机制,系统管理员可以根据需要,动态地调整策略。

抽象方法Server.call()调用结束后,可能返回一个Writable对象,或者抛出异常,抛出的异常会被捕获并被设置到相关变量中。setupResponse()方法用于将返回结果序列化到Call的成员变量response中,完成发送应答前的处理。在实际运行的IPC服务器中,抽象方法Server.call()的实现在RPC.Server中,代码如下:

在RPC.Server.call()中,传入的param会被强制转换成RPC.Invocation对象,并通过该对象包含的方法名称getMethodName()和形式参数列表getParameterClasses()方法获取Method对象(java动态代理)。在RPC.Server的call()方法中,最终是通过Method类的invoke()方法,调用IPC服务器实现对象instance上对应的方法,完成Hadoop远程过程调用。
对于调用的结果,分两种情况:如果调用正常结束,则将结果放入一个ObjectWritable对象中,并返回;否则抛出异常。
3. Response的工作过程
Handler通过调用Responder的doRespond()方法,将处理完的结果交给Responder。之所以不立即将结果发送给客户端,因为一般情况下,远程过程调用使用Handler的线程执行被调用的过程,handler所在的线程是共享资源,对共享资源占用的时间越短越好;另一方面,网络通信的时间是不确定的。
doRespond()方法把应答对应的远程调用对象放入IPC连接的应答队列里。如果IPC连接的应答队列只有一个元素,则会立即调用Handler的processResponse()方法,向客户端发送结果。这是一个提高服务器性能的优化,当应答队列只有一个元素的时候,表明对应的IPC连接比较空闲,这时候直接调用processResponse()发送应答,可以避免从Handler的处理线程到Responder处理线程的切换开销。

Responder的选择器循环实现在Responder.run()中,代码如下:
其中waitPending()方法保证对成员变量writeSelector的select()操作和register()操作不会相互影响,当可以安全进行select()操作时,run()主循环调用选择器的select()方法,并在可写的通道上调用doAsyncWrite(),进行写操作。doAsyncWrite()对输入的数据进行一些必要的检查以后,调用processResponse()写数据。