前言
Ribbon在进行Server过滤的时候,用到了一个重要的基础组件【AbstractServerPredicate】,它的作用就是在众多Server的列表中通过一定的过滤策略踢除不合格的Server,留下来合格的Server列表。
负载均衡策略的核心之一就是对已知的服务列表进行过滤,留下一堆合格的Server进而按照一定规则进行选择
源码解读
PredicateKey
它是一个不可变对象,代表一个断言key。用于断言【AbstractServerPredicate】的入参,也就是public boolean apply(@Nullable PredicateKey input) {}方法的参数,该方法在后面会有具体的实例展示。源码如下:
package com.netflix.loadbalancer;
public class PredicateKey {
private Object loadBalancerKey;
private Server server;
public PredicateKey(Object loadBalancerKey, Server server) {
this.loadBalancerKey = loadBalancerKey;
this.server = server;
}
public PredicateKey(Server server) {
this((Object)null, server);
}
public final Object getLoadBalancerKey() {
return this.loadBalancerKey;
}
public final Server getServer() {
return this.server;
}
}
上述源码中有两个属性,【loadBalancerKey】与【Server】。
【Server】表示服务实例;
【loadBalancerKey】则用于一个叫【IRule】的接口下的choose方法,即为key,其中【IRule】代表负载均衡策略。其源码如下:
package com.netflix.loadbalancer;
public interface IRule {
Server choose(Object var1);
void setLoadBalancer(ILoadBalancer var1);
ILoadBalancer getLoadBalancer();
}
Ribbon是一个为客户端提供负载均衡功能的服务,它内部提供了一个叫做ILoadBalance的接口代表负载均衡器的操作,比如有添加服务器操作、选择服务器操作、获取所有的服务器列表、获取可用的服务器列表等等。顾名思义,loadBalancerKey相当于负载均衡器的key。其他的在此不做延申。
AbstractServerPredicate
它是服务器过滤逻辑的基础组件,可用于rules and server list filters。它传入的是一个【PredicateKey】,含有一个【Server】和【loadBalancerKey】,由此可以通过服务器和负载均衡器来开发过滤服务器的逻辑。
该类的成员属性源码如下
public abstract class AbstractServerPredicate implements Predicate<PredicateKey> {
// 负载均衡器LoadBalancer规则:能从其获取到ILoadBalancer,从而得到一个对应的LoadBalancerStats实例
protected IRule rule;
// LoadBalancer状态信息。可以通过构造器指定/set方法指定,若没有指定的话将会从IRule里拿
private volatile LoadBalancerStats lbStats;
// 随机数。当过滤后还剩多台Server将从中随机获取
private final Random random = new Random();
// 下一个角标。用于轮询算法的指示
private final AtomicInteger nextIndex = new AtomicInteger();
// 一个特殊的Predicate:只有Server参数并无loadBalancerKey参数的PredicateKey,最终也是使用AbstractServerPredicate完成断言
private final Predicate<Server> serverOnlyPredicate;
public Predicate<Server> getServerOnlyPredicate() {
return this.serverOnlyPredicate;
}
}
该抽象类提供了三个静态工具方法,用于快速生成一个AbstractServerPredicate实例
该类的静态工具方法源码如下:
public static AbstractServerPredicate alwaysTrue() {
return new AbstractServerPredicate() {
public boolean apply(@Nullable PredicateKey input) {
return true;
}
};
}
public static AbstractServerPredicate ofKeyPredicate(final Predicate<PredicateKey> p) {
return new AbstractServerPredicate() {
@SuppressWarnings({"NP"})
public boolean apply(PredicateKey input) {
return p.apply(input);
}
};
}
public static AbstractServerPredicate ofServerPredicate(final Predicate<Server> p) {
return new AbstractServerPredicate() {
@SuppressWarnings({"NP"})
public boolean apply(PredicateKey input) {
return p.apply(input.getServer());
}
};
}
该类的成员方法源码如下:
// 得到负载均衡器对应的LoadBalancerStats实例
// 该方法为protected,在子类中会被调用用于判断
protected LoadBalancerStats getLBStats() {
if (this.lbStats != null) {
return this.lbStats;
} else if (this.rule != null) {
//从rule里面拿到ILoadBalancer,进而拿到LoadBalancerStats
ILoadBalancer lb = this.rule.getLoadBalancer();
if (lb instanceof AbstractLoadBalancer) {
LoadBalancerStats stats = ((AbstractLoadBalancer)lb).getLoadBalancerStats();
this.setLoadBalancerStats(stats);
return stats;
} else {
return null;
}
} else {
return null;
}
}
// Eligible:适合的
// 把servers通过Predicate#apply(PredicateKey)删选一把后,返回符合条件的Server们
// loadBalancerKey非必须的。
public List<Server> getEligibleServers(List<Server> servers) {
return this.getEligibleServers(servers, (Object)null);
}
public List<Server> getEligibleServers(List<Server> servers, Object loadBalancerKey) {
if (loadBalancerKey == null) {
return ImmutableList.copyOf(Iterables.filter(servers, this.getServerOnlyPredicate()));
} else {
List<Server> results = Lists.newArrayList();
Iterator var4 = servers.iterator();
//遍历服务清单,使用apply方法来判断实例是否需要保留,如果是,就添加到结果列表中
//所以apply方法需要在子类中实现,子类就可实现高级策略
while(var4.hasNext()) {
Server server = (Server)var4.next();
if (this.apply(new PredicateKey(loadBalancerKey, server))) {
results.add(server);
}
}
return results;
}
}
getEligibleServers方法内部调用断言方法Predicate#apply(PredicateKey)
完成Server的过滤,它属于AbstractServerPredicate的核心,因为【apply】方法在此处是唯一调用处,因此该方法重要。另外,需要注意的是apply方法(具体的过滤逻辑)在本抽象类是没有提供实现的,全在子类身上。
应用实例
过滤服务:AbstractDiscoveryEnabledPredicate类
import com.netflix.loadbalancer.AbstractServerPredicate;
import com.netflix.loadbalancer.PredicateKey;
import org.springframework.cloud.alibaba.nacos.ribbon.NacosServer;
import org.springframework.http.HttpHeaders;
import org.springframework.lang.Nullable;
/**
* 过滤服务
*
*/
public abstract class AbstractDiscoveryEnabledPredicate extends AbstractServerPredicate {
/**
* {@inheritDoc}
*/
@Override
public boolean apply(@Nullable PredicateKey input) {
return input != null
&& input.getServer() instanceof NacosServer
&& apply((NacosServer) input.getServer(), (HttpHeaders) input.getLoadBalancerKey());
}
/**
* Returns whether the specific {@link NacosServer} matches this predicate.
*
* @param server the discovered server
* @param headers 请求头
* @return whether the server matches the predicate
*/
abstract boolean apply(NacosServer server, HttpHeaders headers);
}
服务筛选:GrayMetadataAwarePredicate类
import cn.hutool.core.util.StrUtil;
import com.medusa.gruul.common.core.constant.CommonConstants;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.alibaba.nacos.ribbon.NacosServer;
import org.springframework.http.HttpHeaders;
import java.util.Map;
/**
* 基于 Metadata version 的服务筛选
*
*/
@Slf4j
public class GrayMetadataAwarePredicate extends AbstractDiscoveryEnabledPredicate {
@Override
protected boolean apply(NacosServer server, HttpHeaders headers) {
final Map<String, String> metadata = server.getMetadata();
String version = metadata.get(CommonConstants.VERSION);
// 判断Nacos服务是否有版本标签
if (StrUtil.isBlank(version)) {
log.trace("nacos未配置version字段,GrayMetadataAwarePredicate将被跳过");
return true;
}
// 判断请求中是否有版本
String target = headers.getFirst(CommonConstants.VERSION);
if (StrUtil.isBlank(target)) {
log.trace("请求头中未配置version字段,GrayMetadataAwarePredicate将被跳过");
return true;
}
log.info("请求版本:{} ,当前服务版本:{}", target, version);
return target.equals(version);
}
}
参考资料:
Ribbon服务器过滤逻辑的基础组件:AbstractServerPredicate