微服务框架相关 OpenFeign 源码

2023-10-26

一. 基础

  1. 使用 OpenFeign 流程
  1. 项目中引入 OpenFeign 依赖
  2. 启动类使用 @EnableFeignClients 修饰开启OpenFeign
  3. 服务消费方编写调用接口,该接口使@FeignClient(name = “服务名称”, fallback = IHotelInfoClient.HystrixClientFallbackImpl.class, url=“指定ip”)修饰,接口内部抽象方法使用RestFull注解修饰,指定请求uri与需要调用的接口uri,入参反参保持一致
  4. 服务消费方直接调用这个接口即可
  1. OpenFeign 内部封装了 Ribbon
  2. 查看@FeignClient
package org.springframework.cloud.openfeign;

import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import org.springframework.core.annotation.AliasFor;

@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface FeignClient {
	//nam与value相同,value优先级高于name,服务名称,可选,当输入服务名称后,在调用时会自动在服务名称前添加"http://"
    @AliasFor("name") 
    String value() default "";
    @AliasFor("value")
    String name() default "";

    /** @deprecated 已标记为禁用,与nam,value相同优先级最低*/
    @Deprecated
    String serviceId() default "";
	//与name valu serverId功能相同,优先级最高
    String contextId() default "";
	//当使用继承出现多个FeignClient时通过该属性指定哪一个,与@Qualifier配合使用
    String qualifier() default "";

	//指定调用地址,设置该属性后,name,value,负载均衡等失效
    String url() default "";
	
    boolean decode404() default false;

    Class<?>[] configuration() default {};

    Class<?> fallback() default void.class;

    Class<?> fallbackFactory() default void.class;
	
	//等同于RequestMapping("/类上修饰的路径")
    String path() default "";
	//优先加载
    boolean primary() default true;
}

二. 初始化注册

  1. 查看 @EnableFeignClients 开启Feign客户端功能注解,该注解重点关注使用@Import向容器中导入了一个 FeignClientsRegistrar
package org.springframework.cloud.openfeign;

@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE})
@Documented
@Import({FeignClientsRegistrar.class})
public @interface EnableFeignClients {
    String[] value() default {};

    String[] basePackages() default {};

    Class<?>[] basePackageClasses() default {};

    Class<?>[] defaultConfiguration() default {};

