用Java手写一个简易的RPC框架
涉及的知识点
RPC和HTTP都是应用层的协议,都可以完成服务的远程调用,我个人认为它们最大的区别主要是传输的方式不同,一个是纯文本,另一个是二进制流,从而导致它们应用的场景各不相同。HTTP 协议适用于 Web 应用程序的交互,RPC 协议更多用于分布式系统中服务间的通信。
一个完整的 RPC 框架需要涉及的知识点很多,包括网络通信、序列化和反序列化、动态代理、服务的注册和发现等等。在这里,我简单介绍一下其中的一些关键点。
网络通信
RPC 框架的核心是远程方法调用,因此必须实现网络通信来传递数据。可以采用 Java 的 Socket 编程来实现 TCP 方式的网络通信,并通过线程池或者 NIO 实现并发处理,也可以采用Tomcat、Netty等服务器的形式。这一块我们可以使用一个配置文件,实现程序的可扩展性。
序列化和反序列化
在数据传输过程中,需要将对象进行序列化和反序列化。可以采用 Java 原生的序列化机制,也可以使用第三方的序列化框架,如 Json等,同样可以使用配置文件进行配置。
动态代理
在客户端调用远程服务时,需要生成代理对象来实现远程方法的调用。可以使用 Java 自带的动态代理机制,实现客户端调用时的代理对象。
服务的注册和发现
服务的提供者需要将服务注册到本地或者远程,同时可以被服务调用者获取到。
简单实现
下面我来实现一个简易版本的RPC框架,以便理解它的底层原理。后续的博客会对其中的代码进行进一步的改进和优化。
这是代码的模块
- api:用于定义Service接口,给实现模块提供规范
- common:定义公共的类,防止重复编译
- consumer:服务调用者模块
- provider:服务提供者模块
- framework:RPC协议的核心模块
common模块
服务调用者需要把调用的信息发送给服务提供者,所以需要将调用信息封装成一个类,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
| public class Invocation implements Serializable { private String className; private String methodName; private Class<?>[] paramTypes; private Object[] params;
public String getClassName() { return className; }
public void setClassName(String className) { this.className = className; }
public String getMethodName() { return methodName; }
public void setMethodName(String methodName) { this.methodName = methodName; }
public Class<?>[] getParamTypes() { return paramTypes; }
public void setParamTypes(Class<?>[] paramTypes) { this.paramTypes = paramTypes; }
public Object[] getParams() { return params; }
public void setParams(Object[] params) { this.params = params; } }
|
同理,服务提供者需要把调用的结果发送给服务调用者,所以也需要将返回信息封装成一个类,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| public class RpcResponse implements Serializable { private Throwable error; private Object result;
public Throwable getError() { return error; }
public void setError(Throwable error) { this.error = error; }
public Object getResult() { return result; }
public void setResult(Object result) { this.result = result; } }
|
因为又可能存在调用失败的情况,所以还需要加一个异常字段。
url字段也会作为一个通用的类来使用,定义如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| public class URL implements Serializable { private String hostname; private Integer port;
public URL(String hostname, Integer port) { this.hostname = hostname; this.port = port; }
public String getHostname() { return hostname; }
public void setHostname(String hostname) { this.hostname = hostname; }
public Integer getPort() { return port; }
public void setPort(Integer port) { this.port = port; } }
|
framework模块
服务的调用者需要提供调用信息、IP和端口。流程如下:
- 定义Socket,绑定IP和端口。
- 发请求(对象流)
- 接收请求(对象流)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| public class RpcConsumer { public Object execute(Invocation invocation, String host, int port) throws Throwable { Socket server = new Socket(host, port); ObjectInputStream ois = null; ObjectOutputStream oos = null; try{ oos = new ObjectOutputStream(server.getOutputStream()); oos.writeObject(invocation); oos.flush();
ois = new ObjectInputStream(server.getInputStream()); Object res = ois.readObject(); RpcResponse response = (RpcResponse) res;
return response.getResult();
} catch (ClassNotFoundException e) { e.printStackTrace(); throw e; } finally { if (ois!=null)ois.close(); if (oos != null) oos.close(); if (server != null) server.close(); } } }
|
服务提供者循环等待Socket连接,并将相关的任务放入到线程池中,任务里面存储了Socket信息。services是本地注册的服务。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| public class RpcProvider { public void start(int port, Map<String, Object> services){ ServerSocket server = null; try { server = new ServerSocket(port);
Executor executor = new ThreadPoolExecutor(5, 10, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>() { }); while(true){ Socket client = server.accept(); RpcProviderHandler service = new RpcProviderHandler(client,services); executor.execute(service); } } catch (IOException e) { e.printStackTrace(); } finally { if (server!=null){ try { server.close(); } catch (IOException e) { e.printStackTrace(); } } } }
}
|
线程池任务的定义
先通过反序列化获取调用信息,然后再从services中获取对应的class,然后通过反射调用服务。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
| public class RpcProviderHandler implements Runnable { private Socket clientSocket; private Map<String,Object> serviceMap; public RpcProviderHandler(Socket client, Map<String, Object> services) { this.clientSocket = client; this.serviceMap = services; }
@Override public void run() { ObjectInputStream ois = null; ObjectOutputStream oos = null; RpcResponse response = new RpcResponse(); try{ ois = new ObjectInputStream(clientSocket.getInputStream()); oos = new ObjectOutputStream(clientSocket.getOutputStream());
Object object = ois.readObject(); Invocation invocation = invocation = (Invocation) object;
Class<?> clazz = (Class<?>) serviceMap.get(invocation.getClassName()); Method method = clazz.getMethod(invocation.getMethodName(),invocation.getParamTypes()); Object result = method.invoke(clazz.getConstructor().newInstance(),invocation.getParams());
response.setResult(result); oos.writeObject(response); oos.flush();
} catch (Exception e) { if (oos != null) { response.setError(e); try { oos.writeObject(response); oos.flush(); } catch (IOException e1) { e1.printStackTrace(); } } } finally { try { if (ois!=null) ois.close(); if (oos != null) oos.close(); if (clientSocket != null) clientSocket.close(); } catch (IOException e) { e.printStackTrace(); } } } }
|
客户端代理类 RpcProviderProxy,主要实现客户端方法的调用和增强
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| public class RpcProviderProxy { private URL url; public RpcProviderProxy(URL url) { this.url=url; }
@SuppressWarnings("unchecked") public <T> T getProxy(Class<T> clazz){
return (T) Proxy.newProxyInstance(clazz.getClassLoader(), new Class[]{clazz}, new InvocationHandler() { @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { System.out.println("可以做一些执行方法的前置处理"); Invocation invocation = new Invocation(); invocation.setClassName(method.getDeclaringClass().getName()); invocation.setMethodName(method.getName()); invocation.setParamTypes(method.getParameterTypes()); invocation.setParams(args); Object result = new RpcConsumer().execute(invocation, url.getHostname(), url.getPort()); System.out.println("反射中的方法执行完毕,返回结果为:"+result); System.out.println("可以做一些执行方法的后置处理"); return result; } }); } }
|
至此,一个简易的RPC框架便实现完成。