接着上面的一篇关于Spring Cloud之Open Feign调用流程和源码分析,解析feign在rpc调用的时候lb的组成及底层工作流程。
关键组件介绍
- ServerList:可以响应客户端的特定服务的服务器列表。
- ServerListFilter:可以动态获得的具有所需特征的候选服务器列表的过滤器。
- ServerListUpdater:用于执行动态服务器列表更新。
-
Rule:负载均衡策略,用于确定从服务器列表返回哪个服务器。
- Ping:客户端用于快速检查服务器当时是否处于活动状态。
- LoadBalancer:负载均衡器,负责负载均衡调度的管理。
流程及源码分析
1.是如何加载到spring中的呢?了解spring boot加载jar包原理的应该知道,spring boot通过spi机制加载我们需要扩展的类,即在spring.factories 配置j类的全限定名称的形式。对于不太了解springboot的spi机制的同学,可以查看我的这篇jdk、springboot、dubbospi机制对比。LoadBalancerAutoConfiguration加载LoadBalancerRetryProperties(负载均衡器的是否开启重试)和LoadBalancerClient。
负载均衡器的是否开启重试配置类:
@ConfigurationProperties("spring.cloud.loadbalancer.retry")
public class LoadBalancerRetryProperties {
//...
}
2.@LoadBalanced的RestTemplate Bean添加了一个LoadBalancerInterceptor拦截器,而这个拦截器的作用就是对请求的URI进行转换获取到具体应该请求哪个服务实例。通过LoadBalancerClient执行具体的请求发送,通过默认的方法(默认策略)选取服务器。
配置类配置loadBalancerInterceptor拦截器:
LoadBalancerAutoConfiguration
@Bean
@ConditionalOnMissingBean
public RestTemplateCustomizer restTemplateCustomizer(final RetryLoadBalancerInterceptor loadBalancerInterceptor) {
return (restTemplate) -> {
List<ClientHttpRequestInterceptor> list = new ArrayList(restTemplate.getInterceptors());
list.add(loadBalancerInterceptor);
restTemplate.setInterceptors(list);
};
}
}
拦截器获取调用服务名称:
LoadBalancerInterceptor
public ClientHttpResponse intercept(final HttpRequest request, final byte[] body, final ClientHttpRequestExecution execution) throws IOException {
URI originalUri = request.getURI();
String serviceName = originalUri.getHost();
Assert.state(serviceName != null, "Request URI does not contain a valid hostname: " + originalUri);
return (ClientHttpResponse)this.loadBalancer.execute(serviceName, this.requestFactory.createRequest(request, body, execution));
}
LoadBalancerClient配置的默认路由策略(轮询):
RibbonLoadBalancerClient
protected Server getServer(ILoadBalancer loadBalancer) {
return loadBalancer == null ? null : loadBalancer.chooseServer("default");
}
3.从 RibbonLoadBalancerClient 代码可以看出,实际负载均衡的是通过 ILoadBalancer 来实现的,默认采用了ZoneAwareLoadBalancer来实现负载均衡器;
RibbonLoadBalancerClient
public <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException {
ILoadBalancer loadBalancer = this.getLoadBalancer(serviceId);
Server server = this.getServer(loadBalancer);
if (server == null) {
throw new IllegalStateException("No instances available for " + serviceId);
} else {
RibbonLoadBalancerClient.RibbonServer ribbonServer = new RibbonLoadBalancerClient.RibbonServer(serviceId, server, this.isSecure(server, serviceId), this.serverIntrospector(serviceId).getMetadata(server));
return this.execute(serviceId, ribbonServer, request);
}
}
lb的类图
- NoOpLoadBalancer:啥都不做
-
BaseLoadBalancer:
一个负载均衡器的基本实现,其中有一个任意列表,可以将服务器设置为服务器池。
可以设置一个ping来确定服务器的活力。
在内部,该类维护一个“all”服务器列表,以及一个“up”服务器列表,并根据调用者的要求使用它们。
- DynamicServerListLoadBalancer:
通过动态的获取服务器的候选列表的负载平衡器。
可以通过筛选标准来传递服务器列表,以过滤不符合所需条件的服务器。
用于测量区域条件的关键指标是平均活动请求,它根据每个rest客户机和每个区域聚合。这是区域内未完成的请求总数除以可用目标实例的数量(不包括断路器跳闸实例)。当在坏区上缓慢发生超时时,此度量非常有效。
该负载均衡器将计算并检查所有可用区域的区域状态。如果任何区域的平均活动请求已达到配置的阈值,则该区域将从活动服务器列表中删除。如果超过一个区域达到阈值,则将删除每个服务器上活动请求最多的区域。一旦去掉最坏的区域,将在其余区域中选择一个区域,其概率与其实例数成正比。服务器将使用给定的规则从所选区域返回。对于每个请求,将重复上述步骤。也就是说,每个与区域相关的负载平衡决策都是实时做出的,最新的统计数据可以帮助进行选择。
RibbonClient默认的连接超时、读取内容超时是1s:
public class RibbonClientConfiguration {
public static final int DEFAULT_CONNECT_TIMEOUT = 1000;
public static final int DEFAULT_READ_TIMEOUT = 1000;
@RibbonClientName
private String name = "client";
@Autowired
private PropertiesFactory propertiesFactory;
}
-
4.IClientConfig 用于对客户端或者负载均衡的配置,它的默认实现类为 DefaultClientConfigImpl;
-
RibbonClientConfiguration
@Bean
@ConditionalOnMissingBean
public ILoadBalancer ribbonLoadBalancer(IClientConfig config, ServerList<Server> serverList, ServerListFilter<Server> serverListFilter, IRule rule, IPing ping, ServerListUpdater serverListUpdater) {
return (ILoadBalancer)(this.propertiesFactory.isSet(ILoadBalancer.class, this.name) ? (ILoadBalancer)this.propertiesFactory.get(ILoadBalancer.class, config, this.name) : new ZoneAwareLoadBalancer(config, rule, ping, serverList, serverListFilter, serverListUpdater));
}
其中new ZoneAwareLoadBalancer(config, rule, ping, serverList, serverListFilter, serverListUpdater) 代码包含了lb使用到的所有组件,下面会一一说明。
public class DefaultClientConfigImpl implements IClientConfig {
public static final Boolean DEFAULT_PRIORITIZE_VIP_ADDRESS_BASED_SERVERS;
public static final String DEFAULT_NFLOADBALANCER_PING_CLASSNAME = "com.netflix.loadbalancer.DummyPing";
public static final String DEFAULT_NFLOADBALANCER_RULE_CLASSNAME = "com.netflix.loadbalancer.AvailabilityFilteringRule";
public static final String DEFAULT_NFLOADBALANCER_CLASSNAME = "com.netflix.loadbalancer.ZoneAwareLoadBalancer";
public static final boolean DEFAULT_USEIPADDRESS_FOR_SERVER;
public static final String DEFAULT_CLIENT_CLASSNAME = "com.netflix.niws.client.http.RestClient";
public static final String DEFAULT_VIPADDRESS_RESOLVER_CLASSNAME = "com.netflix.client.SimpleVipAddressResolver";
public static final String DEFAULT_PRIME_CONNECTIONS_URI = "/";
//...
}
5.Rule分类:
- BestAvailableRule:选择具有最低并发请求的服务器。
- ClientConfigEnabledRoundRobinRule:轮询。
- RandomRule:随机选择一个服务器。
- RoundRobinRule:轮询选择服务器。
- RetryRule:具备重试机制的轮询。
- WeightedResponseTimeRule:根据使用平均响应时间去分配一个weight(权重) ,weight越低,被选择的可能性就越低。
-
ZoneAvoidanceRule:根据区域和可用性筛选,再轮询选择服务器。
rule的类图
6.ping, serverList, serverListFilter, serverListUpdater组件介绍;
定义如何 “ping” 服务器以检查其是否存活;
ping的类图
ServerList
定义获取所有的服务实例清单。
- DomainExtractingServerList:代理类,根据传入的ServerList的值,实现具体的逻辑。
- ConfigurationBasedServerList:从配置文件中加载服务器列表。
- DiscoveryEnabledNIWSServerList:从Eureka注册中心中获取服务器列表。
- StaticServerList:通过静态配置来维护服务器列表。
serverList的类图
ServerListFilter
允许根据过滤配置动态获得的具有所需特性的候选服务器列表。
- DefaultNIWSServerListFilter:完全继承自ZoneAffinityServerListFilter。
- ZonePreferenceServerListFilter:EnableZoneAffinity 或 EnableZoneExclusivity 开启状态使用,默认关闭。处理基于区域感知的过滤服务器,过滤掉不和客户端在相同zone的服务,若不存在相同zone,则不进行过滤。
- ServerListSubsetFilter:服务器列表筛选器,它将负载平衡器使用的服务器数量限制为所有服务器的子集。如果服务器机群很大(例如数百个),并且不需要使用每一个机群并将连接保存在http客户机的连接池中,那么这是非常有用的。它还可以通过比较总的网络故障和并发连接来驱逐相对不健康的服务器。
ServerListFilter类图
ServerListUpdater
用于执行动态服务器列表更新。
serverListUpdater类图
- PollingServerListUpdater:默认的实现策略,会启动一个定时线程池,定时执行更新策略。
- EurekaNotificationServerListUpdater:利用Eureka的事件监听器来驱动服务列表的更新操作。