    Class<?>[] clients() default {};
}
  1. FeignClientsRegistrar 实现了ImportBeanDefinitionRegistrar, ResourceLoaderAware, EnvironmentAware三个接口,首先实现ImportBeanDefinitionRegistrar的 registerBeanDefinitions()方法
	public void registerBeanDefinitions(AnnotationMetadata metadata, BeanDefinitionRegistry registry) {
		//1.拿到@EnableFeignClints注解修饰的类的全限定名与注解的defaultConfiguration属性,
		//构建出用来生成FeignClientSpecification的BeanDefinition(FeignClientSpecification: FeignClient的生成规范)
        this.registerDefaultConfiguration(metadata, registry);
        //2.主要完成三项工作:
        //第一: 扫描项目中所有被@FeignClient修饰的类
        //第二: 获取每个@FeignClient注解中的configuration注册到容器中
        //第三: 根据@FeignClient注解修饰的元数据生成FeignClientFactoryBean的BeanDefinition
        //并将BeanDefinition注入到容器中
        this.registerFeignClients(metadata, registry);
    }
  1. 创建用来生成FeignClientSpecification的 BeanDefinition, FeignClientSpecification: FeignClient的生成规范
	private void registerDefaultConfiguration(AnnotationMetadata metadata, BeanDefinitionRegistry registry) {
		//1.获取@EanbleFeignClients注解属性(第二个参数为ture,表示将属性类型转换为String,返回为Map的key,防止提前加载)
		//metadata: 被@EanbleFeignClients修饰的类
        Map<String, Object> defaultAttrs = metadata.getAnnotationAttributes(EnableFeignClients.class.getName(), true);
        //2.如果属性不为空,并且存在defaultConfiguration属性,进入if
        if (defaultAttrs != null && defaultAttrs.containsKey("defaultConfiguration")) {
        	//2.1判断metadata是否是闭合类(内部类),获取类名
            String name;
            if (metadata.hasEnclosingClass()) {
            	//如果是内部类
                name = "default." + metadata.getEnclosingClassName();
            } else {
            	//假设是外部类,到此处name="default.被@EanbleFeignClients修饰的类的全限定名"
                name = "default." + metadata.getClassName();
            }
			//3.拿到的被@EanbleFeignClients修饰类的"defaule+权限定名",与@EanbleFeignClients中defaultConfiguration属性
			//生成FeignClientSpecification
            this.registerClientConfiguration(registry, name, defaultAttrs.get("defaultConfiguration"));
        }
    }

	//第三步执行的方法
	private void registerClientConfiguration(BeanDefinitionRegistry registry, Object name, Object configuration) {
		//1.创建一把Builder
        BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(FeignClientSpecification.class);
        builder.addConstructorArgValue(name);
        builder.addConstructorArgValue(configuration);
        //2通过这个Builder构建出BeanDefinition实例,后续会使用这个BeanDefinition生成FeignClientSpecification实例,
        //FeignClientSpecification的作用: 在后续会用来作为FeignClient的生成规范
        registry.registerBeanDefinition(name + "." + FeignClientSpecification.class.getSimpleName(), builder.getBeanDefinition());
    }
  1. 扫描@FeignClient,创建 FeignClientFactoryBean
	public void registerFeignClients(AnnotationMetadata metadata, BeanDefinitionRegistry registry) {
        ClassPathScanningCandidateComponentProvider scanner = this.getScanner();
        scanner.setResourceLoader(this.resourceLoader);
        //1.获取@EnableFeignClients注解属性
        Map<String, Object> attrs = metadata.getAnnotationAttributes(EnableFeignClients.class.getName());
        AnnotationTypeFilter annotationTypeFilter = new AnnotationTypeFilter(FeignClient.class);
        //2.获取注解中的clients属性值
        Class<?>[] clients = attrs == null ? null : (Class[])((Class[])attrs.get("clients"));
        Object basePackages;
        //判断是否存在clients属性值,也就是扫描项目中被@FeignClient
        //如果clients属性不为空,直接通过clients属性值扫描获取@FeignClient
        //否则注册扫描器,为扫描器添加filter扫描@FeignClient注解
        if (clients != null && clients.length != 0) {
            final Set<String> clientClasses = new HashSet();
            basePackages = new HashSet();
            Class[] var9 = clients;
            int var10 = clients.length;

            for(int var11 = 0; var11 < var10; ++var11) {
                Class<?> clazz = var9[var11];
                ((Set)basePackages).add(ClassUtils.getPackageName(clazz));
                clientClasses.add(clazz.getCanonicalName());
            }

            AbstractClassTestingTypeFilter filter = new AbstractClassTestingTypeFilter() {
                protected boolean match(ClassMetadata metadata) {
                    String cleaned = metadata.getClassName().replaceAll("\\$", ".");
                    return clientClasses.contains(cleaned);
                }
            };
            scanner.addIncludeFilter(new FeignClientsRegistrar.AllTypeFilter(Arrays.asList(filter, annotationTypeFilter)));
        } else {
            scanner.addIncludeFilter(annotationTypeFilter);
            basePackages = this.getBasePackages(metadata);
        }
        //2====end 自此已经拿到了项目中所有的@FeignClient注解修饰了接口,与注解中的所有属性值==========

        Iterator var17 = ((Set)basePackages).iterator();

        while(var17.hasNext()) {
            String basePackage = (String)var17.next();
            Set<BeanDefinition> candidateComponents = scanner.findCandidateComponents(basePackage);
            Iterator var21 = candidateComponents.iterator();

            while(var21.hasNext()) {
                BeanDefinition candidateComponent = (BeanDefinition)var21.next();
                //3.判断是否是Feign接口组件
                if (candidateComponent instanceof AnnotatedBeanDefinition) {
                    AnnotatedBeanDefinition beanDefinition = (AnnotatedBeanDefinition)candidateComponent;
                    //3.1获取@FeignClient注解元数据
                    AnnotationMetadata annotationMetadata = beanDefinition.getMetadata();
                    //3.2判断被@FeignClient注解修饰的是不是接口,如果不是抛出异常
                    Assert.isTrue(annotationMetadata.isInterface(), "@FeignClient can only be specified on an interface");
                    //3.3获取@FeignClient所有属性
                    Map<String, Object> attributes = annotationMetadata.getAnnotationAttributes(FeignClient.class.getCanonicalName());
                    //3.4获取属性中的contextId|value|name|serverId
                    String name = this.getClientName(attributes);
                    //3.5将当前扫描到的@FeignClient中的configuration属性注入到容器中
                    this.registerClientConfiguration(registry, name, attributes.get("configuration"));
                    //3.6将FeignClientFactoryBean的BeanDefinition注册到容器中
                    this.registerFeignClient(registry, annotationMetadata, attributes);
                }
            }
        }
    }

	//3.6执行的方法
	private void registerFeignClient(BeanDefinitionRegistry registry, AnnotationMetadata annotationMetadata, Map<String, Object> attributes) {
        String className = annotationMetadata.getClassName();
        BeanDefinitionBuilder definition = BeanDefinitionBuilder.genericBeanDefinition(FeignClientFactoryBean.class);
        this.validate(attributes);
        definition.addPropertyValue("url", this.getUrl(attributes));
        definition.addPropertyValue("path", this.getPath(attributes));
        String name = this.getName(attributes);
        definition.addPropertyValue("name", name);
        String contextId = this.getContextId(attributes);
        definition.addPropertyValue("contextId", contextId);
        definition.addPropertyValue("type", className);
        definition.addPropertyValue("decode404", attributes.get("decode404"));
        definition.addPropertyValue("fallback", attributes.get("fallback"));
        definition.addPropertyValue("fallbackFactory", attributes.get("fallbackFactory"));
        definition.setAutowireMode(2);
        String alias = contextId + "FeignClient";
        AbstractBeanDefinition beanDefinition = definition.getBeanDefinition();
        boolean primary = (Boolean)attributes.get("primary");
        beanDefinition.setPrimary(primary);
        String qualifier = this.getQualifier(attributes);
        if (StringUtils.hasText(qualifier)) {
            alias = qualifier;
        }

        BeanDefinitionHolder holder = new BeanDefinitionHolder(beanDefinition, className, new String[]{alias});
        BeanDefinitionReaderUtils.registerBeanDefinition(holder, registry);
    }

三. FeignClient 自动配置

  1. 上一步已经初始化注册成功, 开始自动装配, 根据@SpringApplication自动装配META-INF下spring.factories配置文件中的类,org.springframework.cloud:spring-cloud-openfeign-core下META-INF下spring.factories,找到自动配置类FeignAutoConfiguration
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.springframework.cloud.openfeign.ribbon.FeignRibbonClientAutoConfiguration,\
org.springframework.cloud.openfeign.FeignAutoConfiguration,\
org.springframework.cloud.openfeign.encoding.FeignAcceptGzipEncodingAutoConfiguration,\
org.springframework.cloud.openfeign.encoding.FeignContentGzipEncodingAutoConfiguration
  1. 查看 FeignAutoConfiguration 通过这个类向容器中注入了重点关注的几个类:
  1. FeignContext : Feign上下文
package org.springframework.cloud.openfeign;
import ...

@Configuration
@ConditionalOnClass({Feign.class})
@EnableConfigurationProperties({FeignClientProperties.class, FeignHttpClientProperties.class})
public class FeignAutoConfiguration {
    @Autowired(
        required = false
    )
    private List<FeignClientSpecification> configurations = new ArrayList();

    public FeignAutoConfiguration() {
    }

    @Bean
    public HasFeatures feignFeature() {
        return HasFeatures.namedFeature("Feign", Feign.class);
    }

    @Bean
    public FeignContext feignContext() {
        FeignContext context = new FeignContext();
        context.setConfigurations(this.configurations);
        return context;
    }

    @Configuration
    @ConditionalOnClass({OkHttpClient.class})
    @ConditionalOnMissingClass({"com.netflix.loadbalancer.ILoadBalancer"})
    @ConditionalOnMissingBean({okhttp3.OkHttpClient.class})
    @ConditionalOnProperty({"feign.okhttp.enabled"})
    protected static class OkHttpFeignConfiguration {
        private okhttp3.OkHttpClient okHttpClient;

