当 Archiver Appliance 开始运行后发生了什么:
mgmt服务:
config\DefaultConfigService.java
initialize() {
this.mgmtRuntime = new MgmtRuntimeState(this);
startupExecutor = Executors.newScheduledThreadPool(1, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("Startup executor");
return t;
}
});
MgmtPostStartup mgmtPostStartup = new MgmtPostStartup(this);
ScheduledFuture<?> postStartupFuture = startupExecutor.scheduleAtFixedRate(mgmtPostStartup, 10, 20, TimeUnit.SECONDS);
mgmtPostStartup.setCancellingFuture(postStartupFuture);
}
创建 mgmt 状态机 —— MgmtRuntimeState;
定期执行 mgmt 后处理 —— MgmtPostStartup。
mgmt\MgmtPostStartup.java
public MgmtPostStartup(ConfigService configService) {
this.configService = configService;
}
@Override
public void run() {
if(this.configService.isStartupComplete()) {
this.checkIfAllComponentsHaveStartedUp();
if(this.configService.getMgmtRuntimeState().haveChildComponentsStartedUp()) {
cancellingFuture.cancel(false);
}
} else {
configService.postStartup();
}
}
checkIfAllComponentsHaveStartedUp() {
if(retrievalStartupState == ConfigService.STARTUP_SEQUENCE.STARTUP_COMPLETE) {
configService.getMgmtRuntimeState().componentStartedUp(WAR_FILE.RETRIEVAL);
}
...
}
定期执行的 mgmt 后处理包括:
若 configService 未完成,则执行 configService.postStartup() ;
若 configService 已完成,检查 mgmt、engine、etl、retrieval 四个服务是否都启动成功;
每个服务启动成功时,都会更新 mgmt 状态机中已启动服务列表;
若四个服务都成功启动,则开始存储 PV 值 —— startArchivePVRequests()。
config\DefaultConfigService.java
postStartup() {
Config config = new XmlConfigBuilder().build();
...
hzinstance = Hazelcast.newHazelcastInstance(config);
pubSub = hzinstance.getTopic("pubSub");
initializePersistenceLayer();
loadTypeInfosFromPersistence();
loadAliasesFromPersistence();
loadArchiveRequestsFromPersistence();
loadExternalServersFromPersistence();
registerForNewExternalServers(hzinstance.getMap("channelArchiverDataServers"));
eventBus.register(this);
pubSub.addMessageListener(new MessageListener<PubSubEvent>() {
@Override
public void onMessage(Message<PubSubEvent> pubSubEventMsg) {
PubSubEvent pubSubEvent = pubSubEventMsg.getMessageObject();
if(pubSubEvent.getDestination().equals("ALL")
|| (pubSubEvent.getDestination().startsWith(myIdentity) && pubSubEvent.getDestination().endsWith(DefaultConfigService.this.warFile.toString()))
) {
pubSubEvent.markSourceAsCluster();
eventBus.post(pubSubEvent);
}
});
this.startupState = STARTUP_SEQUENCE.STARTUP_COMPLETE;
}
将数据库中存储的 PV 信息加载到当前环境;
建立集群信息、事务通知机制:
集群中某个appliance发送消息后,先通过 PubSubEvent 推送到集群,该消息的目的appliance收到并发现是发给自己的某个服务的消息后,再通过 eventBus 推送到本地。
initializePersistenceLayer() throws ConfigException {
persistanceLayer = new MySQLPersistence();
}
mgmt\MgmtRuntimeState.java
private static int threadNumber = 1;
ScheduledExecutorService archivePVWorkflow = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("MgmtArchivePVWorkflow" + threadNumber++);
return t;
}
});
public MgmtRuntimeState(final ConfigService configService) {
this.configService = configService;
myIdentity = this.configService.getMyApplianceInfo().getIdentity();
...
configService.getEventBus().register(this);
}
componentStartedUp(WAR_FILE component) {
componentsThatHaveCompletedStartup.add(component);
if(this.haveChildComponentsStartedUp()) {
this.startArchivePVRequests();
}
}
private void startArchivePVRequests() {
for(String pvNameFromPersistence : configService.getArchiveRequestsCurrentlyInWorkflow()) {
this.startPVWorkflow(pvNameFromPersistence);
}
startArchivePVWorkflow(initialDelayInSeconds);
}
startPVWorkflow() {
ArchivePVState pvState = new ArchivePVState(pvName, configService);
currentPVRequests.put(pvName, pvState);
}
private void startArchivePVWorkflow() {
theArchiveWorkflow = archivePVWorkflow.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
LinkedList<ArchivePVState> archivePVStates = new LinkedList<ArchivePVState>(currentPVRequests.values());
int totRequests = archivePVStates.size();
int maxRequestsToProcess = Math.min(archivePVWorkflowBatchSize, totRequests);
int pvCount = 0;
while(pvCount < maxRequestsToProcess) {
ArchivePVState runWorkFlowForPV = archivePVStates.pop();
runWorkFlowForPV.nextStep();
pvCount++;
}
}, initialDelayInSeconds, archivePVWorkflowTickSeconds, TimeUnit.SECONDS);
}
在四个服务都成功启动、开始存储 PV 值后,执行:
对数据库 ArchivePVRequests 表中的 PV 开始存储流程;
定期对 currentPVRequests 中最多前 archivePVWorkflowBatchSize 个 PV 的存储状态机执行状态更新操作。
当在web端点击”Archive PV”后发生了什么:
mgmt\BPLServlet.java:
doPost() {
BasicDispatcher.dispatch(req, resp, configService, postActions);
}
common\BasicDispatcher.java:
dispatch() {
action = actionClass.getConstructor().newInstance();
action.execute(req, resp, configService);
}
mgmt\bpl\ArchivePVAction.java
execute() {
archivePV(out, pv, samplingPeriodSpecified, samplingMethod, samplingPeriod, controllingPV, policyName, null, skipCapacityPlanning, configService, fieldsAsPartOfStream);
}
archivePV() {
configService.addToArchiveRequests(actualPVName, userSpecifiedSamplingParams);
configService.getMgmtRuntimeState().startPVWorkflow(pvName);
}
config\DefaultConfigService.java
archivePVRequests = hzinstance.getMap("archivePVRequests");
addToArchiveRequests() {
archivePVRequests.put(pvName, userSpecifiedSamplingParams);
persistanceLayer.putArchivePVRequest(pvName, userSpecifiedSamplingParams);
}
mgmt\MgmtRuntimeState.java
startPVWorkflow() {
ArchivePVState pvState = new ArchivePVState(pvName, configService);
currentPVRequests.put(pvName, pvState);
}
mgmt\archivepv\ArchivePVState.java
private ArchivePVStateMachine currentState = ArchivePVStateMachine.START;
public ArchivePVState(String pvName, ConfigService configService) {
this.myIdentity = this.configService.getMyApplianceInfo().getIdentity();
this.fieldsArchivedAsPartOfStream = configService.getFieldsArchivedAsPartOfStream();
}
public synchronized void nextStep() {
switch(currentState) {
case START: {
PubSubEvent pubSubEvent = new PubSubEvent("ComputeMetaInfo", myIdentity + "_" + ConfigService.WAR_FILE.ENGINE, pvName);
...
pubSubEvent.setEventData(encoder.encode(userSpec).toJSONString());
configService.getEventBus().post(pubSubEvent);
currentState = ArchivePVStateMachine.METAINFO_REQUESTED;
return;
}
case METAINFO_REQUESTED: {
logger.debug("A request to gather metainfo has been published for the PV " + pvName);
return;
}
case METAINFO_GATHERING: {
logger.debug("Metainfo has been requested and is being gathered for " + pvName);
return;
}
case METAINFO_OBTAINED: {
PolicyConfig thePolicy = configService.computePolicyForPV(pvName, metaInfo, userSpec);
...
PVTypeInfo typeInfo = new PVTypeInfo(pvName, metaInfo.getArchDBRTypes(), !metaInfo.isVector(), metaInfo.getCount());
...
ApplianceInfo applianceInfoForPV = null;
...
configService.registerPVToAppliance(pvName, applianceInfoForPV);
typeInfo.setApplianceIdentity(applianceIdentityAfterCapacityPlanning);
configService.updateTypeInfoForPV(pvName, typeInfo);
currentState = ArchivePVStateMachine.POLICY_COMPUTED;
return;
}
case POLICY_COMPUTED: {
PVTypeInfo typeInfo = configService.getTypeInfoForPV(pvName);
if(typeInfo.getApplianceIdentity().equals(applianceIdentityAfterCapacityPlanning)) {
currentState = ArchivePVStateMachine.TYPEINFO_STABLE;
}
return;
}
case TYPEINFO_STABLE: {
PVTypeInfo typeInfo = configService.getTypeInfoForPV(pvName);
ArchivePVState.startArchivingPV(pvName, configService, configService.getAppliance(typeInfo.getApplianceIdentity()));
registerAliasesIfAny(typeInfo);
currentState = ArchivePVStateMachine.ARCHIVE_REQUEST_SUBMITTED;
return;
}
case ARCHIVE_REQUEST_SUBMITTED:
return;
case ARCHIVING: {
configService.archiveRequestWorkflowCompleted(pvName);
configService.getMgmtRuntimeState().finishedPVWorkflow(pvName);
currentState = ArchivePVStateMachine.FINISHED;
return;
}
case ABORTED: {
configService.archiveRequestWorkflowCompleted(pvName);
configService.getMgmtRuntimeState().finishedPVWorkflow(pvName);
currentState = ArchivePVStateMachine.FINISHED;
return;
}
case FINISHED: {
logger.error("Archive state for PV " + this.pvName + " is finished.");
return;
}
}
}
public void metaInfoRequestAcknowledged() {
metaInfoRequestedSubmitted = TimeUtils.now();
this.currentState = ArchivePVStateMachine.METAINFO_GATHERING;
}
public void metaInfoObtained(MetaInfo metaInfo) {
this.metaInfo = metaInfo;
this.currentState = ArchivePVStateMachine.METAINFO_OBTAINED;
}
public static void startArchivingPV() throws IOException {
PubSubEvent pubSubEvent = new PubSubEvent("StartArchivingPV", applianceInfoForPV.getIdentity() + "_" + ConfigService.WAR_FILE.ENGINE, pvName);
configService.getEventBus().post(pubSubEvent);
}
public void confirmedStartedArchivingPV() {
this.currentState = ArchivePVStateMachine.ARCHIVING;
}
对单个 PV 的存储流程主要分两大部分:
创建 PV 通道,元数据信息获取;
开始执行数据存储。
TYPEINFO_STABLE 和 ARCHIVE_REQUEST_SUBMITTED 这两个状态,是这两大部分的分界点。
若 ”Archive PV” 不成功,或者成功后出现 PV unconnected 等错误情况,应首先定位问题属于这两大部分的哪一部分,再去具体定位问题。
关于 applianceInfoForPV 的设置过程,详见 Archiver Appliance 建立集群时可能出现的问题 。
engine\pv\EngineContext.java
@Subscribe public void computeMetaInfo(PubSubEvent pubSubEvent) {
if(pubSubEvent.getType().equals("ComputeMetaInfo")) {
ArchiveEngine.getArchiveInfo(pvName, configService, extraFields, userSpec.isUsePVAccess(), new ArchivePVMetaCompletedListener(pvName, configService, myIdentity));
PubSubEvent confirmationEvent = new PubSubEvent("MetaInfoRequested", pubSubEvent.getSource() + "_" + ConfigService.WAR_FILE.MGMT, pvName);
configService.getEventBus().post(confirmationEvent);
} else if(pubSubEvent.getType().equals("StartArchivingPV")) {
String pvName = pubSubEvent.getPvName();
this.startArchivingPV(pvName);
PubSubEvent confirmationEvent = new PubSubEvent("StartedArchivingPV", pubSubEvent.getSource() + "_" + ConfigService.WAR_FILE.MGMT, pvName);
configService.getEventBus().post(confirmationEvent);
} else if(pubSubEvent.getType().equals("AbortComputeMetaInfo")) {
String pvName = pubSubEvent.getPvName();
this.abortComputeMetaInfo(pvName);
}
}
private void startArchivingPV(String pvName) {
PVTypeInfo typeInfo = configService.getTypeInfoForPV(pvName);
...
ArchiveEngine.archivePV(pvName, samplingPeriod, samplingMethod, secondsToBuffer, firstDest, configService, dbrType, lastKnownTimeStamp, controllingPV, archiveFields, typeInfo.getHostName(), typeInfo.isUsePVAccess(), typeInfo.isUseDBEProperties());
}
engine\ArchiveEngine.java
public static void getArchiveInfo(final String pvName,
final ConfigService configservice, final String metadatafields[], boolean usePVAccess,
final MetaCompletedListener metaListener) throws Exception {
MetaGet metaget = new MetaGet(pvName, configservice, metadatafields, usePVAccess, metaListener);
metaget.initpv();
}
public static void archivePV() {
ArchiveEngine.createChannels4PVWithMetaField(pvName, samplingPeriod, mode, secondstoBuffer, writer,
configservice, archdbrtype, lastKnownEventTimeStamp, start, null, metaFieldNames, iocHostName, usePVAccess, useDBEProperties);
}
private static void createChannels4PVWithMetaField() {
ArchiveChannel channel = ArchiveEngine.addChannel(pvName, writer, Enablement.Enabling, scan_mode2,
lastKnownEventTimeStamp,configservice, archdbrtype, controlPVname, iocHostName, usePVAccess);
channel.initializeMetaFieldPVS(metaFields, configservice, usePVAccess, useDBEProperties);
}
private static ArchiveChannel addChannel() {
...
int JCACommandThreadID = engineContext.assignJCACommandThread(name, iocHostName);
if (sample_mode.isMonitor()) {
if (sample_mode.getDelta() > 0) {
channel = new DeltaArchiveChannel(name, writer, enablement, buffer_capacity, last_sampleTimestamp, pvSamplingPeriod, sample_mode.getDelta(),configservice, archdbrtype, controlPVname, JCACommandThreadID, usePVAccess);
} else {
channel = new MonitoredArchiveChannel(name, writer, enablement, buffer_capacity, last_sampleTimestamp, pvSamplingPeriod, configservice, archdbrtype, controlPVname, JCACommandThreadID, usePVAccess);
}
} else {
channel = new ScannedArchiveChannel(name, writer, enablement, buffer_capacity, last_sampleTimestamp, pvSamplingPeriod, configservice, archdbrtype, controlPVname, JCACommandThreadID, usePVAccess);
}
configservice.getEngineContext().getChannelList().put(channel.getName(), channel);
engineContext.getWriteThead().addChannel(channel);
return channel;
}
engine\metadata\MetaGet.java
initpv() throws Exception {
int jcaCommandThreadId = configservice.getEngineContext().assignJCACommandThread(pvName, null);
PV pv = PVFactory.createPV(pvName, configservice, jcaCommandThreadId, usePVAccess);
pv.addListener(new PVListener() {
@Override
public void pvValueUpdate(PV pv, DBRTimeEvent ev) {}
@Override
public void pvDisconnected(PV pv) {}
@Override
public void pvConnected(PV pv) {
...
}
@Override
public void pvConnectionRequestMade(PV pv) {}
@Override
public void sampleDroppedTypeChange(PV pv, ArchDBRTypes newDBRtype) {}
});
pvList.put("main", pv);
pv.start();
class FieldListener implements PVListener {
...
}
}
@Override
public void run() {
...
metaListener.completed(mainMeta);
}
创建 PV 通道,添加事件监听,获取 PV 元数据信息。
engine\pv\EngineContext.java
static class ArchivePVMetaCompletedListener implements MetaCompletedListener {
@Override
public void completed(MetaInfo metaInfo) {
PubSubEvent confirmationEvent = new PubSubEvent("MetaInfoFinished", myIdentity + "_" + ConfigService.WAR_FILE.MGMT, pvName);
confirmationEvent.setEventData(JSONValue.toJSONString(metaInfoObj));
configService.getEventBus().post(confirmationEvent);
}
}
mgmt\MgmtRuntimeState.java
@Subscribe public void computeMetaInfo(PubSubEvent pubSubEvent) {
if(pubSubEvent.getType().equals("MetaInfoRequested")) {
String pvName = pubSubEvent.getPvName();
ArchivePVState pvState = currentPVRequests.get(pvName);
pvState.metaInfoRequestAcknowledged();
} else if (pubSubEvent.getType().equals("MetaInfoFinished")) {
String pvName = pubSubEvent.getPvName();
ArchivePVState pvState = currentPVRequests.get(pvName);
MetaInfo metaInfo = new MetaInfo();
decoder.decode(metaInfoObj, metaInfo);
pvState.metaInfoObtained(metaInfo);
} else if (pubSubEvent.getType().equals("StartedArchivingPV")) {
String pvName = pubSubEvent.getPvName();
pvState.confirmedStartedArchivingPV();
}
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)