RPC 简介
Remote Procedure Call,远程过程调用。在分布式场景下,调用远程服务,就像本地调用一样简单。
而在分布式场景下,server 通常都是集群模式;
有多个 server 节点,如何同步给 client,这就涉及到了 registry 注册中心;
provider 作为服务提供方,将自身信息注册到 registry 中,consumer 服务消费方通过 registry 订阅 provider 信息,在 PRC 调用时选择合适的 provider 节点。
涉及到服务的注册订阅,原来的 server 和 client 分别改为 provider 和 consumer 比较合适。
业界参考
Java RMI
RPC 的 JDK(1.1) 版本,不过由于存在漏洞和太过局限性,并不广为人知;
- 接口必须继承
java.rmi.Remote
接口 - 方法必须抛出
java.rmi.RemoteException
异常 - 传输对象必须实现
java.io.Serializable
接口
Spring 集成 RMI
仅是对 RMI 进行了封装,解除了上述原生的几个限制,例如接口注册;
<bean class="com.caojiantao.RmiService"/>
<bean class="org.springframework.remoting.rmi.RmiServiceExporter">
<property name="service" ref="rmiService"/>
<property name="serviceName" value="rmiService"/>
<property name="serviceInterface" value="com.caojiantao.IRmiService"/>
<property name="registryPort" value="9090"/>
</bean>
Spring 从 5.3 开始逐步淘汰 RMI,phasing out serialization-based remoting。
Doubbo
阿里的知名 RPC 框架,后加入 Apache 孵化器;
Spring Cloud
微服务架构下的一站式解决方案,与 Spring 完美融合。RPC 调用基于 Feigin,采用的 Http 协议。
知识储备
总结 RPC 最核心的两个模块,寻址和通信。寻址涉及到注册中心,负载均衡,心跳机制等。通信就涉及到通信协议,TCP粘包,序列化等。
Zookeeper
Zookeeper 是一个分布式协调框架,保证了 CP,通常用作注册中心。
一个分布式系统最多只能同时满足一致性(Consistency)、可用性(Availability)和分区容错性(Partition tolerance)这三项中的两项。
数据节点有两个特性,-e
指临时节点,随着 session 关闭而清除,-s
指顺序节点,创建的节点名自增加 1。
TCP 粘包
TCP 是面向流的协议,消息无边界。传输的消息报文受多种因素影响可能会拆分成多个数据包,这就是粘包。接收方需要找到消息边界,解析完整的消息报文,这就是拆包。
通常有以下几种拆包方式;
- 固定消息报文长度
- 特定分隔符
- 消息长度 + 内容
序列化/反序列化
数据在网络都是以二进制形式传输,也就是字节数组。将对象转换为字节数组叫做序列化,将字节数组转换为对象叫做反序列化。
常用的序列化方式有 jdk、json、hessian,jdk 是 Java 语言特有且性能较低不建议使用。
由于使用方便数据可读性较好,我倾向 json 序列化,并将原始对象的类型也加入序列化,便于反序列化,不过会增加序列化后的体积。
以 jackson 为例,需要增加 mapper 配置;
private static ObjectMapper mapper = new ObjectMapper();
static {
// 序列化携带类描述信息
mapper.activateDefaultTyping(LaissezFaireSubTypeValidator.instance, ObjectMapper.DefaultTyping.NON_FINAL);
}
负载均衡
负载均衡指的是在集群模式下,将客户端的请求分摊到每个节点。例如 Nginx 中的方向代理,和 RPC 中的服务列表。
常见的随机算法有;
- 随机
- 加权随机
- 轮询
- 一致性哈希
一致性哈希可以参考:https://www.jianshu.com/p/528ce5cd7e8f
心跳机制
心跳机制是定时发送一个自定义的结构体(心跳包),让对方知道自己还活着,以确保连接的有效性的机制。
“有效”不仅指已建立的连接,还包括能够正常请求响应的连接,避免因服务过载或网络波动等原因导致通信异常。
Bean 注册
在运行时注册 Spring Bean,可以参考 org.springframework.beans.factory.support.BeanDefinitionRegistry#registerBeanDefinition
方法,不过可以通过扫描器 org.springframework.context.annotation.ClassPathBeanDefinitionScanner#scan
简化这个流程;
public class Registrar implements ImportBeanDefinitionRegistrar {
@Override
@SneakyThrows
public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
BeanDefinition definition = BeanDefinitionBuilder.rootBeanDefinition(User.class)
.addPropertyValue("name", "caojiantao").getBeanDefinition();
// 手动注入 Bean
registry.registerBeanDefinition("user", definition);
}
}
I/0 多路复用
JDK 动态代理
也叫接口代理,通过反射动态生成代理类对象,实现与目标类的解耦。
/**
* UserHandler handler = new UserHandler(target);
* IUser proxy = Proxy.newProxyInstance(User.class.getClassLoader(), new Class[]{IUser.class}, handler);
*/
public class UserHandler implements InvocationHandler {
private Object target = null;
public UserHandler(Object target) {
this.target = target;
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
Object result = null;
result = method.invoke(target,args);
// todo 代理逻辑
return result;
}
}
注册中心
基于 Zookeeper 实现,保证 CP 强一致性,使用 curator-x-discovery
进一步简化开发;
/**
* @author caojiantao
*/
@Slf4j
public class ZKRegistry implements IRegistry {
private ServiceDiscovery<ServiceInfo> discovery;
public ZKRegistry(CuratorFramework curator) throws Exception {
this.discovery = ServiceDiscoveryBuilder.builder(ServiceInfo.class)
.client(curator)
.basePath("/services")
.build();
this.discovery.start();
}
@SneakyThrows
@Override
public void register(ServiceInfo serviceInfo) {
ServiceInstance<ServiceInfo> instance = toInstance(serviceInfo);
discovery.registerService(instance);
}
@SneakyThrows
@Override
public void unregister(ServiceInfo serviceInfo) {
ServiceInstance<ServiceInfo> instance = toInstance(serviceInfo);
discovery.unregisterService(instance);
}
@Override
public ServiceInfo load(String service, IBalancer<ServiceInfo> balancer) {
ServiceInfo serviceInfo = null;
try (ServiceProvider<ServiceInfo> provider = discovery.serviceProviderBuilder()
// see org.apache.curator.x.discovery.strategies.RandomStrategy
.providerStrategy(instanceProvider -> {
List<ServiceInstance<ServiceInfo>> list = instanceProvider.getInstances();
List<ServiceInfo> payloadList = list.stream().map(ServiceInstance::getPayload).collect(Collectors.toList());
ServiceInfo payload = balancer.choose(payloadList);
return toInstance(payload);
})
.serviceName(service)
.build()) {
provider.start();
ServiceInstance<ServiceInfo> instance = provider.getInstance();
if (Objects.nonNull(instance)) {
serviceInfo = instance.getPayload();
}
} catch (Exception e) {
log.error("[rpc-registry] 获取服务信息异常", e);
}
return serviceInfo;
}
@SneakyThrows
@Override
public List<ServiceInfo> list(String service) {
Collection<ServiceInstance<ServiceInfo>> instances = discovery.queryForInstances(service);
return instances.stream().map(ServiceInstance::getPayload).collect(Collectors.toList());
}
@SneakyThrows
private ServiceInstance<ServiceInfo> toInstance(ServiceInfo serviceInfo) {
return ServiceInstance.<ServiceInfo>builder()
.name(serviceInfo.getName())
.address(serviceInfo.getHost())
.port(serviceInfo.getPort())
.payload(serviceInfo)
.build();
}
}
自定义协议
通信其一就是要约定好协议;
-----------------------------------
| 1魔数 | 32追踪 | 1版本 | 1序列化 |
-----------------------------------
| 1报文类型 | 4报文长度 |
-----------------------------------
| 内容 |
-----------------------------------
- 魔数:合法报文标识
- 追踪:分布式请求链路追踪
- 版本:协议版本号
- 序列化:约定的序列化版本,JSON
- 报文类型:请求、响应、心跳
- 报文长度:实际请求报文内容长度
- 内容:实际请求的报文内容
/**
* @author caojiantao
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class MessageHeader {
@Builder.Default
private Byte magic = Constants.MAGIC;
private String traceId;
@Builder.Default
private Byte version = 1;
@Builder.Default
private ESerializeType serialize = ESerializeType.JSON;
private EMessageType type;
private Integer length;
}
IO 模型
基于非阻塞的 NIO 模型,选用 Netty 开发通信模块;
编码器
public class MessageEncoder extends MessageToByteEncoder<Message> {
@Override
protected void encode(ChannelHandlerContext channelHandlerContext, Message message, ByteBuf byteBuf) throws Exception {
MessageHeader header = message.getHeader();
byteBuf.writeByte(header.getMagic());
byteBuf.writeBytes(header.getTraceId().getBytes());
byteBuf.writeByte(header.getVersion());
ESerializeType serialize = header.getSerialize();
byteBuf.writeByte(serialize.getValue());
byteBuf.writeByte(header.getType().getValue());
ISerialization serialization = serialize.getSerialization();
byte[] body = serialization.serialize(message.getBody());
byteBuf.writeInt(body.length);
byteBuf.writeBytes(body);
}
}
服务端
public boolean start() {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childOption(ChannelOption.TCP_NODELAY, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel channel) throws Exception {
channel.pipeline()
.addLast(new IdleStateHandler(30, 0, 0, TimeUnit.SECONDS))
.addLast(new MessageDecoder())
.addLast(new ServerHeartHandler())
.addLast(new ServerHandler(beanFactory))
.addLast(new MessageEncoder());
}
});
String host = IpUtils.getHostIp();
Integer port = rpcConfig.getProvider().getPort();
ChannelFuture channelFuture = bootstrap.bind(host, port).sync();
return channelFuture.isSuccess();
}
Spring 集成
集成 Spring 是必不可少的;
服务自动注册
@Slf4j
public class ProviderRegister implements BeanPostProcessor {
@Autowired
private IRegistry registry;
@Autowired
private ApplicationContext context;
@Autowired
private RpcConfig config;
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
Class<?>[] interfaces = bean.getClass().getInterfaces();
if (bean instanceof Proxy || interfaces.length == 0) {
return bean;
}
boolean flag = false;
Class<?> service = null;
for (Class<?> clazz : interfaces) {
if (clazz.isAnnotationPresent(RpcService.class)) {
flag = true;
service = clazz;
break;
}
}
if (!flag) {
return bean;
}
String host = IpUtils.getHostIp();
Integer port = config.getProvider().getPort();
ServiceInfo serviceInfo = new ServiceInfo();
serviceInfo.setApp(context.getId());
serviceInfo.setHost(host);
serviceInfo.setPort(port);
serviceInfo.setName(service.getName());
registry.register(serviceInfo);
log.info("[rpc-provider] 注册服务 {} {}:{}", serviceInfo.getName(), host, port);
return bean;
}
}
服务自动发现
@Slf4j
public class ProviderDiscovery implements ImportBeanDefinitionRegistrar {
@Override
@SneakyThrows
public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
Map<String, Object> attrMap = importingClassMetadata.getAnnotationAttributes(EnableRpcConsumer.class.getName());
String[] basePackages = (String[]) attrMap.get("basePackages");
ClassPathBeanDefinitionScanner scanner = new ClassPathBeanDefinitionScanner(registry) {
@Override
@SneakyThrows
protected boolean isCandidateComponent(AnnotatedBeanDefinition definition) {
AnnotationMetadata metadata = definition.getMetadata();
boolean isCandidate = metadata.isInterface() && metadata.isIndependent();
if (!isCandidate) {
return false;
}
String clazzName = definition.getBeanClassName();
Class<?> clazz = Class.forName(clazzName);
ListableBeanFactory beanFactory = (ListableBeanFactory) registry;
String[] exists = beanFactory.getBeanNamesForType(clazz);
if (!ObjectUtils.isEmpty(exists)) {
return false;
}
// 添加动态代理类,统一处理
definition.setBeanClassName(ProviderFactoryBean.class.getName());
definition.getPropertyValues().add("clazz", clazz);
definition.getPropertyValues().add("beanFactory", beanFactory);
return true;
}
};
BeanNameGenerator beanNameGenerator = (beanDefinition, beanDefinitionRegistry) -> {
Class clazz = (Class) beanDefinition.getPropertyValues().get("clazz");
return clazz.getName();
};
scanner.setBeanNameGenerator(beanNameGenerator);
scanner.addIncludeFilter(new AnnotationTypeFilter(RpcService.class));
// 扫描到的合法 candidate 自动注册
scanner.scan(basePackages);
}
}
心跳机制
为了维持有效的长连接,需要增加心跳保活机制;
@Slf4j
public class ClientHeartHandler extends AbsHeartHandler {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleState state = ((IdleStateEvent) evt).state();
if (state == IdleState.WRITER_IDLE) {
log.info("[rpc-core] 客户端空闲,发送心跳包");
sendHeart(ctx, EMessageType.PING);
} else if (state == IdleState.READER_IDLE) {
log.info("[rpc-core] 服务端响应超时,关闭此连接");
ctx.channel().close();
}
} else {
super.userEventTriggered(ctx, evt);
}
}
}
此处“有效”不仅指已建立的连接,还包括能够正常请求响应的连接,避免因服务过载或网络波动等原因导致通信异常。
负载均衡
请求广播
在集群环境中,同步各节点上的数据很有必要,例如本地缓存,线程池配置等;
也可以用 MQ 的集群广播模式实现;
完整代码
https://github.com/caojiantao/rpc