本文深入探讨一下变量监测的用法和原理。
一 累积传输
先前写的关于变量监测的文章,都是设置一个采样时间,然后有变化了就通知一下,有时我们希望变化累积一段时间再一起传给client,这时如何设置呢?
直接上代码,如下,是client端,
#include <stdlib.h>
#include <signal.h>
#include "open62541.h"
UA_Boolean running = true;
UA_NodeId gTargetId = UA_NODEID_STRING(1, (char*)"the.answer");
void stopHandler(int sign) {
UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "received ctrl-c");
running = false;
}
static void handler_DataChanged(UA_Client *client, UA_UInt32 subId,
void *subContext, UA_UInt32 monId,
void *monContext, UA_DataValue *value)
{
UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_CLIENT, "Received Notification");
UA_NodeId *ptr = (UA_NodeId*)monContext;
if (UA_NodeId_equal(ptr, &gTargetId))
{
UA_Int32 currentValue = *(UA_Int32*)(value->value.data);
UA_DateTimeStruct dts = UA_DateTime_toStruct(value->sourceTimestamp);
UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_CLIENT, "SubId:%u, MonId:%u, Current Value: %d, date is: %u-%u-%u %u:%u:%u.%03u\n",
subId, monId, currentValue,
dts.day, dts.month, dts.year, dts.hour, dts.min, dts.sec, dts.milliSec);
}
}
void addMonitoredItemToVariable(UA_Client *client)
{
UA_CreateSubscriptionRequest request = UA_CreateSubscriptionRequest_default();
request.requestedPublishingInterval = 10000;
UA_CreateSubscriptionResponse response = UA_Client_Subscriptions_create(client, request,
NULL, NULL, NULL);
UA_UInt32 subId = response.subscriptionId;
if(response.responseHeader.serviceResult == UA_STATUSCODE_GOOD)
{
UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_CLIENT, "Create subscription succeeded, id %u\n", subId);
}
UA_MonitoredItemCreateRequest monRequest =
UA_MonitoredItemCreateRequest_default(gTargetId);
monRequest.requestedParameters.samplingInterval = 500;
monRequest.requestedParameters.queueSize = 50;
UA_MonitoredItemCreateResult monResponse =
UA_Client_MonitoredItems_createDataChange(client, response.subscriptionId,
UA_TIMESTAMPSTORETURN_BOTH,
monRequest, &gTargetId,
handler_DataChanged, NULL);
if(monResponse.statusCode == UA_STATUSCODE_GOOD)
{
UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_CLIENT, "Monitoring 'the.answer', id %u\n",
monResponse.monitoredItemId);
}
}
int main(void)
{
signal(SIGINT, stopHandler);
signal(SIGTERM, stopHandler);
UA_Client *client = UA_Client_new();
UA_ClientConfig_setDefault(UA_Client_getConfig(client));
UA_StatusCode retval = UA_Client_connect(client, "opc.tcp://localhost:4840");
if(retval != UA_STATUSCODE_GOOD)
{
UA_Client_delete(client);
return (int)retval;
}
addMonitoredItemToVariable(client);
while(running)
{
UA_Client_run_iterate(client, 1000);
}
UA_Client_delete(client);
return EXIT_SUCCESS;
}
关键是函数addMonitoredItemToVariable(),里面先把发布的时间间隔设置为10s,然后把采样时间间隔设置为500ms,队列设置为50,也就是最多存放50个元素。如果超过50个,就会把旧的数据删除掉,这是依据UA_MonitoredItemCreateRequest_default()给的默认设置,如下,
static UA_INLINE UA_MonitoredItemCreateRequest
UA_MonitoredItemCreateRequest_default(UA_NodeId nodeId) {
UA_MonitoredItemCreateRequest request;
UA_MonitoredItemCreateRequest_init(&request);
request.itemToMonitor.nodeId = nodeId;
request.itemToMonitor.attributeId = UA_ATTRIBUTEID_VALUE;
request.monitoringMode = UA_MONITORINGMODE_REPORTING;
request.requestedParameters.samplingInterval = 250;
request.requestedParameters.discardOldest = true;
request.requestedParameters.queueSize = 1;
return request;
}
discardOldest是true
下面是Server代码,
#include <signal.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>
#include "open62541.h"
UA_Boolean running = true;
void stopHandler(int sign) {
UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "received ctrl-c");
running = false;
}
UA_NodeId addTheAnswerVariable(UA_Server *server)
{
UA_VariableAttributes attr = UA_VariableAttributes_default;
UA_Int32 myInteger = 1;
UA_Variant_setScalar(&attr.value, &myInteger, &UA_TYPES[UA_TYPES_INT32]);
attr.description = UA_LOCALIZEDTEXT((char*)"en-US", (char*)"the answer");
attr.displayName = UA_LOCALIZEDTEXT((char*)"en-US", (char*)"the answer");
attr.dataType = UA_TYPES[UA_TYPES_INT32].typeId;
attr.accessLevel = UA_ACCESSLEVELMASK_READ | UA_ACCESSLEVELMASK_WRITE;
UA_NodeId theAnswerNodeId = UA_NODEID_STRING(1, (char*)"the.answer");
UA_QualifiedName myIntegerName = UA_QUALIFIEDNAME(1, (char*)"the answer");
UA_NodeId parentNodeId = UA_NODEID_NUMERIC(0, UA_NS0ID_OBJECTSFOLDER);
UA_NodeId parentReferenceNodeId = UA_NODEID_NUMERIC(0, UA_NS0ID_ORGANIZES);
UA_Server_addVariableNode(server, theAnswerNodeId, parentNodeId,
parentReferenceNodeId, myIntegerName,
UA_NODEID_NUMERIC(0, UA_NS0ID_BASEDATAVARIABLETYPE), attr, NULL, NULL);
return theAnswerNodeId;
}
UA_NodeId addTheAnswer2Variable(UA_Server *server)
{
UA_VariableAttributes attr = UA_VariableAttributes_default;
UA_Int32 myInteger = 1;
UA_Variant_setScalar(&attr.value, &myInteger, &UA_TYPES[UA_TYPES_INT32]);
attr.description = UA_LOCALIZEDTEXT((char*)"en-US", (char*)"the answer2");
attr.displayName = UA_LOCALIZEDTEXT((char*)"en-US", (char*)"the answer2");
attr.dataType = UA_TYPES[UA_TYPES_INT32].typeId;
attr.accessLevel = UA_ACCESSLEVELMASK_READ | UA_ACCESSLEVELMASK_WRITE;
UA_NodeId theAnswerNodeId = UA_NODEID_STRING(1, (char*)"the.answer2");
UA_QualifiedName myIntegerName = UA_QUALIFIEDNAME(1, (char*)"the answer2");
UA_NodeId parentNodeId = UA_NODEID_NUMERIC(0, UA_NS0ID_OBJECTSFOLDER);
UA_NodeId parentReferenceNodeId = UA_NODEID_NUMERIC(0, UA_NS0ID_ORGANIZES);
UA_Server_addVariableNode(server, theAnswerNodeId, parentNodeId,
parentReferenceNodeId, myIntegerName,
UA_NODEID_NUMERIC(0, UA_NS0ID_BASEDATAVARIABLETYPE), attr, NULL, NULL);
return theAnswerNodeId;
}
void cycleCallback(UA_Server *server, void *data)
{
static UA_Int32 update = 2;
static UA_Int32 update2 = 8;
UA_NodeId * idArray = (UA_NodeId*)data;
UA_Variant myVar;
UA_Variant_init(&myVar);
UA_Variant_setScalar(&myVar, &update, &UA_TYPES[UA_TYPES_INT32]);
UA_Server_writeValue(server, idArray[0], myVar);
UA_Variant_init(&myVar);
UA_Variant_setScalar(&myVar, &update2, &UA_TYPES[UA_TYPES_INT32]);
UA_Server_writeValue(server, idArray[1], myVar);
update++;
update2++;
if (update == 100)
{
update = 2;
}
if (update2 == 200)
{
update = 8;
}
}
int main(void)
{
signal(SIGINT, stopHandler);
signal(SIGTERM, stopHandler);
UA_Server *server = UA_Server_new();
UA_ServerConfig_setDefault(UA_Server_getConfig(server));
UA_NodeId targetNodeId = addTheAnswerVariable(server);
UA_NodeId target2NodeId = addTheAnswer2Variable(server);
UA_NodeId idArr[2] = {targetNodeId, target2NodeId};
UA_UInt64 callbackId = 0;
UA_Server_addRepeatedCallback(server, cycleCallback, idArr, 1000, &callbackId);
UA_StatusCode retval = UA_Server_run(server, &running);
UA_Server_delete(server);
return retval == UA_STATUSCODE_GOOD ? EXIT_SUCCESS : EXIT_FAILURE;
}
server中添加一个变量,然后每隔1s去把其值加1。
编译运行后,可以在client这边看到打印信息如下,
可以看出每隔10s会发送多个变化值过来,在client的变化回调里打印其值,同时打印值变化的时间,可以看到变化的时间间隔是1s,和Server端的设置一样。注意,每个变化值都会调用一次Client的回调函数。
二 设置有变化就通知
有时希望被监测的变量一有变化就通知,不希望等到发布时间到了再发送过来,如何做呢?可以把采样时间设置为0,同时把发布时间也设置为0,这样就可以达到这个目的了。
源码分析
创建监测项时会调用UA_MonitoredItem_registerSampling(),这里会根据采样时间分别进行处理,如下,
UA_StatusCode
UA_MonitoredItem_registerSampling(UA_Server *server, UA_MonitoredItem *mon) {
UA_LOCK_ASSERT(&server->serviceMutex, 1);
if(mon->sampleCallbackIsRegistered)
return UA_STATUSCODE_GOOD;
UA_assert(mon->next == (UA_MonitoredItem*)~0);
UA_StatusCode res;
if(mon->itemToMonitor.attributeId == UA_ATTRIBUTEID_EVENTNOTIFIER ||
mon->parameters.samplingInterval == 0.0) {
UA_Subscription *sub = mon->subscription;
UA_Session *session = &server->adminSession;
if(sub)
session = sub->session;
res = UA_Server_editNode(server, session, &mon->itemToMonitor.nodeId,
addMonitoredItemBackpointer, mon);
} else {
res = addRepeatedCallback(server,
(UA_ServerCallback)UA_MonitoredItem_sampleCallback,
mon, mon->parameters.samplingInterval,
&mon->sampleCallbackId);
}
if(res == UA_STATUSCODE_GOOD)
mon->sampleCallbackIsRegistered = true;
return res;
}
如果采样时间不为0,就会添加一个定时任务去做采样;如果是0,那么就会把这个监测项添加到node里的一个单链表里,该链表存放所有对该node创建的检测项(因为代码里可能会对同一个变量添加多个监测项),一旦有变化就会逐个进行通知,此时所使用的函数如下,
#ifdef UA_ENABLE_SUBSCRIPTIONS
static void
triggerImmediateDataChange(UA_Server *server, UA_Session *session,
UA_Node *node, const UA_WriteValue *wvalue) {
for(UA_MonitoredItem *mon = node->head.monitoredItems; mon != NULL; mon = mon->next) {
if(mon->itemToMonitor.attributeId != wvalue->attributeId)
continue;
UA_DataValue value;
UA_DataValue_init(&value);
ReadWithNode(node, server, session, mon->timestampsToReturn,
&mon->itemToMonitor, &value);
UA_Subscription *sub = mon->subscription;
UA_StatusCode res = sampleCallbackWithValue(server, sub, mon, &value);
if(res != UA_STATUSCODE_GOOD) {
UA_DataValue_clear(&value);
UA_LOG_WARNING_SUBSCRIPTION(&server->config.logger, sub,
"MonitoredItem %" PRIi32 " | "
"Sampling returned the statuscode %s",
mon->monitoredItemId,
UA_StatusCode_name(res));
}
}
}
#endif
该函数会遍历这个单链表,然后执行回调,注意这里是server端,其回调执行中会把内容发给client端。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)