使用 Kinesis Analytics 构建实时会话

2024-03-18

是否有某个地方的示例,或者有人可以解释如何使用 Kinesis Analytics 构建实时会话。 (即会话化)

这里提到这可能:https://aws.amazon.com/blogs/aws/amazon-kinesis-analytics-process-streaming-data-in-real-time-with-sql/ https://aws.amazon.com/blogs/aws/amazon-kinesis-analytics-process-streaming-data-in-real-time-with-sql/在自定义窗口的讨论中但没有给出示例。

通常,这是使用 LAG 函数在 SQL 中完成的,因此您可以计算连续行之间的时间差。这个帖子:https://blog.modeanalytics.com/finding-user-sessions-sql/ https://blog.modeanalytics.com/finding-user-sessions-sql/描述如何使用传统(非流式)SQL 来完成此操作。但是,我在 Kinesis Analytics 中没有看到对 LAG 功能的支持。

我特别喜欢两个例子。假设两者都采用由 user_id 和时间戳组成的流作为输入。将会话定义为来自同一用户、间隔不超过 5 分钟的一系列事件

1) 第一个输出具有附加列 event_count session_start_timestamp 的流。每次出现事件时,都应该输出带有这两个附加列的事件。

2) 第二个示例是一个流,一旦会话结束(即 5 分钟过去,没有来自用户的数据),每个会话输出一个事件。该事件将包含 userId、start_timestamp、end_timestamp 和 event_count

Kinesis Analytics 可以实现这一点吗?

以下是使用 Apache Spark 执行此操作的示例:https://docs.cloud.databricks.com/docs/latest/databricks_guide/07%20Spark%20Streaming/Applications/01%20Sessionization.html https://docs.cloud.databricks.com/docs/latest/databricks_guide/07%20Spark%20Streaming/Applications/01%20Sessionization.html

但我很乐意使用一个(或两个)Kinesis Analytics 流来完成此操作。


您可以使用 Drools 创建以下逻辑来完成此操作:

Types:

package com.test;

import java.util.List;

declare EventA
    @role( event )
    userId:String;
    seen:boolean;
end

declare SessionStart
    userId: String;
    timestamp: long;
    events: List;
end

declare SessionEnd
    userId: String;
    timestamp: long;
    numOfEvents: int;
end

declare SessionNotification
    userId: String;
    currentNumOfEvents: int;
end

Rules:

package com.test;

import java.util.List;
import java.util.ArrayList;

rule "Start session"
when
    // for any EventA
    $a : EventA() from entry-point events
    // check session is not started for this userId
    not (exists(SessionStart(userId == $a.userId)))
then
    modify($a){setSeen(true);}
    List events = new ArrayList();
    events.add($a);
    insert(new SessionStart($a.getUserId(), System.currentTimeMillis(), events));
end

rule "join session"
when
    // for every new EventA
    $a : EventA(seen == false) from entry-point events
    // get event's session
    $session: SessionStart(userId == $a.userId)
then
    $session.getEvents().add($a);
    insertLogical(new SessionNotification($a.getUserId(), $session.getEvents().size()));
    modify($a) {setSeen(true);}

end

rule "End session"
// if session timed out, clean up first
salience 5
when
    // for any EventA
    $a : EventA() from entry-point events
    // check there is no following EventA with same userId within 30 seconds
    not (exists(EventA(this != $a, userId == $a.userId, this after[0, 30s] $a)
        from entry-point events))
    // get event's session
    $session: SessionStart(userId == $a.userId)
then
    insertLogical(new SessionEnd($a.getUserId(), System.currentTimeMillis(),
        $session.getEvents().size()));

    // cleanup
    for (Object $x : $session.getEvents())
        delete($x);
    delete($session);
end

您可以使用 Drools Kinesis Analytics 创作这项服务 https://aws.amazon.com/marketplace/pp/B075MDR5DC

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

