rpc项目总结

[TOC]

分布式RPC框架

项目描述:该项目是编写一个RPC调度框架,实现跨服务调度

使用技术:本项目使用的技术主要有Netty, Kryo , Hook

主要功能:实现了网络传输,注册中心,序列化,动态均衡,自动注销服务等功能

项目亮点

1、 通过 Netty基于 Nio的方式实现了网络传输。
2、 自定义编码器解码器,并使用自定义的协议来封装消息。
3、 使用 Kryo的方式实现了数据格式的序列化,提升效率。
4、 使用 Nacos作为注册中心,并使用 ConcurrentHashMap来重用服务。
5、 实现了随机算法和轮询算法来实现负载均衡。
6、 使用单例模式获取 Runtime 对象,采用 线程池 的方式实现服务的注销。

启动过程

客户端启动过程:我们会先new一个NettyClient的对象,

  • 这个对象有服务发现的功能 (NacosUtil.getAllInstance(serviceName))
  • 随后有一个select方法,根据我们选择的策略进行发现( Instance instance = loadBalancer.select(instances);)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class NacosServiceDiscovery implements ServiceDiscovery {
...
public NacosServiceDiscovery(LoadBalancer loadBalancer) {
if(loadBalancer == null) this.loadBalancer = new RandomLoadBalancer();
else this.loadBalancer = loadBalancer;
}
...
@Override
public InetSocketAddress lookupService(String serviceName) {
try {
List<Instance> instances = NacosUtil.getAllInstance(serviceName);
Instance instance = loadBalancer.select(instances);
return new InetSocketAddress(instance.getIp(), instance.getPort());
} catch (NacosException e) {
logger.error("获取服务时有错误发生:", e);
}
return null;
}

}
  • 还有sendRequest发送消息的功能。

接下来new一个代理方法,传入这个NettyClient对象,我们会通过代理消息发送的功能,然后在invoke方法里进行功能增强。

具体功能的增强就是把我们要调用的服务具体信息包装成一个RpcRequest,里面有请求号,接口名称,调用方法名称,调用方法的参数,调用方法的参数类型,是否是心跳包

1
2
RpcRequest rpcRequest = new RpcRequest(UUID.randomUUID().toString(), method.getDeclaringClass().getName(),
method.getName(), args, method.getParameterTypes(), false);

接下来调用NettyClient的发送方法,返回值会封装在CompletableFuture里面,调用服务发现功能,根据接口的名称获取地址,放在InetSocketAddress里面

1
InetSocketAddress inetSocketAddress = serviceDiscovery.lookupService(rpcRequest.getInterfaceName());

取到地址后我们就可以根据地址,调用Netty进行发送消息

1
Channel channel = ChannelProvider.get(inetSocketAddress, serializer);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
public class ChannelProvider {

private static final Logger logger = LoggerFactory.getLogger(ChannelProvider.class);
private static EventLoopGroup eventLoopGroup;
private static Bootstrap bootstrap = initializeBootstrap();

private static Map<String, Channel> channels = new ConcurrentHashMap<>();

public static Channel get(InetSocketAddress inetSocketAddress, CommonSerializer serializer) throws InterruptedException {
String key = inetSocketAddress.toString() + serializer.getCode();
if (channels.containsKey(key)) {
Channel channel = channels.get(key);
if(channels != null && channel.isActive()) {
return channel;
} else {
channels.remove(key);
}
}
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
/*自定义序列化编解码器*/
// RpcResponse -> ByteBuf
ch.pipeline().addLast(new CommonEncoder(serializer))
.addLast(new IdleStateHandler(0, 5, 0, TimeUnit.SECONDS))
.addLast(new CommonDecoder())
.addLast(new NettyClientHandler());
}
});
Channel channel = null;
try {
channel = connect(bootstrap, inetSocketAddress);
} catch (ExecutionException e) {
logger.error("连接客户端时有错误发生", e);
return null;
}
channels.put(key, channel);
return channel;
}