        protected OkHttpFeignConfiguration() {
        }

        @Bean
        @ConditionalOnMissingBean({ConnectionPool.class})
        public ConnectionPool httpClientConnectionPool(FeignHttpClientProperties httpClientProperties, OkHttpClientConnectionPoolFactory connectionPoolFactory) {
            Integer maxTotalConnections = httpClientProperties.getMaxConnections();
            Long timeToLive = httpClientProperties.getTimeToLive();
            TimeUnit ttlUnit = httpClientProperties.getTimeToLiveUnit();
            return connectionPoolFactory.create(maxTotalConnections, timeToLive, ttlUnit);
        }

        @Bean
        public okhttp3.OkHttpClient client(OkHttpClientFactory httpClientFactory, ConnectionPool connectionPool, FeignHttpClientProperties httpClientProperties) {
            Boolean followRedirects = httpClientProperties.isFollowRedirects();
            Integer connectTimeout = httpClientProperties.getConnectionTimeout();
            Boolean disableSslValidation = httpClientProperties.isDisableSslValidation();
            this.okHttpClient = httpClientFactory.createBuilder(disableSslValidation).connectTimeout((long)connectTimeout, TimeUnit.MILLISECONDS).followRedirects(followRedirects).connectionPool(connectionPool).build();
            return this.okHttpClient;
        }

        @PreDestroy
        public void destroy() {
            if (this.okHttpClient != null) {
                this.okHttpClient.dispatcher().executorService().shutdown();
                this.okHttpClient.connectionPool().evictAll();
            }

        }

        @Bean
        @ConditionalOnMissingBean({Client.class})
        public Client feignClient(okhttp3.OkHttpClient client) {
            return new OkHttpClient(client);
        }
    }

    @Configuration
    @ConditionalOnClass({ApacheHttpClient.class})
    @ConditionalOnMissingClass({"com.netflix.loadbalancer.ILoadBalancer"})
    @ConditionalOnMissingBean({CloseableHttpClient.class})
    @ConditionalOnProperty(
        value = {"feign.httpclient.enabled"},
        matchIfMissing = true
    )
    protected static class HttpClientFeignConfiguration {
        private final Timer connectionManagerTimer = new Timer("FeignApacheHttpClientConfiguration.connectionManagerTimer", true);
        @Autowired(
            required = false
        )
        private RegistryBuilder registryBuilder;
        private CloseableHttpClient httpClient;

        protected HttpClientFeignConfiguration() {
        }

        @Bean
        @ConditionalOnMissingBean({HttpClientConnectionManager.class})
        public HttpClientConnectionManager connectionManager(ApacheHttpClientConnectionManagerFactory connectionManagerFactory, FeignHttpClientProperties httpClientProperties) {
            final HttpClientConnectionManager connectionManager = connectionManagerFactory.newConnectionManager(httpClientProperties.isDisableSslValidation(), httpClientProperties.getMaxConnections(), httpClientProperties.getMaxConnectionsPerRoute(), httpClientProperties.getTimeToLive(), httpClientProperties.getTimeToLiveUnit(), this.registryBuilder);
            this.connectionManagerTimer.schedule(new TimerTask() {
                public void run() {
                    connectionManager.closeExpiredConnections();
                }
            }, 30000L, (long)httpClientProperties.getConnectionTimerRepeat());
            return connectionManager;
        }

        @Bean
        public CloseableHttpClient httpClient(ApacheHttpClientFactory httpClientFactory, HttpClientConnectionManager httpClientConnectionManager, FeignHttpClientProperties httpClientProperties) {
            RequestConfig defaultRequestConfig = RequestConfig.custom().setConnectTimeout(httpClientProperties.getConnectionTimeout()).setRedirectsEnabled(httpClientProperties.isFollowRedirects()).build();
            this.httpClient = httpClientFactory.createBuilder().setConnectionManager(httpClientConnectionManager).setDefaultRequestConfig(defaultRequestConfig).build();
            return this.httpClient;
        }

        @Bean
        @ConditionalOnMissingBean({Client.class})
        public Client feignClient(HttpClient httpClient) {
            return new ApacheHttpClient(httpClient);
        }

        @PreDestroy
        public void destroy() throws Exception {
            this.connectionManagerTimer.cancel();
            if (this.httpClient != null) {
                this.httpClient.close();
            }

        }
    }

    @Configuration
    @ConditionalOnMissingClass({"feign.hystrix.HystrixFeign"})
    protected static class DefaultFeignTargeterConfiguration {
        protected DefaultFeignTargeterConfiguration() {
        }

        @Bean
        @ConditionalOnMissingBean
        public Targeter feignTargeter() {
            return new DefaultTargeter();
        }
    }

    @Configuration
    @ConditionalOnClass(
        name = {"feign.hystrix.HystrixFeign"}
    )
    protected static class HystrixFeignTargeterConfiguration {
        protected HystrixFeignTargeterConfiguration() {
        }

        @Bean
        @ConditionalOnMissingBean
        public Targeter feignTargeter() {
            return new HystrixTargeter();
        }
    }
}

四. FeignClient 创建

  1. 前面在初始化时已经生成了FeignClientFactoryBean,首先该类实现了FactoryBean接口,在项目运行时通过重写的getObject()方法返回FeignClient ,实际是通过JDK动态代理Proxy.newPoxyInstance()生成
class FeignClientFactoryBean implements FactoryBean<Object>, InitializingBean, ApplicationContextAware {
	
	public Object getObject() throws Exception {
        return this.getTarget();
	}