使用 Kinesis Analytics 构建实时会话 的相关文章

  • 外部混洗:从内存中混洗大量数据

    我正在寻找一种方法来整理内存不适合的大量数据 大约 40GB 我有大约 3000 万个可变长度的条目 存储在一个大文件中 我知道该文件中每个条目的开始和结束位置 我需要对内存中不适合的数据进行洗牌 我想到的唯一解决方案是对包含以下数字的数组
  • Neo4j 关系索引 - 搜索关系属性

    我有一个具有以下结构的 neo4j 图 账户 交易 账户 Transaction是neo4j关系 Account是节点 每笔交易都设置了各种属性 例如交易 ID 金额 日期和各种其他银行信息 我可以按帐户 ID 运行搜索 结果返回正常 但是
  • 为什么布尔字段在 Hive 中不起作用?

    我的配置单元表中有一个数据类型为布尔值的列 当我尝试从 csv 导入数据时 它存储为 NULL 这是我的示例表 CREATE tABLE if not exists Engineanalysis EngineModel String Eng
  • 层次聚类大稀疏距离矩阵 R

    我试图在非常大的距离上执行 fastclust 但遇到了问题 我有一个非常大的 csv 文件 大约 9100 万行 因此 for 循环在 R 中花费太长时间 其中包含关键字 大约 50 000 个唯一关键字 之间的相似性 当我读入 data
  • 如何在 Spark Scala 中将 null NAN 或 Infinite 值替换为默认值

    我正在将 csv 读入 Spark 并将架构设置为所有 DecimalType 10 0 列 当我查询数据时 出现以下错误 NumberFormatException Infinite or NaN 如果我的数据框中有 NaN null i
  • AWS Kinesis 中的分区键是什么?

    我正在读关于AWS Kinesis 在下面的程序中 我将数据写入名为的流中TestStream 我运行这段代码 10 次 将 10 条记录插入到流中 var params Data More Sample data into the tes
  • 如何使用R将年度数据转换为月度数据?

    我有2000年至2015年15年的逐年GDP数据 我想将这些数据转换为月度数据 其中只有月份和年份 我只想将当年的值复制到所有月份 我怎样才能在 R 中做到这一点2010 年的值是 1708 我想为 2010 年的所有月份复制相同的值 我的
  • 如何构建和使用flink-connector-kinesis?

    我正在尝试将 Apache Flink 与 AWS kinesis 结合使用 这document https ci apache org projects flink flink docs release 1 7 dev connector
  • 读取 Amazon Kinesis Firehose 流写入 s3 的数据

    我正在将记录写入 Kinesis Firehose 流 该流最终由 Amazon Kinesis Firehose 写入 S3 文件 我的记录对象看起来像 ItemPurchase String personId String itemId
  • 如何从 HIVE 中的日期减去月份

    我正在寻找一种方法来帮助我从 HIVE 中的日期中减去月份 我有个约会2015 02 01 现在我需要从这个日期减去 2 个月 这样结果应该是2014 12 01 你们能帮我一下吗 select add months 2015 02 01
  • 大稀疏矩阵到三角矩阵 R

    我在 R 中有一个非常大的 大约 9100 万个非零条目 sparseMatrix 如下所示 gt myMatrix a b c a 1 2 b 1 c 2 我想将其转换为三角矩阵 上或下 但是当我尝试 myMatrix myMatrix
  • 如何在Hadoop中设置数据块大小?改变它有好处吗?

    如果我们可以更改 Hadoop 中的数据块大小 请告诉我如何操作 更改块大小是否有利 如果是 请告诉我为什么以及如何更改 如果没有 请告诉我为什么以及如何 您可以随时更改块大小 除非dfs blocksize参数在 hdfs site xm
  • 全新安装时的 HDFS 空间使用情况

    我刚刚安装了 HDFS 并启动了该服务 并且已使用空间已经超过800MB 它代表什么 hdfs dfs df h Filesystem Size Used Available Use hdfs quickstart cloudera 802
  • 分段读取 CSV 文件的策略?

    我的计算机上有一个中等大小的文件 4GB CSV 但没有足够的 RAM 来读取该文件 64 位 Windows 上为 8GB 在过去 我只是将其加载到集群节点上并将其读入 但我的新集群似乎任意将进程限制为 4GB RAM 尽管每台机器的硬件
  • Flink 中的水印和触发器有什么区别?

    我读到 排序运算符必须缓冲它接收到的所有元素 然后 当它接收到水印时 它可以对时间戳低于水印的所有元素进行排序 并按排序顺序发出它们 这是正确 因为水印表明不能有更多元素到达并与已排序元素混合 https cwiki apache org
  • 如何将非分区表转换为分区表

    如何使用 StandardSQL 或 LegacySQL 重命名 BigQuery 中的表 以便对之前未分区的表进行分区 我正在尝试使用 StandardSQL 但出现以下错误 重命名表dataset old table name TO d
  • 如何从 Kinesis Analytics (SQL) 中的格式化为字符串的 json 中选择数据

    我有一个运动数据流 以这种格式提供数据 创建时间 时间戳 有效负载 varchar 6000 一个简化的例子payload元素 version 2 0 data whatever someString observations obs id
  • 为什么 Spark 在字数统计时速度很快? [复制]

    这个问题在这里已经有答案了 测试用例 Spark 在 20 秒以上对 6G 数据进行字数统计 我明白映射减少 FP and stream编程模型 但无法弄清楚字数统计的速度如此惊人 我认为这种情况下是I O密集型计算 不可能在20秒以上扫描
  • Sqoop mysql错误-通信链路故障

    尝试运行以下命令 sqoop import connect jdbc mysql 3306 home credit risk table bureau target dir home sqoop username root password
  • (R 错误)错误:cons 内存耗尽(达到限制?)

    我正在处理大数据 并且有一个 70GB 的 JSON 文件 我正在使用 jsonlite 库将文件加载到内存中 我尝试过 AWS EC2 x1 16large 机器 976 GB RAM 来执行此负载 但 R 因错误而中断 Error co