private static Channel connect(Bootstrap bootstrap, InetSocketAddress inetSocketAddress) throws ExecutionException, InterruptedException {
CompletableFuture<Channel> completableFuture = new CompletableFuture<>();
bootstrap.connect(inetSocketAddress).addListener((ChannelFutureListener) future -> {
if (future.isSuccess()) {
logger.info("客户端连接成功!");
completableFuture.complete(future.channel());
} else {
throw new IllegalStateException();
}
});
return completableFuture.get();
}

private static Bootstrap initializeBootstrap() {
eventLoopGroup = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup)
.channel(NioSocketChannel.class)
//连接的超时时间,超过这个时间还是建立不上的话则代表连接失败
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
//是否开启 TCP 底层心跳机制
.option(ChannelOption.SO_KEEPALIVE, true)
//TCP默认开启了 Nagle 算法,该算法的作用是尽可能的发送大数据快,减少网络传输。TCP_NODELAY 参数的作用就是控制是否启用 Nagle 算法。
.option(ChannelOption.TCP_NODELAY, true);
return bootstrap;
}

}

在pipeline里自定义编码器,设置心跳时间,设置解码器

1
2
3
4
5
6
7
8
9
10
11
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
/*自定义序列化编解码器*/
// RpcResponse -> ByteBuf
ch.pipeline().addLast(new CommonEncoder(serializer))
.addLast(new IdleStateHandler(0, 5, 0, TimeUnit.SECONDS))
.addLast(new CommonDecoder())
.addLast(new NettyClientHandler());
}
});

然后连接服务端

1
channel = connect(bootstrap, inetSocketAddress);

设置连接的监听器

1
2
3
4
5
6
7
8
9
10
11
12
private static Channel connect(Bootstrap bootstrap, InetSocketAddress inetSocketAddress) throws ExecutionException, InterruptedException {
CompletableFuture<Channel> completableFuture = new CompletableFuture<>();
bootstrap.connect(inetSocketAddress).addListener((ChannelFutureListener) future -> {
if (future.isSuccess()) {
logger.info("客户端连接成功!");
completableFuture.complete(future.channel());
} else {
throw new IllegalStateException();
}
});
return completableFuture.get();
}

设置发送消息的监听器

1
2
3
4
5
6
7
8
9
channel.writeAndFlush(rpcRequest).addListener((ChannelFutureListener) future1 -> {
if (future1.isSuccess()) {
logger.info(String.format("客户端发送消息: %s", rpcRequest.toString()));
} else {
future1.channel().close();
resultFuture.completeExceptionally(future1.cause());
logger.error("发送消息时有错误发生: ", future1.cause());
}
});
1
channel.writeAndFlush(rpcRequest)

获取服务名字

1
NacosUtil.getAllInstance(serviceName);

获取服务

1
loadBalancer.select(instances);
1
return instances.get(new Random().nextInt(instances.size()));

心跳机制

客户端

1
.addLast(new IdleStateHandler(0, 5, 0, TimeUnit.SECONDS))

1)readerIdleTime:为读超时时间(即测试端一定时间内未接受到被测试端消息)

2)writerIdleTime:为写超时时间(即测试端一定时间内向被测试端发送消息)

3)allIdleTime:所有类型的超时时间

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleState state = ((IdleStateEvent) evt).state();
if (state == IdleState.WRITER_IDLE) {
logger.info("发送心跳包 [{}]", ctx.channel().remoteAddress());
Channel channel = ChannelProvider.get((InetSocketAddress) ctx.channel().remoteAddress(), CommonSerializer.getByCode(CommonSerializer.DEFAULT_SERIALIZER));
RpcRequest rpcRequest = new RpcRequest();
rpcRequest.setHeartBeat(true);
channel.writeAndFlush(rpcRequest).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
}
} else {
super.userEventTriggered(ctx, evt);
}
}

服务器

1
2
3
4
5
6
7
8
9
10
11
12
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleState state = ((IdleStateEvent) evt).state();
if (state == IdleState.READER_IDLE) {
logger.info("长时间未收到心跳包,断开连接...");
ctx.close();
}
} else {
super.userEventTriggered(ctx, evt);
}
}

