Siddhi

2023-11-20

1, Siddhi是什么?

Siddhi是一个开源的流处理和复杂事件处理引擎,由WSO2公司开发。它提供了一个强大而灵活的框架,用于处理实时流数据和复杂事件。官网

2, Siddhi特点和功能

Siddhi具有以下特点和功能:
1),实时流处理:Siddhi可以处理连续的实时数据流,并在数据上执行计算、转换和聚合操作。
2) 复杂事件处理:Siddhi允许您基于特定的模式或条件在数据流中检测和关联感兴趣的事件。
3) 时间处理:Siddhi支持事件时间处理,可以处理事件的无序到达并在事件数据上执行基于时间的操作。
4) 查询语言:Siddhi提供了一种类似SQL的查询语言,称为Siddhi Query Language(SiddhiQL),可以以简洁直观的方式表达复杂事件处理逻辑。
5) 窗口操作:Siddhi支持滑动窗口、滚动窗口和基于时间的窗口等多种窗口操作机制,可以在特定的时间间隔内分析数据。
6) 可扩展性:Siddhi可以通过自定义函数、数据源、数据接收器和传输机制进行扩展,可以与各种数据源、处理引擎和输出系统集成。

3, Siddhi缺点

1)学习曲线较陡峭:Siddhi的查询语言相对复杂,需要一定的学习和理解成本,特别是对于没有经验的用户来说。
2)配置和部署复杂性:Siddhi的配置和部署可能需要一些复杂的步骤,特别是在分布式环境中使用时。
3)可视化和监控工具的缺乏:Siddhi缺乏一些成熟的可视化和监控工具,使得在开发和调试过程中可能需要额外的努力。
4)对硬件资源的要求较高:由于Siddhi需要处理大规模的实时数据流,对硬件资源(如内存和处理能力)的要求较高。

4, 架构

±---------------------------------+
| Siddhi Engine |
±---------------------------------+
| |
| ±----------------------------+ |
| | Input Sources | |
| ±----------------------------+ |
| |
| ±----------------------------+ |
| | Siddhi Queries | |
| ±----------------------------+ |
| |
| ±----------------------------+ |
| | Output Sinks | |
| ±----------------------------+ |
| |
±---------------------------------+
在这个架构中,Siddhi Engine是整个平台的核心组件,负责处理实时数据流和执行复杂事件处理逻辑。
1)输入源(Input Sources)是数据流的来源,可以是各种数据源,如消息队列、传感器、数据库等。Siddhi Engine从这些输入源中接收数据流,并将其传递给Siddhi Queries进行处理。
2)Siddhi Queries是用户定义的查询逻辑,使用Siddhi Query Language(SiddhiQL)编写。这些查询可以包括事件关联、窗口操作、过滤条件等,用于处理和分析数据流。
3)输出接收器(Output Sinks)是数据流的目标,可以是消息队列、数据库、实时仪表板等。Siddhi Engine将处理后的数据流发送到这些输出接收器,供后续的处理和展示使用。
4)整个架构的核心是Siddhi Engine,它负责协调输入源、Siddhi Queries和输出接收器之间的数据流动和处理。

5, 应用场景

1)物联网(IoT):Siddhi可以用于处理大规模的传感器数据流,进行实时监测、分析和决策。例如,监测智能家居设备的状态、分析工厂生产线的传感器数据等。
2)金融交易监控:Siddhi可以用于实时监控金融交易数据,检测异常交易、欺诈行为和市场波动。它可以帮助金融机构及时发现和应对风险。
3)实时分析和智能监控:Siddhi可以用于实时分析大规模数据流,提取有价值的信息和洞察,并进行实时决策。例如,实时分析社交媒体数据、监控网络流量等。
4)实时报警和通知:Siddhi可以用于实时监测事件流,并根据预定义的规则和条件触发报警和通知。例如,监测系统日志、网络安全事件等。
5)实时仪表板和可视化:Siddhi可以将处理后的数据流发送到实时仪表板和可视化工具,用于实时展示和监控。例如,实时展示销售数据、监控设备状态等。
6)数据流集成和处理:Siddhi可以用于将不同数据源的数据流进行集成和处理,进行数据清洗、转换和聚合。例如,将传感器数据与外部数据源进行关联和分析。

6, 与flink区别

1)编程模型:Siddhi使用Siddhi Query Language(SiddhiQL)作为查询语言,类似于SQL,用于定义复杂事件处理逻辑。而Flink使用基于Java或Scala的编程API,允许开发人员以编程方式定义数据流处理逻辑。
2) 数据处理模型:Siddhi是基于事件的处理模型,它将连续的事件流作为输入,并在事件上执行计算和操作。Flink是基于数据流的处理模型,它将连续的数据流作为输入,并在数据上执行计算和操作。
3) 窗口操作:Siddhi提供了丰富的窗口操作机制,如滑动窗口、滚动窗口和基于时间的窗口,用于在特定的时间间隔内分析数据。Flink也提供了窗口操作,但它的窗口操作更加灵活和可定制。
4) 扩展性和生态系统:Flink具有更广泛的生态系统和更强大的扩展性。它支持各种数据源和数据接收器,并提供了丰富的库和工具,用于处理不同类型的数据和场景。Siddhi的生态系统相对较小,但也提供了一些扩展点和可定制性。
5) 集群部署和容错性:Flink具有强大的集群部署和容错性能力,可以在分布式环境中处理大规模的数据流。它提供了故障恢复机制和容错保证。Siddhi也支持分布式部署,但在这方面相对较弱。

