RPC框架(简易版)总结笔记

核心架构:消费方调用、序列化器、网络服务器、请求处理器、服务注册器。
消费方调用
(基于 JDK 动态代理+工厂模式实现,为指定服务接口类生成可发送HTTP请求的代理对象)
实际调用过程:UserService userService = ServiceProxyFactory.getProxy(UserService.class);
实现原理:ServiceProxyFactory通过Proxy.newProxyInstance()方法创建代理对象,动态代理对象 ServiceProxy 实现InvocationHandler接口的invoke方法后,在调用代理对象实现方法时,会先进入invoke方法再进入目标方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| import java.lang.reflect.Proxy;
public class ServiceProxyFactory {
public static <T> T getProxy(Class<T> serviceClass) { return (T) Proxy.newProxyInstance( serviceClass.getClassLoader(), new Class[]{serviceClass}, new ServiceProxy()); } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
|
public class ServiceProxy implements InvocationHandler {
@Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { Serializer serializer = new JdkSerializer();
RpcRequest rpcRequest = RpcRequest.builder() .serviceName(method.getDeclaringClass().getName()) .methodName(method.getName()) .parameterTypes(method.getParameterTypes()) .args(args) .build(); try { byte[] bodyBytes = serializer.serialize(rpcRequest); try (HttpResponse httpResponse = HttpRequest.post("http://localhost:8080") .body(bodyBytes) .execute()) { byte[] result = httpResponse.bodyBytes(); RpcResponse rpcResponse = serializer.deserialize(result, RpcResponse.class); return rpcResponse.getData(); } } catch (IOException e) { e.printStackTrace(); }
return null; } }
|
服务注册器
(简单来讲,新建一个final的ConcurrentHashMap存储服务名称 和 对应的服务实现类)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| import java.util.Map; import java.util.concurrent.ConcurrentHashMap;
public class LocalRegistry {
private static final Map<String, Class<?>> map = new ConcurrentHashMap<>();
public static void register(String serviceName, Class<?> implClass) { map.put(serviceName, implClass); }
public static Class<?> get(String serviceName) { return map.get(serviceName); }
public static void remove(String serviceName) { map.remove(serviceName); }
}
|
请求处理器
(通过server.requestHandler()传入对应的处理类,处理类HttpServerHandler实现Vertx的Handler接口)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78
|
public class HttpServerHandler implements Handler<HttpServerRequest> {
@Override public void handle(HttpServerRequest request) { final Serializer serializer = new JdkSerializer();
System.out.println("receive request: " + request.method() + " " + request.uri());
request.bodyHandler(body -> { byte[] bytes = body.getBytes(); RpcRequest rpcRequest = null; try { rpcRequest = serializer.deserialize(bytes, RpcRequest.class); } catch (Exception e) { e.printStackTrace(); }
RpcResponse rpcResponse = new RpcResponse(); if (rpcRequest == null) { rpcResponse.setMessage("rpcRequest is null"); doResponse(request, rpcResponse, serializer); return; }
try { Class<?> implClass = LocalRegistry.get(rpcRequest.getServiceName()); Method method = implClass.getMethod(rpcRequest.getMethodName(), rpcRequest.getParameterTypes()); Object result = method.invoke(implClass.newInstance(), rpcRequest.getArgs()); rpcResponse.setData(result); rpcResponse.setDataType(method.getReturnType()); rpcResponse.setMessage("ok"); } catch (Exception e) { e.printStackTrace(); rpcResponse.setMessage(e.getMessage()); rpcResponse.setException(e); } doResponse(request, rpcResponse, serializer); }); }
void doResponse(HttpServerRequest request, RpcResponse rpcResponse, Serializer serializer) { HttpServerResponse httpServerResponse = request.response() .putHeader("Content-Type", "application/json"); try { byte[] serialized = serializer.serialize(rpcResponse); httpServerResponse.end(Buffer.buffer(serialized)); } catch (IOException e) { e.printStackTrace(); httpServerResponse.end(Buffer.buffer()); }
} }
|