CompletableFuture

1
unprocessedRequests.put(rpcRequest.getRequestId(), resultFuture);

在NettyClientHandler

1
unprocessedRequests.complete(msg);

在sendRequest方法里

1
unprocessedRequests.put(rpcRequest.getRequestId(), resultFuture);

kyrio序列化

Kryo序列化机制比默认的Java序列化机制速度要快,序列化后的数据要更小,大概是Java序列化机制的1/10。所以Kryo序列化优化以后,可以让网络传输的数据变少,在集群中耗费的内存资源大大减少。

https://www.cnblogs.com/520playboy/p/6341490.html

java原生序列化时间:8281 ms
java原生反序列化时间:5899 ms

Kryo 序列化时间:630 ms
Kryo 反序列化时间:15 ms

经过对比,可以发现kryo是java原生序列化性能十几倍

1、Kryo序列化后比Hessian小很多。(kryo优于hessian)

2、由于Kryo没有将类field的描述信息序列化,所以Kryo需要以自己加载该类的filed。这意味着如果该类没有在kryo中注册,或者该类是第一次被kryo序列化时,kryo需要时间去加载该类(hessian优于kryo)

3、由于2的原因,如果该类已经被kryo加载过,那么kryo保存了其类的信息,就可以很快的将byte数组填入到类的field中,而hessian则需要解析序列化后的byte数组中的field信息,对于序列化过的类,kryo优于hessian。

image-20201229165346572

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
/**
* Kryo序列化类,Kryo序列化效率很高,但是只兼容 Java 语言
*
* @author shuang.kou
* @createTime 2020年05月13日 19:29:00
*/
@Slf4j
public class KryoSerializer implements Serializer {

/**
* 由于 Kryo 不是线程安全的。每个线程都应该有自己的 Kryo,Input 和 Output 实例。
* 所以,使用 ThreadLocal 存放 Kryo 对象
*/
private final ThreadLocal<Kryo> kryoThreadLocal = ThreadLocal.withInitial(() -> {
Kryo kryo = new Kryo();
kryo.register(RpcResponse.class);
kryo.register(RpcRequest.class);
kryo.setReferences(true); //默认值为true,是否关闭注册行为,关闭之后可能存在序列化问题,一般推荐设置为 true
kryo.setRegistrationRequired(false); //默认值为false,是否关闭循环引用,可以提高性能,但是一般不推荐设置为 true
return kryo;
});

@Override
public byte[] serialize(Object obj) {
try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
Output output = new Output(byteArrayOutputStream)) {
Kryo kryo = kryoThreadLocal.get();
// Object->byte:将对象序列化为byte数组
kryo.writeObject(output, obj);
kryoThreadLocal.remove();
return output.toBytes();
} catch (Exception e) {
throw new SerializeException("序列化失败");
}
}

@Override
public <T> T deserialize(byte[] bytes, Class<T> clazz) {
try (ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes);
Input input = new Input(byteArrayInputStream)) {
Kryo kryo = kryoThreadLocal.get();
// byte->Object:从byte数组中反序列化出对对象
Object o = kryo.readObject(input, clazz);
kryoThreadLocal.remove();
return clazz.cast(o);
} catch (Exception e) {
throw new SerializeException("反序列化失败");
}
}

}

Netty

自定义协议与编解码器

在传输过程中,我们可以在发送的数据上加上各种必要的数据,形成自定义的协议,而自动加上这个数据就是编码器的工作,解析数据获得原始数据就是解码器的工作。

我们定义的协议是这样的:

image-20201120101454330