随机推荐

  • 迁移 NDB 模型属性的正确方法

    我目前在 NDB 中有一个模型 我想更改属性名称而不必接触 NBD 假设我有以下内容 from google appengine ext import ndb class User ndb Model company ndb KeyProp
  • Neo4j 索引创建失败

    我目前正在了解 Neo4j 和图形数据库的功能 我还做了neo4j附带的电影教程 现在我必须处理一个大型数据库 135 GB 500 个 mio nodes 950 个 mio relationships 并且想要为特殊类型的节点添加索引
  • 为什么 this 不能作为成员函数中的默认参数传递?

    我试图将当前的 le ngth 值作为默认参数作为函数参数传递 但编译器显示错误 在此上下文中不得使用 this 谁能告诉我我犯了什么错误 class A private int length public A void display i
  • 在node.js中,全局对象(或头对象)有名称吗?

    如果环境是浏览器 那么我们可以使用 测试this关键字指向window对象 也是全局对象 或者有些书称其为 头对象 如果在node js中 有没有类似这样的名字呢 window 以便 console log this globalObjec
  • 确定线斜率变化的位置(算法)

    如果你绘制下面的数字 你会得到一个 波动性微笑 数字遵循一个线性斜率 左斜率 然后更改为 遵循另一个线性斜率 右斜率 我有几组这样的数据 想知道斜率在哪里 变化 笔记 斜率变化通常发生在点之间 不知道有多少个点有左斜率 有多少个有 正确的斜
  • java 如何打开另一个目录中的文件?

    如何打开不在当前目录中但在另一个目录中的文件 例如 我有一个文件夹 F test 我的文件位于 F test test2 doit txt 和 D test3 doit2 txt 创建文件对象时在参数路径中输入的内容如下 File f ne
  • XSLT - 将前同级元素与当前节点元素进行比较

    我有这个 XML 文件
  • JavaFX 如何滚动 ScrollPane 以使节点位于视口中间?

    我必须创建显示当前时间的时间线 我正在使用 AnchorPane 并将添加的行放置在 ScrollPane 中 我需要模拟1天 滚动窗格宽度为 2880px 每60px为一小时 我的限制是 前一天 12 小时 第二天 12 小时 移动我的线
  • 如何为BackgroundService传递参数?

    我阅读了有关 ASP net core 2 2 的内容 并找到了有关通用主机的参考资料 我尝试在示例下使用 backgroundService 创建控制台应用程序 https github com aspnet AspNetCore Doc
  • 一个值在数组中存在多少次[重复]

    这个问题在这里已经有答案了 我想做的第一件事是找出我在输入中输入的数字是否存在于数组中 这有效 我想做的第二件事是找出它存在的次数 我在这里走的路正确吗
  • 如何将 Flot 与 AngularJS 集成?

    我对 Angular 和 Flot 很陌生 但对 Jquery 和 Javascript 很熟悉 我对如何将 Flot 图表绑定到 Angular 数据模型有点困惑 因为 Flot 是一个 JQuery 插件 我四处搜寻 但未能找到示例 我
  • 库存管理数据库设计

    我正在为我的公司创建一个内部网 我们希望在其中进行库存管理 我们销售和租赁报警系统 我们希望很好地了解哪些产品仍在我们的办公室中 哪些产品已出租或出售 何时出租或出售等 目前我想到了这个数据库设计 每次我们创建新合同时 该合同都与地点或商品
  • 如何扩展Generator类?

    我尝试过滤生成器 并期望这种通用功能必须在 JavaScript 中的任何位置定义 因为它是为数组定义的 但我找不到它 所以我试图定义它 但我无法扩展内置发电机 我有一个示例生成器 function make nums let nums n
  • NSFetchRequest 未捕获属性已更改的对象

    我在 Mac Os X 10 6 上使用 SQL 存储时遇到了 Core Data 的奇怪问题 我有一个NSManagedObject子类称为Family有属性name和一段关系personList连接到另一个NSManagedObject
  • 必须从 UI 线程调用 getText() 方法

    必须从 UI 线程调用 getText 方法 请帮忙 我是 android studio 的初学者 在网上找到了这些代码 但无法弄清楚 我真的很感激 public class MainActivity extends AppCompatAc
  • Django password_reset 支持 html 电子邮件模板吗?

    在我看来 django 仅支持开箱即用的密码重置电子邮件的纯文本消息 我如何使用 html 模板来实现此目的 以下是如何进行覆盖 urls py url r user password reset YOUR APP views passwo
  • node.js - 代码保护?

    我想在下一个项目中使用node js 但我的老板不喜欢我们的竞争对手可以阅读源代码 有没有办法保护 JavaScript 代码 您可以使用 Node 的 NativeExtension 来完成此操作 你会有一个boostrap js为 js
  • 我如何能够在 C++ 中声明一个在运行时确定的可变长度数组?

    请检查这段代码 它编译并运行得非常好 问题是 当我开始学习 c turbo c 时 我从来无法将任何类型的数组声明为 datatype var variable set at runtime 我想当然地认为这在最新的 gcc 编译器中是不可
  • 如何根据优先级和关联性来解析(复杂)声明?

    符号 如 等 两者都使用表达式 and 声明 这是两个不同的概念 In 表达式 符号为运营商 为此我们有一个明确定义的优先级和结合性表 当表达比较复杂 我们可以利用这张表进行分解分析 例如 a b c Question In 声明 这些符号
  • 使用 Kinesis Analytics 构建实时会话

    是否有某个地方的示例 或者有人可以解释如何使用 Kinesis Analytics 构建实时会话 即会话化 这里提到这可能 https aws amazon com blogs aws amazon kinesis analytics pr