7,SiddhiQL

一 简单操作

1)定义输入流
语法

define stream <stream_name> (<attribute1> <type1>, <attribute2> <type2>, ...);

举例

define stream SensorStream (sensorId string, temperature double, humidity float);

2)定义输出流

语法

define stream <stream_name> (<attribute1> <type1>, <attribute2> <type2>, ...);

举例

define stream AlertStream (sensorId string, temperature double, humidity float);

3)插入事件到输入流
语法

insert into <stream_name> values (<value1>, <value2>, ...);

举例

insert into SensorStream values ('sensor001', 25.5, 60.2);

4)查询事件
语法

from <stream_name> [<condition>]
select <attribute1>, <attribute2>, ...
insert into <output_stream_name>;

示例

from SensorStream[temperature > 30]
select sensorId, temperature, humidity
insert into AlertStream;

5) 定义窗口

语法

define window <window_name> <window_type> [parameters];

示例

define window TemperatureWindow length(10);

6) 使用窗口查询

语法

from <stream_name>
[<condition>]
insert into <window_name>;


示例

from SensorStream
select *
insert into TemperatureWindow;

7)定义查询的输出频率
语法

@info(name = 'query_name', outputRateLimit = '<rate_limit>')
from <stream_name>
select ...
insert into ...



示例

@info(name = 'AlertQuery', outputRateLimit = '10 events per second')
from SensorStream[temperature > 30]
select sensorId, temperature, humidity
insert into AlertStream;


二注解

1)@source注解:
@source注解用于定义输入数据源。
通过type属性指定数据源的类型,例如inMemory、kafka、mqtt等。
可以使用其他属性来配置特定类型的数据源,例如topic、bootstrap.servers等。
通过@map注解可以指定数据源的映射类型,例如passThrough、json、xml等。
示例:@source(type=‘inMemory’, topic=‘SensorEventStream’, @map(type=‘passThrough’))定义了一个名为SensorEventStream的输入流,使用inMemory类型的数据源,并使用passThrough映射类型。

@source(type='<source_type>', <source_properties>)

其中,<source_type>是数据源的类型,可以是内置的数据源类型(如kafka、http、tcp等),也可以是自定义的数据源类型。
<source_properties>是数据源的属性,用于配置数据源的详细信息。属性的具体内容取决于数据源的类型。以下是一些常见的属性示例:
topic:指定数据源的主题或队列名称。
bootstrap.servers:指定Kafka或其他消息队列的引导服务器地址。
host:指定数据源的主机名。
port:指定数据源的端口号。
username和password:指定用于身份验证的用户名和密码。
@map:指定数据源的映射类型,用于将输入数据映射到Siddhi流中的属性。

2)@sink注解:
@sink注解用于定义输出数据接收器。
通过type属性指定数据接收器的类型,例如log、http、kafka等。
可以使用其他属性来配置特定类型的数据接收器,例如topic、url等。
通过@map注解可以指定数据接收器的映射类型,例如passThrough、json、xml等。
示例:@sink(type=‘log’)定义了一个名为OutputStream的输出流,使用log类型的数据接收器。

3)@App:name注解:
@App:name注解用于为Siddhi应用程序指定一个名称。
该名称在Siddhi应用程序中必须是唯一的。
示例:@App:name(‘MySiddhiApp’)为Siddhi应用程序指定了名称为MySiddhiApp。

4)@info注解:
@info注解用于提供有关查询的信息,例如查询的名称或描述。
这些信息可以在Siddhi应用程序的运行时环境中使用,例如在日志中显示。
示例:@info(name=‘query1’)为查询指定了名称为query1的信息。

5)@map注解:
@map注解用于定义数据源和数据接收器的映射类型。
映射类型指定了如何将输入数据转换为Siddhi事件或将Siddhi事件转换为输出数据。
Siddhi提供了多种映射类型,例如passThrough、json、xml等。
示例:@map(type=‘json’)指定了使用JSON映射类型。

6)@function注解:
@function注解用于定义自定义函数。
通过name属性指定函数的名称。
可以使用其他属性来配置函数的参数和返回类型。
示例:@function(name=‘customFunction’)定义了一个名为customFunction的自定义函数。

7)@trigger注解:
@trigger注解用于定义触发器。
通过cron属性指定触发器的时间表达式。
可以使用其他属性来配置触发器的行为,例如start、end等。
示例:@trigger(cron=‘0/5 * * * * ?’)定义了一个每5秒触发一次的触发器。