首先是 4 字节魔数,表识一个协议包。接着是 Package Type,标明这是一个调用请求还是调用响应,Serializer Type 标明了实际数据使用的序列化器,这个服务端和客户端应当使用统一标准;Data Length 就是实际数据的长度,设置这个字段主要防止粘包,最后就是经过序列化后的实际数据,可能是 RpcRequest 也可能是 RpcResponse 经过序列化后的字节,取决于 Package Type。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
CommonEncoder extends MessageToByteEncoder
{
protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
out.writeInt(MAGIC_NUMBER);
if (msg instanceof RpcRequest) {
out.writeInt(PackageType.REQUEST_PACK.getCode());
} else {
out.writeInt(PackageType.RESPONSE_PACK.getCode());
}
out.writeInt(serializer.getCode());
byte[] bytes = serializer.serialize(msg);
out.writeInt(bytes.length);
out.writeBytes(bytes);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
CommonDecoder extends ReplayingDecoder
{
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
int magic = in.readInt();
if (magic != MAGIC_NUMBER) {
logger.error("不识别的协议包: {}", magic);
throw new RpcException(RpcError.UNKNOWN_PROTOCOL);
}
int packageCode = in.readInt();
Class<?> packageClass;
if (packageCode == PackageType.REQUEST_PACK.getCode()) {
packageClass = RpcRequest.class;
} else if (packageCode == PackageType.RESPONSE_PACK.getCode()) {
packageClass = RpcResponse.class;
} else {
logger.error("不识别的数据包: {}", packageCode);
throw new RpcException(RpcError.UNKNOWN_PACKAGE_TYPE);
}
int serializerCode = in.readInt();
CommonSerializer serializer = CommonSerializer.getByCode(serializerCode);
if (serializer == null) {
logger.error("不识别的反序列化器: {}", serializerCode);
throw new RpcException(RpcError.UNKNOWN_SERIALIZER);
}
int length = in.readInt();
byte[] bytes = new byte[length];
in.readBytes(bytes);
Object obj = serializer.deserialize(bytes, packageClass);
out.add(obj);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
@Override
protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
out.writeInt(MAGIC_NUMBER);
if (msg instanceof RpcRequest) {
out.writeInt(PackageType.REQUEST_PACK.getCode());
} else {
out.writeInt(PackageType.RESPONSE_PACK.getCode());
}
out.writeInt(serializer.getCode());
byte[] bytes = serializer.serialize(msg);
out.writeInt(bytes.length);
out.writeBytes(bytes);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public class KryoSerializer implements CommonSerializer {

private static final Logger logger = LoggerFactory.getLogger(KryoSerializer.class);

private static final ThreadLocal<Kryo> kryoThreadLocal = ThreadLocal.withInitial(() -> {
Kryo kryo = new Kryo();
kryo.register(RpcResponse.class);
kryo.register(RpcRequest.class);
kryo.setReferences(true);
kryo.setRegistrationRequired(false);
return kryo;
});

@Override
public byte[] serialize(Object obj) {
try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
Output output = new Output(byteArrayOutputStream)) {
Kryo kryo = kryoThreadLocal.get();
kryo.writeObject(output, obj);
kryoThreadLocal.remove();
return output.toBytes();
} catch (Exception e) {
logger.error("序列化时有错误发生:", e);
throw new SerializeException("序列化时有错误发生");
}
}

@Override
public Object deserialize(byte[] bytes, Class<?> clazz) {
try (ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes);
Input input = new Input(byteArrayInputStream)) {
Kryo kryo = kryoThreadLocal.get();
Object o = kryo.readObject(input, clazz);
kryoThreadLocal.remove();
return o;
} catch (Exception e) {
logger.error("反序列化时有错误发生:", e);
throw new SerializeException("反序列化时有错误发生");
}
}

@Override
public int getCode() {
return SerializerCode.valueOf("KRYO").getCode();
}
}

NettyServerHandler 和 NettyClientHandler

(Server)汇总

对于Server端来说,先初始化一个服务(HelloService),再启动一个NettyServer

  • 这个NettyServer会在构造器里初始化 host、port和serviceRegistry方法以及serviceProvider方法。前者是Nacos服务注册中心,用于向nacos注册和获取服务,后者是本地保存和提供服务实例对象

  • 除此之外,还有一个publishService,它会传入服务(object)和接口的类,然后在这个类里,调用相应的方法,把服务保存本地,然后向nacos注册

  • 会在 start()方法里,初始化ServerBoostrap,自定义编码器和解码器,添加handler,绑定相应的ip地址和端口号进行监听

