一. 基础
- 使用 OpenFeign 流程
- 项目中引入 OpenFeign 依赖
- 启动类使用 @EnableFeignClients 修饰开启OpenFeign
- 服务消费方编写调用接口,该接口使@FeignClient(name = “服务名称”, fallback = IHotelInfoClient.HystrixClientFallbackImpl.class, url=“指定ip”)修饰,接口内部抽象方法使用RestFull注解修饰,指定请求uri与需要调用的接口uri,入参反参保持一致
- 服务消费方直接调用这个接口即可
- OpenFeign 内部封装了 Ribbon
- 查看@FeignClient
package org.springframework.cloud.openfeign;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import org.springframework.core.annotation.AliasFor;
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface FeignClient {
//nam与value相同,value优先级高于name,服务名称,可选,当输入服务名称后,在调用时会自动在服务名称前添加"http://"
@AliasFor("name")
String value() default "";
@AliasFor("value")
String name() default "";
/** @deprecated 已标记为禁用,与nam,value相同优先级最低*/
@Deprecated
String serviceId() default "";
//与name valu serverId功能相同,优先级最高
String contextId() default "";
//当使用继承出现多个FeignClient时通过该属性指定哪一个,与@Qualifier配合使用
String qualifier() default "";
//指定调用地址,设置该属性后,name,value,负载均衡等失效
String url() default "";
boolean decode404() default false;
Class<?>[] configuration() default {};
Class<?> fallback() default void.class;
Class<?> fallbackFactory() default void.class;
//等同于RequestMapping("/类上修饰的路径")
String path() default "";
//优先加载
boolean primary() default true;
}
二. 初始化注册
- 查看 @EnableFeignClients 开启Feign客户端功能注解,该注解重点关注使用@Import向容器中导入了一个 FeignClientsRegistrar
package org.springframework.cloud.openfeign;
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE})
@Documented
@Import({FeignClientsRegistrar.class})
public @interface EnableFeignClients {
String[] value() default {};
String[] basePackages() default {};
Class<?>[] basePackageClasses() default {};
Class<?>[] defaultConfiguration() default {};
Class<?>[] clients() default {};
}
- FeignClientsRegistrar 实现了ImportBeanDefinitionRegistrar, ResourceLoaderAware, EnvironmentAware三个接口,首先实现ImportBeanDefinitionRegistrar的 registerBeanDefinitions()方法
public void registerBeanDefinitions(AnnotationMetadata metadata, BeanDefinitionRegistry registry) {
//1.拿到@EnableFeignClints注解修饰的类的全限定名与注解的defaultConfiguration属性,
//构建出用来生成FeignClientSpecification的BeanDefinition(FeignClientSpecification: FeignClient的生成规范)
this.registerDefaultConfiguration(metadata, registry);
//2.主要完成三项工作:
//第一: 扫描项目中所有被@FeignClient修饰的类
//第二: 获取每个@FeignClient注解中的configuration注册到容器中
//第三: 根据@FeignClient注解修饰的元数据生成FeignClientFactoryBean的BeanDefinition
//并将BeanDefinition注入到容器中
this.registerFeignClients(metadata, registry);
}
- 创建用来生成FeignClientSpecification的 BeanDefinition, FeignClientSpecification: FeignClient的生成规范
private void registerDefaultConfiguration(AnnotationMetadata metadata, BeanDefinitionRegistry registry) {
//1.获取@EanbleFeignClients注解属性(第二个参数为ture,表示将属性类型转换为String,返回为Map的key,防止提前加载)
//metadata: 被@EanbleFeignClients修饰的类
Map<String, Object> defaultAttrs = metadata.getAnnotationAttributes(EnableFeignClients.class.getName(), true);
//2.如果属性不为空,并且存在defaultConfiguration属性,进入if
if (defaultAttrs != null && defaultAttrs.containsKey("defaultConfiguration")) {
//2.1判断metadata是否是闭合类(内部类),获取类名
String name;
if (metadata.hasEnclosingClass()) {
//如果是内部类
name = "default." + metadata.getEnclosingClassName();
} else {
//假设是外部类,到此处name="default.被@EanbleFeignClients修饰的类的全限定名"
name = "default." + metadata.getClassName();
}
//3.拿到的被@EanbleFeignClients修饰类的"defaule+权限定名",与@EanbleFeignClients中defaultConfiguration属性
//生成FeignClientSpecification
this.registerClientConfiguration(registry, name, defaultAttrs.get("defaultConfiguration"));
}
}
//第三步执行的方法
private void registerClientConfiguration(BeanDefinitionRegistry registry, Object name, Object configuration) {
//1.创建一把Builder
BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(FeignClientSpecification.class);
builder.addConstructorArgValue(name);
builder.addConstructorArgValue(configuration);
//2通过这个Builder构建出BeanDefinition实例,后续会使用这个BeanDefinition生成FeignClientSpecification实例,
//FeignClientSpecification的作用: 在后续会用来作为FeignClient的生成规范
registry.registerBeanDefinition(name + "." + FeignClientSpecification.class.getSimpleName(), builder.getBeanDefinition());
}
- 扫描@FeignClient,创建 FeignClientFactoryBean
public void registerFeignClients(AnnotationMetadata metadata, BeanDefinitionRegistry registry) {
ClassPathScanningCandidateComponentProvider scanner = this.getScanner();
scanner.setResourceLoader(this.resourceLoader);
//1.获取@EnableFeignClients注解属性
Map<String, Object> attrs = metadata.getAnnotationAttributes(EnableFeignClients.class.getName());
AnnotationTypeFilter annotationTypeFilter = new AnnotationTypeFilter(FeignClient.class);
//2.获取注解中的clients属性值
Class<?>[] clients = attrs == null ? null : (Class[])((Class[])attrs.get("clients"));
Object basePackages;
//判断是否存在clients属性值,也就是扫描项目中被@FeignClient
//如果clients属性不为空,直接通过clients属性值扫描获取@FeignClient
//否则注册扫描器,为扫描器添加filter扫描@FeignClient注解
if (clients != null && clients.length != 0) {
final Set<String> clientClasses = new HashSet();
basePackages = new HashSet();
Class[] var9 = clients;
int var10 = clients.length;
for(int var11 = 0; var11 < var10; ++var11) {
Class<?> clazz = var9[var11];
((Set)basePackages).add(ClassUtils.getPackageName(clazz));
clientClasses.add(clazz.getCanonicalName());
}
AbstractClassTestingTypeFilter filter = new AbstractClassTestingTypeFilter() {
protected boolean match(ClassMetadata metadata) {
String cleaned = metadata.getClassName().replaceAll("\\$", ".");
return clientClasses.contains(cleaned);
}
};
scanner.addIncludeFilter(new FeignClientsRegistrar.AllTypeFilter(Arrays.asList(filter, annotationTypeFilter)));
} else {
scanner.addIncludeFilter(annotationTypeFilter);
basePackages = this.getBasePackages(metadata);
}
//2====end 自此已经拿到了项目中所有的@FeignClient注解修饰了接口,与注解中的所有属性值==========
Iterator var17 = ((Set)basePackages).iterator();
while(var17.hasNext()) {
String basePackage = (String)var17.next();
Set<BeanDefinition> candidateComponents = scanner.findCandidateComponents(basePackage);
Iterator var21 = candidateComponents.iterator();
while(var21.hasNext()) {
BeanDefinition candidateComponent = (BeanDefinition)var21.next();
//3.判断是否是Feign接口组件
if (candidateComponent instanceof AnnotatedBeanDefinition) {
AnnotatedBeanDefinition beanDefinition = (AnnotatedBeanDefinition)candidateComponent;
//3.1获取@FeignClient注解元数据
AnnotationMetadata annotationMetadata = beanDefinition.getMetadata();
//3.2判断被@FeignClient注解修饰的是不是接口,如果不是抛出异常
Assert.isTrue(annotationMetadata.isInterface(), "@FeignClient can only be specified on an interface");
//3.3获取@FeignClient所有属性
Map<String, Object> attributes = annotationMetadata.getAnnotationAttributes(FeignClient.class.getCanonicalName());
//3.4获取属性中的contextId|value|name|serverId
String name = this.getClientName(attributes);
//3.5将当前扫描到的@FeignClient中的configuration属性注入到容器中
this.registerClientConfiguration(registry, name, attributes.get("configuration"));
//3.6将FeignClientFactoryBean的BeanDefinition注册到容器中
this.registerFeignClient(registry, annotationMetadata, attributes);
}
}
}
}
//3.6执行的方法
private void registerFeignClient(BeanDefinitionRegistry registry, AnnotationMetadata annotationMetadata, Map<String, Object> attributes) {
String className = annotationMetadata.getClassName();
BeanDefinitionBuilder definition = BeanDefinitionBuilder.genericBeanDefinition(FeignClientFactoryBean.class);
this.validate(attributes);
definition.addPropertyValue("url", this.getUrl(attributes));
definition.addPropertyValue("path", this.getPath(attributes));
String name = this.getName(attributes);
definition.addPropertyValue("name", name);
String contextId = this.getContextId(attributes);
definition.addPropertyValue("contextId", contextId);
definition.addPropertyValue("type", className);
definition.addPropertyValue("decode404", attributes.get("decode404"));
definition.addPropertyValue("fallback", attributes.get("fallback"));
definition.addPropertyValue("fallbackFactory", attributes.get("fallbackFactory"));
definition.setAutowireMode(2);
String alias = contextId + "FeignClient";
AbstractBeanDefinition beanDefinition = definition.getBeanDefinition();
boolean primary = (Boolean)attributes.get("primary");
beanDefinition.setPrimary(primary);
String qualifier = this.getQualifier(attributes);
if (StringUtils.hasText(qualifier)) {
alias = qualifier;
}
BeanDefinitionHolder holder = new BeanDefinitionHolder(beanDefinition, className, new String[]{alias});
BeanDefinitionReaderUtils.registerBeanDefinition(holder, registry);
}
三. FeignClient 自动配置
- 上一步已经初始化注册成功, 开始自动装配, 根据@SpringApplication自动装配META-INF下spring.factories配置文件中的类,org.springframework.cloud:spring-cloud-openfeign-core下META-INF下spring.factories,找到自动配置类FeignAutoConfiguration
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.springframework.cloud.openfeign.ribbon.FeignRibbonClientAutoConfiguration,\
org.springframework.cloud.openfeign.FeignAutoConfiguration,\
org.springframework.cloud.openfeign.encoding.FeignAcceptGzipEncodingAutoConfiguration,\
org.springframework.cloud.openfeign.encoding.FeignContentGzipEncodingAutoConfiguration
- 查看 FeignAutoConfiguration 通过这个类向容器中注入了重点关注的几个类:
- FeignContext : Feign上下文
package org.springframework.cloud.openfeign;
import ...
@Configuration
@ConditionalOnClass({Feign.class})
@EnableConfigurationProperties({FeignClientProperties.class, FeignHttpClientProperties.class})
public class FeignAutoConfiguration {
@Autowired(
required = false
)
private List<FeignClientSpecification> configurations = new ArrayList();
public FeignAutoConfiguration() {
}
@Bean
public HasFeatures feignFeature() {
return HasFeatures.namedFeature("Feign", Feign.class);
}
@Bean
public FeignContext feignContext() {
FeignContext context = new FeignContext();
context.setConfigurations(this.configurations);
return context;
}
@Configuration
@ConditionalOnClass({OkHttpClient.class})
@ConditionalOnMissingClass({"com.netflix.loadbalancer.ILoadBalancer"})
@ConditionalOnMissingBean({okhttp3.OkHttpClient.class})
@ConditionalOnProperty({"feign.okhttp.enabled"})
protected static class OkHttpFeignConfiguration {
private okhttp3.OkHttpClient okHttpClient;
protected OkHttpFeignConfiguration() {
}
@Bean
@ConditionalOnMissingBean({ConnectionPool.class})
public ConnectionPool httpClientConnectionPool(FeignHttpClientProperties httpClientProperties, OkHttpClientConnectionPoolFactory connectionPoolFactory) {
Integer maxTotalConnections = httpClientProperties.getMaxConnections();
Long timeToLive = httpClientProperties.getTimeToLive();
TimeUnit ttlUnit = httpClientProperties.getTimeToLiveUnit();
return connectionPoolFactory.create(maxTotalConnections, timeToLive, ttlUnit);
}
@Bean
public okhttp3.OkHttpClient client(OkHttpClientFactory httpClientFactory, ConnectionPool connectionPool, FeignHttpClientProperties httpClientProperties) {
Boolean followRedirects = httpClientProperties.isFollowRedirects();
Integer connectTimeout = httpClientProperties.getConnectionTimeout();
Boolean disableSslValidation = httpClientProperties.isDisableSslValidation();
this.okHttpClient = httpClientFactory.createBuilder(disableSslValidation).connectTimeout((long)connectTimeout, TimeUnit.MILLISECONDS).followRedirects(followRedirects).connectionPool(connectionPool).build();
return this.okHttpClient;
}
@PreDestroy
public void destroy() {
if (this.okHttpClient != null) {
this.okHttpClient.dispatcher().executorService().shutdown();
this.okHttpClient.connectionPool().evictAll();
}
}
@Bean
@ConditionalOnMissingBean({Client.class})
public Client feignClient(okhttp3.OkHttpClient client) {
return new OkHttpClient(client);
}
}
@Configuration
@ConditionalOnClass({ApacheHttpClient.class})
@ConditionalOnMissingClass({"com.netflix.loadbalancer.ILoadBalancer"})
@ConditionalOnMissingBean({CloseableHttpClient.class})
@ConditionalOnProperty(
value = {"feign.httpclient.enabled"},
matchIfMissing = true
)
protected static class HttpClientFeignConfiguration {
private final Timer connectionManagerTimer = new Timer("FeignApacheHttpClientConfiguration.connectionManagerTimer", true);
@Autowired(
required = false
)
private RegistryBuilder registryBuilder;
private CloseableHttpClient httpClient;
protected HttpClientFeignConfiguration() {
}
@Bean
@ConditionalOnMissingBean({HttpClientConnectionManager.class})
public HttpClientConnectionManager connectionManager(ApacheHttpClientConnectionManagerFactory connectionManagerFactory, FeignHttpClientProperties httpClientProperties) {
final HttpClientConnectionManager connectionManager = connectionManagerFactory.newConnectionManager(httpClientProperties.isDisableSslValidation(), httpClientProperties.getMaxConnections(), httpClientProperties.getMaxConnectionsPerRoute(), httpClientProperties.getTimeToLive(), httpClientProperties.getTimeToLiveUnit(), this.registryBuilder);
this.connectionManagerTimer.schedule(new TimerTask() {
public void run() {
connectionManager.closeExpiredConnections();
}
}, 30000L, (long)httpClientProperties.getConnectionTimerRepeat());
return connectionManager;
}
@Bean
public CloseableHttpClient httpClient(ApacheHttpClientFactory httpClientFactory, HttpClientConnectionManager httpClientConnectionManager, FeignHttpClientProperties httpClientProperties) {
RequestConfig defaultRequestConfig = RequestConfig.custom().setConnectTimeout(httpClientProperties.getConnectionTimeout()).setRedirectsEnabled(httpClientProperties.isFollowRedirects()).build();
this.httpClient = httpClientFactory.createBuilder().setConnectionManager(httpClientConnectionManager).setDefaultRequestConfig(defaultRequestConfig).build();
return this.httpClient;
}
@Bean
@ConditionalOnMissingBean({Client.class})
public Client feignClient(HttpClient httpClient) {
return new ApacheHttpClient(httpClient);
}
@PreDestroy
public void destroy() throws Exception {
this.connectionManagerTimer.cancel();
if (this.httpClient != null) {
this.httpClient.close();
}
}
}
@Configuration
@ConditionalOnMissingClass({"feign.hystrix.HystrixFeign"})
protected static class DefaultFeignTargeterConfiguration {
protected DefaultFeignTargeterConfiguration() {
}
@Bean
@ConditionalOnMissingBean
public Targeter feignTargeter() {
return new DefaultTargeter();
}
}
@Configuration
@ConditionalOnClass(
name = {"feign.hystrix.HystrixFeign"}
)
protected static class HystrixFeignTargeterConfiguration {
protected HystrixFeignTargeterConfiguration() {
}
@Bean
@ConditionalOnMissingBean
public Targeter feignTargeter() {
return new HystrixTargeter();
}
}
}
四. FeignClient 创建
- 前面在初始化时已经生成了FeignClientFactoryBean,首先该类实现了FactoryBean接口,在项目运行时通过重写的getObject()方法返回FeignClient ,实际是通过JDK动态代理Proxy.newPoxyInstance()生成
class FeignClientFactoryBean implements FactoryBean<Object>, InitializingBean, ApplicationContextAware {
public Object getObject() throws Exception {
return this.getTarget();
}
<T> T getTarget() {
//1.从容器中获取到FeignContext上下文
FeignContext context = (FeignContext)this.applicationContext.getBean(FeignContext.class);
//2.从Context中获取当前FeignClient需要的参数创建Builder
Builder builder = this.feign(context);
//3.如果当前FeingClient没有设置url属性,不是根据指定地址进行调用,
//获取FeingClient的contextId|name|value|serverId拼接http://
//可以理解为以服务名负载均衡方式进行调用
if (!StringUtils.hasText(this.url)) {
if (!this.name.startsWith("http")) {
this.url = "http://" + this.name;
} else {
this.url = this.name;
}
//4.获取FeingClient的path属性,若存在则根据规则拼接
this.url = this.url + this.cleanPath();
//5.负载均衡调用
return this.loadBalance(builder, context, new HardCodedTarget(this.type, this.name, this.url));
} else {
if (StringUtils.hasText(this.url) && !this.url.startsWith("http")) {
this.url = "http://" + this.url;
}
String url = this.url + this.cleanPath();
Client client = (Client)this.getOptional(context, Client.class);
if (client != null) {
if (client instanceof LoadBalancerFeignClient) {
client = ((LoadBalancerFeignClient)client).getDelegate();
}
builder.client(client);
}
Targeter targeter = (Targeter)this.get(context, Targeter.class);
//6.跟随这段代码,可以发现实际FeignClient是通过JDK动态代理生成
return targeter.target(this, builder, context, new HardCodedTarget(this.type, this.name, url));
}
}
}
- 观察代码发现当设置@FeignClient的url属性时,不会使用负载均衡,负责使用ribbon进行负载均衡
- 查看第六步中执行的HystrixTargeter下的 target()方法
public <T> T target(FeignClientFactoryBean factory, Builder feign, FeignContext context, HardCodedTarget<T> target) {
if (!(feign instanceof feign.hystrix.HystrixFeign.Builder)) {
//默认情况下会执行到此处
return feign.target(target);
} else {
feign.hystrix.HystrixFeign.Builder builder = (feign.hystrix.HystrixFeign.Builder)feign;
SetterFactory setterFactory = (SetterFactory)this.getOptional(factory.getName(), context, SetterFactory.class);
if (setterFactory != null) {
builder.setterFactory(setterFactory);
}
Class<?> fallback = factory.getFallback();
if (fallback != Void.TYPE) {
return this.targetWithFallback(factory.getName(), context, target, builder, fallback);
} else {
Class<?> fallbackFactory = factory.getFallbackFactory();
return fallbackFactory != Void.TYPE ? this.targetWithFallbackFactory(factory.getName(), context, target, builder, fallbackFactory) : feign.target(target);
}
}
}
Feign下的 target(),
- 会获取@FeignClient接口中的所有方法,创建对应每个方法的方法处理器SynchronousMethodHandler
- 创建调用处理器拿到InvocationHandler ,实际返回的是FeignInvocationHandler
- 通过动态代理生成针对每个方法的代理类Proxy.newProxyInstance()
public <T> T target(Target<T> target) {
return this.build().newInstance(target);
}
public <T> T newInstance(Target<T> target) {
//1.获取方法处理器,假设被@FeignClient修饰的接口中有三个抽象方法,用来调用三方接口,那么会有三个方法处理器
//后续访问接口时,就是通过这个方法处理器进行处理,最终返回的map中key是方法名,value是对应该方法的方法处理器
//SynchronousMethodHandler(接口中可以写抽象接口,也可写默认方法,根据这两种情况出现了两种方法处理器,
//一个是对应抽象接口的方法处理器,一个是对应默认方法的方法处理器)
Map<String, MethodHandler> nameToHandler = this.targetToHandlersByName.apply(target);
Map<Method, MethodHandler> methodToHandler = new LinkedHashMap();
List<DefaultMethodHandler> defaultMethodHandlers = new LinkedList();
Method[] var5 = target.type().getMethods();
int var6 = var5.length;
for(int var7 = 0; var7 < var6; ++var7) {
Method method = var5[var7];
if (method.getDeclaringClass() != Object.class) {
if (Util.isDefault(method)) {
DefaultMethodHandler handler = new DefaultMethodHandler(method);
defaultMethodHandlers.add(handler);
methodToHandler.put(method, handler);
} else {
methodToHandler.put(method, (MethodHandler)nameToHandler.get(Feign.configKey(target.type(), method)));
}
}
}
//2.创建调用处理器
InvocationHandler handler = this.factory.create(target, methodToHandler);
//3.动态代理生成
T proxy = Proxy.newProxyInstance(target.type().getClassLoader(), new Class[]{target.type()}, handler);
Iterator var12 = defaultMethodHandlers.iterator();
while(var12.hasNext()) {
DefaultMethodHandler defaultMethodHandler = (DefaultMethodHandler)var12.next();
defaultMethodHandler.bindTo(proxy);
}
return proxy;
}
五. 网络请求的发出
- 前面了解到通过Proxy.newProxyInstance()动态代理方式创建了FeignClient,通过InvocationHandler 处理器执行,查看创建InvocationHandler 的代码,创建的是FeignInvocationHandler
public InvocationHandler create(Target target, Map<Method, InvocationHandlerFactory.MethodHandler> dispatch) {
return new FeignInvocationHandler(target, dispatch);
}
- 查看 FeignInvocationHandler 这个类,实现了 InvocationHandler接口,并重写了invoke()方法,在调用接口执行时,实际执行的是这个invoke方法
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
//1.判断排除equals(),hashCode()
if (!"equals".equals(method.getName())) {
if ("hashCode".equals(method.getName())) {
return this.hashCode();
} else {
//2.正常情况下执行到此处((MethodHandler)this.dispatch.get(method)).invoke(args);通过执行的方法名
//获取该方法对应的方法处理器,执行方法处理器的invoke(),(注意点如果当前执行的方法在@FeignClient修饰的接口中是
//默认方法存在,那么对应默认方法的方法处理器执行会直接执行本地方法)
return "toString".equals(method.getName()) ? this.toString() : ((MethodHandler)this.dispatch.get(method)).invoke(args);
}
} else {
try {
Object otherHandler = args.length > 0 && args[0] != null ? Proxy.getInvocationHandler(args[0]) : null;
return this.equals(otherHandler);
} catch (IllegalArgumentException var5) {
return false;
}
}
}
- 前面在FeignClient 创建中了解到,项目启动时会扫描@FeignClient获取接口中的所有方法,创建对应每个方法的方法处理器SynchronousMethodHandler,通过动态代理生成代理类,执行时拿到代理类的处理执行器FeignInvocationHandler 先执行invoke,而实际的逻辑是在处理器的invoke()方法中执行的方法处理器的SynchronousMethodHandler 的invoke()方法,查看该方法
public Object invoke(Object[] argv) throws Throwable {
RequestTemplate template = this.buildTemplateFromArgs.create(argv);
Retryer retryer = this.retryer.clone();
while(true) {
try {
//执行该方法,会继续调用executeAndDecode该方法的重载方法
return this.executeAndDecode(template);
} catch (RetryableException var8) {
RetryableException e = var8;
try {
retryer.continueOrPropagate(e);
} catch (RetryableException var7) {
Throwable cause = var7.getCause();
if (this.propagationPolicy == ExceptionPropagationPolicy.UNWRAP && cause != null) {
throw cause;
}
throw var7;
}
if (this.logLevel != Level.NONE) {
this.logger.logRetry(this.metadata.configKey(), this.logLevel);
}
}
}
}
- 查看Client下execute()方法通过,HttpURLConnection 拿到连接发送的
public Response execute(Request request, Options options) throws IOException {
HttpURLConnection connection = this.convertAndSend(request, options);
return this.convertResponse(connection, request);
}
六. 负载均衡
- 在"五网络请求的发出"步骤中最终会执行一个client.execute()方法发出请求,在不使用Ribbon情况下,执行Client下的execute(),当开启Ribbon后则执行LoadBalancerFeignClient下的execute()方法
public Response execute(Request request, Options options) throws IOException {
try {
URI asUri = URI.create(request.url());
String clientName = asUri.getHost();
URI uriWithoutHost = cleanUrl(request.url(), clientName);
RibbonRequest ribbonRequest = new RibbonRequest(this.delegate, request, uriWithoutHost);
IClientConfig requestConfig = this.getClientConfig(options, clientName);
//执行到此处选择调用的主机
return ((RibbonResponse)this.lbClient(clientName).executeWithLoadBalancer(ribbonRequest, requestConfig)).toResponse();
} catch (ClientException var8) {
IOException io = this.findIOException(var8);
if (io != null) {
throw io;
} else {
throw new RuntimeException(var8);
}
}
}
- AbstractLoadBalancerAwareClient 下
public T executeWithLoadBalancer(final S request, final IClientConfig requestConfig) throws ClientException {
LoadBalancerCommand command = this.buildLoadBalancerCommand(request, requestConfig);
try {
return (IResponse)command.submit(new ServerOperation<T>() {
public Observable<T> call(Server server) {
URI finalUri = AbstractLoadBalancerAwareClient.this.reconstructURIWithServer(server, request.getUri());
ClientRequest requestForServer = request.replaceUri(finalUri);
try {
return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig));
} catch (Exception var5) {
return Observable.error(var5);
}
}
}).toBlocking().single();
} catch (Exception var6) {
Throwable t = var6.getCause();
if (t instanceof ClientException) {
throw (ClientException)t;
} else {
throw new ClientException(var6);
}
}
}
- LoadBalancerCommand 下 submit()
public Observable<T> submit(final ServerOperation<T> operation) {
final LoadBalancerCommand<T>.ExecutionInfoContext context = new LoadBalancerCommand.ExecutionInfoContext();
if (this.listenerInvoker != null) {
try {
this.listenerInvoker.onExecutionStart();
} catch (AbortExecutionException var6) {
return Observable.error(var6);
}
}
final int maxRetrysSame = this.retryHandler.getMaxRetriesOnSameServer();
final int maxRetrysNext = this.retryHandler.getMaxRetriesOnNextServer();
//此处如果server为空,执行 this.selectServer()
Observable<T> o = (this.server == null ? this.selectServer() : Observable.just(this.server)).concatMap(new Func1<Server, Observable<T>>() {
public Observable<T> call(Server server) {
context.setServer(server);
final ServerStats stats = LoadBalancerCommand.this.loadBalancerContext.getServerStats(server);
Observable<T> o = Observable.just(server).concatMap(new Func1<Server, Observable<T>>() {
public Observable<T> call(final Server server) {
context.incAttemptCount();
LoadBalancerCommand.this.loadBalancerContext.noteOpenConnection(stats);
if (LoadBalancerCommand.this.listenerInvoker != null) {
try {
LoadBalancerCommand.this.listenerInvoker.onStartWithServer(context.toExecutionInfo());
} catch (AbortExecutionException var3) {
return Observable.error(var3);
}
}
final Stopwatch tracer = LoadBalancerCommand.this.loadBalancerContext.getExecuteTracer().start();
return operation.call(server).doOnEach(new Observer<T>() {
private T entity;
public void onCompleted() {
this.recordStats(tracer, stats, this.entity, (Throwable)null);
}
public void onError(Throwable e) {
this.recordStats(tracer, stats, (Object)null, e);
LoadBalancerCommand.logger.debug("Got error {} when executed on server {}", e, server);
if (LoadBalancerCommand.this.listenerInvoker != null) {
LoadBalancerCommand.this.listenerInvoker.onExceptionWithServer(e, context.toExecutionInfo());
}
}
public void onNext(T entity) {
this.entity = entity;
if (LoadBalancerCommand.this.listenerInvoker != null) {
LoadBalancerCommand.this.listenerInvoker.onExecutionSuccess(entity, context.toExecutionInfo());
}
}
private void recordStats(Stopwatch tracerx, ServerStats statsx, Object entity, Throwable exception) {
tracerx.stop();
LoadBalancerCommand.this.loadBalancerContext.noteRequestCompletion(statsx, entity, exception, tracerx.getDuration(TimeUnit.MILLISECONDS), LoadBalancerCommand.this.retryHandler);
}
});
}
});
if (maxRetrysSame > 0) {
o = o.retry(LoadBalancerCommand.this.retryPolicy(maxRetrysSame, true));
}
return o;
}
});
if (maxRetrysNext > 0 && this.server == null) {
o = o.retry(this.retryPolicy(maxRetrysNext, false));
}
return o.onErrorResumeNext(new Func1<Throwable, Observable<T>>() {
public Observable<T> call(Throwable e) {
if (context.getAttemptCount() > 0) {
if (maxRetrysNext > 0 && context.getServerAttemptCount() == maxRetrysNext + 1) {
e = new ClientException(ErrorType.NUMBEROF_RETRIES_NEXTSERVER_EXCEEDED, "Number of retries on next server exceeded max " + maxRetrysNext + " retries, while making a call for: " + context.getServer(), (Throwable)e);
} else if (maxRetrysSame > 0 && context.getAttemptCount() == maxRetrysSame + 1) {
e = new ClientException(ErrorType.NUMBEROF_RETRIES_EXEEDED, "Number of retries exceeded max " + maxRetrysSame + " retries, while making a call for: " + context.getServer(), (Throwable)e);
}
}
if (LoadBalancerCommand.this.listenerInvoker != null) {
LoadBalancerCommand.this.listenerInvoker.onExecutionFailed((Throwable)e, context.toFinalExecutionInfo());
}
return Observable.error((Throwable)e);
}
});
}
private Observable<Server> selectServer() {
return Observable.create(new OnSubscribe<Server>() {
public void call(Subscriber<? super Server> next) {
try {
//执行此处
Server server = LoadBalancerCommand.this.loadBalancerContext.getServerFromLoadBalancer(LoadBalancerCommand.this.loadBalancerURI, LoadBalancerCommand.this.loadBalancerKey);
next.onNext(server);
next.onCompleted();
} catch (Exception var3) {
next.onError(var3);
}
}
});
}
- LoadBalancerContext 下 getServerFromLoadBalancer()
public Server getServerFromLoadBalancer(@Nullable URI original, @Nullable Object loadBalancerKey) throws ClientException {
String host = null;
int port = -1;
if (original != null) {
host = original.getHost();
}
if (original != null) {
Pair<String, Integer> schemeAndPort = this.deriveSchemeAndPortFromPartialUri(original);
port = (Integer)schemeAndPort.second();
}
ILoadBalancer lb = this.getLoadBalancer();
if (host == null) {
if (lb != null) {
Server svc = lb.chooseServer(loadBalancerKey);
if (svc == null) {
throw new ClientException(ErrorType.GENERAL, "Load balancer does not have available server for client: " + this.clientName);
}
host = svc.getHost();
if (host == null) {
throw new ClientException(ErrorType.GENERAL, "Invalid Server for :" + svc);
}
logger.debug("{} using LB returned Server: {} for request {}", new Object[]{this.clientName, svc, original});
return svc;
}
if (this.vipAddresses != null && this.vipAddresses.contains(",")) {
throw new ClientException(ErrorType.GENERAL, "Method is invoked for client " + this.clientName + " with partial URI of (" + original + ") with no load balancer configured. Also, there are multiple vipAddresses and hence no vip address can be chosen to complete this partial uri");
}
if (this.vipAddresses == null) {
throw new ClientException(ErrorType.GENERAL, this.clientName + " has no LoadBalancer registered and passed in a partial URL request (with no host:port). Also has no vipAddress registered");
}
try {
Pair<String, Integer> hostAndPort = this.deriveHostAndPortFromVipAddress(this.vipAddresses);
host = (String)hostAndPort.first();
port = (Integer)hostAndPort.second();
} catch (URISyntaxException var8) {
throw new ClientException(ErrorType.GENERAL, "Method is invoked for client " + this.clientName + " with partial URI of (" + original + ") with no load balancer configured. Also, the configured/registered vipAddress is unparseable (to determine host and port)");
}
} else {
boolean shouldInterpretAsVip = false;
if (lb != null) {
shouldInterpretAsVip = this.isVipRecognized(original.getAuthority());
}
if (shouldInterpretAsVip) {
Server svc = lb.chooseServer(loadBalancerKey);
if (svc != null) {
host = svc.getHost();
if (host == null) {
throw new ClientException(ErrorType.GENERAL, "Invalid Server for :" + svc);
}
logger.debug("using LB returned Server: {} for request: {}", svc, original);
return svc;
}
logger.debug("{}:{} assumed to be a valid VIP address or exists in the DNS", host, port);
} else {
logger.debug("Using full URL passed in by caller (not using load balancer): {}", original);
}
}
if (host == null) {
throw new ClientException(ErrorType.GENERAL, "Request contains no HOST to talk to");
} else {
return new Server(host, port);
}
}
- ZoneAwareLoadBalancer 下 chooseServer()
public Server chooseServer(Object key) {
//会判断有几个Zone,多个时进入if否则执行下面
if (ENABLED.get() && this.getLoadBalancerStats().getAvailableZones().size() > 1) {
Server server = null;
try {
LoadBalancerStats lbStats = this.getLoadBalancerStats();
Map<String, ZoneSnapshot> zoneSnapshot = ZoneAvoidanceRule.createSnapshot(lbStats);
logger.debug("Zone snapshots: {}", zoneSnapshot);
if (this.triggeringLoad == null) {
this.triggeringLoad = DynamicPropertyFactory.getInstance().getDoubleProperty("ZoneAwareNIWSDiscoveryLoadBalancer." + this.getName() + ".triggeringLoadPerServerThreshold", 0.2D);
}
if (this.triggeringBlackoutPercentage == null) {
this.triggeringBlackoutPercentage = DynamicPropertyFactory.getInstance().getDoubleProperty("ZoneAwareNIWSDiscoveryLoadBalancer." + this.getName() + ".avoidZoneWithBlackoutPercetage", 0.99999D);
}
//1.获取到所有可用的zones
Set<String> availableZones = ZoneAvoidanceRule.getAvailableZones(zoneSnapshot, this.triggeringLoad.get(), this.triggeringBlackoutPercentage.get());
logger.debug("Available zones: {}", availableZones);
if (availableZones != null && availableZones.size() < zoneSnapshot.keySet().size()) {
//2.调用randomChooseZone()在获取到的所有zones中随机选择一个
String zone = ZoneAvoidanceRule.randomChooseZone(zoneSnapshot, availableZones);
logger.debug("Zone chosen: {}", zone);
if (zone != null) {
//在zone中获取到配置的负载策略(若没有配置则还是默认执行BaseLoadBalancer 下 chooseServer())
BaseLoadBalancer zoneLoadBalancer = this.getLoadBalancer(zone);
//根据负载策略获取到server
server = zoneLoadBalancer.chooseServer(key);
}
}
} catch (Exception var8) {
logger.error("Error choosing server using zone aware logic for load balancer={}", this.name, var8);
}
if (server != null) {
return server;
} else {
logger.debug("Zone avoidance logic is not invoked.");
return super.chooseServer(key);
}
} else {
//假设当前只有一个Zone执行此处,调用父类的chooseServer()方法
logger.debug("Zone aware logic disabled or there is only one zone");
return super.chooseServer(key);
}
}
- 只有一个zone时执行 BaseLoadBalancer 下 chooseServer()
public Server chooseServer(Object key) {
if (this.counter == null) {
this.counter = this.createCounter();
}
this.counter.increment();
if (this.rule == null) {
return null;
} else {
try {
//执行到此处,拿到IRule, 默认情况下IRule使用new RoundRobinRule() 轮询模式
return this.rule.choose(key);
} catch (Exception var3) {
logger.warn("LoadBalancer [{}]: Error choosing server for key {}", new Object[]{this.name, key, var3});
return null;
}
}
}
SpringCloud Loadbalancer
- 当项目中引入spring-cloud-starter-loadbalancer,并配置关闭Ribbon情况下,在执行负载时会使用SpringCloud Loadbalancer
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-loadbalancer</artifactId>
<version>3.0.4</version>
</dependency>
spring:
cloud:
loadbalancer:
ribbon:
enabled:false
- 会执行FeignBlockingLoadBalancerClient下的execute()方法
public Response execute(Request request, Options options) throws IOException {
URI originalUri = URI.create(request.url());
String serviceId = originalUri.getHost();
Assert.state(serviceId != null, "Request URI does not contain a valid hostname: " + originalUri);
String hint = this.getHint(serviceId);
DefaultRequest<RequestDataContext> lbRequest = new DefaultRequest(new RequestDataContext(LoadBalancerUtils.buildRequestData(request), hint));
Set<LoadBalancerLifecycle> supportedLifecycleProcessors = LoadBalancerLifecycleValidator.getSupportedLifecycleProcessors(this.loadBalancerClientFactory.getInstances(serviceId, LoadBalancerLifecycle.class), RequestDataContext.class, ResponseData.class, ServiceInstance.class);
supportedLifecycleProcessors.forEach((lifecycle) -> {
lifecycle.onStart(lbRequest);
});
ServiceInstance instance = this.loadBalancerClient.choose(serviceId, lbRequest);
org.springframework.cloud.client.loadbalancer.Response<ServiceInstance> lbResponse = new DefaultResponse(instance);
String message;
if (instance == null) {
message = "Load balancer does not contain an instance for the service " + serviceId;
if (LOG.isWarnEnabled()) {
LOG.warn(message);
}
supportedLifecycleProcessors.forEach((lifecycle) -> {
lifecycle.onComplete(new CompletionContext(Status.DISCARD, lbRequest, lbResponse));
});
return Response.builder().request(request).status(HttpStatus.SERVICE_UNAVAILABLE.value()).body(message, StandardCharsets.UTF_8).build();
} else {
message = this.loadBalancerClient.reconstructURI(instance, originalUri).toString();
Request newRequest = this.buildRequest(request, message);
return LoadBalancerUtils.executeWithLoadBalancerLifecycleProcessing(this.delegate, options, newRequest, lbRequest, lbResponse, supportedLifecycleProcessors);
}
}
- 调用BlockingLoadBalancerClient下choose()方法
public <T> ServiceInstance choose(String serviceId, Request<T> request) {
ReactiveLoadBalancer<ServiceInstance> loadBalancer = this.loadBalancerClientFactory.getInstance(serviceId);
if (loadBalancer == null) {
return null;
} else {
Response<ServiceInstance> loadBalancerResponse = (Response)Mono.from(loadBalancer.choose(request)).block();
return loadBalancerResponse == null ? null : (ServiceInstance)loadBalancerResponse.getServer();
}
}
- RoundRobinLoadBalancer 下 choose()方法
public Mono<Response<ServiceInstance>> choose(Request request) {
ServiceInstanceListSupplier supplier = (ServiceInstanceListSupplier)this.serviceInstanceListSupplierProvider.getIfAvailable(NoopServiceInstanceListSupplier::new);
return supplier.get(request).next().map((serviceInstances) -> {
//执行该方法
return this.processInstanceResponse(supplier, serviceInstances);
});
}
private Response<ServiceInstance> processInstanceResponse(ServiceInstanceListSupplier supplier, List<ServiceInstance> serviceInstances) {
//查看该方法
Response<ServiceInstance> serviceInstanceResponse = this.getInstanceResponse(serviceInstances);
if (supplier instanceof SelectedInstanceCallback && serviceInstanceResponse.hasServer()) {
((SelectedInstanceCallback)supplier).selectedServiceInstance((ServiceInstance)serviceInstanceResponse.getServer());
}
return serviceInstanceResponse;
}
private Response<ServiceInstance> getInstanceResponse(List<ServiceInstance> instances) {
if (instances.isEmpty()) {
if (log.isWarnEnabled()) {
log.warn("No servers available for service: " + this.serviceId);
}
return new EmptyResponse();
} else {
//取模使用轮训进行负载
int pos = Math.abs(this.position.incrementAndGet());
ServiceInstance instance = (ServiceInstance)instances.get(pos % instances.size());
return new DefaultResponse(instance);
}
}