8)@distribution注解:
@distribution注解用于定义分布式部署的相关信息。
可以使用strategy属性指定分布式部署的策略,例如partitioned、replicated等。
示例:@distribution(strategy = ‘partitioned’)定义了一个分布式部署策略为partitioned。

三 函数

1) 普通函数
a)数学函数(Math Functions):

  • math:sin(x):计算参数x的正弦值。
  • math:cos(x):计算参数x的余弦值。
  • math:tan(x):计算参数x的正切值。
  • math:abs(x):计算参数x的绝对值。
  • math:round(x):将参数x四舍五入为最接近的整数。
    b)字符串函数(String Functions):
  • str:length(s):计算字符串s的长度。
  • str:concat(s1, s2):将字符串s1和s2连接起来。
  • str:substring(s, startIndex, endIndex):从字符串s中提取子字符串,从startIndex到endIndex。
    c)时间函数(Time Functions):
  • time:currentTimeMillis():返回当前时间的毫秒数。
  • time:timestamp():返回当前时间的时间戳。
    d)类型转换函数(Type Conversion Functions):
  • cast(x, targetType):将参数x转换为目标类型。

2)聚合函数
a)数值聚合函数(Numeric Aggregate Functions):

  • avg(attribute):计算指定属性的平均值。
  • sum(attribute):计算指定属性的总和。
  • min(attribute):计算指定属性的最小值。
  • max(attribute):计算指定属性的最大值。
  • count(attribute):计算指定属性的计数。
    b)字符串聚合函数(String Aggregate Functions):
  • str:concat(attribute):将指定属性的所有值连接成一个字符串。
  • str:collect(attribute):将指定属性的所有值收集到一个列表中。
    c)时间聚合函数(Time Aggregate Functions):
  • time:timestampInMilliseconds(attribute):计算指定属性的时间戳的毫秒数。
  • time:timestampInSeconds(attribute):计算指定属性的时间戳的秒数。
    d)自定义聚合函数(Custom Aggregate Functions):
  • Siddhi还允许您定义自定义的聚合函数,以满足特定的需求。您可以使用Java或脚本语言编写自定义聚合函数的逻辑,并在Siddhi查询中使用它们。

Java中实现自定义聚合函数
i1)创建一个类来实现Siddhi的org.wso2.siddhi.core.aggregation.AggregationProcessor接口。这个接口定义了自定义聚合函数的方法。

import org.wso2.siddhi.core.aggregation.AggregationProcessor;
import org.wso2.siddhi.core.event.ComplexEventChunk;
import org.wso2.siddhi.core.event.state.StateEvent;
import org.wso2.siddhi.core.event.stream.StreamEvent;
import org.wso2.siddhi.core.event.stream.StreamEventFactory;
import org.wso2.siddhi.core.event.stream.StreamEventPool;
import org.wso2.siddhi.core.event.stream.StreamEventPoolManager;
import org.wso2.siddhi.core.util.collection.operator.CompiledCondition;
import org.wso2.siddhi.core.util.collection.operator.CompiledSelection;
import org.wso2.siddhi.core.util.collection.operator.MatchingMetaInfoHolder;
import org.wso2.siddhi.query.api.aggregation.TimePeriod;

public class CustomAggregationFunction implements AggregationProcessor {
    // 实现聚合函数的逻辑

    @Override
    public CompiledCondition compileCondition(Expression expression, MatchingMetaInfoHolder matchingMetaInfoHolder,
            SiddhiAppContext siddhiAppContext, List<VariableExpressionExecutor> list,
            Map<String, Table> map, String s) {
        // 实现编译条件的逻辑
        return null;
    }

    @Override
    public CompiledSelection compileSelection(Expression expression, MatchingMetaInfoHolder matchingMetaInfoHolder,
            SiddhiAppContext siddhiAppContext, List<VariableExpressionExecutor> list,
            Map<String, Table> map, String s) {
        // 实现编译选择的逻辑
        return null;
    }

    @Override
    public void start() {
        // 实现聚合函数的启动逻辑
    }

    @Override
    public void stop() {
        // 实现聚合函数的停止逻辑
    }

    @Override
    public Map<String, Object> currentState() {
        // 实现聚合函数的当前状态逻辑
        return null;
    }

    @Override
    public void restoreState(Map<String, Object> map) {
        // 实现聚合函数的状态恢复逻辑
    }
}

i2)在自定义聚合函数类中实现聚合函数的逻辑。您可以根据需要自定义聚合函数的计算逻辑、初始化逻辑、状态管理等。
i3) 在Siddhi查询中使用自定义聚合函数。在查询中,使用aggregate关键字指定自定义聚合函数,并将其与输入流和输出流关联起来。

