用Java手写一个简易的RPC框架

涉及的知识点

image-20230620140353607

RPC和HTTP都是应用层的协议,都可以完成服务的远程调用,我个人认为它们最大的区别主要是传输的方式不同,一个是纯文本,另一个是二进制流,从而导致它们应用的场景各不相同。HTTP 协议适用于 Web 应用程序的交互,RPC 协议更多用于分布式系统中服务间的通信。

一个完整的 RPC 框架需要涉及的知识点很多,包括网络通信、序列化和反序列化、动态代理、服务的注册和发现等等。在这里,我简单介绍一下其中的一些关键点。

网络通信

RPC 框架的核心是远程方法调用,因此必须实现网络通信来传递数据。可以采用 Java 的 Socket 编程来实现 TCP 方式的网络通信,并通过线程池或者 NIO 实现并发处理,也可以采用Tomcat、Netty等服务器的形式。这一块我们可以使用一个配置文件,实现程序的可扩展性。

序列化和反序列化

在数据传输过程中,需要将对象进行序列化和反序列化。可以采用 Java 原生的序列化机制,也可以使用第三方的序列化框架,如 Json等,同样可以使用配置文件进行配置。

动态代理

在客户端调用远程服务时,需要生成代理对象来实现远程方法的调用。可以使用 Java 自带的动态代理机制,实现客户端调用时的代理对象。

服务的注册和发现

服务的提供者需要将服务注册到本地或者远程,同时可以被服务调用者获取到。

简单实现

下面我来实现一个简易版本的RPC框架,以便理解它的底层原理。后续的博客会对其中的代码进行进一步的改进和优化。

这是代码的模块

image-20230620163652309

  • api:用于定义Service接口,给实现模块提供规范
  • common:定义公共的类,防止重复编译
  • consumer:服务调用者模块
  • provider:服务提供者模块
  • framework:RPC协议的核心模块

common模块

服务调用者需要把调用的信息发送给服务提供者,所以需要将调用信息封装成一个类,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public class Invocation implements Serializable {
//因为要进行网络传输,所以需要进行序列化。
//接口全限定名
private String className;
//方法名
private String methodName;
//方法参数类型
private Class<?>[] paramTypes;
//方法参数
private Object[] params;

/*getter and setter*/

public String getClassName() {
return className;
}

public void setClassName(String className) {
this.className = className;
}

public String getMethodName() {
return methodName;
}

public void setMethodName(String methodName) {
this.methodName = methodName;
}

public Class<?>[] getParamTypes() {
return paramTypes;
}

public void setParamTypes(Class<?>[] paramTypes) {
this.paramTypes = paramTypes;
}

public Object[] getParams() {
return params;
}

public void setParams(Object[] params) {
this.params = params;
}
}

同理,服务提供者需要把调用的结果发送给服务调用者,所以也需要将返回信息封装成一个类,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class RpcResponse implements Serializable {
//异常
private Throwable error;
//调用结果
private Object result;

/*getter and setter*/

public Throwable getError() {
return error;
}

public void setError(Throwable error) {
this.error = error;
}

public Object getResult() {
return result;
}

public void setResult(Object result) {
this.result = result;
}
}

因为又可能存在调用失败的情况,所以还需要加一个异常字段。

url字段也会作为一个通用的类来使用,定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class URL implements Serializable {
private String hostname;
private Integer port;

public URL(String hostname, Integer port) {
this.hostname = hostname;
this.port = port;
}

public String getHostname() {
return hostname;
}

public void setHostname(String hostname) {
this.hostname = hostname;
}

public Integer getPort() {
return port;
}

public void setPort(Integer port) {
this.port = port;
}
}

framework模块