NacosServiceRegistry

静态代码块li向nacos进行注册

1
2
3
4
5
6
7
8
9
10
11
12
13
public class NacosUtil {
....
public void register(String serviceName, InetSocketAddress inetSocketAddress) {
try {
namingService.registerInstance(serviceName, inetSocketAddress.getHostName(), inetSocketAddress.getPort());//top.guoziyang.rpc.api.HelloService
} catch (NacosException e) {
logger.error("注册服务时有错误发生:", e);
throw new RpcException(RpcError.REGISTER_SERVICE_FAILED);
}
}
}
...
namingService=NamingFactory.createNamingService(SERVER_ADDR);

ServiceProviderImpl(默认的服务注册表,保存服务端本地服务)

1
2
3
4
private static final Map<String, Object> serviceMap = new ConcurrentHashMap<>();
private static final Set<String> registeredService = ConcurrentHashMap.newKeySet();
addServiceProvider(T service)
getServiceProvider(String serviceName)

向registeredService 集合添加服务

1
registeredService.add(serviceName);

向serviceMap里放入服务的名字和具体的服务

1
serviceMap.put(serviceName, service);
1
2
3
4
5
6
7
8
@Override
public <T> void addServiceProvider(T service, Class<T> serviceClass) {//top.guoziyang.rpc.api.HelloService //interface top.guoziyang.rpc.api.HelloService
String serviceName = serviceClass.getCanonicalName();
if (registeredService.contains(serviceName)) return;//重用服务
registeredService.add(serviceName);
serviceMap.put(serviceName, service);
logger.info("向接口: {} 注册服务: {}", service.getClass().getInterfaces(), serviceName);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.option(ChannelOption.SO_BACKLOG, 256)
.option(ChannelOption.SO_KEEPALIVE, true)
.childOption(ChannelOption.TCP_NODELAY, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new IdleStateHandler(30, 0, 0, TimeUnit.SECONDS))
.addLast(new CommonEncoder(serializer))
.addLast(new CommonDecoder())
.addLast(new NettyServerHandler());
}
});
ChannelFuture future = serverBootstrap.bind(host, port).sync();
future.channel().closeFuture().sync();

NettyServerhandler

初始化一个线程池,线程池里调用requestHandler,这个requestHandler用于处理客户端传过来的rpcRequest, 通过serviceProvider和传过来的rpcRequest的接口,返回一个服务,并返回相应的结果。

1
requestHandler.handle(msg);

调用requestHandler来处理信息

这个requestHandler通过本地的serviceProvider来处理信息

1
Object service = serviceProvider.getServiceProvider(rpcRequest.getInterfaceName());
1
2
3
4
5
6
7
8
@Override
public Object getServiceProvider(String serviceName) {
Object service = serviceMap.get(serviceName);
if (service == null) {
throw new RpcException(RpcError.SERVICE_NOT_FOUND);
}
return service;
}

通过反射来获取相应的方法

1
2
Method method = service.getClass().getMethod(rpcRequest.getMethodName(), rpcRequest.getParamTypes());
result = method.invoke(service, rpcRequest.getParameters());
1
ctx.writeAndFlush(RpcResponse.success(result, msg.getRequestId())
1
2
3
4
5
6
7
public static <T> RpcResponse<T> success(T data, String requestId) {
RpcResponse<T> response = new RpcResponse<>();
response.setRequestId(requestId);
response.setStatusCode(ResponseCode.SUCCESS.getCode());
response.setData(data);
return response;
}
1
2
3
4
5
6
7
public static <T> RpcResponse<T> success(T data, String requestId) {
RpcResponse<T> response = new RpcResponse<>();
response.setRequestId(requestId);
response.setStatusCode(ResponseCode.SUCCESS.getCode());
response.setData(data);
return response;
}

服务自动注销

注销的方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class NacosUtil {

...

public static NamingService getNacosNamingService()

public static void registerService(String serviceName, InetSocketAddress address)

public static List<Instance> getAllInstance(String serviceName)

public static void clearRegistry() {
if(!serviceNames.isEmpty() && address != null) {
String host = address.getHostName();
int port = address.getPort();
Iterator<String> iterator = serviceNames.iterator();
while(iterator.hasNext()) {
String serviceName = iterator.next();
try {
namingService.deregisterInstance(serviceName, host, port);
} catch (NacosException e) {
logger.error("注销服务 {} 失败", serviceName, e);
}
}
}
}
}

钩子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class ShutdownHook {

private static final Logger logger = LoggerFactory.getLogger(ShutdownHook.class);

private final ExecutorService threadPool = ThreadPoolFactory.createDefaultThreadPool("shutdown-hook");
private static final ShutdownHook shutdownHook = new ShutdownHook();

public static ShutdownHook getShutdownHook() {
return shutdownHook;
}

public void addClearAllHook() {
logger.info("关闭后将自动注销所有服务");
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
NacosUtil.clearRegistry();
threadPool.shutdown();
}));
}

}