String siddhiApp = "@App:name('CustomAggregationExample')\n" +
        "define stream Input (value double);\n" +
        "\n" +
        "@info(name = 'query1')\n" +
        "from Input#window.time(10 sec)\n" +
        "select custom:myAggregation(value) as result\n" +
        "insert into Output;";

在上述示例中,我们定义了一个输入流Input,包含一个属性value。然后,我们使用自定义聚合函数custom:myAggregation()计算输入流中每个窗口的聚合结果,并将结果插入到输出流Output中。
i4)在Siddhi应用程序中注册自定义聚合函数。
怎么注册?
o1)创建一个类来实现Siddhi的org.wso2.siddhi.core.extension.EternalReferencedHolder接口。这个接口定义了自定义聚合函数的注册方法。

import org.wso2.siddhi.core.config.SiddhiAppContext;
import org.wso2.siddhi.core.extension.EternalReferencedHolder;
import org.wso2.siddhi.core.util.config.ConfigReader;

public class CustomAggregationExtensionHolder implements EternalReferencedHolder {
    @Override
    public void init(SiddhiAppContext siddhiAppContext, ConfigReader configReader) {
        // 在这里初始化自定义聚合函数
        siddhiAppContext.getSiddhiContext().getSiddhiExtensions().addExtension("custom:myAggregation",
                CustomAggregationFunction.class);
    }

    @Override
    public void destroy() {
        // 在这里销毁自定义聚合函数
    }
}

o2)在自定义聚合函数注册类中实现init()方法。在该方法中,使用addExtension()方法将自定义聚合函数注册到Siddhi上下文中。
o3)在Siddhi应用程序中使用自定义聚合函数注册类。在Siddhi应用程序的配置文件中,使用@Extension注解将自定义聚合函数注册类与应用程序关联起来。

@App:name("CustomAggregationExample")
@Extension(name = "custom:myAggregation", namespace = "custom",
        description = "Custom Aggregation Function")
define stream Input (value double);

在上述示例中,我们使用@Extension注解将自定义聚合函数注册类CustomAggregationExtensionHolder与Siddhi应用程序关联起来。这样,当Siddhi应用程序启动时,自定义聚合函数将被注册到Siddhi上下文中。

3) 窗口函数:
length():返回窗口中事件的数量。
timeBatch():将事件按时间批量处理。
time():将事件按时间滑动窗口处理。

   from SensorStream#window.timeBatch(10 sec)
   select count() as eventCount
   insert into BatchCountStream;

4) 序列函数
序列函数用于检测事件的模式和序列

o1)sequence函数:
sequence函数用于定义事件的顺序。它接受多个参数,每个参数表示一个事件类型。
该函数返回一个布尔值,指示事件是否按照指定的顺序出现。
示例:sequence(A, B, C)表示事件A、B和C必须按照给定的顺序出现。
o2)lengthBatch函数:
lengthBatch函数用于定义事件的批处理操作。它接受一个整数参数,表示每个批次中的事件数量。
该函数返回一个布尔值,指示是否达到了指定数量的事件批次。
示例:lengthBatch(3)表示每当有3个事件时,将触发批处理操作。
o3)every函数:
every函数用于定义事件的连续出现。它接受一个时间间隔参数,表示事件之间的最大时间间隔。
该函数返回一个布尔值,指示事件是否连续出现在指定的时间间隔内。
示例:every(10 sec)表示事件必须在指定的时间间隔内连续出现。
o4)within函数:
within函数用于定义事件的时间限制。它接受一个时间间隔参数,表示事件必须在指定的时间范围内出现。
该函数返回一个布尔值,指示事件是否在指定的时间范围内出现。
示例:within(30 sec)表示事件必须在指定的时间范围内出现。

   from every (e1=SensorStream[sensorId == 'sensor001'] -> e2=SensorStream[sensorId == 'sensor002'])
   within 10 sec
   select e1.sensorId as sensor1, e2.sensorId as sensor2
   insert into PatternMatchStream;

序列函数复杂实践
假设我们有一个事件流SensorEventStream,其中包含传感器数据的事件。我们希望检测以下模式:事件A后跟事件B,然后是事件C,并且事件B和C之间的时间间隔不超过5秒。
首先,我们需要创建一个Siddhi应用程序,并定义输入流和输出流:

import org.wso2.siddhi.core.SiddhiManager;
import org.wso2.siddhi.core.stream.input.InputHandler;
import org.wso2.siddhi.core.stream.output.StreamCallback;
import org.wso2.siddhi.core.util.EventPrinter;