	<T> T getTarget() {
		//1.从容器中获取到FeignContext上下文
        FeignContext context = (FeignContext)this.applicationContext.getBean(FeignContext.class);
        //2.从Context中获取当前FeignClient需要的参数创建Builder
        Builder builder = this.feign(context);
        //3.如果当前FeingClient没有设置url属性,不是根据指定地址进行调用,
        //获取FeingClient的contextId|name|value|serverId拼接http://
        //可以理解为以服务名负载均衡方式进行调用
        if (!StringUtils.hasText(this.url)) {
            if (!this.name.startsWith("http")) {
                this.url = "http://" + this.name;
            } else {
                this.url = this.name;
            }
			//4.获取FeingClient的path属性,若存在则根据规则拼接
            this.url = this.url + this.cleanPath();
            //5.负载均衡调用
            return this.loadBalance(builder, context, new HardCodedTarget(this.type, this.name, this.url));
        } else {
            if (StringUtils.hasText(this.url) && !this.url.startsWith("http")) {
                this.url = "http://" + this.url;
            }

            String url = this.url + this.cleanPath();
            Client client = (Client)this.getOptional(context, Client.class);
            if (client != null) {
                if (client instanceof LoadBalancerFeignClient) {
                    client = ((LoadBalancerFeignClient)client).getDelegate();
                }

                builder.client(client);
            }

            Targeter targeter = (Targeter)this.get(context, Targeter.class);
            //6.跟随这段代码,可以发现实际FeignClient是通过JDK动态代理生成
            return targeter.target(this, builder, context, new HardCodedTarget(this.type, this.name, url));
        }
    }
}
  1. 观察代码发现当设置@FeignClient的url属性时,不会使用负载均衡,负责使用ribbon进行负载均衡
  2. 查看第六步中执行的HystrixTargeter下的 target()方法
	public <T> T target(FeignClientFactoryBean factory, Builder feign, FeignContext context, HardCodedTarget<T> target) {
        if (!(feign instanceof feign.hystrix.HystrixFeign.Builder)) {
        	//默认情况下会执行到此处
            return feign.target(target);
        } else {
            feign.hystrix.HystrixFeign.Builder builder = (feign.hystrix.HystrixFeign.Builder)feign;
            SetterFactory setterFactory = (SetterFactory)this.getOptional(factory.getName(), context, SetterFactory.class);
            if (setterFactory != null) {
                builder.setterFactory(setterFactory);
            }

            Class<?> fallback = factory.getFallback();
            if (fallback != Void.TYPE) {
                return this.targetWithFallback(factory.getName(), context, target, builder, fallback);
            } else {
                Class<?> fallbackFactory = factory.getFallbackFactory();
                return fallbackFactory != Void.TYPE ? this.targetWithFallbackFactory(factory.getName(), context, target, builder, fallbackFactory) : feign.target(target);
            }
        }
    }

Feign下的 target(),

  1. 会获取@FeignClient接口中的所有方法,创建对应每个方法的方法处理器SynchronousMethodHandler
  2. 创建调用处理器拿到InvocationHandler ,实际返回的是FeignInvocationHandler
  3. 通过动态代理生成针对每个方法的代理类Proxy.newProxyInstance()
	public <T> T target(Target<T> target) {
            return this.build().newInstance(target);
    }
	public <T> T newInstance(Target<T> target) {
		//1.获取方法处理器,假设被@FeignClient修饰的接口中有三个抽象方法,用来调用三方接口,那么会有三个方法处理器
		//后续访问接口时,就是通过这个方法处理器进行处理,最终返回的map中key是方法名,value是对应该方法的方法处理器
		//SynchronousMethodHandler(接口中可以写抽象接口,也可写默认方法,根据这两种情况出现了两种方法处理器,
		//一个是对应抽象接口的方法处理器,一个是对应默认方法的方法处理器)
        Map<String, MethodHandler> nameToHandler = this.targetToHandlersByName.apply(target);
        Map<Method, MethodHandler> methodToHandler = new LinkedHashMap();
        List<DefaultMethodHandler> defaultMethodHandlers = new LinkedList();
        Method[] var5 = target.type().getMethods();
        int var6 = var5.length;

        for(int var7 = 0; var7 < var6; ++var7) {
            Method method = var5[var7];
            if (method.getDeclaringClass() != Object.class) {
                if (Util.isDefault(method)) {
                    DefaultMethodHandler handler = new DefaultMethodHandler(method);
                    defaultMethodHandlers.add(handler);
                    methodToHandler.put(method, handler);
                } else {
                    methodToHandler.put(method, (MethodHandler)nameToHandler.get(Feign.configKey(target.type(), method)));
                }
            }
        }
		//2.创建调用处理器
        InvocationHandler handler = this.factory.create(target, methodToHandler);
        //3.动态代理生成
        T proxy = Proxy.newProxyInstance(target.type().getClassLoader(), new Class[]{target.type()}, handler);
        Iterator var12 = defaultMethodHandlers.iterator();

        while(var12.hasNext()) {
            DefaultMethodHandler defaultMethodHandler = (DefaultMethodHandler)var12.next();
            defaultMethodHandler.bindTo(proxy);
        }

        return proxy;
    }

五. 网络请求的发出

  1. 前面了解到通过Proxy.newProxyInstance()动态代理方式创建了FeignClient,通过InvocationHandler 处理器执行,查看创建InvocationHandler 的代码,创建的是FeignInvocationHandler
	public InvocationHandler create(Target target, Map<Method, InvocationHandlerFactory.MethodHandler> dispatch) {
            return new FeignInvocationHandler(target, dispatch);
    }
  1. 查看 FeignInvocationHandler 这个类,实现了 InvocationHandler接口,并重写了invoke()方法,在调用接口执行时,实际执行的是这个invoke方法
	public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
			//1.判断排除equals(),hashCode()
            if (!"equals".equals(method.getName())) {
                if ("hashCode".equals(method.getName())) {
                    return this.hashCode();
                } else {
                	//2.正常情况下执行到此处((MethodHandler)this.dispatch.get(method)).invoke(args);通过执行的方法名
                	//获取该方法对应的方法处理器,执行方法处理器的invoke(),(注意点如果当前执行的方法在@FeignClient修饰的接口中是
                	//默认方法存在,那么对应默认方法的方法处理器执行会直接执行本地方法)
                    return "toString".equals(method.getName()) ? this.toString() : ((MethodHandler)this.dispatch.get(method)).invoke(args);
                }
            } else {
                try {
                    Object otherHandler = args.length > 0 && args[0] != null ? Proxy.getInvocationHandler(args[0]) : null;
                    return this.equals(otherHandler);
                } catch (IllegalArgumentException var5) {
                    return false;
                }
            }
        }
  1. 前面在FeignClient 创建中了解到,项目启动时会扫描@FeignClient获取接口中的所有方法,创建对应每个方法的方法处理器SynchronousMethodHandler,通过动态代理生成代理类,执行时拿到代理类的处理执行器FeignInvocationHandler 先执行invoke,而实际的逻辑是在处理器的invoke()方法中执行的方法处理器的SynchronousMethodHandler 的invoke()方法,查看该方法
	public Object invoke(Object[] argv) throws Throwable {
        RequestTemplate template = this.buildTemplateFromArgs.create(argv);
        Retryer retryer = this.retryer.clone();

        while(true) {
            try {
            	//执行该方法,会继续调用executeAndDecode该方法的重载方法
                return this.executeAndDecode(template);
            } catch (RetryableException var8) {
                RetryableException e = var8;

                try {
                    retryer.continueOrPropagate(e);
                } catch (RetryableException var7) {
                    Throwable cause = var7.getCause();
                    if (this.propagationPolicy == ExceptionPropagationPolicy.UNWRAP && cause != null) {
                        throw cause;
                    }

                    throw var7;
                }

                if (this.logLevel != Level.NONE) {
                    this.logger.logRetry(this.metadata.configKey(), this.logLevel);
                }
            }
        }
    }

