RPC框架总结笔记

RPC框架(简易版)总结笔记

img

核心架构:消费方调用、序列化器、网络服务器、请求处理器、服务注册器。

消费方调用

(基于 JDK 动态代理+工厂模式实现,为指定服务接口类生成可发送HTTP请求的代理对象)

实际调用过程UserService userService = ServiceProxyFactory.getProxy(UserService.class);

实现原理ServiceProxyFactory通过Proxy.newProxyInstance()方法创建代理对象,动态代理对象 ServiceProxy 实现InvocationHandler接口的invoke方法后,在调用代理对象实现方法时,会先进入invoke方法再进入目标方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import java.lang.reflect.Proxy;

/**
* 服务代理工厂(用于创建代理对象)
*/
public class ServiceProxyFactory {

/**
* 根据服务类获取代理对象
*
* @param serviceClass
* @param <T>
* @return
*/
public static <T> T getProxy(Class<T> serviceClass) {
return (T) Proxy.newProxyInstance(
serviceClass.getClassLoader(),
new Class[]{serviceClass},
new ServiceProxy());
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
/**
* 服务代理(JDK 动态代理)
*/
public class ServiceProxy implements InvocationHandler {

/**
* 调用代理
*
* @return
* @throws Throwable
*/
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
// 指定序列化器
Serializer serializer = new JdkSerializer();

// 构造请求
RpcRequest rpcRequest = RpcRequest.builder()
.serviceName(method.getDeclaringClass().getName())
.methodName(method.getName())
.parameterTypes(method.getParameterTypes())
.args(args)
.build();
try {
// 序列化
byte[] bodyBytes = serializer.serialize(rpcRequest);
// 发送请求
// todo 注意,这里地址被硬编码了(需要使用注册中心和服务发现机制解决)
try (HttpResponse httpResponse = HttpRequest.post("http://localhost:8080")
.body(bodyBytes)
.execute()) {
byte[] result = httpResponse.bodyBytes();
// 反序列化
RpcResponse rpcResponse = serializer.deserialize(result, RpcResponse.class);
return rpcResponse.getData();
}
} catch (IOException e) {
e.printStackTrace();
}

return null;
}
}

服务注册器

(简单来讲,新建一个finalConcurrentHashMap存储服务名称 和 对应的服务实现类)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
* 本地注册中心
*/
public class LocalRegistry {

/**
* 注册信息存储
*/
private static final Map<String, Class<?>> map = new ConcurrentHashMap<>();

/**
* 注册服务
* @param serviceName
* @param implClass
*/
public static void register(String serviceName, Class<?> implClass) {
map.put(serviceName, implClass);
}

/**
* 获取服务
* @param serviceName
* @return
*/
public static Class<?> get(String serviceName) {
return map.get(serviceName);
}

/**
* 删除服务
* @param serviceName
*/
public static void remove(String serviceName) {
map.remove(serviceName);
}

}

请求处理器

(通过server.requestHandler()传入对应的处理类,处理类HttpServerHandler实现VertxHandler接口)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
/**
* HTTP请求处理
*/
public class HttpServerHandler implements Handler<HttpServerRequest> {

@Override
public void handle(HttpServerRequest request) {
// 指定序列化器
final Serializer serializer = new JdkSerializer();

// 记录日志
System.out.println("receive request: " + request.method() + " " + request.uri());

// 异步处理HTTP请求
request.bodyHandler(body -> {
// 获取请求体的字节流
byte[] bytes = body.getBytes();
RpcRequest rpcRequest = null;
// 尝试反序列化请求为对象
try {
rpcRequest = serializer.deserialize(bytes, RpcRequest.class);
} catch (Exception e) {
e.printStackTrace();
}

// 构造响应结果对象
RpcResponse rpcResponse = new RpcResponse();
// 如果请求为null,直接返回
if (rpcRequest == null) {
rpcResponse.setMessage("rpcRequest is null");
doResponse(request, rpcResponse, serializer);
return;
}

try {
// 从 请求对象 中获取 服务名称 并从 本地注册器 中获取到对应的 服务实现类
Class<?> implClass = LocalRegistry.get(rpcRequest.getServiceName());
// 通过反射机制调用方法getMethod() 并 传入所需method的 方法名称 和 参数类型 动态获取方法
Method method = implClass.getMethod(rpcRequest.getMethodName(), rpcRequest.getParameterTypes());
// invoke() 在实例对象上调用该方法,并传入参数,得到返回结果
Object result = method.invoke(implClass.newInstance(), rpcRequest.getArgs());
// 封装返回结果
rpcResponse.setData(result);
rpcResponse.setDataType(method.getReturnType());
rpcResponse.setMessage("ok");
} catch (Exception e) {
e.printStackTrace();
rpcResponse.setMessage(e.getMessage());
rpcResponse.setException(e);
}
// 响应
doResponse(request, rpcResponse, serializer);
});
}

/**
* 对HTTP请求作Response
* @param request
* @param rpcResponse
* @param serializer
*/
void doResponse(HttpServerRequest request, RpcResponse rpcResponse, Serializer serializer) {
// 初始化HttpServerResponse,并设置请求头指定内容类型为JSON
HttpServerResponse httpServerResponse = request.response()
.putHeader("Content-Type", "application/json");
try {
// 通过序列化器将RpcResponse类序列化
byte[] serialized = serializer.serialize(rpcResponse);
// Vertx提供的方法,将字节数组包装成Vertx的Buffer对象
// 最后通过end函数结束HTTP响应,往函数传入Buffer后,客户端就能收到序列化后的响应内容
httpServerResponse.end(Buffer.buffer(serialized));
} catch (IOException e) {
e.printStackTrace();
httpServerResponse.end(Buffer.buffer());
}

}
}

RPC框架总结笔记
https://maplelea1f.github.io/2025/06/22/简易RPC框架/
作者
Maple
发布于
2025年6月22日
许可协议