public class SiddhiApp {
    public static void main(String[] args) throws InterruptedException {
        SiddhiManager siddhiManager = new SiddhiManager();

        String siddhiApp = "@App:name('SequenceFunctionExample')" +
                "@source(type='inMemory', topic='SensorEventStream', @map(type='passThrough'))" +
                "define stream SensorEventStream (sensorId int, value double, timestamp long);" +
                "@sink(type='log')" +
                "define stream OutputStream (sensorId int, value double, timestamp long);" +
                "@info(name='query1')" +
                "from SensorEventStream#window.time(10 sec) " +
                "sequence(sensorId == 1) -> " +
                "every(1 sec) " +
                "sequence(sensorId == 2) -> " +
                "every(1 sec) " +
                "sequence(sensorId == 3) " +
                "select sensorId, value, timestamp " +
                "insert into OutputStream;";

        SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(siddhiApp);
        siddhiAppRuntime.start();

        InputHandler inputHandler = siddhiAppRuntime.getInputHandler("SensorEventStream");
        inputHandler.send(new Object[]{1, 10.5, System.currentTimeMillis()});
        inputHandler.send(new Object[]{2, 20.3, System.currentTimeMillis()});
        inputHandler.send(new Object[]{3, 30.2, System.currentTimeMillis()});

        Thread.sleep(1000);

        siddhiAppRuntime.shutdown();
        siddhiManager.shutdown();
    }
}

在上述示例中,我们定义了一个名为SensorEventStream的输入流,其中包含传感器数据的事件。然后,我们定义了一个名为OutputStream的输出流,用于输出满足我们定义的模式的事件。

在Siddhi查询中,我们使用了sequence函数和every函数来定义事件的顺序和时间限制。具体来说,我们使用了三个sequence函数来定义事件A、B和C的顺序,并使用了两个every函数来定义事件B和C之间的时间间隔。

最后,我们通过创建SiddhiAppRuntime并发送事件到输入流来执行Siddhi应用程序。满足定义的模式的事件将被输出到OutputStream流中。

这只是一个复杂实践的示例,您可以根据具体需求调整和扩展查询语句和事件流定义。

5)表格函数:
lookup():从外部表格中检索数据。
insertInto():将事件插入到外部表格中。
deleteFrom():从外部表格中删除数据。

   from SensorStream
   select sensorId, temperature, humidity
   insert into SensorTable;
   
   from SensorStream
   select sensorId, temperature, humidity
   insertInto SensorTable;
   
   from SensorStream
   delete from SensorTable
   on SensorStream.sensorId == SensorTable.sensorId;

6)脚本函数
o1)创建一个脚本文件,例如customScript.js,并在其中编写自定义函数的逻辑。例如,以下是一个使用JavaScript编写的脚本函数示例:

function customScriptFunction(input) {
    // 在这里编写自定义函数的逻辑
    var result = input * 2;
    return result;
}

o2)在Java代码中使用脚本函数。使用Siddhi的ScriptFunction类来执行脚本函数的逻辑。

import org.wso2.siddhi.core.function.Script;

public class ScriptFunctionExample {
    public static void main(String[] args) {
        // 创建ScriptFunction对象
        ScriptFunction scriptFunction = new ScriptFunction("customScript.js");

        // 调用脚本函数
        Object result = scriptFunction.call("customScriptFunction", 10);

        // 打印结果
        System.out.println("Result: " + result);
    }
}

在上述示例中,我们创建了一个ScriptFunction对象,并指定了脚本文件的路径。然后,我们使用call()方法调用脚本函数,并传递参数。最后,我们打印出脚本函数的结果。

三 复杂操作

1)聚合计算
计算每个传感器的平均温度和湿度。

   from SensorStream
   select sensorId, avg(temperature) as avgTemperature, avg(humidity) as avgHumidity
   group by sensorId
   insert into AggregatedStream;

2) 模式匹配和复杂事件处理
检测连续三个温度超过30度的事件。

   from every (e1=SensorStream[temperature > 30] -> e2=SensorStream[temperature > 30] -> e3=SensorStream[temperature > 30])
   select e1.sensorId, e1.temperature, e2.temperature, e3.temperature
   insert into HighTemperaturePatternStream;

3) 多流处理和联接操作
将传感器数据与天气数据联接,根据城市和时间进行联接。

   from SensorStream#window.time(1 min) as s join WeatherStream#window.time(1 min) as w
   on s.city == w.city and s.timestamp == w.timestamp
   select s.sensorId, s.temperature, w.weatherCondition
   insert into JoinedStream;

4)复杂条件和过滤操作
过滤出温度大于30度且湿度小于50%的事件

   from SensorStream[temperature > 30 and humidity < 50]
   select sensorId, temperature, humidity
   insert into FilteredStream;

5)时间窗口和滑动窗口操作:
计算最近10秒内的温度平均值。

   from SensorStream#window.time(10 sec)
   select avg(temperature) as avgTemperature
   insert into AverageTemperatureStream;

6)检测每小时的最高温度,并将结果插入到MaxTemperatureStream中

@info(name='query2', description='Detect temperature spikes')
from TemperatureStream#window.time(1 hour)
select max(temperature) as maxTemperature
insert into MaxTemperatureStream;
@info(name='query1', description='Calculate average temperature per hour')
from TemperatureStream#window.time(1 hour)
select avg(temperature) as avgTemperature
insert into AverageTemperatureStream;

8,举例