在这里插入图片描述

  1. 查看Client下execute()方法通过,HttpURLConnection 拿到连接发送的
	public Response execute(Request request, Options options) throws IOException {
            HttpURLConnection connection = this.convertAndSend(request, options);
            return this.convertResponse(connection, request);
    }

六. 负载均衡

  1. 在"五网络请求的发出"步骤中最终会执行一个client.execute()方法发出请求,在不使用Ribbon情况下,执行Client下的execute(),当开启Ribbon后则执行LoadBalancerFeignClient下的execute()方法
	public Response execute(Request request, Options options) throws IOException {
        try {
            URI asUri = URI.create(request.url());
            String clientName = asUri.getHost();
            URI uriWithoutHost = cleanUrl(request.url(), clientName);
            RibbonRequest ribbonRequest = new RibbonRequest(this.delegate, request, uriWithoutHost);
            IClientConfig requestConfig = this.getClientConfig(options, clientName);
            //执行到此处选择调用的主机
            return ((RibbonResponse)this.lbClient(clientName).executeWithLoadBalancer(ribbonRequest, requestConfig)).toResponse();
        } catch (ClientException var8) {
            IOException io = this.findIOException(var8);
            if (io != null) {
                throw io;
            } else {
                throw new RuntimeException(var8);
            }
        }
    }
  1. AbstractLoadBalancerAwareClient 下
	public T executeWithLoadBalancer(final S request, final IClientConfig requestConfig) throws ClientException {
        LoadBalancerCommand command = this.buildLoadBalancerCommand(request, requestConfig);

        try {
            return (IResponse)command.submit(new ServerOperation<T>() {
                public Observable<T> call(Server server) {
                    URI finalUri = AbstractLoadBalancerAwareClient.this.reconstructURIWithServer(server, request.getUri());
                    ClientRequest requestForServer = request.replaceUri(finalUri);

                    try {
                        return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig));
                    } catch (Exception var5) {
                        return Observable.error(var5);
                    }
                }
            }).toBlocking().single();
        } catch (Exception var6) {
            Throwable t = var6.getCause();
            if (t instanceof ClientException) {
                throw (ClientException)t;
            } else {
                throw new ClientException(var6);
            }
        }
    }
  1. LoadBalancerCommand 下 submit()
	public Observable<T> submit(final ServerOperation<T> operation) {
        final LoadBalancerCommand<T>.ExecutionInfoContext context = new LoadBalancerCommand.ExecutionInfoContext();
        if (this.listenerInvoker != null) {
            try {
                this.listenerInvoker.onExecutionStart();
            } catch (AbortExecutionException var6) {
                return Observable.error(var6);
            }
        }

        final int maxRetrysSame = this.retryHandler.getMaxRetriesOnSameServer();
        final int maxRetrysNext = this.retryHandler.getMaxRetriesOnNextServer();
        //此处如果server为空,执行 this.selectServer()
        Observable<T> o = (this.server == null ? this.selectServer() : Observable.just(this.server)).concatMap(new Func1<Server, Observable<T>>() {
            public Observable<T> call(Server server) {
                context.setServer(server);
                final ServerStats stats = LoadBalancerCommand.this.loadBalancerContext.getServerStats(server);
                Observable<T> o = Observable.just(server).concatMap(new Func1<Server, Observable<T>>() {
                    public Observable<T> call(final Server server) {
                        context.incAttemptCount();
                        LoadBalancerCommand.this.loadBalancerContext.noteOpenConnection(stats);
                        if (LoadBalancerCommand.this.listenerInvoker != null) {
                            try {
                                LoadBalancerCommand.this.listenerInvoker.onStartWithServer(context.toExecutionInfo());
                            } catch (AbortExecutionException var3) {
                                return Observable.error(var3);
                            }
                        }

                        final Stopwatch tracer = LoadBalancerCommand.this.loadBalancerContext.getExecuteTracer().start();
                        return operation.call(server).doOnEach(new Observer<T>() {
                            private T entity;

                            public void onCompleted() {
                                this.recordStats(tracer, stats, this.entity, (Throwable)null);
                            }

                            public void onError(Throwable e) {
                                this.recordStats(tracer, stats, (Object)null, e);
                                LoadBalancerCommand.logger.debug("Got error {} when executed on server {}", e, server);
                                if (LoadBalancerCommand.this.listenerInvoker != null) {
                                    LoadBalancerCommand.this.listenerInvoker.onExceptionWithServer(e, context.toExecutionInfo());
                                }

                            }

                            public void onNext(T entity) {
                                this.entity = entity;
                                if (LoadBalancerCommand.this.listenerInvoker != null) {
                                    LoadBalancerCommand.this.listenerInvoker.onExecutionSuccess(entity, context.toExecutionInfo());
                                }

                            }

                            private void recordStats(Stopwatch tracerx, ServerStats statsx, Object entity, Throwable exception) {
                                tracerx.stop();
                                LoadBalancerCommand.this.loadBalancerContext.noteRequestCompletion(statsx, entity, exception, tracerx.getDuration(TimeUnit.MILLISECONDS), LoadBalancerCommand.this.retryHandler);
                            }
                        });
                    }
                });
                if (maxRetrysSame > 0) {
                    o = o.retry(LoadBalancerCommand.this.retryPolicy(maxRetrysSame, true));
                }

                return o;
            }
        });
        if (maxRetrysNext > 0 && this.server == null) {
            o = o.retry(this.retryPolicy(maxRetrysNext, false));
        }

        return o.onErrorResumeNext(new Func1<Throwable, Observable<T>>() {
            public Observable<T> call(Throwable e) {
                if (context.getAttemptCount() > 0) {
                    if (maxRetrysNext > 0 && context.getServerAttemptCount() == maxRetrysNext + 1) {
                        e = new ClientException(ErrorType.NUMBEROF_RETRIES_NEXTSERVER_EXCEEDED, "Number of retries on next server exceeded max " + maxRetrysNext + " retries, while making a call for: " + context.getServer(), (Throwable)e);
                    } else if (maxRetrysSame > 0 && context.getAttemptCount() == maxRetrysSame + 1) {
                        e = new ClientException(ErrorType.NUMBEROF_RETRIES_EXEEDED, "Number of retries exceeded max " + maxRetrysSame + " retries, while making a call for: " + context.getServer(), (Throwable)e);
                    }
                }

                if (LoadBalancerCommand.this.listenerInvoker != null) {
                    LoadBalancerCommand.this.listenerInvoker.onExecutionFailed((Throwable)e, context.toFinalExecutionInfo());
                }

                return Observable.error((Throwable)e);
            }
        });
    }




	private Observable<Server> selectServer() {
        return Observable.create(new OnSubscribe<Server>() {
            public void call(Subscriber<? super Server> next) {
                try {
                	//执行此处
                    Server server = LoadBalancerCommand.this.loadBalancerContext.getServerFromLoadBalancer(LoadBalancerCommand.this.loadBalancerURI, LoadBalancerCommand.this.loadBalancerKey);
                    next.onNext(server);
                    next.onCompleted();
                } catch (Exception var3) {
                    next.onError(var3);
                }

            }
        });
    }
  1. LoadBalancerContext 下 getServerFromLoadBalancer()
	public Server getServerFromLoadBalancer(@Nullable URI original, @Nullable Object loadBalancerKey) throws ClientException {
        String host = null;
        int port = -1;
        if (original != null) {
            host = original.getHost();
        }

        if (original != null) {
            Pair<String, Integer> schemeAndPort = this.deriveSchemeAndPortFromPartialUri(original);
            port = (Integer)schemeAndPort.second();
        }

        ILoadBalancer lb = this.getLoadBalancer();
        if (host == null) {
            if (lb != null) {
                Server svc = lb.chooseServer(loadBalancerKey);
                if (svc == null) {
                    throw new ClientException(ErrorType.GENERAL, "Load balancer does not have available server for client: " + this.clientName);
                }

                host = svc.getHost();
                if (host == null) {
                    throw new ClientException(ErrorType.GENERAL, "Invalid Server for :" + svc);
                }

                logger.debug("{} using LB returned Server: {} for request {}", new Object[]{this.clientName, svc, original});
                return svc;
            }

            if (this.vipAddresses != null && this.vipAddresses.contains(",")) {
                throw new ClientException(ErrorType.GENERAL, "Method is invoked for client " + this.clientName + " with partial URI of (" + original + ") with no load balancer configured. Also, there are multiple vipAddresses and hence no vip address can be chosen to complete this partial uri");
            }

            if (this.vipAddresses == null) {
                throw new ClientException(ErrorType.GENERAL, this.clientName + " has no LoadBalancer registered and passed in a partial URL request (with no host:port). Also has no vipAddress registered");
            }

            try {
                Pair<String, Integer> hostAndPort = this.deriveHostAndPortFromVipAddress(this.vipAddresses);
                host = (String)hostAndPort.first();
                port = (Integer)hostAndPort.second();
            } catch (URISyntaxException var8) {
                throw new ClientException(ErrorType.GENERAL, "Method is invoked for client " + this.clientName + " with partial URI of (" + original + ") with no load balancer configured.  Also, the configured/registered vipAddress is unparseable (to determine host and port)");
            }
        } else {
            boolean shouldInterpretAsVip = false;
            if (lb != null) {
                shouldInterpretAsVip = this.isVipRecognized(original.getAuthority());
            }

            if (shouldInterpretAsVip) {
                Server svc = lb.chooseServer(loadBalancerKey);
                if (svc != null) {
                    host = svc.getHost();
                    if (host == null) {
                        throw new ClientException(ErrorType.GENERAL, "Invalid Server for :" + svc);
                    }

                    logger.debug("using LB returned Server: {} for request: {}", svc, original);
                    return svc;
                }

                logger.debug("{}:{} assumed to be a valid VIP address or exists in the DNS", host, port);
            } else {
                logger.debug("Using full URL passed in by caller (not using load balancer): {}", original);
            }
        }

        if (host == null) {
            throw new ClientException(ErrorType.GENERAL, "Request contains no HOST to talk to");
        } else {
            return new Server(host, port);
        }
    }
  1. ZoneAwareLoadBalancer 下 chooseServer()
	public Server chooseServer(Object key) {
		//会判断有几个Zone,多个时进入if否则执行下面
        if (ENABLED.get() && this.getLoadBalancerStats().getAvailableZones().size() > 1) {
            Server server = null;

            try {
                LoadBalancerStats lbStats = this.getLoadBalancerStats();
                Map<String, ZoneSnapshot> zoneSnapshot = ZoneAvoidanceRule.createSnapshot(lbStats);
                logger.debug("Zone snapshots: {}", zoneSnapshot);
                if (this.triggeringLoad == null) {
                    this.triggeringLoad = DynamicPropertyFactory.getInstance().getDoubleProperty("ZoneAwareNIWSDiscoveryLoadBalancer." + this.getName() + ".triggeringLoadPerServerThreshold", 0.2D);
                }

                if (this.triggeringBlackoutPercentage == null) {
                    this.triggeringBlackoutPercentage = DynamicPropertyFactory.getInstance().getDoubleProperty("ZoneAwareNIWSDiscoveryLoadBalancer." + this.getName() + ".avoidZoneWithBlackoutPercetage", 0.99999D);
                }
				//1.获取到所有可用的zones
                Set<String> availableZones = ZoneAvoidanceRule.getAvailableZones(zoneSnapshot, this.triggeringLoad.get(), this.triggeringBlackoutPercentage.get());
                logger.debug("Available zones: {}", availableZones);
                if (availableZones != null && availableZones.size() < zoneSnapshot.keySet().size()) {
                	//2.调用randomChooseZone()在获取到的所有zones中随机选择一个
                    String zone = ZoneAvoidanceRule.randomChooseZone(zoneSnapshot, availableZones);
                    logger.debug("Zone chosen: {}", zone);
                    if (zone != null) {
                    	//在zone中获取到配置的负载策略(若没有配置则还是默认执行BaseLoadBalancer 下 chooseServer())
                        BaseLoadBalancer zoneLoadBalancer = this.getLoadBalancer(zone);
                        //根据负载策略获取到server
                        server = zoneLoadBalancer.chooseServer(key);
                    }
                }
            } catch (Exception var8) {
                logger.error("Error choosing server using zone aware logic for load balancer={}", this.name, var8);
            }

            if (server != null) {
                return server;
            } else {
                logger.debug("Zone avoidance logic is not invoked.");
                return super.chooseServer(key);
            }
        } else {
        	//假设当前只有一个Zone执行此处,调用父类的chooseServer()方法
            logger.debug("Zone aware logic disabled or there is only one zone");
            return super.chooseServer(key);
        }
    }
  1. 只有一个zone时执行 BaseLoadBalancer 下 chooseServer()
	public Server chooseServer(Object key) {
        if (this.counter == null) {
            this.counter = this.createCounter();
        }

        this.counter.increment();
        if (this.rule == null) {
            return null;
        } else {
            try {
            	//执行到此处,拿到IRule, 默认情况下IRule使用new RoundRobinRule() 轮询模式
                return this.rule.choose(key);
            } catch (Exception var3) {
                logger.warn("LoadBalancer [{}]:  Error choosing server for key {}", new Object[]{this.name, key, var3});
                return null;
            }
        }
    }

