1. 概述
老艿艿:本系列假定胖友已经阅读过 《Apollo 官方 wiki 文档》 ,特别是 《Java 客户端使用指南》 。
本文接 《Apollo 源码解析 —— 客户端 API 配置(一)之一览》 一文,分享 Config 接口,及其子类,如下图:
2. Config
在 《Apollo 源码解析 —— 客户端 API 配置(一)之一览》 的 「3.1 Config」 中,有详细分享。
3. AbstractConfig
com.ctrip.framework.apollo.internals.AbstractConfig
,实现 Config 接口,Config 抽象类,实现了1)缓存读取属性值、2)异步通知监听器、3)计算属性变化等等特性。
3.1 构造方法
private static final Logger logger = LoggerFactory.getLogger(AbstractConfig.class);
/**
* ExecutorService 对象,用于配置变化时,异步通知 ConfigChangeListener 监听器们
*
* 静态属性,所有 Config 共享该线程池。
*/
private static ExecutorService m_executorService;
/**
* ConfigChangeListener 集合
*/
private List<ConfigChangeListener> m_listeners = Lists.newCopyOnWriteArrayList();
private ConfigUtil m_configUtil;
private volatile Cache<String, Integer> m_integerCache;
private volatile Cache<String, Long> m_longCache;
private volatile Cache<String, Short> m_shortCache;
private volatile Cache<String, Float> m_floatCache;
private volatile Cache<String, Double> m_doubleCache;
private volatile Cache<String, Byte> m_byteCache;
private volatile Cache<String, Boolean> m_booleanCache;
private volatile Cache<String, Date> m_dateCache;
private volatile Cache<String, Long> m_durationCache;
/**
* 数组属性 Cache Map
*
* KEY:分隔符
* KEY2:属性建
*/
private Map<String, Cache<String, String[]>> m_arrayCache; // 并发 Map
/**
* 上述 Cache 对象集合
*/
private List<Cache> allCaches;
/**
* 缓存版本号,用于解决更新缓存可能存在的并发问题。详细见 {@link #getValueAndStoreToCache(String, Function, Cache, Object)} 方法
*/
private AtomicLong m_configVersion; //indicate config version
static {
m_executorService = Executors.newCachedThreadPool(ApolloThreadFactory.create("Config", true));
}
public AbstractConfig() {
m_configUtil = ApolloInjector.getInstance(ConfigUtil.class);
m_configVersion = new AtomicLong();
m_arrayCache = Maps.newConcurrentMap();
allCaches = Lists.newArrayList();
}
private <T> Cache<String, T> newCache() {
// 创建 Cache 对象
Cache<String, T> cache = CacheBuilder.newBuilder()
.maximumSize(m_configUtil.getMaxConfigCacheSize()) // 500
.expireAfterAccess(m_configUtil.getConfigCacheExpireTime(), // 1 分钟
m_configUtil.getConfigCacheExpireTimeUnit())
.build();
// 添加到 Cache 集合
allCaches.add(cache);
return cache;
}
-
allCaches
字段,上述 Cache 对象集合,用于 #clearConfigCache()
方法,清空缓存。代码如下:
protected void clearConfigCache() {
synchronized (this) {
// 过期缓存
for (Cache c : allCaches) {
if (c != null) {
c.invalidateAll();
}
}
// 新增版本号
m_configVersion.incrementAndGet();
}
}
-
-
synchronized ,用于和
#getValueAndStoreToCache(...)
方法,在更新缓存时的互斥,避免并发。
- 每次过期完所有的缓存后,版本号 + 1 。
3.2 获得属性值
AbstractConfig 实现了所有的获得属性值的方法,除了 #getProperty(key, defaultValue)
方法。我们以 #getIntProperty(key, defaultValue)
方法,举例子。代码如下:
@Override
public Integer getIntProperty(String key, Integer defaultValue) {
try {
// 初始化缓存
if (m_integerCache == null) {
synchronized (this) {
if (m_integerCache == null) {
m_integerCache = newCache();
}
}
}
// 从缓存中,读取属性值
return getValueFromCache(key, Functions.TO_INT_FUNCTION, m_integerCache, defaultValue);
} catch (Throwable ex) {
Tracer.logError(new ApolloConfigException(
String.format("getIntProperty for %s failed, return default value %d", key,
defaultValue), ex));
}
// 默认值
return defaultValue;
}
- 调用
#getValueFromCache(key, Function, cache, defaultValue)
方法,从缓存中,读取属性值。比较特殊的是 Function 方法参数,我们下面详细解析。
#getValueFromCache(key, Function, cache, defaultValue)
方法,代码如下:
private <T> T getValueFromCache(String key, Function<String, T> parser, Cache<String, T> cache, T defaultValue) {
// 获得属性值
T result = cache.getIfPresent(key);
// 若存在,则返回
if (result != null) {
return result;
}
// 获得值,并更新到缓存
return getValueAndStoreToCache(key, parser, cache, defaultValue);
}
- 优先,从缓存中,获得属性值。若获取不到,调用
#getValueAndStoreToCache(key, Function, cache, defaultValue)
方法,获得值,并更新到缓存。
#getValueAndStoreToCache(key, Function, cache, defaultValue)
方法,代码如下:
private <T> T getValueAndStoreToCache(String key, Function<String, T> parser, Cache<String, T> cache, T defaultValue) {
// 获得当前版本号
long currentConfigVersion = m_configVersion.get();
// 获得属性值
String value = getProperty(key, null);
// 若获得到属性,返回该属性值
if (value != null) {
// 解析属性值
T result = parser.apply(value);
// 若解析成功
if (result != null) {
// 若版本号未变化,则更新到缓存,从而解决并发的问题。
synchronized (this) {
if (m_configVersion.get() == currentConfigVersion) {
cache.put(key, result);
}
}
// 返回属性值
return result;
}
}
// 获得不到属性值,返回默认值
return defaultValue;
}
3.2.1 Functions
com.ctrip.framework.apollo.util.function.Functions
,枚举了所有解析字符串成对应数据类型的 Function 。代码如下:
public interface Functions {
Function<String, Integer> TO_INT_FUNCTION = new Function<String, Integer>() {
@Override
public Integer apply(String input) {
return Integer.parseInt(input);
}
};
Function<String, Long> TO_LONG_FUNCTION = new Function<String, Long>() {
@Override
public Long apply(String input) {
return Long.parseLong(input);
}
};
Function<String, Short> TO_SHORT_FUNCTION = new Function<String, Short>() {
@Override
public Short apply(String input) {
return Short.parseShort(input);
}
};
Function<String, Float> TO_FLOAT_FUNCTION = new Function<String, Float>() {
@Override
public Float apply(String input) {
return Float.parseFloat(input);
}
};
Function<String, Double> TO_DOUBLE_FUNCTION = new Function<String, Double>() {
@Override
public Double apply(String input) {
return Double.parseDouble(input);
}
};
Function<String, Byte> TO_BYTE_FUNCTION = new Function<String, Byte>() {
@Override
public Byte apply(String input) {
return Byte.parseByte(input);
}
};
Function<String, Boolean> TO_BOOLEAN_FUNCTION = new Function<String, Boolean>() {
@Override
public Boolean apply(String input) {
return Boolean.parseBoolean(input);
}
};
Function<String, Date> TO_DATE_FUNCTION = new Function<String, Date>() {
@Override
public Date apply(String input) {
try {
return Parsers.forDate().parse(input);
} catch (ParserException ex) {
throw new ApolloConfigException("Parse date failed", ex);
}
}
};
Function<String, Long> TO_DURATION_FUNCTION = new Function<String, Long>() {
@Override
public Long apply(String input) {
try {
return Parsers.forDuration().parseToMillis(input);
} catch (ParserException ex) {
throw new ApolloConfigException("Parse duration failed", ex);
}
}
};
}
- 因为 Function 在 JDK 1.8 才支持,所以此处使用的是
com.google.common.base.Function
。
-
TO_DATE_FUNCTION
和 TO_DURATION_FUNCTION
,具体的解析,使用 com.ctrip.framework.apollo.util.parser.Parsers
。(⊙v⊙)嗯,还是感兴趣的胖友,自己去查看。
3.3 计算配置变更集合
List<ConfigChange> calcPropertyChanges(String namespace, Properties previous, Properties current) {
if (previous == null) {
previous = new Properties();
}
if (current == null) {
current = new Properties();
}
Set<String> previousKeys = previous.stringPropertyNames();
Set<String> currentKeys = current.stringPropertyNames();
Set<String> commonKeys = Sets.intersection(previousKeys, currentKeys); // 交集
Set<String> newKeys = Sets.difference(currentKeys, commonKeys); // 新集合 - 交集 = 新增
Set<String> removedKeys = Sets.difference(previousKeys, commonKeys); // 老集合 - 交集 = 移除
List<ConfigChange> changes = Lists.newArrayList();
// 计算新增的
for (String newKey : newKeys) {
changes.add(new ConfigChange(namespace, newKey, null, current.getProperty(newKey), PropertyChangeType.ADDED));
}
// 计算移除的
for (String removedKey : removedKeys) {
changes.add(new ConfigChange(namespace, removedKey, previous.getProperty(removedKey), null, PropertyChangeType.DELETED));
}
// 计算修改的
for (String commonKey : commonKeys) {
String previousValue = previous.getProperty(commonKey);
String currentValue = current.getProperty(commonKey);
if (Objects.equal(previousValue, currentValue)) {
continue;
}
changes.add(new ConfigChange(namespace, commonKey, previousValue, currentValue, PropertyChangeType.MODIFIED));
}
return changes;
}
3.4 添加配置变更监听器
@Override
public void addChangeListener(ConfigChangeListener listener) {
if (!m_listeners.contains(listener)) {
m_listeners.add(listener);
}
}
3.5 触发配置变更监听器们
protected void fireConfigChange(final ConfigChangeEvent changeEvent) {
// 缓存 ConfigChangeListener 数组
for (final ConfigChangeListener listener : m_listeners) {
m_executorService.submit(new Runnable() {
@Override
public void run() {
String listenerName = listener.getClass().getName();
Transaction transaction = Tracer.newTransaction("Apollo.ConfigChangeListener", listenerName);
try {
// 通知监听器
listener.onChange(changeEvent);
transaction.setStatus(Transaction.SUCCESS);
} catch (Throwable ex) {
transaction.setStatus(ex);
Tracer.logError(ex);
logger.error("Failed to invoke config change listener {}", listenerName, ex);
} finally {
transaction.complete();
}
}
});
}
}
- 提交到线程池中,异步并发通知监听器们,从而避免有些监听器执行时间过长。
4. DefaultConfig
com.ctrip.framework.apollo.internals.DefaultConfig
,实现 RepositoryChangeListener 接口,继承 AbstractConfig 抽象类,默认 Config 实现类。
4.1 构造方法
private static final Logger logger = LoggerFactory.getLogger(DefaultConfig.class);
/**
* Namespace 的名字
*/
private final String m_namespace;
/**
* 配置 Properties 的缓存引用
*/
private AtomicReference<Properties> m_configProperties;
/**
* 配置 Repository
*/
private ConfigRepository m_configRepository;
/**
* 项目下,Namespace 对应的配置文件的 Properties
*/
private Properties m_resourceProperties;
/**
* 答应告警限流器。当读取不到属性值,会打印告警日志。通过该限流器,避免打印过多日志。
*/
private RateLimiter m_warnLogRateLimiter;
/**
* Constructor.
*
* @param namespace the namespace of this config instance
* @param configRepository the config repository for this config instance
*/
public DefaultConfig(String namespace, ConfigRepository configRepository) {
m_namespace = namespace;
m_resourceProperties = loadFromResource(m_namespace);
m_configRepository = configRepository;
m_configProperties = new AtomicReference<>();
m_warnLogRateLimiter = RateLimiter.create(0.017); // 1 warning log output per minute
// 初始化
initialize();
}
-
m_namespace
字段,Namespace 的名字。
-
m_configProperties
字段,配置 Properties 的缓存引用。
-
m_configRepository
字段,配置 Repository 。DefaultConfig 会从 ConfigRepository 中,加载配置 Properties ,并更新到 m_configProperties
中。
-
为什么 DefaultConfig 实现 RepositoryChangeListener 接口?ConfigRepository 的一个实现类 RemoteConfigRepository ,会从远程 Config Service 加载配置。但是 Config Service 的配置不是一成不变,可以在 Portal 进行修改。所以 RemoteConfigRepository 会在配置变更时,从 Admin Service 重新加载配置。为了实现 Config 监听配置的变更,所以需要将 DefaultConfig 注册为 ConfigRepository 的监听器。因此,DefaultConfig 需要实现 RepositoryChangeListener 接口。详细解析,见 《Apollo 源码解析 —— Client 轮询配置》 。
-
#initialize()
方法,初始拉取 ConfigRepository 的配置,更新到 m_configProperties
中,并注册自己到 ConfigRepository 为监听器。代码如下:
private void initialize() {
try {
// 初始化 m_configProperties
m_configProperties.set(m_configRepository.getConfig());
} catch (Throwable ex) {
Tracer.logError(ex);
logger.warn("Init Apollo Local Config failed - namespace: {}, reason: {}.", m_namespace, ExceptionUtil.getDetailMessage(ex));
} finally {
// register the change listener no matter config repository is working or not
// so that whenever config repository is recovered, config could get changed
// 注册到 ConfigRepository 中,从而实现每次配置发生变更时,更新配置缓存 `m_configProperties` 。
m_configRepository.addChangeListener(this);
}
}
-
m_resourceProperties
字段,项目下,Namespace 对应的配置文件的 Properties 。代码如下:
private Properties loadFromResource(String namespace) {
// 生成文件名
String name = String.format("META-INF/config/%s.properties", namespace);
// 读取 Properties 文件
InputStream in = ClassLoaderUtil.getLoader().getResourceAsStream(name);
Properties properties = null;
if (in != null) {
properties = new Properties();
try {
properties.load(in);
} catch (IOException ex) {
Tracer.logError(ex);
logger.error("Load resource config for namespace {} failed", namespace, ex);
} finally {
try {
in.close();
} catch (IOException ex) {
// ignore
}
}
}
return properties;
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)