假设您有一个实时数据流,包含用户的登录信息,您希望使用Siddhi框架对数据进行处理和分析,以检测异常登录行为
1)添加Siddhi依赖项:在您的Java项目的构建配置文件(如pom.xml)中,添加Siddhi的依赖项。确保使用与Siddhi框架版本兼容的依赖项版本。

<dependency>
    <groupId>io.siddhi</groupId>
    <artifactId>siddhi-core</artifactId>
    <version>5.1.2</version>
</dependency>

2)编写Siddhi查询:使用SiddhiQL编写查询逻辑。以下是一个示例查询,用于检测异常登录行为:

String query = "@info(name = 'query') " +
        "from LoginStream#window.time(5 min) " +
        "select userId, count(*) as loginCount " +
        "group by userId " +
        "having loginCount > 10 " +
        "insert into SuspiciousLoginStream;";

这个查询逻辑包括以下步骤:

  • 从名为LoginStream的输入流中获取数据。
  • 使用时间窗口函数window.time(5 min)将数据流限制在过去5分钟的时间范围内。
  • 根据userId分组,并计算每个用户的登录次数。
  • 使用having子句过滤出登录次数大于10的用户。
  • 将结果插入名为SuspiciousLoginStream的输出流中。

3)创建Siddhi运行时:在Java代码中,创建SiddhiManager对象来管理Siddhi运行时环境。

SiddhiManager siddhiManager = new SiddhiManager();

4)部署查询:使用SiddhiManager对象,创建ExecutionPlanRuntime并部署查询。

ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(query);
executionPlanRuntime.start();

5)处理输入和输出:使用ExecutionPlanRuntime对象,获取输入处理器和输出处理器,并使用它们来处理输入数据流和获取查询结果。

InputHandler inputHandler = executionPlanRuntime.getInputHandler("LoginStream");
inputHandler.send(new Object[]{"user1", "2022-01-01T10:00:00"});

OutputHandler outputHandler = executionPlanRuntime.getOutputHandler("SuspiciousLoginStream");
outputHandler.register(new OutputCallback() {
    @Override
    public void send(Event[] events) {
        for (Event event : events) {
            System.out.println("Suspicious Login Detected: " + event.getData()[0]);
        }
    }
});

6)停止和清理:在您的应用程序完成后,确保停止ExecutionPlanRuntime并进行清理。