使用了单例模式创建其对象,在 addClearAllHook 中,Runtime 对象是 JVM 虚拟机的运行时环境,调用其 addShutdownHook 方法增加一个钩子函数,创建一个新线程调用 clearRegistry 方法完成注销工作。这个钩子函数会在 JVM 关闭之前被调用。

这样在 RpcServer 启动之前,只需要调用 addClearAllHook,就可以注册这个钩子了。例如在 NettyServer 中:

1
2
3
ChannelFuture future = serverBootstrap.bind(host, port).sync();
ShutdownHook.getShutdownHook().addClearAllHook();
future.channel().closeFuture().sync();

负载均衡策略

负载均衡大家应该都熟悉,在上一节中客户端在 lookupService 方法中,从 Nacos 获取到的是所有提供这个服务的服务端信息列表,我们就需要从中选择一个,这便涉及到客户端侧的负载均衡策略。我们新建一个接口:LoadBalancer:

1
2
3
4
public interface LoadBalancer {
Instance select(List<Instance> instances);
}
123

接口中的 select 方法用于从一系列 Instance 中选择一个。这里我就实现两个比较经典的算法:随机和转轮。

随机算法顾名思义,就是随机选一个,毫无技术含量:

1
2
3
4
5
6
7
8
public class RandomLoadBalancer implements LoadBalancer {

@Override
public Instance select(List<Instance> instances) {
return instances.get(new Random().nextInt(instances.size()));
}

}

而转轮算法大家也应该了解,按照顺序依次选择第一个、第二个、第三个……这里就需要一个变量来表示当前选到了第几个:

1
2
3
4
5
6
7
8
9
10
11
12
13
public class RoundRobinLoadBalancer implements LoadBalancer {

private int index = 0;

@Override
public Instance select(List<Instance> instances) {
if(index >= instances.size()) {
index %= instances.size();
}
return instances.get(index++);
}

}

index 就表示当前选到了第几个服务器,并且每次选择后都会自增一。

最后在 NacosServiceRegistry 中集成就可以了,这里选择外部传入的方式传入 LoadBalancer:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class NacosServiceDiscovery implements ServiceDiscovery {
private final LoadBalancer loadBalancer;

public NacosServiceDiscovery(LoadBalancer loadBalancer) {
if(loadBalancer == null) this.loadBalancer = new RandomLoadBalancer();
else this.loadBalancer = loadBalancer;
}

public InetSocketAddress lookupService(String serviceName) {
try {
List<Instance> instances = NacosUtil.getAllInstance(serviceName);
Instance instance = loadBalancer.select(instances);
return new InetSocketAddress(instance.getIp(), instance.getPort());
} catch (NacosException e) {
logger.error("获取服务时有错误发生:", e);
}
return null;
}
}

服务自动注册

定义注解

首先我们需要定义两个注解:Service 和 ServiceScan:

Service.java

1
2
3
4
5
6
7
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface Service {

public String name() default "";

}

ServiceScan.java

1
2
3
4
5
6
7
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface ServiceScan {

public String value() default "";

}

