[TOC]
分布式RPC框架
项目描述:该项目是编写一个RPC调度框架,实现跨服务调度
使用技术:本项目使用的技术主要有Netty, Kryo , Hook
主要功能:实现了网络传输,注册中心,序列化,动态均衡,自动注销服务等功能
项目亮点:
1、 通过 Netty基于 Nio的方式实现了网络传输。
2、 自定义编码器解码器,并使用自定义的协议来封装消息。
3、 使用 Kryo的方式实现了数据格式的序列化,提升效率。
4、 使用 Nacos作为注册中心,并使用 ConcurrentHashMap来重用服务。
5、 实现了随机算法和轮询算法来实现负载均衡。
6、 使用单例模式获取 Runtime 对象,采用 线程池 的方式实现服务的注销。
启动过程
客户端启动过程:我们会先new一个NettyClient的对象,
- 这个对象有服务发现的功能 (NacosUtil.getAllInstance(serviceName))
- 随后有一个select方法,根据我们选择的策略进行发现( Instance instance = loadBalancer.select(instances);)
1 | public class NacosServiceDiscovery implements ServiceDiscovery { |
- 还有sendRequest发送消息的功能。
接下来new一个代理方法,传入这个NettyClient对象,我们会通过代理消息发送的功能,然后在invoke方法里进行功能增强。
具体功能的增强就是把我们要调用的服务具体信息包装成一个RpcRequest,里面有请求号,接口名称,调用方法名称,调用方法的参数,调用方法的参数类型,是否是心跳包
1 | RpcRequest rpcRequest = new RpcRequest(UUID.randomUUID().toString(), method.getDeclaringClass().getName(), |
接下来调用NettyClient的发送方法,返回值会封装在CompletableFuture
1 | InetSocketAddress inetSocketAddress = serviceDiscovery.lookupService(rpcRequest.getInterfaceName()); |
取到地址后我们就可以根据地址,调用Netty进行发送消息
1 | Channel channel = ChannelProvider.get(inetSocketAddress, serializer); |
1 | public class ChannelProvider { |
在pipeline里自定义编码器,设置心跳时间,设置解码器
1 | bootstrap.handler(new ChannelInitializer<SocketChannel>() { |
然后连接服务端
1 | channel = connect(bootstrap, inetSocketAddress); |
设置连接的监听器
1 | private static Channel connect(Bootstrap bootstrap, InetSocketAddress inetSocketAddress) throws ExecutionException, InterruptedException { |
设置发送消息的监听器
1 | channel.writeAndFlush(rpcRequest).addListener((ChannelFutureListener) future1 -> { |
1 | channel.writeAndFlush(rpcRequest) |
获取服务名字
1 | NacosUtil.getAllInstance(serviceName); |
获取服务
1 | loadBalancer.select(instances); |
1 | return instances.get(new Random().nextInt(instances.size())); |
心跳机制
客户端
1 | .addLast(new IdleStateHandler(0, 5, 0, TimeUnit.SECONDS)) |
1)readerIdleTime:为读超时时间(即测试端一定时间内未接受到被测试端消息)
2)writerIdleTime:为写超时时间(即测试端一定时间内向被测试端发送消息)
3)allIdleTime:所有类型的超时时间
1 | @Override |
服务器
1 | @Override |
CompletableFuture
1 | unprocessedRequests.put(rpcRequest.getRequestId(), resultFuture); |
在NettyClientHandler
1 | unprocessedRequests.complete(msg); |
在sendRequest方法里
1 | unprocessedRequests.put(rpcRequest.getRequestId(), resultFuture); |
kyrio序列化
Kryo序列化机制比默认的Java序列化机制速度要快,序列化后的数据要更小,大概是Java序列化机制的1/10。所以Kryo序列化优化以后,可以让网络传输的数据变少,在集群中耗费的内存资源大大减少。
https://www.cnblogs.com/520playboy/p/6341490.html
java原生序列化时间:8281 ms
java原生反序列化时间:5899 ms
和
Kryo 序列化时间:630 ms
Kryo 反序列化时间:15 ms
经过对比,可以发现kryo是java原生序列化性能十几倍
1、Kryo序列化后比Hessian小很多。(kryo优于hessian)
2、由于Kryo没有将类field的描述信息序列化,所以Kryo需要以自己加载该类的filed。这意味着如果该类没有在kryo中注册,或者该类是第一次被kryo序列化时,kryo需要时间去加载该类(hessian优于kryo)
3、由于2的原因,如果该类已经被kryo加载过,那么kryo保存了其类的信息,就可以很快的将byte数组填入到类的field中,而hessian则需要解析序列化后的byte数组中的field信息,对于序列化过的类,kryo优于hessian。
1 | /** |
Netty
自定义协议与编解码器
在传输过程中,我们可以在发送的数据上加上各种必要的数据,形成自定义的协议,而自动加上这个数据就是编码器的工作,解析数据获得原始数据就是解码器的工作。
我们定义的协议是这样的:
首先是 4 字节魔数,表识一个协议包。接着是 Package Type,标明这是一个调用请求还是调用响应,Serializer Type 标明了实际数据使用的序列化器,这个服务端和客户端应当使用统一标准;Data Length 就是实际数据的长度,设置这个字段主要防止粘包,最后就是经过序列化后的实际数据,可能是 RpcRequest 也可能是 RpcResponse 经过序列化后的字节,取决于 Package Type。
1 | CommonEncoder extends MessageToByteEncoder |
1 | CommonDecoder extends ReplayingDecoder |
1 | @Override |
1 | public class KryoSerializer implements CommonSerializer { |
NettyServerHandler 和 NettyClientHandler
(Server)汇总
对于Server端来说,先初始化一个服务(HelloService),再启动一个NettyServer
这个NettyServer会在构造器里初始化 host、port和serviceRegistry方法以及serviceProvider方法。前者是Nacos服务注册中心,用于向nacos注册和获取服务,后者是本地保存和提供服务实例对象
除此之外,还有一个publishService,它会传入服务(object)和接口的类,然后在这个类里,调用相应的方法,把服务保存本地,然后向nacos注册
会在 start()方法里,初始化ServerBoostrap,自定义编码器和解码器,添加handler,绑定相应的ip地址和端口号进行监听
NacosServiceRegistry
静态代码块li向nacos进行注册
1 | public class NacosUtil { |
ServiceProviderImpl(默认的服务注册表,保存服务端本地服务)
1 | private static final Map<String, Object> serviceMap = new ConcurrentHashMap<>(); |
向registeredService 集合添加服务
1 | registeredService.add(serviceName); |
向serviceMap里放入服务的名字和具体的服务
1 | serviceMap.put(serviceName, service); |
1 | @Override |
1 | ServerBootstrap serverBootstrap = new ServerBootstrap(); |
NettyServerhandler
初始化一个线程池,线程池里调用requestHandler,这个requestHandler用于处理客户端传过来的rpcRequest, 通过serviceProvider和传过来的rpcRequest的接口,返回一个服务,并返回相应的结果。
1 | requestHandler.handle(msg); |
调用requestHandler来处理信息
这个requestHandler通过本地的serviceProvider来处理信息
1 | Object service = serviceProvider.getServiceProvider(rpcRequest.getInterfaceName()); |
1 | @Override |
通过反射来获取相应的方法
1 | Method method = service.getClass().getMethod(rpcRequest.getMethodName(), rpcRequest.getParamTypes()); |
1 | ctx.writeAndFlush(RpcResponse.success(result, msg.getRequestId()) |
1 | public static <T> RpcResponse<T> success(T data, String requestId) { |
1 | public static <T> RpcResponse<T> success(T data, String requestId) { |
服务自动注销
注销的方法
1 | public class NacosUtil { |
钩子
1 | public class ShutdownHook { |
使用了单例模式创建其对象,在 addClearAllHook 中,Runtime 对象是 JVM 虚拟机的运行时环境,调用其 addShutdownHook 方法增加一个钩子函数,创建一个新线程调用 clearRegistry 方法完成注销工作。这个钩子函数会在 JVM 关闭之前被调用。
这样在 RpcServer 启动之前,只需要调用 addClearAllHook,就可以注册这个钩子了。例如在 NettyServer 中:
1 | ChannelFuture future = serverBootstrap.bind(host, port).sync(); |
负载均衡策略
负载均衡大家应该都熟悉,在上一节中客户端在 lookupService 方法中,从 Nacos 获取到的是所有提供这个服务的服务端信息列表,我们就需要从中选择一个,这便涉及到客户端侧的负载均衡策略。我们新建一个接口:LoadBalancer:
1 | public interface LoadBalancer { |
接口中的 select 方法用于从一系列 Instance 中选择一个。这里我就实现两个比较经典的算法:随机和转轮。
随机算法顾名思义,就是随机选一个,毫无技术含量:
1 | public class RandomLoadBalancer implements LoadBalancer { |
而转轮算法大家也应该了解,按照顺序依次选择第一个、第二个、第三个……这里就需要一个变量来表示当前选到了第几个:
1 | public class RoundRobinLoadBalancer implements LoadBalancer { |
index 就表示当前选到了第几个服务器,并且每次选择后都会自增一。
最后在 NacosServiceRegistry 中集成就可以了,这里选择外部传入的方式传入 LoadBalancer:
1 | public class NacosServiceDiscovery implements ServiceDiscovery { |
服务自动注册
定义注解
首先我们需要定义两个注解:Service 和 ServiceScan:
Service.java
1 | (ElementType.TYPE) |
ServiceScan.java
1 | (ElementType.TYPE) |
@Service 放在一个类上,标识这个类提供一个服务,@ServiceScan 放在启动的入口类上(main 方法所在的类),标识服务的扫描的包的范围。Service 注解的值定义为该服务的名称,默认值是该类的完整类名,而 ServiceScan 的值定义为扫描范围的根包,默认值为入口类所在的包,扫描时会扫描该包及其子包下所有的类,找到标记有 Service 的类,并注册。
工具类ReflectUtil
主要就是 getClasses
方法,传入一个包名,用于扫描该包及其子包下所有的类,并将其 Class 对象放入一个 Set 中返回。
1 | public class ReflectUtil { |
扫描服务
由于扫描服务这一步是一个比较公共的方法,无论是 Socket 还是 Netty 的服务端都需要这个方法,于是我对项目做了一点重构,使用了一个抽象类 AbstractRpcServer 实现了 RpcServer 接口,而 NettyServer 和 SocketServer 继承自 AbstractRpcServer,将 scanServices 方法放在抽象类中,而 start 方法则由具体实现类来实现。
1 | public void scanServices() { |
我们首先需要获得要扫描的包的范围,就需要获取到 ServiceScan 注解的值,而我们前面说过,这个注解是加在启动类上的,那么,我们怎么知道启动类是哪一个呢?答案是通过调用栈。方法的调用和返回是通过方法调用栈来实现的,当调用一个方法时,该方法入栈,该方法返回时,该方法出站,控制回到栈顶的方法。那么,main 方法一定位于调用栈的最底端,在 ReflectUtils 中,我写了一个 getStackTrace 方法(名字起得不好),用于获取 main 所在的类。通过 Class 对象的 isAnnotationPresent 方法来判断该类是否有 ServiceScan 注解。如果有,通过startClass.getAnnotation(ServiceScan.class).value();
获取注解的值。
当获得扫描的范围后,就可以通过ReflectUtil.getClasses(basePackage)
获取到所有的 Class 了,逐个判断是否有 Service 注解,如果有的话,通过反射创建该对象,并且调用 publishService 注册即可。
负载均衡
1 | @Override |
Runtime(服务注销)
1 | public static void clearRegistry() { |
开启自动注册并测试
以 NettyServer 为例,在 NettyServer 的构造方法最后,调用 scanServices 方法,即可自动注册所有服务:
1 | public NettyServer(String host, int port, Integer serializer) { |
不要忘了在 HelloServiceImpl 类上加上 @service 注解:
1 |
|
并且在服务器启动类上加上注解:
1 |
|
直接使用启动类所在的包作为扫描根包。
启动类变得无比简洁!启动后应该能看到和之前相同的结果
线程池
1 | new ThreadPoolExecutor(10, 100, 1, 分钟, 数组(10), threadFactory) |