接上篇TODO
Elasticsearch CCR源码分析
上篇TODO:
http请求(ccr/follow)接收到后,follow集群节点开始全量同步,是以snapshot的模式去拉leader集群数据的,那么是在什么时候将leader集群伪装成snapshot的repository的?理论上应该是在Node初始化的时候...还未验证,后续再补充该逻辑...
如何加载ccr repositoryService
1.node启动:
进入构造方法:
org.elasticsearch.node.Node#Node(org.elasticsearch.env.Environment, java.util.Collection<java.lang.Class<? extends org.elasticsearch.plugins.Plugin>>, boolean)
2.CcrRepositoryManager在节点启动的时候通过调用为每一个remoteCluster注册了名为 “_ccr_” + remoteCluster 的ccrRepository
Collection<Object> pluginComponents = pluginsService.filterPlugins(Plugin.class).stream()
.flatMap(p -> p.createComponents(client, clusterService, threadPool, resourceWatcherService,
scriptModule.getScriptService(), xContentRegistry, environment, nodeEnvironment,
namedWriteableRegistry).stream())
.collect(Collectors.toList());
在createComponents方法(org.elasticsearch.xpack.ccr.Ccr#createComponents)中,调用了
new CcrRepositoryManager(settings, clusterService, (NodeClient) client)
继续加载其他...
开始调用org.elasticsearch.xpack.ccr.CcrRepositoryManager.RemoteSettingsUpdateListener#updateRemoteCluster
@Override
protected void updateRemoteCluster(String clusterAlias, List<String> addresses, String proxy, boolean compressionEnabled,
TimeValue pingSchedule) {
String repositoryName = CcrRepository.NAME_PREFIX + clusterAlias;
if (addresses.isEmpty()) {
deleteRepository(repositoryName);
} else {
putRepository(repositoryName);
}
}
经历一系列方法:
最终--->org.elasticsearch.xpack.ccr.CcrRepositoryManager#putRepository
3.加载snapshot/restore的module
modules.add(new RepositoriesModule(this.environment, pluginsService.filterPlugins(RepositoryPlugin.class), transportService,
clusterService, threadPool, xContentRegistry));
4.进入RepositoriesModule构造方法:
Map<String, Repository.Factory> newRepoTypes = repoPlugin.getInternalRepositories(env, namedXContentRegistry);
5.进入getInternalRepositories实现类:
执行重写的 getInternalRepositories方法
org.elasticsearch.xpack.ccr.Ccr#getInternalRepositories
@Override
public Map<String, Repository.Factory> getInternalRepositories(Environment env, NamedXContentRegistry namedXContentRegistry) {
Repository.Factory repositoryFactory =
(metadata) -> new CcrRepository(metadata, client, ccrLicenseChecker, settings, ccrSettings.get(), threadPool.get());
return Collections.singletonMap(CcrRepository.TYPE, repositoryFactory);
}
6.org.elasticsearch.repositories.RepositoriesModule#RepositoriesModule构造了repositoriesService
7.开始注入初始化的一些bean或者service(相当于Spring的单例池)
injector = modules.createInjector();
8.node初始化的最后,暴露rest API
if (NetworkModule.HTTP_ENABLED.get(settings)) {
logger.debug("initializing HTTP handlers ...");
actionModule.initRestHandlers(() -> clusterService.state().nodes());
}
9.小结
Node初始化的时候会将很多handler、service初始化完成(包括CCR相关service,repository),并且直接为每一个remoteCluster注册了名为 “_ccr_” + remoteCluster的ccrRepository
等到触发全量同步的时候(API触发或者auto API自动触发,检测是否是开启了Ccr且是Ccr的Restore流程,则会走到CcrRepository,否则走正常的repository),会走到org.elasticsearch.xpack.ccr.repository.CcrRepository的逻辑开始restore(即将leader集群视为一个仓库,leader数据视为名为“_latest_”的一个snapshot,获取元数据 & 数据)
void restoreFiles() throws IOException {
ArrayList<FileInfo> fileInfos = new ArrayList<>();
for (StoreFileMetaData fileMetaData : sourceMetaData) {
ByteSizeValue fileSize = new ByteSizeValue(fileMetaData.length());
fileInfos.add(new FileInfo(fileMetaData.name(), fileMetaData, fileSize));
}
SnapshotFiles snapshotFiles = new SnapshotFiles(LATEST, fileInfos);
// 获取数据的核心方法
restore(snapshotFiles);
}
TODO
到此CCR的逻辑已经完全通了,CCR借助snapshot的逻辑进行restore的细节仍然不明朗,例如,把leader集群视为一个repository,将leader集群的数据作为snapshot,是如何进行恢复的?直接拷贝文件到follow集群本地,follow集群的shard逐一开始恢复?增量复制的时候直接读取leader集群的operations(translog),是直接读取在内存中直接回放(bulk写入follow集群shard)?还是其他逻辑?