@Service 放在一个类上,标识这个类提供一个服务,@ServiceScan 放在启动的入口类上(main 方法所在的类),标识服务的扫描的包的范围。Service 注解的值定义为该服务的名称,默认值是该类的完整类名,而 ServiceScan 的值定义为扫描范围的根包,默认值为入口类所在的包,扫描时会扫描该包及其子包下所有的类,找到标记有 Service 的类,并注册。

工具类ReflectUtil

主要就是 getClasses 方法,传入一个包名,用于扫描该包及其子包下所有的类,并将其 Class 对象放入一个 Set 中返回。

1
2
3
4
5
6
7
8
9
public class ReflectUtil {
public static Set<Class<?>> getClasses(String packageName) {
...
return classes;
}

private static void findAndAddClassesInPackageByFile{
}
}

扫描服务

由于扫描服务这一步是一个比较公共的方法,无论是 Socket 还是 Netty 的服务端都需要这个方法,于是我对项目做了一点重构,使用了一个抽象类 AbstractRpcServer 实现了 RpcServer 接口,而 NettyServer 和 SocketServer 继承自 AbstractRpcServer,将 scanServices 方法放在抽象类中,而 start 方法则由具体实现类来实现。

1
2
3
public void scanServices() {
...
}

我们首先需要获得要扫描的包的范围,就需要获取到 ServiceScan 注解的值,而我们前面说过,这个注解是加在启动类上的,那么,我们怎么知道启动类是哪一个呢?答案是通过调用栈。方法的调用和返回是通过方法调用栈来实现的,当调用一个方法时,该方法入栈,该方法返回时,该方法出站,控制回到栈顶的方法。那么,main 方法一定位于调用栈的最底端,在 ReflectUtils 中,我写了一个 getStackTrace 方法(名字起得不好),用于获取 main 所在的类。通过 Class 对象的 isAnnotationPresent 方法来判断该类是否有 ServiceScan 注解。如果有,通过startClass.getAnnotation(ServiceScan.class).value(); 获取注解的值。

当获得扫描的范围后,就可以通过ReflectUtil.getClasses(basePackage) 获取到所有的 Class 了,逐个判断是否有 Service 注解,如果有的话,通过反射创建该对象,并且调用 publishService 注册即可。

负载均衡

1
2
3
4
5
6
7
@Override
public Instance select(List<Instance> instances) {
if(index >= instances.size()) {
index %= instances.size();
}
return instances.get(index++);
}

Runtime(服务注销)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public static void clearRegistry() {
if(!serviceNames.isEmpty() && address != null) {
String host = address.getHostName();
int port = address.getPort();
Iterator<String> iterator = serviceNames.iterator();
while(iterator.hasNext()) {
String serviceName = iterator.next();
try {
namingService.deregisterInstance(serviceName, host, port);
} catch (NacosException e) {
logger.error("注销服务 {} 失败", serviceName, e);
}
}
}
}

开启自动注册并测试

以 NettyServer 为例,在 NettyServer 的构造方法最后,调用 scanServices 方法,即可自动注册所有服务:

1
2
3
4
5
6
7
8
public NettyServer(String host, int port, Integer serializer) {
this.host = host;
this.port = port;
serviceRegistry = new NacosServiceRegistry();
serviceProvider = new ServiceProviderImpl();
this.serializer = CommonSerializer.getByCode(serializer);
scanServices();
}

不要忘了在 HelloServiceImpl 类上加上 @service 注解:

1
2
3
4
5
6
7
@Service
public class HelloServiceImpl implements HelloService {
@Override
public String hello(String name) {
return "Hello, " + name;
}
}

并且在服务器启动类上加上注解:

1
2
3
4
5
6
7
@ServiceScan
public class NettyTestServer {
public static void main(String[] args) {
NettyServer server = new NettyServer("127.0.0.1", 9999, CommonSerializer.PROTOBUF_SERIALIZER);
server.start();
}
}

直接使用启动类所在的包作为扫描根包。

启动类变得无比简洁!启动后应该能看到和之前相同的结果

线程池

1
new ThreadPoolExecutor(10, 100, 1, 分钟, 数组(10), threadFactory)