SpringCloud Loadbalancer

  1. 当项目中引入spring-cloud-starter-loadbalancer,并配置关闭Ribbon情况下,在执行负载时会使用SpringCloud Loadbalancer
		<dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-loadbalancer</artifactId>
            <version>3.0.4</version>
        </dependency>
spring:
	cloud:
		loadbalancer:
			ribbon:
				enabled:false
  1. 会执行FeignBlockingLoadBalancerClient下的execute()方法
	public Response execute(Request request, Options options) throws IOException {
        URI originalUri = URI.create(request.url());
        String serviceId = originalUri.getHost();
        Assert.state(serviceId != null, "Request URI does not contain a valid hostname: " + originalUri);
        String hint = this.getHint(serviceId);
        DefaultRequest<RequestDataContext> lbRequest = new DefaultRequest(new RequestDataContext(LoadBalancerUtils.buildRequestData(request), hint));
        Set<LoadBalancerLifecycle> supportedLifecycleProcessors = LoadBalancerLifecycleValidator.getSupportedLifecycleProcessors(this.loadBalancerClientFactory.getInstances(serviceId, LoadBalancerLifecycle.class), RequestDataContext.class, ResponseData.class, ServiceInstance.class);
        supportedLifecycleProcessors.forEach((lifecycle) -> {
            lifecycle.onStart(lbRequest);
        });
        ServiceInstance instance = this.loadBalancerClient.choose(serviceId, lbRequest);
        org.springframework.cloud.client.loadbalancer.Response<ServiceInstance> lbResponse = new DefaultResponse(instance);
        String message;
        if (instance == null) {
            message = "Load balancer does not contain an instance for the service " + serviceId;
            if (LOG.isWarnEnabled()) {
                LOG.warn(message);
            }

            supportedLifecycleProcessors.forEach((lifecycle) -> {
                lifecycle.onComplete(new CompletionContext(Status.DISCARD, lbRequest, lbResponse));
            });
            return Response.builder().request(request).status(HttpStatus.SERVICE_UNAVAILABLE.value()).body(message, StandardCharsets.UTF_8).build();
        } else {
            message = this.loadBalancerClient.reconstructURI(instance, originalUri).toString();
            Request newRequest = this.buildRequest(request, message);
            return LoadBalancerUtils.executeWithLoadBalancerLifecycleProcessing(this.delegate, options, newRequest, lbRequest, lbResponse, supportedLifecycleProcessors);
        }
    }
  1. 调用BlockingLoadBalancerClient下choose()方法
	public <T> ServiceInstance choose(String serviceId, Request<T> request) {
        ReactiveLoadBalancer<ServiceInstance> loadBalancer = this.loadBalancerClientFactory.getInstance(serviceId);
        if (loadBalancer == null) {
            return null;
        } else {
            Response<ServiceInstance> loadBalancerResponse = (Response)Mono.from(loadBalancer.choose(request)).block();
            return loadBalancerResponse == null ? null : (ServiceInstance)loadBalancerResponse.getServer();
        }
    }
  1. RoundRobinLoadBalancer 下 choose()方法
	public Mono<Response<ServiceInstance>> choose(Request request) {
        ServiceInstanceListSupplier supplier = (ServiceInstanceListSupplier)this.serviceInstanceListSupplierProvider.getIfAvailable(NoopServiceInstanceListSupplier::new);
        return supplier.get(request).next().map((serviceInstances) -> {
        	//执行该方法
            return this.processInstanceResponse(supplier, serviceInstances);
        });
    }


	private Response<ServiceInstance> processInstanceResponse(ServiceInstanceListSupplier supplier, List<ServiceInstance> serviceInstances) {
		//查看该方法
        Response<ServiceInstance> serviceInstanceResponse = this.getInstanceResponse(serviceInstances);
        if (supplier instanceof SelectedInstanceCallback && serviceInstanceResponse.hasServer()) {
            ((SelectedInstanceCallback)supplier).selectedServiceInstance((ServiceInstance)serviceInstanceResponse.getServer());
        }
        return serviceInstanceResponse;
    }

	
	private Response<ServiceInstance> getInstanceResponse(List<ServiceInstance> instances) {
        if (instances.isEmpty()) {
            if (log.isWarnEnabled()) {
                log.warn("No servers available for service: " + this.serviceId);
            }
            return new EmptyResponse();
        } else {
        	//取模使用轮训进行负载
            int pos = Math.abs(this.position.incrementAndGet());
            ServiceInstance instance = (ServiceInstance)instances.get(pos % instances.size());
            return new DefaultResponse(instance);
        }
    }
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