executionPlanRuntime.shutdown();
siddhiManager.shutdown();

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Siddhi 的相关文章

  • 尝试将大 Excel 文件读入 DataTable 时出现 OutOfMemoryException

    我正在使用 SSIS 包清理 Xlsx 文件中的数据并将其加载到 SQL Server 表中 我还必须突出显示 Xlsx 文件中包含错误数据的单元格 为此我必须根据列名称和行 ID 我在数据电子表格中拥有 获取列和行索引 为此 我比较第一个
  • 我应该何时为导出到 BigQuery 的 Firebase Analytics 数据运行每日 ETL 作业?

    我们使用 Firebase Analytics 从我们的应用收集事件 我们已启用将事件导出到 BigQuery 我们每天都会运行一些 ETL 作业 以便在 BigQuery 中创建更友好的分析表 例如会话 购买 问题是我们什么时候应该运行这
  • ssis生成json文件删除返回

    我正在使用脚本任务从 sql 查询生成 json 文件 脚本任务中的c 代码 public void Main TODO Add your code here ConnectionManager cm string sqlString Sy
  • 如何将 40 亿条记录从 MySQL 一次性加载到 SQL Server

    我们需要对一个拥有超过 4 亿条记录的表从源 MySQL 5 5 进行初始数据复制到目标 SQL Server 2014 该表相当宽 有 55 列 但没有一个是 LOB 我正在寻找以最有效的方式复制这些数据的选项 我们尝试通过 Attuni
  • 将 SharePoint 列表数据提取到单独的 SQL Server 表的最简单方法?

    Edited What is the easiest way to scrape extract SharePoint list data to a separate SQL Server table One condition you r
  • 使用 Interop 从 Excel 获取最后一个非空列和行索引

    我正在尝试使用互操作库从 Excel 文件中删除所有多余的空白行和列 我关注了这个问题使用 Interop 从 Excel 文件中删除空行和空列的最快方法 https stackoverflow com questions 40574084
  • 数据分析任务 - 自定义分析请求

    是否有任何选项可以为 SSIS 数据分析任务创建自定义配置文件请求 目前 SSIS 数据分析任务下有 5 个标准分析请求 列空比率配置文件请求 列统计资料请求 列长度 分布配置文件请求 列值分布配置文件请求 候选人关键资料请求 我需要添加另
  • 如何使用OrientDB ETL仅创建边

    我有两个 CSV 文件 首先包含 500M 记录 格式如下 id name10000023432 汤姆用户13943423235 胡说八道 第二个包含 1 5B 好友关系 格式如下 从 ID 到 ID10000023432 13943423
  • 集成服务目录文件夹权限已更改

    问 SSISDB 或 MSDB 中的任何 SQL Server 系统表是否包含可让我发现哪些用户正在对 Integration Services 目录中的文件夹权限进行更改的信息 背景 我发现 SQL 代理作业失败 错误描述如下 无法访问该
  • SSIS 中的 OData 源组件未连接

    这是上一个问题的后续问题 SSIS 中的 OData 源组件挂起 https stackoverflow com questions 48026984 odata source component in ssis hanging Setup
  • 开源 ETL 框架 [关闭]

    就目前情况而言 这个问题不太适合我们的问答形式 我们希望答案得到事实 参考资料或专业知识的支持 但这个问题可能会引发辩论 争论 民意调查或扩展讨论 如果您觉得这个问题可以改进并可能重新开放 访问帮助中心 help reopen questi
  • 如何使用我在后续任务中添加的记录的自动生成 ID?

    我目前正在使用以下命令向表中添加一些记录OLE DB Destination 每条记录都有一个自动生成的Id场地 我想使用这个生成的Id字段作为某些子记录中的外键 我以为我能够从一个数据流线OLE DB Destination组件到另一个组
  • 当设置为 CRLF 时,SSIS 将 LF 读取为终止符

    使用 SSIS 2012 我的平面文件连接管理器我有一个分隔文件 其中行分隔符设置为CRLF 但是当它处理文件时 我有一个文本列 其中有一个LF在里面 这导致它将其读取为行终止符 从而导致失败 有任何想法吗 我没有 SSIS 经验 但作为一
  • OLE DB 目标:转换规范的字符值无效

    我的表来源 num facture TYPE actif date 1 1 1 2010 01 31 00 00 00 000 2 2 1 2011 01 31 00 00 00 000 3 3 2 2012 01 31 00 00 00
  • Luigi:如何将不同的参数传递给叶任务?

    这是我第二次尝试了解如何在 Luigi 中将参数传递给依赖项 第一个是here https stackoverflow com questions 64837259 luigi how to pass arguments to depend
  • 从事务性平面数据库填充事实表和维度表的最佳实践

    我想在 SSIS SSAS 中填充星型模式 多维数据集 我准备了所有维度表和事实表 主键等 源是一个 平面 项目级别 表 我现在的问题是如何拆分它 并将其从一个放入相应的表中 我做了一些谷歌搜索 但找不到令人满意的解决方案 人们会认为这是
  • 释放对执行进程任务中使用的变量的锁定SSIS

    我有一个包裹Foreach容器 and 执行流程任务 inside 对于每个容器 在执行流程任务中出现一些错误时 它会重定向到OnError事件处理程序对于每个容器 我正在使用 exe 捕获错误标准误差变量任务的属性并在脚本任务中使用它On
  • ETL 工具...它们到底做什么?请通俗地说[关闭]

    Closed 这个问题需要多问focused help closed questions 目前不接受答案 我最近接触了一些 ETL 工具 例如 Talend 和 Apatar 我想知道这些工具通俗地说的用途 主要目标到底是什么 谁主要使用它
  • 如何在 SSIS 变量中存储“完全限定”和“仅名称”文件名

    我有一个 SSIS 包 其中有一个 Foreach 循环容器 加载静态文件夹中的所有 txt 文件 我将完全限定的文件名作为在连接字符串中使用的变量传递 我现在只需将文件名传递给一个变量以用于执行存储过程 问题是如果我将 Foreach 循
  • 将多个平面文件导入到多个 SQL 表

    这是我的文件夹设置 这是文件设置 这个想法是遍历文件夹并将文件内容放入数据库上的表 File dbo 还有 FileB FileC 等 所有文件夹的文件名结构都相同 我有这个 ssis 包 我在其中使用 foreachloop gt 数据流

