Kinesis lambda DynamoDB

2024-03-02

我正在学习 AWS 服务的一个用例。在浏览完文档后,我想出了一个简单的流程。我想使用 Streams API 和 KPL 将数据提取到 Kinesis 流中。我使用示例 putRecord 方法将数据提取到流中。我正在将此 JSON 摄取到流中 -

{"userid":1234,"username":"jDoe","firstname":"John","lastname":"Doe"}

数据被摄取后,我会在 putRecordResult 中得到以下响应 -

Put Result :{ShardId: shardId-000000000000,SequenceNumber: 49563097246355834103398973318638512162631666140828401666}
Put Result :{ShardId: shardId-000000000000,SequenceNumber: 49563097246355834103398973318645765717549353915876638722}
Put Result :{ShardId: shardId-000000000000,SequenceNumber: 49563097246355834103398973318649392495008197803400757250}

现在,我编写一个 Lambda 函数来获取这些数据并将其推送到 DynamoDB 表中。这是我的 Lambda 函数 -

console.log('Loading function');
var AWS = require('aws-sdk');
var tableName = "sampleTable";
var doc = require('dynamodb-doc');
var db = new doc.DynamoDB();

exports.handler = (event, context, callback) => {
    //console.log('Received event:', JSON.stringify(event, null, 2));
    event.Records.forEach((record) => {
        // Kinesis data is base64 encoded so decode here
        const payload = new Buffer(record.kinesis.data, 'base64').toString('ascii');
        console.log('Decoded payload:', payload);
        var userid = event.userid;
        var username = event.username;
        var firstname = event.firstname;
        console.log(userid + "," + username +","+ firstname);

        var item = {
            "userid" : userid,
            "username" : username,
            "firstname" : firstname
        };

        var params = {
            TableName : tableName,
            Item : item
        };
        console.log(params);

        db.putItem(params, function(err, data){
            if(err) console.log(err);
            else console.log(data);
        });

    });
    callback(null, `Successfully processed ${event.Records.length} records.`);
};

不知何故,我无法在 lambda 函数执行中看到 console.logs。我在流页面中看到,已将 putRecord 写入流并获取,但不知何故,我在 Lambdafunction 页面和 DynamoDB 表中看不到任何内容。

我有一个针对 Java 代码的 IAM 策略,用于将数据提取到 Kinesis 中,另一个针对 Lambda 函数(即 lambda-kinesis-execution-role),还有一个针对 DynamoDB 的策略,用于将数据提取到表中。

有没有任何教程可以展示如何以正确的方式完成它?我感觉我在这个过程中遗漏了很多要点,例如如何链接所有这些 IAM 策略并使它们同步,以便当数据放入流中时由 Lambda 处理并最终进入 Dynamo?

非常感谢任何指示和帮助。


如果您上面的代码是您正在使用的代码的直接副本,那么您正在引用event.userid但你应该使用payload.userid。您已将 Kinesis 记录解码到有效负载变量中。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Kinesis lambda DynamoDB 的相关文章