服务的调用者需要提供调用信息、IP和端口。流程如下:

  1. 定义Socket,绑定IP和端口。
  2. 发请求(对象流)
  3. 接收请求(对象流)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class RpcConsumer {
public Object execute(Invocation invocation, String host, int port) throws Throwable {
Socket server = new Socket(host, port);
ObjectInputStream ois = null;
ObjectOutputStream oos = null;
try{
//将请求写到连接服务端socket的输出流
oos = new ObjectOutputStream(server.getOutputStream());
oos.writeObject(invocation);
oos.flush();

//读取输入流的内容
ois = new ObjectInputStream(server.getInputStream());
Object res = ois.readObject();
RpcResponse response = (RpcResponse) res;

return response.getResult();

} catch (ClassNotFoundException e) {
e.printStackTrace();
throw e;
} finally {
if (ois!=null)ois.close();
if (oos != null) oos.close();
if (server != null) server.close();
}
}
}

服务提供者循环等待Socket连接,并将相关的任务放入到线程池中,任务里面存储了Socket信息。services是本地注册的服务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class RpcProvider {
public void start(int port, Map<String, Object> services){
ServerSocket server = null;
try {
//1、创建socket连接
server = new ServerSocket(port);
//2、获取所有服务类
// Map<String,Object> services = getService(clazz);


//3、创建线程池

Executor executor = new ThreadPoolExecutor(5, 10, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>() {
});
while(true){
//4、获取客户端连接
Socket client = server.accept();
//5、将服务端被调用的服务放到线程池中异步执行
RpcProviderHandler service = new RpcProviderHandler(client,services);
executor.execute(service);
}
} catch (IOException e) {
e.printStackTrace();
} finally {
if (server!=null){
try {
server.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}

}

线程池任务的定义

先通过反序列化获取调用信息,然后再从services中获取对应的class,然后通过反射调用服务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
public class RpcProviderHandler implements Runnable {
private Socket clientSocket;
private Map<String,Object> serviceMap;
public RpcProviderHandler(Socket client, Map<String, Object> services) {
this.clientSocket = client;
this.serviceMap = services;
}

@Override
public void run() {
ObjectInputStream ois = null;
ObjectOutputStream oos = null;
RpcResponse response = new RpcResponse();
try{
ois = new ObjectInputStream(clientSocket.getInputStream());
oos = new ObjectOutputStream(clientSocket.getOutputStream());

//反序列化
Object object = ois.readObject();
Invocation invocation = invocation = (Invocation) object;

//查找并执行服务
Class<?> clazz = (Class<?>) serviceMap.get(invocation.getClassName());
Method method = clazz.getMethod(invocation.getMethodName(),invocation.getParamTypes());
Object result = method.invoke(clazz.getConstructor().newInstance(),invocation.getParams());

response.setResult(result);
oos.writeObject(response);
oos.flush();

} catch (Exception e) {
if (oos != null) {
response.setError(e);
try {
oos.writeObject(response);
oos.flush();
} catch (IOException e1) {
e1.printStackTrace();
}
}
} finally {
try {
if (ois!=null) ois.close();
if (oos != null) oos.close();
if (clientSocket != null) clientSocket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}

客户端代理类 RpcProviderProxy,主要实现客户端方法的调用和增强

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class RpcProviderProxy {
private URL url;
public RpcProviderProxy(URL url) {
this.url=url;
}

@SuppressWarnings("unchecked")
public <T> T getProxy(Class<T> clazz){

return (T) Proxy.newProxyInstance(clazz.getClassLoader(), new Class[]{clazz}, new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
System.out.println("可以做一些执行方法的前置处理");
Invocation invocation = new Invocation();
invocation.setClassName(method.getDeclaringClass().getName());
invocation.setMethodName(method.getName());
invocation.setParamTypes(method.getParameterTypes());
invocation.setParams(args);
Object result = new RpcConsumer().execute(invocation, url.getHostname(), url.getPort());
//功能增强,比如记录流水信息
System.out.println("反射中的方法执行完毕,返回结果为:"+result);
System.out.println("可以做一些执行方法的后置处理");
return result;
}
});
}
}

至此,一个简易的RPC框架便实现完成。