随机推荐

  • KEIL经常出现 Encountered an improper argument 弹窗

    关于 keil5 使用在线调试时 经常出现 Encountered an improper argument 弹窗 经实测 可有如下方法 方法1 下载UV4 exe 替换本机电脑 Keil UV4目录下的UV4 exe 更换后 如果不能编译
  • 7-14 解一元一次方程 (17 分)

    请编写程序 解一元一次方程 ax b 0 一元一次方程求解公式为 x ab 求解要求 a 0 方程有唯一解 输出解 a 0 b 0 方程无解 输出no solution a 0 b 0 则方程有无穷多解 输出Infinitely solut
  • The absolute uri: http://java.sun.com/jsp/jstl/fmt cannot be resolved in either web.xml or the jar

    错误提示 org apache jasper JasperException file H netbeans workspace netbeans 6 9 ShoppingSystemOnline build web system fron
  • [LDUoj 倍增] 题解

    星星之火 可以燎原 细节的地方慢慢补充 欢迎提出问题 私聊 留言均可 A 跳跳棋 较难 B 聚会 板子题 C 祖孙询问 板子题 D Dis 板子题 E 次小生成树 严格次小生成树 难 F 异象石 难度适中 G 暗的连锁 难度适中 H 点的距
  • 3D游戏编程实践——Priests and Devils

    编程实践 Priests and Devils github链接 https github com ctlchild SYSU unity3d learning tree master hw2 Priests and Devils is a
  • 给Protobuf中的repeated类型变量添加子项

    Protobuf为repeated类型变量生成的自动代码 不提供通常的类似add item item 的添加子项的成员函数 Protobuf的做法是 UserDocChangesResp changes DocChangeInfo chan
  • Linux shell 编程之 - 合并两个文件

    两个文件a1 b1 内容分别如下 a1 1 2 3 b1 a b c 如何把它们合在一起内容如下的 1 a 2 b 3 c paste d a1 a2 SUN的Solaris只能合并12个文件 sco5 5下ksh只能合并6个文件 在aix
  • Allegro PCB封装焊盘介绍(一)

    PCB封装焊盘结构 焊盘结构如图 1所示 图 1焊盘结构 锡膏层 SMT刷锡膏贴片用 一般贴片焊盘要选 跟焊盘等大 阻焊层 把焊盘裸露出来 不开的话 焊盘会被油墨盖住 这样无法焊接哦 一般比焊盘大0 1mm 顶层 底层焊盘 实际焊盘大小 电
  • tensorRT 与 torchserve-GPU性能对比

    实验对比 前端时间搭建了TensorRT Torchserve GPU 最近抽时间将这两种方案做一个简单的实验对比 实验数据 Cuda11 0 Xeon 6242 3 1 80 RTX3090 24G Resnet50 TensorRT T
  • nosql练习

    1 string类型数据的命令操作 1 设置键值 2 读取键值 3 数值类型自增1 4 数值类型自减1 5 查看值的长度 2 list类型数据的命令操作 1 对列表city插入元素 Shanghai Suzhou Hangzhou 2 将列
  • Qt中代码添加背景图

    第一步 选择一张背景图下到本地 第二步 在qt中点击添加新文件选择图中位置 随便起个名字 点击下一步 这时项目中多出一个目录 选择打开资源编辑器 底部添加前缀 注意该前缀是在内部使用图的路径 点击添加 gt 添加前缀 我这里直接使用的 作为
  • STM32F4实现SD卡读写

    更多交流欢迎关注作者抖音号 81849645041 目的 熟悉SD卡和SDIO工作原理 掌握SD卡的读写 原理 大多单片机系统都需要大容量存储设备 以存储数据 目前常用的有 U 盘 FLASH 芯片 SD 卡等 他们各有优点 综合比较 最适
  • 2020网易笔试编程题(一)

    题目 在一次聚会中 教授们被要求写下自己认可哪位教授的学术成果 也可以写自己 且可能出现重复 已知 如果教授A认可教授B 且教授B认可教授C 那么即可视为教授A也认可教授C 现在我们想知道多少对教授是两两互相认可的 输入举例 输入教授人数
  • oracle中replace怎么用,oraclereplace函数怎么用

    1 REPLACE函数怎么用 REPLACE 参数1 参数2 参数3 参数4 参数1 是要替换其部分字符的文本 参数2 是要用参数4替换的参数1中字符的起始位置 参数3 是希望REPLACE用参数4替换参数1中从参数2开始算起的字符个数 参
  • js 将一维数组转为二维数组并分组

    let arr a W b W01 a W b W02 a WC b WC01 a WC b WC02 a WC b WC02 a WC b WC02 let map arr forEach item gt if map item a ma
  • 理解Spring定时任务@Scheduled的两个属性fixedRate和fixedDelay

    fixedRate和fixedDelay都是表示任务执行的间隔时间 fixedRate和fixedDelay的区别 fixedDelay非常好理解 它的间隔时间是根据上次的任务结束的时候开始计时的 比如一个方法上设置了fixedDelay
  • js 手机、邮箱、身份证格式验证

  • 使用Transformer与无监督学习,OpenAI提出可迁移至多种NLP任务的通用模型

    OpenAI 最近通过一个与任务无关的可扩展系统在一系列语言任务中获得了当前最优的性能 目前他们已经发布了该系统 OpenAI 表示他们的方法主要结合了两个已存的研究 即 Transformer 和无监督预训练 实验结果提供了非常令人信服的
  • 不相交集类(并查集)

    并查集 就是只有合并和 查找操作的一种数据结构 很简单 主要判断一个元素是否在一个集合里 主要应用在最小生成树 Kruskal算法 看到图的时候会将实现代码贴上 package chapter8 类名 DisjSets 说明 实现并查集 按
  • Siddhi

    1 Siddhi是什么 Siddhi是一个开源的流处理和复杂事件处理引擎 由WSO2公司开发 它提供了一个强大而灵活的框架 用于处理实时流数据和复杂事件 官网 2 Siddhi特点和功能 Siddhi具有以下特点和功能 1 实时流处理 Si