随机推荐

  • 在 rich:dataTable 中使用 a4j:repeat 或 ui:repeat 无法正确呈现单选按钮

    使用时
  • 使复合小部件可拖动时出现问题

    我对 gwt n dnd 相对较新 我创建了一个复合小部件 当我尝试使复合小部件的对象可拖动时 它会抛出异常 dragHandle 必须实现 HasMouseDownHandlers HasMouseUpHandlers HasMouseM
  • Expression.ToString() 有效吗?

    我有一个生成的 lambda 但是当我想观看它时 它就像一个普通的 lambda 它只是不显示任何内容 当我打电话时expr Body ToString 我得到以下信息 var compareA 但表达式的 DebugView 工作正常 L
  • 如何使用两个不同的节点版本运行两个不同的nodejs应用程序[关闭]

    Closed 这个问题需要多问focused help closed questions 目前不接受答案 我们有两个 Node js 应用程序 这两个应用程序不能在同一节点版本中运行 一个应用程序只能在node4 8 3中运行 另一个应用程
  • MapFragment 样式为 Dialog 导致 TextView 透明

    这是设置 我正在构建和显示样式为居中对话框的活动 这是为了显示不应在设备上全屏显示的分层内容 一种类型的内容是地图 所以我已经成功地将 MapFragment 加载到对话框样式的 FragmentActivity 中 这确实有效 问题是当我
  • 在 Python、NumPy 和 R 中创建相同的随机数序列

    Python NumPy 和 R 都使用相同的算法 Mersenne Twister 来生成随机数序列 因此 从理论上讲 设置相同的种子应该会在所有 3 个中产生相同的随机数序列 但事实并非如此 我认为这 3 个实现使用不同的参数导致了这种
  • 使用 Cocoa 和 Core Data 自动保存

    我正在开发一个非基于文档的核心数据应用程序 我希望更改发生时能够保存 这是用户对此类应用程序的期望 这也是苹果公司实施的iPhoto or iTunes 一种强力方法是设置一个计时器来频繁保存 然后 由保存触发的方法将吞掉所有验证错误 以免
  • Emacs 使用 Git-Rebase 将缓冲区恢复到奇怪的先前状态

    我在 OS X 上使用 Emacs 23 3 1 我从终端发出 git 命令 而不使用任何 Emacs 的 VC 功能 我将 Emacs 设置为在修改文件时刷新 这是通过我的 emacs 文件中的这些行启用的 custom set vari
  • 使用 javascript 处理文本区域上的 Enter 键

    我在页面上有 5 个文本区域 我希望在第一个文本区域上按 Enter 键时发生特定事件 在其他文本区域上按 Enter 键时发生不同的事件 您能否建议如何实现这一目标
  • 替代

    我正在尝试修复为我编写的旧脚本 我需要它运行而无需 我想从脚本内部运行该函数 而无需像该命令那样的内联代码 抱歉 我不是 JS 专家 但是我该怎么做呢 或者 如果您使用的是 jQuery function Your code here
  • BITS 多域传输文件

    如何在不同域的服务器之间传输文件 i e PS C Users Desktop gt Import Module bitstransfer PS C Users Desktop gt c get credential PS C Users
  • 在R中进行线性回归时,如何有条件地删除因子的NA观察?

    我正在尝试在 R 中建立一个简单的线性回归模型 模型中有三个因子变量 模型是 lm Exercise Econ Job Position 其中 锻炼 是数字因变量 即锻炼的时间量 经济 工作 职位 都是因子变量 经济 是指一个人是否有工作
  • 需要 T SQL 合并示例来帮助理解

    下列 MERGE dbo commissions history AS target USING SELECT amount requestID AS source amount request ON target request sour
  • 如何对齐SpriteBatch.DrawString绘制的文本?

    有没有一种简单的方法可以将文本向右和居中对齐 而不是默认的左对齐 我使用这段代码 Flags public enum Alignment Center 0 Left 1 Right 2 Top 4 Bottom 8 public void
  • C# double 未按预期工作[重复]

    这个问题在这里已经有答案了 我知道双精度数是小数 在下面的程序中 输出是 1 尽管我认为它会重复 1 05 static void Main string args double d 19 18 Console WriteLine d Co
  • 从结构数组中选择 Spark DataFrames 中的特定列

    我有一个 Spark 数据框df具有以下架构 root k integer nullable false v array nullable true element struct containsNull true a integer nu
  • 使用 VS Code 远程 Docker 容器网络挂起 5 秒

    我有 3 个服务 共享同一网络的 webapp app 数据库 db 和 redis rd 随机地 两个服务 数据库和 Redis 的连接都会挂起大约 5 秒 不仅当我运行连接到数据库的网络服务器时 甚至当我浏览网页上的链接时也是如此 有时
  • 如何在 git 中列出版本控制的文件?

    我想列出 git 存储库根目录中的版本控制文件 要在集市中执行相同的操作 您可以运行 bzr ls versioned non recursive 我如何在 git 中执行此操作 如果您准确描述要显示的列表 将会更有帮助 从 bzr 文档猜
  • SQL Server 查询处理器耗尽内部资源

    Query update mytable set mycol null where id in 583048 583049 50000 more Message 查询处理器耗尽了内部资源并且无法 生成查询计划 这是一个罕见的事件 仅预计 极
  • Kinesis lambda DynamoDB

    我正在学习 AWS 服务的一个用例 在浏览完文档后 我想出了一个简单的流程 我想使用 Streams API 和 KPL 将数据提取到 Kinesis 流中 我使用示例 putRecord 方法将数据提取到流中 我正在将此 JSON 摄取到