Apollo配置中心Client源码学习(二)-- 配置同步

2023-11-15

 

  上一篇文章(https://blog.csdn.net/crystonesc/article/details/106630412)我们从Apollo社区给出的DEMO开始逐步分析了Apollo客户端配置的创建过程,作为Apollo配置中心Client源码学习的第二篇文章,我们继续学习,来看看在通过ConfigFactory创建Config后,Config如何来获取配置信息的。

  我们知道Apollo的DefaultConfigFactory会调用create方法来创建默认的DefaultConfig,在DefaultConfig的构造函数中,会传入namespace(命名空间)和ConfigRepository(配置源),配置源会指定了我们客户端获取配置的位置,今天我们就来说说ConfigRepository。下面是DefaultConfigFactory中create方法的内容,可以看到对于非具体格式的配置,Apollo都会使用createLocalConfigRepository返回的LocalConfigRepository作为配置源。

@Override
  //(3) 创建Config
  public Config create(String namespace) {
    //判断namespace的文件类型
    ConfigFileFormat format = determineFileFormat(namespace);
    //如果是property兼容的文件,比如yaml,yml
    if (ConfigFileFormat.isPropertiesCompatible(format)) {
      //创建了默认的Config,并通过参数制定了namespace和Repository(仓库)
      return new DefaultConfig(namespace, createPropertiesCompatibleFileConfigRepository(namespace, format));
    }
    //否则创建LocalConfigRepository
    return new DefaultConfig(namespace, createLocalConfigRepository(namespace));
  }

  一、ConfigRepository结构 

  我们首先来看下ConfigRepository,ConfigRepository是一个接口,其实现包含AbstractConfigRepository以及三个具体实现类:PropertiesCompatibleFileConfigRepository,LocalFileConfigRepository,RemoteConfigRepository。

  首先研究下ConfigRepository接口类,接口类包含几个方法,如getConfig,用于获取一个Properties类型的配置;setUpstreamRepository用于设置一个备用配置源以及addChangeListener和removeChangeListener,用于添加监听和移除监听队列,最后getSourceType用于获取配置的来源。接下来我们看下抽象实现类AbstractConfigRepository。

/**
 * (1)仓库接口类,主要表示一个配置源,Apollo的Server是一个数据源,本地缓存也算是一个数据源
 * @author Jason Song(song_s@ctrip.com)
 */
public interface ConfigRepository {
  /**
   * (2)获取配置,返回Properties类型
   * Get the config from this repository.
   * @return config
   */
  public Properties getConfig();

  /**
   *
   * (3)设置一个备用的仓库
   * Set the fallback repo for this repository.
   * @param upstreamConfigRepository the upstream repo
   */
  public void setUpstreamRepository(ConfigRepository upstreamConfigRepository);

  /**
   * (4)添加仓库变更监听
   * Add change listener.
   * @param listener the listener to observe the changes
   */
  public void addChangeListener(RepositoryChangeListener listener);

  /**
   * (5)移除仓库变更监听
   * Remove change listener.
   * @param listener the listener to remove
   */
  public void removeChangeListener(RepositoryChangeListener listener);

  /**
   * (6)返回配置是从哪个源获取的
   * Return the config's source type, i.e. where is the config loaded from
   *
   * @return the config's source type
   */
  public ConfigSourceType getSourceType();

   首先从总体上我们看到AbstractConfigRepository实现了一些ConfigRepository的方法,包括监听队列的添加和删除,监听队列事件调用,这些部分是所有配置源类的共有方法。同时AbstractConfgRepository还包括trySync和Sync方法,trySync实际调用的是sync抽象方法,用于从配置源获取数据,看到这里了我们可以继续查看其实现类的代码,我们先看下LocalFileConfigRepository的代码。

/**
 * @author Jason Song(song_s@ctrip.com)
 */
//(1)配置仓库抽象类
public abstract class AbstractConfigRepository implements ConfigRepository {
  private static final Logger logger = LoggerFactory.getLogger(AbstractConfigRepository.class);
  //(2)配置仓库变更监听队列
  private List<RepositoryChangeListener> m_listeners = Lists.newCopyOnWriteArrayList();

  //(3) 实际调用抽象的sync方法
  protected boolean trySync() {
    try {
      sync();
      return true;
    } catch (Throwable ex) {
      Tracer.logEvent("ApolloConfigException", ExceptionUtil.getDetailMessage(ex));
      logger
          .warn("Sync config failed, will retry. Repository {}, reason: {}", this.getClass(), ExceptionUtil
              .getDetailMessage(ex));
    }
    return false;
  }

  //(4) 需要每个实现类实现的同步方法
  protected abstract void sync();

  @Override
  //(5) 向监听队列添加监听器
  public void addChangeListener(RepositoryChangeListener listener) {
    if (!m_listeners.contains(listener)) {
      m_listeners.add(listener);
    }
  }

  @Override
  //(6) 从队列移除监听器
  public void removeChangeListener(RepositoryChangeListener listener) {
    m_listeners.remove(listener);
  }

  //(7) 触发监听器修改事件,遍历监听器队列中的监听者,调用器处理方法
  protected void fireRepositoryChange(String namespace, Properties newProperties) {
    for (RepositoryChangeListener listener : m_listeners) {
      try {
        listener.onRepositoryChange(namespace, newProperties);
      } catch (Throwable ex) {
        Tracer.logError(ex);
        logger.error("Failed to invoke repository change listener {}", listener.getClass(), ex);
      }
    }
  }
}

 二、LocalFileConfigRepository源码分析

 LocalFileConfigRepository是一个用于表示本地文件配置源的类,我们知道Apollo从服务端同步的数据都会保留在本地缓存目录,以防止在不能够连接到远程配置源的情况下使用本地配置源,那么所以对于默认的配置,实际上是Config是一个拥有带远程配置源(RemoteConfigRepository)的LocalFileConfigRepository,这一点我们可以从createLocalConfigRepository方法中得到印证。如下图所示:

//(5) 创建LocalFileConfigRepository,如果Apollo是以本地模式运行,则创建没有upstream的LocalFileConfigRepository,否则
  //创建一个带有远程仓库RemoteConfigRepository的创建LocalFileConfigRepository
  LocalFileConfigRepository createLocalConfigRepository(String namespace) {
    if (m_configUtil.isInLocalMode()) {
      logger.warn(
          "==== Apollo is in local mode! Won't pull configs from remote server for namespace {} ! ====",
          namespace);
      return new LocalFileConfigRepository(namespace);
    }
    return new LocalFileConfigRepository(namespace, createRemoteConfigRepository(namespace));
  }

  接下来我们仔细阅读LocalFileConfigRepository的源码,在LocalFileConfigRepository构造的时候就会进行一次trySync,用于获取配置数据,并且还会指定一个Upstream配置源(远程配置),从上面的分析我们可以知道,trySync会调用sync方法,而这里的sync方法会在LocalFileConfigRepository中被实现,我们继续研究sync方法(7),在sync方法中它先调用trySyncFromUpStream方法尝试从Upstream仓库获取配置,如果能够获取到配置,则直接返回,如果未能同步成功,则会调用loadFromLoaclCacheFile从本地加载配置。(具体分析建代码)

public class LocalFileConfigRepository extends AbstractConfigRepository
    implements RepositoryChangeListener {
  private static final Logger logger = LoggerFactory.getLogger(LocalFileConfigRepository.class);
  private static final String CONFIG_DIR = "/config-cache";
  private final String m_namespace;
  private File m_baseDir;
  private final ConfigUtil m_configUtil;
  private volatile Properties m_fileProperties;
  private volatile ConfigRepository m_upstream;

  private volatile ConfigSourceType m_sourceType = ConfigSourceType.LOCAL;

  /**
   * Constructor.
   *
   * @param namespace the namespace
   */
  //(1) 本地配置仓库构造函数(不指定upstream),实际调用LocalFileConfigRepository,
  public LocalFileConfigRepository(String namespace) {
    this(namespace, null);
  }

  //(2) 带upstream的构造函数,构造的同时就会调用trySync来同步配置
  public LocalFileConfigRepository(String namespace, ConfigRepository upstream) {
    m_namespace = namespace;
    m_configUtil = ApolloInjector.getInstance(ConfigUtil.class);
    this.setLocalCacheDir(findLocalCacheDir(), false);
    this.setUpstreamRepository(upstream);
    this.trySync();
  }

  //(3) 设置本地配置缓存目录,如果syncImmediately为true, 则会立即同步配置
  void setLocalCacheDir(File baseDir, boolean syncImmediately) {
    m_baseDir = baseDir;
    this.checkLocalConfigCacheDir(m_baseDir);
    if (syncImmediately) {
      this.trySync();
    }
  }

  //(4) 定位本地缓存目录,若不存在则创建目录,若存在文件则返回File类型对象
  private File findLocalCacheDir() {
    try {
      String defaultCacheDir = m_configUtil.getDefaultLocalCacheDir();
      Path path = Paths.get(defaultCacheDir);
      if (!Files.exists(path)) {
        Files.createDirectories(path);
      }
      if (Files.exists(path) && Files.isWritable(path)) {
        return new File(defaultCacheDir, CONFIG_DIR);
      }
    } catch (Throwable ex) {
      //ignore
    }

    return new File(ClassLoaderUtil.getClassPath(), CONFIG_DIR);
  }

  @Override
  //(5) 获取配置,如果不存在就进行同步
  public Properties getConfig() {
    if (m_fileProperties == null) {
      sync();
    }
    Properties result = new Properties();
    result.putAll(m_fileProperties);
    return result;
  }

  @Override
  //(6) 设置fallback 配置源
  public void setUpstreamRepository(ConfigRepository upstreamConfigRepository) {
    if (upstreamConfigRepository == null) {
      return;
    }
    //clear previous listener
    if (m_upstream != null) {
      m_upstream.removeChangeListener(this);
    }
    m_upstream = upstreamConfigRepository;
    trySyncFromUpstream();
    upstreamConfigRepository.addChangeListener(this);
  }

  @Override
  public ConfigSourceType getSourceType() {
    return m_sourceType;
  }

  @Override
  public void onRepositoryChange(String namespace, Properties newProperties) {
    if (newProperties.equals(m_fileProperties)) {
      return;
    }
    Properties newFileProperties = new Properties();
    newFileProperties.putAll(newProperties);
    updateFileProperties(newFileProperties, m_upstream.getSourceType());
    this.fireRepositoryChange(namespace, newProperties);
  }

  @Override
  //(7) 同步配置
  protected void sync() {
    //sync with upstream immediately
    //首先从Upstream获取配置
    boolean syncFromUpstreamResultSuccess = trySyncFromUpstream();

    //如果从upstream源同步成功则直接返回
    if (syncFromUpstreamResultSuccess) {
      return;
    }

    //如果未能从upstream同步成功,则通过本地缓存文件加载
    Transaction transaction = Tracer.newTransaction("Apollo.ConfigService", "syncLocalConfig");
    Throwable exception = null;
    try {
      transaction.addData("Basedir", m_baseDir.getAbsolutePath());
      m_fileProperties = this.loadFromLocalCacheFile(m_baseDir, m_namespace);
      m_sourceType = ConfigSourceType.LOCAL;
      transaction.setStatus(Transaction.SUCCESS);
    } catch (Throwable ex) {
      Tracer.logEvent("ApolloConfigException", ExceptionUtil.getDetailMessage(ex));
      transaction.setStatus(ex);
      exception = ex;
      //ignore
    } finally {
      transaction.complete();
    }

    if (m_fileProperties == null) {
      m_sourceType = ConfigSourceType.NONE;
      throw new ApolloConfigException(
          "Load config from local config failed!", exception);
    }
  }

  //(8) 从Upstream中获取配置
  private boolean trySyncFromUpstream() {
    if (m_upstream == null) {
      return false;
    }
    try {
      updateFileProperties(m_upstream.getConfig(), m_upstream.getSourceType());
      return true;
    } catch (Throwable ex) {
      Tracer.logError(ex);
      logger
          .warn("Sync config from upstream repository {} failed, reason: {}", m_upstream.getClass(),
              ExceptionUtil.getDetailMessage(ex));
    }
    return false;
  }

  //(9) 把新配置更新到本地缓存中
  private synchronized void updateFileProperties(Properties newProperties, ConfigSourceType sourceType) {
    this.m_sourceType = sourceType;
    if (newProperties.equals(m_fileProperties)) {
      return;
    }
    this.m_fileProperties = newProperties;
    persistLocalCacheFile(m_baseDir, m_namespace);
  }

  //(10) 从本地缓存加载配置文件
  private Properties loadFromLocalCacheFile(File baseDir, String namespace) throws IOException {
    Preconditions.checkNotNull(baseDir, "Basedir cannot be null");

    File file = assembleLocalCacheFile(baseDir, namespace);
    Properties properties = null;

    if (file.isFile() && file.canRead()) {
      InputStream in = null;

      try {
        in = new FileInputStream(file);

        properties = new Properties();
        properties.load(in);
        logger.debug("Loading local config file {} successfully!", file.getAbsolutePath());
      } catch (IOException ex) {
        Tracer.logError(ex);
        throw new ApolloConfigException(String
            .format("Loading config from local cache file %s failed", file.getAbsolutePath()), ex);
      } finally {
        try {
          if (in != null) {
            in.close();
          }
        } catch (IOException ex) {
          // ignore
        }
      }
    } else {
      throw new ApolloConfigException(
          String.format("Cannot read from local cache file %s", file.getAbsolutePath()));
    }

    return properties;
  }

  //(11) 持久化到本地文件实际操作
  void persistLocalCacheFile(File baseDir, String namespace) {
    if (baseDir == null) {
      return;
    }
    File file = assembleLocalCacheFile(baseDir, namespace);

    OutputStream out = null;

    Transaction transaction = Tracer.newTransaction("Apollo.ConfigService", "persistLocalConfigFile");
    transaction.addData("LocalConfigFile", file.getAbsolutePath());
    try {
      out = new FileOutputStream(file);
      m_fileProperties.store(out, "Persisted by DefaultConfig");
      transaction.setStatus(Transaction.SUCCESS);
    } catch (IOException ex) {
      ApolloConfigException exception =
          new ApolloConfigException(
              String.format("Persist local cache file %s failed", file.getAbsolutePath()), ex);
      Tracer.logError(exception);
      transaction.setStatus(exception);
      logger.warn("Persist local cache file {} failed, reason: {}.", file.getAbsolutePath(),
          ExceptionUtil.getDetailMessage(ex));
    } finally {
      if (out != null) {
        try {
          out.close();
        } catch (IOException ex) {
          //ignore
        }
      }
      transaction.complete();
    }
  }

  //(12) 检查缓存目录是否存在,若不存在则建立目录
  private void checkLocalConfigCacheDir(File baseDir) {
    if (baseDir.exists()) {
      return;
    }
    Transaction transaction = Tracer.newTransaction("Apollo.ConfigService", "createLocalConfigDir");
    transaction.addData("BaseDir", baseDir.getAbsolutePath());
    try {
      Files.createDirectory(baseDir.toPath());
      transaction.setStatus(Transaction.SUCCESS);
    } catch (IOException ex) {
      ApolloConfigException exception =
          new ApolloConfigException(
              String.format("Create local config directory %s failed", baseDir.getAbsolutePath()),
              ex);
      Tracer.logError(exception);
      transaction.setStatus(exception);
      logger.warn(
          "Unable to create local config cache directory {}, reason: {}. Will not able to cache config file.",
          baseDir.getAbsolutePath(), ExceptionUtil.getDetailMessage(ex));
    } finally {
      transaction.complete();
    }
  }

  File assembleLocalCacheFile(File baseDir, String namespace) {
    String fileName =
        String.format("%s.properties", Joiner.on(ConfigConsts.CLUSTER_NAMESPACE_SEPARATOR)
            .join(m_configUtil.getAppId(), m_configUtil.getCluster(), namespace));
    return new File(baseDir, fileName);
  }
}

  三、RemoteConfigRespository源码分析

  接下来我们来分析RemoteConfigRespository,从上面我们可以看到RemoteConfigRespository代表的是远程配置源,在其构造函数中(1),就会调用trySync方法来进行同步,同样trySync方法也会调用sync方法来实际触发同步(5),在sync方法中我们可以看到,先从缓存中获取(ApolloConfig previous = m_configCache.get())之前获取的配置,然后通过loadApolloConfig方法从远程配置源获取配置,接下来通过判断previous和current来判断是否相同,这里它直接使用(previous != current)来做判断,实际上是对previous和current进行对象比较,看到这里可能大家比较疑惑,从loadApolloConfig的配置为什么可以直接与缓存中后去的previous进行比较,原来在loadApolloConfig方法中,从远程配置源获取配置的时候,如果配置源返回HTTP CODE 304的时候,loadApolloConfig就会将缓存中的配置直接返回给current,从而sync中可以通过判断对象是否相等的方法来决定是否更新缓存配置。如果previous!=current,则会将当前的配置设置到缓存当中,并且触发监听者更新回调。

   接着我们就来研究下loadApolloConfig方法(7),这个方法的内容比较多,首先为了防止客户端频繁获取远程配置造成远程配置源压力,这里Apollo使用了一个限速器,保证在每次更新操作间隔5秒,接下来通过调用getConfigServices()方法来获取configservice地址,在getConfigServices 中会使用ConfigServiceLoactor来获取configservice的地址(具体怎么获取,我们之后来分析),拿到configServices地址后,随机从configServices地址中获取一个地址并获取配置,如果获取失败,Apollo还会又重试机制,最终获取到的结果会进行返回。除了在构造的时候去拉去配置,Apollo还会定期通过单独的线程去获取配置,代码见(4)schedulePeriodicRefresh。

/**
 * @author Jason Song(song_s@ctrip.com)
 */
public class RemoteConfigRepository extends AbstractConfigRepository {
  private static final Logger logger = LoggerFactory.getLogger(RemoteConfigRepository.class);
  private static final Joiner STRING_JOINER = Joiner.on(ConfigConsts.CLUSTER_NAMESPACE_SEPARATOR);
  private static final Joiner.MapJoiner MAP_JOINER = Joiner.on("&").withKeyValueSeparator("=");
  private static final Escaper pathEscaper = UrlEscapers.urlPathSegmentEscaper();
  private static final Escaper queryParamEscaper = UrlEscapers.urlFormParameterEscaper();

  private final ConfigServiceLocator m_serviceLocator;
  private final HttpUtil m_httpUtil;
  private final ConfigUtil m_configUtil;
  private final RemoteConfigLongPollService remoteConfigLongPollService;
  private volatile AtomicReference<ApolloConfig> m_configCache;
  private final String m_namespace;
  private final static ScheduledExecutorService m_executorService;
  private final AtomicReference<ServiceDTO> m_longPollServiceDto;
  private final AtomicReference<ApolloNotificationMessages> m_remoteMessages;
  private final RateLimiter m_loadConfigRateLimiter;
  private final AtomicBoolean m_configNeedForceRefresh;
  private final SchedulePolicy m_loadConfigFailSchedulePolicy;
  private final Gson gson;

  static {
    m_executorService = Executors.newScheduledThreadPool(1,
        ApolloThreadFactory.create("RemoteConfigRepository", true));
  }

  /**
   * Constructor.
   *
   * @param namespace the namespace
   */
  //(1) 远程配置源构造函数
  public RemoteConfigRepository(String namespace) {
    m_namespace = namespace;
    //配置缓存
    m_configCache = new AtomicReference<>();
    //通过AoplloInjector注入configUtil工具类、httpUitl、serviceLocator、remoteConfigLongPollService
    m_configUtil = ApolloInjector.getInstance(ConfigUtil.class);
    m_httpUtil = ApolloInjector.getInstance(HttpUtil.class);
    m_serviceLocator = ApolloInjector.getInstance(ConfigServiceLocator.class);
    remoteConfigLongPollService = ApolloInjector.getInstance(RemoteConfigLongPollService.class);
    m_longPollServiceDto = new AtomicReference<>();
    m_remoteMessages = new AtomicReference<>();
    m_loadConfigRateLimiter = RateLimiter.create(m_configUtil.getLoadConfigQPS());
    m_configNeedForceRefresh = new AtomicBoolean(true);
    m_loadConfigFailSchedulePolicy = new ExponentialSchedulePolicy(m_configUtil.getOnErrorRetryInterval(),
        m_configUtil.getOnErrorRetryInterval() * 8);
    gson = new Gson();
    this.trySync();
    this.schedulePeriodicRefresh();
    this.scheduleLongPollingRefresh();
  }

  @Override
  //(2) 获取配置方法实现
  public Properties getConfig() {
    if (m_configCache.get() == null) {
      this.sync();
    }
    //返回的时候转换为Properties类型的格式
    return transformApolloConfigToProperties(m_configCache.get());
  }

  @Override
  //(3) 设置备用Upstream配置源,因为本生是远程仓库,所以这里并未实现
  public void setUpstreamRepository(ConfigRepository upstreamConfigRepository) {
    //remote config doesn't need upstream
  }

  @Override
  public ConfigSourceType getSourceType() {
    return ConfigSourceType.REMOTE;
  }

  //(4) 设置定期获取配置得线程,定期从远程配置源获取配置
  private void schedulePeriodicRefresh() {
    logger.debug("Schedule periodic refresh with interval: {} {}",
        m_configUtil.getRefreshInterval(), m_configUtil.getRefreshIntervalTimeUnit());
    m_executorService.scheduleAtFixedRate(
        new Runnable() {
          @Override
          public void run() {
            Tracer.logEvent("Apollo.ConfigService", String.format("periodicRefresh: %s", m_namespace));
            logger.debug("refresh config for namespace: {}", m_namespace);
            trySync();
            Tracer.logEvent("Apollo.Client.Version", Apollo.VERSION);
          }
        }, m_configUtil.getRefreshInterval(), m_configUtil.getRefreshInterval(),
        m_configUtil.getRefreshIntervalTimeUnit());
  }

  @Override
  //(5) 从远程配置源获取配置,实际调用方法loadApolloConfig,如果远程配置源更新后,触发监听更新
  protected synchronized void sync() {
    Transaction transaction = Tracer.newTransaction("Apollo.ConfigService", "syncRemoteConfig");

    try {
      ApolloConfig previous = m_configCache.get();
      ApolloConfig current = loadApolloConfig();

      //reference equals means HTTP 304
      if (previous != current) {
        logger.debug("Remote Config refreshed!");
        m_configCache.set(current);
        this.fireRepositoryChange(m_namespace, this.getConfig());
      }

      if (current != null) {
        Tracer.logEvent(String.format("Apollo.Client.Configs.%s", current.getNamespaceName()),
            current.getReleaseKey());
      }

      transaction.setStatus(Transaction.SUCCESS);
    } catch (Throwable ex) {
      transaction.setStatus(ex);
      throw ex;
    } finally {
      transaction.complete();
    }
  }

  //(6) 将Apollo配置转换为Properties类型,并且返回,apolloConfig中的configurations是一个MAP类型
  private Properties transformApolloConfigToProperties(ApolloConfig apolloConfig) {
    Properties result = new Properties();
    result.putAll(apolloConfig.getConfigurations());
    return result;
  }

  //(7) 实际获取配置方法
  private ApolloConfig loadApolloConfig() {
    //获取限速器的允许,得到允许后才能进行配置获取
    if (!m_loadConfigRateLimiter.tryAcquire(5, TimeUnit.SECONDS)) {
      //wait at most 5 seconds
      try {
        TimeUnit.SECONDS.sleep(5);
      } catch (InterruptedException e) {
      }
    }
    String appId = m_configUtil.getAppId();
    String cluster = m_configUtil.getCluster();
    String dataCenter = m_configUtil.getDataCenter();
    Tracer.logEvent("Apollo.Client.ConfigMeta", STRING_JOINER.join(appId, cluster, m_namespace));
    //配置获取重试次数
    int maxRetries = m_configNeedForceRefresh.get() ? 2 : 1;
    long onErrorSleepTime = 0; // 0 means no sleep
    Throwable exception = null;

    List<ServiceDTO> configServices = getConfigServices();
    String url = null;
    for (int i = 0; i < maxRetries; i++) {
      //随机的获取一个configService
      List<ServiceDTO> randomConfigServices = Lists.newLinkedList(configServices);
      Collections.shuffle(randomConfigServices);
      //Access the server which notifies the client first
      if (m_longPollServiceDto.get() != null) {
        randomConfigServices.add(0, m_longPollServiceDto.getAndSet(null));
      }

      for (ServiceDTO configService : randomConfigServices) {
        if (onErrorSleepTime > 0) {
          logger.warn(
              "Load config failed, will retry in {} {}. appId: {}, cluster: {}, namespaces: {}",
              onErrorSleepTime, m_configUtil.getOnErrorRetryIntervalTimeUnit(), appId, cluster, m_namespace);

          try {
            m_configUtil.getOnErrorRetryIntervalTimeUnit().sleep(onErrorSleepTime);
          } catch (InterruptedException e) {
            //ignore
          }
        }

        url = assembleQueryConfigUrl(configService.getHomepageUrl(), appId, cluster, m_namespace,
                dataCenter, m_remoteMessages.get(), m_configCache.get());

        logger.debug("Loading config from {}", url);
        HttpRequest request = new HttpRequest(url);

        Transaction transaction = Tracer.newTransaction("Apollo.ConfigService", "queryConfig");
        transaction.addData("Url", url);
        try {

          //实际获取配置的HTTP调用
          HttpResponse<ApolloConfig> response = m_httpUtil.doGet(request, ApolloConfig.class);
          m_configNeedForceRefresh.set(false);
          m_loadConfigFailSchedulePolicy.success();

          transaction.addData("StatusCode", response.getStatusCode());
          transaction.setStatus(Transaction.SUCCESS);

          if (response.getStatusCode() == 304) {
            logger.debug("Config server responds with 304 HTTP status code.");
            return m_configCache.get();
          }

          ApolloConfig result = response.getBody();

          logger.debug("Loaded config for {}: {}", m_namespace, result);

          return result;
        } catch (ApolloConfigStatusCodeException ex) {
          ApolloConfigStatusCodeException statusCodeException = ex;
          //config not found
          if (ex.getStatusCode() == 404) {
            String message = String.format(
                "Could not find config for namespace - appId: %s, cluster: %s, namespace: %s, " +
                    "please check whether the configs are released in Apollo!",
                appId, cluster, m_namespace);
            statusCodeException = new ApolloConfigStatusCodeException(ex.getStatusCode(),
                message);
          }
          Tracer.logEvent("ApolloConfigException", ExceptionUtil.getDetailMessage(statusCodeException));
          transaction.setStatus(statusCodeException);
          exception = statusCodeException;
        } catch (Throwable ex) {
          Tracer.logEvent("ApolloConfigException", ExceptionUtil.getDetailMessage(ex));
          transaction.setStatus(ex);
          exception = ex;
        } finally {
          transaction.complete();
        }

        // if force refresh, do normal sleep, if normal config load, do exponential sleep
        onErrorSleepTime = m_configNeedForceRefresh.get() ? m_configUtil.getOnErrorRetryInterval() :
            m_loadConfigFailSchedulePolicy.fail();
      }

    }
    String message = String.format(
        "Load Apollo Config failed - appId: %s, cluster: %s, namespace: %s, url: %s",
        appId, cluster, m_namespace, url);
    throw new ApolloConfigException(message, exception);
  }

  String assembleQueryConfigUrl(String uri, String appId, String cluster, String namespace,
                                String dataCenter, ApolloNotificationMessages remoteMessages, ApolloConfig previousConfig) {

    String path = "configs/%s/%s/%s";
    List<String> pathParams =
        Lists.newArrayList(pathEscaper.escape(appId), pathEscaper.escape(cluster),
            pathEscaper.escape(namespace));
    Map<String, String> queryParams = Maps.newHashMap();

    if (previousConfig != null) {
      queryParams.put("releaseKey", queryParamEscaper.escape(previousConfig.getReleaseKey()));
    }

    if (!Strings.isNullOrEmpty(dataCenter)) {
      queryParams.put("dataCenter", queryParamEscaper.escape(dataCenter));
    }

    String localIp = m_configUtil.getLocalIp();
    if (!Strings.isNullOrEmpty(localIp)) {
      queryParams.put("ip", queryParamEscaper.escape(localIp));
    }

    if (remoteMessages != null) {
      queryParams.put("messages", queryParamEscaper.escape(gson.toJson(remoteMessages)));
    }

    String pathExpanded = String.format(path, pathParams.toArray());

    if (!queryParams.isEmpty()) {
      pathExpanded += "?" + MAP_JOINER.join(queryParams);
    }
    if (!uri.endsWith("/")) {
      uri += "/";
    }
    return uri + pathExpanded;
  }

  private void scheduleLongPollingRefresh() {
    remoteConfigLongPollService.submit(m_namespace, this);
  }

  public void onLongPollNotified(ServiceDTO longPollNotifiedServiceDto, ApolloNotificationMessages remoteMessages) {
    m_longPollServiceDto.set(longPollNotifiedServiceDto);
    m_remoteMessages.set(remoteMessages);
    m_executorService.submit(new Runnable() {
      @Override
      public void run() {
        m_configNeedForceRefresh.set(true);
        trySync();
      }
    });
  }

  private List<ServiceDTO> getConfigServices() {
    List<ServiceDTO> services = m_serviceLocator.getConfigServices();
    if (services.size() == 0) {
      throw new ApolloConfigException("No available config service");
    }

    return services;
  }
}

四、总结

   本小结我们从ConfigRespository入手,分析了Apollo客户端在创建了DefaultConfig后如何来获取配置,我们主要分析了ConfigRespository的主要两个实现类LocalFileConfigRepository和RemoteConfigRespository,接下来我们用一幅图来描述下本结描述的内容,方便理解:

 

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Apollo配置中心Client源码学习(二)-- 配置同步 的相关文章

  • FreeRTOS源码学习_01-任务调度器-2021-10-28

    FreeRTOS源码学习 01 任务调度器 一 写在前面二 源码分析1 开始任务调度 xff1a void vTaskStartScheduler 2 创建软件定时器任务 xff1a 3 检查链表队列是否有效 xff1a prvCheckF
  • Eureka注册中心

    3 Eureka注册中心 假如我们的服务提供者user service部署了多个实例 xff0c 如图 xff1a 大家思考几个问题 xff1a order service在发起远程调用的时候 xff0c 该如何得知user service
  • Docker comsul(注册中心)

    文章目录 Docker consul xff08 注册中心 xff09 什么是consul xff1f consul干什么的 xff1f 一 consul的使用场景二 consul集群三 consul部署1 建立consul服务2 设置代理
  • 【Android 源码学习】 init启动

    目录 Android 源码学习 init启动从main cpp开始init cpp 部分逻辑init启动zygote属性服务总结 Android 源码学习 init启动 Android 11 init 启动流程学习 主要是学习刘望舒腾讯课堂
  • px4源码学习(local position estimator)

    前言 xff1a 之前学习的是px4源码中的attitude estimator q xff0c 可以说是学习的相当仔细和深入 于是借着这股劲继续学习位置估计中的local position estimator 另外需要说明的是 xff0c
  • SQLite3源码学习(31) WAL日志的锁机制

    1 锁的原理 先来回顾一下回滚日志的文件锁 xff0c 之前的锁是针对数据库文件加锁的 xff0c 有4种类型 xff0c 分别是shared reserverd pending和exclusive 在WAL日志模式下不再使用原来的锁 xf
  • 注册中心Nacos

    注册中心 nacosNacos注册中心与配置中心nocos优点与缺点启动docker镜像测试linux windows下单机启动应用启动个报错nacos如何修改用户名密码 nacos Nacos注册中心与配置中心 nacos官方文档 Nac
  • docker注册中心

    docker hub镜像上传 span class token comment 1 修改镜像镜像加速器 span span class token punctuation span root 64 sunrui span class tok
  • 《深入剖析tomcat》读书笔记3--servlet容器

    主要是 深入剖析tomcat 的第五章和第十一章 个人觉得如下3点是关键 1 pipeline相关概念及其执行valve的顺序 2 standardwrapper的接受http请求时候的调用序列 3 standardwrapper基础阀加载
  • Nacos快速入门(三):Spring Cloud Alibaba Nacos实现服务注册与发现

    1 前言 Spring Cloud Alibaba 是阿里巴巴提供的微服务开发一站式解决方案 目前已经加入Spring Cloud项目 跟随Spring Cloud一起维护 集成Nacos需要使用Spring Cloud Alibaba N
  • node-formidable源码:原生javascript解析前端传输的FormData

    本系列文章是本人学习相关知识时所积累的笔记 以记录自己的学习历程 也为了方便回顾知识 故文章内容较为随意简练 抱着学习目的来的同学务必转移他处 以免我误人子弟 参考资料 酷勤网 在Koa和Express中 已经通过node formidab
  • Spring Cloud Nacos源码讲解(五)- Nacos服务端健康检查

    Nacos服务端健康检查 长连接 概念 长连接 指在一个连接上可以连续发送多个数据包 在连接保持期间 如果没有数据包发送 需要双方发链路检测包 注册中心客户端2 0之后使用gRPC代替http 会与服务端建立长连接 但仍然保留了对旧http
  • nacos注册中心面试总结

    1注册中心演变及其设计思想 2 Nacos注册中心架构 3 核心功能 服务注册 Nacos Client会通过发送REST请求的方式向Nacos Server注册自己的服务 提供自身的元数据 比如ip地址 端口等信息 Nacos Serve
  • 【Spring Boot 源码学习】@SpringBootApplication 注解

    Spring Boot 源码学习系列 SpringBootApplication 注解 引言 主要内容 1 创建 Spring Boot 项目 2 Spring Boot 入口类 3 SpringBootApplication 介绍 总结
  • 【Spring Boot 源码学习】深入 FilteringSpringBootCondition

    Spring Boot 源码学习系列 深入 FilteringSpringBootCondition 引言 往期内容 主要内容 1 match 方法 2 ClassNameFilter 枚举类 3 filter 方法 总结 引言 前两篇博文
  • (超详细、带图带源码)Nacos注册中心的搭建与测试

    前言 本系列是从头开始进行学习Nacos的相关知识 从相关概念到业务开发等等 本篇是第三篇 主要知道为什么需要注册中心 为什么选择Nacos作为注册中心和Nacos作为注册中心的快速搭建 入门篇 阿里Nacos系列 为什么要选择Nacos和
  • Nacos快速入门(二):Nacos集群安装部署

    1 集群部署架构图 官方提供了三种部署架构 http ip1 port openAPI 直连ip模式 机器挂则需要修改ip才可以使用 http VIP port openAPI 挂载VIP模式 直连vip即可 下面挂server真实ip 可
  • Apollo配置中心Client源码学习(二)-- 配置同步

    上一篇文章 https blog csdn net crystonesc article details 106630412 我们从Apollo社区给出的DEMO开始逐步分析了Apollo客户端配置的创建过程 作为Apollo配置中心Cli
  • 深入理解MyBatis一级缓存和二级缓存【超详细源码解析】

    视频地址 https www bilibili com video BV1nP411A7Gu MyBatis的缓存是一个常见的面试题 一级缓存的作用域为何是 sqlSession 二级缓存的作用域为何是 mapper 怎么理解 一 二级缓存
  • Nacos 1.4.1注册中心源码深度解析-服务下线

    服务下线比较简单 入口在com alibaba nacos naming controllers InstanceController deregister gt serviceManager removeInstance gt remov

随机推荐

  • Vscode——报错解决:Import “torch“ could not be resolved

    一 原因 当前解释器环境中 没安装torch库 二 解决办法 前提 已经安装PyTorch环境 1 键盘上按快捷键 Ctrl shift P 2 输入 Python Select Interpreter 3 选择PyTorch解释器
  • active directory域服务

    active directory域服务 一 Windows 网络环境 工作组workgroup 域 二 windows域 1 集中管理 2 分域控制器和成员服务器 3 账户保存在域当中 文件名为 ntds dit 4 账户可在整个域当中登陆
  • 直播预告

    12月26日 RTSCon2021开发者沙龙将在线上举办 拍乐云Pano受邀出席 服务端专家沈伟锋将在活动中带来关于 拍乐云融合语音通话技术实践 的主题演讲 RTSCon的前身是FreeSWITCH开发者沙龙 而RTS的全称是Real Ti
  • (十三):图

    1 图的基本介绍 1 1为什么要有图 前面我们学到了线性表和树 线性表局限于直接前驱和一个直接后继结点的关系 树也只能有一个直接前驱也就是父节点 当我们需要多对多关系时候 就需要图 1 2图的举例说明 图是一种数据结构 其中结点可以具有零个
  • SpringBoot优质开源项目分享

    Spring Boot 算是目前 Java 领域最火的技术栈了 也是Java开发人员不得不掌握的技术 今天给大家整理了13个优质 Spring Boot 开源项目给大家参考 希望能够帮助到正在学习 Spring Boot 的小伙伴 小伙伴简
  • RK3568资料汇总

    文档资料 野火 https doc embedfire com products link zh latest linux ebf lubancat html 正点原子 http 47 111 11 73 docs boards arm l
  • python的判断与循环语句

    一 判断语句 1 判断 在程序中如果某些条件满足 才能做某件事情 而不满足时不允许做 这就是所谓的判断 2 if语句的使用格式 if 要判断的条件 条件成立时 要做的事情 如 判断年纪 如果 age 大于 18 输入成年 age 18 if
  • Centos7 交叉编译QT5.9.9源码 AArch64架构

    环境准备 centos7 镜像 下载地址 http mirrors aliyun com centos 7 9 2009 isos x86 64 aarch64交叉编译链 下载地址 https releases linaro org com
  • Java与代码检查与自动化测试

    Java是一种面向对象的编程语言 具有简单 快速 安全 可靠等特点 在Java编程中 代码质量的好坏直接关系到最终软件的质量 为了确保代码质量 我们需要进行代码检查和自动化测试 本文将详细介绍Java代码检查和自动化测试的概念 原则 工具及
  • 【DRAM存储器五】DRAM存储器的架构演进-part2

    个人主页 highman110 作者简介 一名硬件工程师 持续学习 不断记录 保持思考 输出干货内容 参考书籍 Memory Systems Cache DRAM Disk 目录
  • TCP的粘包问题

    TCP transport control protocol 传输控制协议 是面向连接的 面向流的 提供高可靠性服务 收发两端 客户端和服务器端 都要有一一成对的socket 因此 发送端为了将多个发往接收端的包 更有效的发到对方 使用了优
  • python pyinstaller打包参数介绍(转)

    pyinstaller相关参数 F onefile 打包一个单个文件 如果你的代码都写在一个 py文件的话 可以用这个 如果是多个 py文件就别用 D onedir 打包多个文件 在dist中生成很多依赖文件 适合以框架形式编写工具代码 我
  • 如何使用yum安装最新软件包

    分享请标明来自 https www css3 io how yum install release soft html 背景 使用yum安装软件时 会经常遇到安装的软件是老版本的 被非自己期望的release版本 这到底是怎么回事儿 得从r
  • 【2023】java通过modbus4j实现modus TCP通讯

    Modbus通信协议 主要分为三个子协议 RTU ASCII TCP Modbus RTU 传输的是字节数组 bit 通信 读写 输出 可以读写 输入 只能读 存储区 输出线圈 输入线圈 输出寄存器 输入寄存器 线圈 代表一个布尔量 最小单
  • Docker环境安装

    Docker环境安装 Docker简介 Docker工作原理 Docker的应用场景 Docker 的优点 CentOS Docker 安装与配置 Docker 安装 Docker 配置 Docker容器概念 Docker容器操作 拉取镜像
  • Java 实现文件复制及文件夹复制

    在Java中 有多种方法可以实现文件的复制 以下是几种常用的方式 使用字节流进行复制 通过FileInputStream和FileOutputStream分别创建源文件和目标文件的输入输出流 然后通过循环读取源文件内容 并将数据写入目标文件
  • mysql相加并输出_用shell把执行的两条sql语句相加并输出数据库名和相加的值以tab健隔开...

    SELECT SUM PresentSum FROM Lg ConsumeDetail0 WHERE ConsumeDate lt 2016 11 01 AND OpId 300 SELECT SUM PresentSum FROM Lg
  • vcpu和physical cpu 绑定

    参考libvirt中xml的写法可以将vpu和物理cpu 绑定 https libvirt org formatdomain html elementsCPUTuning
  • Unity鼠标光标使用学习

    Unity下的鼠标光标程序相关的就一个类下的2 3个方法 首先 光标导入图片的设置需要将类型设置为Cursor 设置鼠标光标的方法就一个 SetCursor 第一个参数是图片 第二个参数是点击点的偏移量 第三个参数是类型 public Te
  • Apollo配置中心Client源码学习(二)-- 配置同步

    上一篇文章 https blog csdn net crystonesc article details 106630412 我们从Apollo社区给出的DEMO开始逐步分析了Apollo客户端配置的创建过程 作为Apollo配置中心Cli