微服务框架相关 OpenFeign 源码 的相关文章

  • java.io.IOException:无效的密钥库格式

    有谁知道如何解决这个问题 我尝试了很多方法 但没有一个有效 当我单击更多详细信息时 我得到以下信息 at sun security provider JavaKeyStore engineLoad Unknown Source atsun
  • 正则表达式 - 捕获所有重复组

    我有如下字符串 property one some text email protected cdn cgi l email protection another optional text here etc 其中包含 里面的字符串 我想通
  • 在 JSF/JSP EL 和 Javascript 中连接字符串[重复]

    这个问题在这里已经有答案了 我在使用 EL 和 javascript 函数 JSF 1 2 Facelets Richfaces 3 3 0GA 时遇到问题 我有一个页面包含另一个组合
  • 当按下批准或取消按钮时,如何阻止 JFileChooser 关闭?

    我使用 JFileChooser 的 showOpenDialog 方法来打开文件 如何将 ActionListener 附加到批准按钮JFileChooser以及如何停止这个对话框 单击 批准 按钮且侦听器完成后关闭 现在我有 publi
  • Java-Android 上的 MulticastSocket 问题

    我开始使用 MulticastSocket 进行编码 尝试制作一个带有客户端和服务器的简单应用程序来发送消息 我的服务器代码 import java io IOException import java net DatagramPacket
  • 在Java中,如何在单线程程序中抛出ConcurrentModificationException? [复制]

    这个问题在这里已经有答案了 我正在读这篇文章 Java常见并发问题 https stackoverflow com questions 461896 what is the most frequent concurrency problem
  • Java Swing - 在运行时动态切换语言环境

    我了解如何国际化 java 程序 但我有一个问题 我的程序中的语言可以随时切换 但我的程序可以存在多种状态 这意味着它可能会也可能不会打开多个 JLabels JPanel JFrame 等 是否有一个类或方法可以将当前的 GUI 更新为切
  • ffmpeg 用于屏幕捕获?

    所以我有一个小程序来捕获屏幕和计算机麦克风的声音 然后屏幕截图被编码为 ScreenVideo2 声音被编码为 AAC 如何使用 ffmpeg 逐帧混合 然后将混合输出发送到 wowza 媒体服务器 如果用ffmpeg无法完成 您能提供一些
  • 通过 jclouds 使用 AWS (S3) - 如何承担角色

    使用普通身份验证凭据时 我可以执行以下操作 ContextBuilder newBuilder aws s3 credentials keyId key buildView BlobStoreContext class 访问 S3 的 Bl
  • 杰克逊:引用同一个对象

    在某些情况下 主体 例如 JSON 主体 中序列化或非序列化的数据包含对同一对象的引用 例如 包含球员列表以及由这些球员组成的球队列表的 JSON 正文 players name Player 1 name Player 2 name Pl
  • Java + JNA:找不到指定的过程

    我正在尝试使用 Visual Studio 创建一个 dll 文件并在 java 项目中使用 访问它 该库似乎已加载 但总是抛出相同的异常 线程 main 中出现异常 java lang UnsatisfiedLinkError 查找函数
  • 将resourceBundle与外部文件java一起使用

    我一直在阅读有关此问题的其他问题和答案 但我不明白资源边界是如何完全工作的 我认为这与 Joomla 使用多语言选项的方式类似 基本上 您有要阅读的不同语言的不同消息的文件 所以我在 src Lang 文件夹中创建了 System prop
  • java.lang.NoSuchFieldError:APPLICATION_CONTEXT_ID_PREFIX

    我在运行项目时收到此错误 最终结果为 404 该项目是在Spring框架上进行的 我读了很多帖子 发现要么是混合了罐子 要么是多余的罐子 接下来我尝试整理我的罐子 以下列表是我的构建路径中的内容 antlr 2 7 6 jar asm ja
  • Keycloak - 自定义表单操作在流程中不可见

    我正在尝试为用户注册实现自定义表单操作 我在表单上添加了一些自定义字段 我希望验证这些字段 在浏览了 keycloak 文档后 我意识到我需要 扩展 FormAction FormActionFactory 将actionfactory打包
  • CoreNLP 如何识别小写的命名实体,例如 kobe bryant?

    我遇到一个问题 CoreNLP 只能识别以大写字符开头的命名实体 例如科比 布莱恩特 Kobe Bryant 但无法识别科比 布莱恩特 kobe bryant 作为一个人 那么CoreNLP如何识别以小写字符开头的命名实体 赞赏它 首先 您
  • Java Marine API - 寻找 NMEA 数据

    我的最终目标是从 Adafruit Ultimate GPS NMEA 0183 标准 接收纬度和经度 GPS 信息到我的 Java 应用程序 我正在使用 Java Marine API 来执行此操作 然后 当前位置将与时间戳一起写入数据库
  • 在java中读取文本文件[关闭]

    Closed 这个问题需要多问focused help closed questions 目前不接受答案 当每行都包含整数 字符串和双精度数时 如何在 Java 中读取 txt 文件并将每一行放入数组中 每行都有不同数量的单词 数字 Try
  • 无法查找 Websphere 8.5 中是否启用了 SSL

    我编写了一个简单的 ejb 瘦客户端来查找 IBM WebSphere 8 5 中部署的 bean 当服务器上未启用 SSL 时 我能够成功查找 bean 但是一旦启用 SSL 我就开始收到下面提到的异常 This is how I ena
  • 通过 awselb 使用 ssl 时的 neo4j java 驱动程序问题

    I am using neo4j community version 3 1 1 and enterprise edition version 3 0 1 with ssl configured through awselb To conn
  • 总小时数无法从 Android 插入 MySQL

    我使用以下公式获得总小时数 public void updateTotalHours int a SplitTime objMyCustomBaseAdapter getFistTime int b SplitTime objMyCusto

随机推荐