尝试从 python 写入 cassandra 时 CQL 查询中出现语法错误

2023-12-29

因此,我正在用 python 构建一个应用程序,该应用程序从 twitter 获取数据,然后将其保存到 cassandra。我当前的问题在于一个从kafka读取数据并尝试将其写入cassandra的脚本,如下所示:

import threading, logging, time
import multiprocessing
from cassandra.cluster import Cluster

from kafka import KafkaConsumer, KafkaProducer




class Consumer(multiprocessing.Process):
   def __init__(self):
        multiprocessing.Process.__init__(self)
        self.stop_event = multiprocessing.Event()

    def stop(self):
         self.stop_event.set()

    def run(self):
       consumer = KafkaConsumer(bootstrap_servers='localhost:9092',
                                 auto_offset_reset='earliest',
                                 consumer_timeout_ms=1000)
        consumer.subscribe(['twitter'])



    while not self.stop_event.is_set():
        for message in consumer:
            # session.execute(
            #     """
            #     INSERT INTO mensaje_73 (tweet)
            #     VALUES (message)
            #     """
            # )
            print(message)
            cluster = Cluster()
            session = cluster.connect('twitter')
            session.execute(
                    """
                    INSERT INTO mensaje_73 (tweet)
                    VALUES (message)
                    """
                )

            # if self.stop_event.is_set():
            #     break

    consumer.close()


   def main():

    tasks = [
        Consumer()
    ]

    for t in tasks:
        t.start()

    time.sleep(10)

    for task in tasks:
        task.stop()



if __name__ == "__main__":
     logging.basicConfig(
        format='%(asctime)s.%(msecs)s:%(name)s:%(thread)d:% 
   (levelname)s:%(process)d:%(message)s',
        level=logging.INFO
    )
    main()

我尝试将测试消息插入表 twitter.mensaje_73 并且它运行良好,如下所示:

import threading, logging, time
import multiprocessing
from cassandra.cluster import Cluster

from kafka import KafkaConsumer, KafkaProducer


cluster = Cluster()
session = cluster.connect('twitter')
session.execute(
    """
    INSERT INTO mensaje_73 (tweet)
    VALUES ('helooo')
    """
)

任何帮助将不胜感激:)


所以这里的问题是你的message变量在 CQL 中被视为文字,如果没有单引号,它将无法工作。因此,出现错误。

为了解决这个问题,我将使用准备好的语句,然后绑定message to it:

session = cluster.connect('twitter')
preparedTweetInsert = session.prepare(
        """
        INSERT INTO mensaje_73 (tweet)
        VALUES (?)
        """
    )
session.execute(preparedTweetInsert,[message])

尝试一下,看看是否有帮助。

此外,这似乎是一个简单的数据模型。但要问自己的一件事是,您将如何查询这些数据?这不会起作用,除非tweet是你唯一的主键。这也意味着查询单个推文的唯一方法是通过消息的确切文本。需要考虑一些事情,但按天分区可能是一个更好的选择,因为它会很好地分布并提供更好的查询模型。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

尝试从 python 写入 cassandra 时 CQL 查询中出现语法错误 的相关文章

  • Python range() 和 zip() 对象类型

    我了解功能如何range and zip 可以在 for 循环中使用 然而我期望range 输出一个列表 很像seq在 Unix shell 中 如果我运行以下代码 a range 10 print a 输出是range 10 表明它不是一
  • 使用 asyncio 时应该如何创建属性?

    在创建使用 asyncio 的类时 我发现自己处于属性 getter 需要进行 io 操作的情况 因此该函数应该是一个协程 然而 等待房产的感觉却很不寻常 这是我的意思的一个最小的例子 该代码有效并且可以运行 import asyncio
  • 使用 InlineKeyboardButton python telegram bot 发送命令

    在Python电报机器人中 是否有可能InlineKeyboardButton发送类似命令 cancel当它被按下时 例如 当用户按下取消按钮时 他们将自动发送 cancel 命令 然后由机器人处理 从这里的例子来看 https githu
  • 无法使用 datastax java 驱动程序通过 UDT 密钥从 cassandra 检索

    我正在尝试使用用户定义的类型作为分区键将对象存储在 cassandra 中 我正在使用 datastax java 驱动程序进行对象映射 虽然我能够插入到数据库中 但无法检索该对象 如果我更改分区键以使用非 udt 例如文本 我就能够保存和
  • 如何在 Python 中根据日期列绘制分类变量

    我有这样的数据 Date Fruit 2017 01 01 Orange 2017 01 01 Apple 2017 01 08 Orange 2017 01 09 Orange 2017 01 09 Apple 我想在一个图中按日期绘制橙
  • Django Web 应用程序中的 SMTP 问题

    我被要求向使用 Django Python 框架实现的现有程序添加一个功能 此功能将允许用户单击一个按钮 该按钮将显示一个小对话框 表单以输入值 我确实编写了一些代码 显示电子邮件已发送的消息 但实际上 它没有发送 My code from
  • 更改用作函数全局作用域的字典

    我想做一个 purePython 的装饰器 其中一部分是能够有选择地禁止访问函数的全局范围 有没有一种方法可以以编程方式更改哪个字典事物充当函数的全局 外部作用域 因此 例如在下面我希望能够拦截对f in h并抛出错误 但我想允许访问g因为
  • pip3:错误的解释器:没有这样的文件或目录

    我正在尝试使用安装依赖项pip3 command 当前场景 Dev which python Users Dev anaconda bin python Dev which python3 usr local bin python3 Dev
  • 使用一次递归调用实现递归

    给定一个函数如下 f n f n 1 f n 3 f n 4 f 0 1 f 1 2 f 2 3 f 3 4 我知道使用递归来实现它 并在一个函数内进行三个递归调用 但我想在函数内仅使用一次递归调用来完成此操作 怎样才能做到呢 要实现使用
  • 为什么 __instancecheck__ 没有被调用?

    我有以下 python3 代码 class BaseTypeClass type def new cls name bases namespace kwd result type new cls name bases namespace p
  • 使用 imblearn 管道进行交叉验证之前或之后是否发生过采样?

    在对训练数据进行交叉验证以验证我的超参数之前 我已将数据分为训练 测试 我有一个不平衡的数据集 并且想要在每次迭代中执行 SMOTE 过采样 因此我使用以下方法建立了一个管道imblearn 我的理解是 将数据分成k折后应该进行过采样 以防
  • ssl.SSLEOFError: EOF 发生违反协议 (_ssl.c:1129)

    我正在尝试使用 GOOGLE Drive Api 从电脑上传多个文件到云端硬盘 from pydrive auth import GoogleAuth from pydrive drive import GoogleDrive import
  • Python 2 的 `exceptions` 模块在 Python3 中丢失了,它的内容到哪里去了?

    一位朋友提到 对于 Python 2 假设您在命令行上的路径环境变量中有它 pydoc exceptions 非常有用 知道它应该可以为他每周节省几分钟的网络查找时间 我自己每周都会用谷歌搜索一次例外层次结构 所以这对我来说也是一个有用的提
  • 如何使用 PyCharm 运行 Pylint

    我想将 Pylint 配置为我正在处理的 Python 项目的整个项目目录中的外部工具 我尝试将存储库用作模块 init py没有的话 这两种方式都不起作用 我在设置 Pylint 与 PyCharm 一起运行时遇到困难 我知道我应该将它作
  • 如何修复 TypeError: G 必须是 'd' 矩阵?

    目标 尝试通过优化过程运行玩具数据集 我遇到以下错误 TypeError Traceback most recent call last
  • 使用 pybtex 将 bibtex 转换为格式化的 HTML 参考书目,例如哈佛风格

    我正在使用 Django 并将 bibtex 存储在我的模型中 并且希望能够以格式化 HTML 字符串的形式向我的视图传递引用 使其看起来像哈佛引用样式 使用中描述的方法Pybtex 无法识别 bibtex 条目 https stackov
  • bs4 `next_sibling` VS `find_next_sibling`

    我在使用时遇到困难next sibling 并且类似地与next element 如果用作属性 我不会得到任何返回 但如果用作find next sibling or find next 然后就可以了 来自doc https www cru
  • 带表格格式的 Matplotlib 条形图

    我在图的底部添加了一个表格 但它存在许多问题 右边的内边距太多了 左边的填充太少 底部没有填充物 单元格对于其中的文本来说太小 该表距离图的底部太近 属于行名称的单元格的颜色未与条形图的颜色相匹配 我要发疯了 去摆弄这个 有人可以帮我解决这
  • QDataWidgetMapper;将 TableWidget 映射到模型

    我没有找到任何文档显示 QDataWidgetMapper 实际上适用于哪些小部件 也没有找到任何使用 QTableWidget 进行映射的实现 它绝对适用于 QLineEdit 和 QComboBoxes 它们是输入小部件 但是是否可以映
  • Cassandra CQL v3.0 和复合类型

    我正在浏览以下文档CQLv3 0 http www datastax com docs 1 1 references cql index 我们是否应该在更新中指定复合键并选择 a b 1 以防万一comparator or key vali

随机推荐

  • Laravel sainttum 更改 csrf cookie 路由

    我怎样才能将 laravel sainttum csrf cookie 路由更改为 api sanctum csrf cookie 我尝试将其添加到 api php 路由中 use Laravel Sanctum Http Controll
  • 使用 Java 中的 Lambda 函数进行 AWS S3 事件通知

    我正在尝试使用 Lambda 函数进行 S3 Put 事件通知 当我在 S3 存储桶中放置 添加任何新的 JSON 文件时 应该调用我的 Lambda 函数 我面临的挑战是没有足够的文档来用 Java 实现这样的 Lambda 函数 我找到
  • Angular 2 - 获取 Observable 中已更改的 FormControl 的值

    我有一个简单的表单FormBuilder this contactForm formBuilder group name email phone 我想观察每个控件的更改 并在发生这种情况时使用更新后的值运行函数 getContacts va
  • 如何在 Visual Studio 2010 中添加 ASP.NET MVC 3 Web 应用程序?

    我的VS 2010如下 微软视觉工作室 2010 版本 10 0 30319 1 RTMRel Microsoft NET Framework 版本 4 0 30319 RTMRel 安装版本 旗舰版 ASP NET MVC 3 Web 应
  • 如何从 IntelliJ IDEA 内部重命名本地 Git 分支?

    您可以使用 IntelliJ IDEA 的 Git 插件做很多事情 但我还没有找到重命名分支的方法 有吗 我知道我总是可以打开终端并执行git branch m source target 但我也希望找到一个 GUI 解决方案 此功能有几个
  • Spring Boot计划任务不适用于docker容器

    我的 Spring Boot 项目在 docker 容器上运行时遇到问题 如果我以恶魔化方式运行容器 docker run d 当我在后台运行非图像时 一切正常 不幸的是 我必须将其作为妖魔化来运行 并且我不知道如何解决该问题 感谢您提供任
  • 使用“this->”的性能损失?

    考虑 C 类中两个类似的 C 成员函数的示例 void C function Foo new f f new f and void C function Foo new f this gt f new f 这些函数的编译方式是否相同 使用是
  • 释放内存的重要性? [复制]

    这个问题在这里已经有答案了 可能的重复 当 malloc 之后不释放时 到底会发生什么 https stackoverflow com questions 654754 what really happens when you dont f
  • MASM:在 .data 声明中使用当前位置计数器 ($)

    我遇到了有关 MASM 中当前位置计数器的问题 这是我的汇编代码 我使用 Visual Studio 2013 Express 进行汇编 386 model flat stdcall stack 8192 ExitProcess proto
  • 使用 JavaScript 读取 CSS 值

    这有效 div style width 100 div 这确实not work div div 我也尝试过将 css 样
  • 如何避免重复将大文件加载到Python脚本中?

    我编写了一个 python 脚本来获取一个大文件 一个矩阵 50k 行 X 500 列 并将其用作数据集来训练随机森林模型 我的脚本有两个函数 一个用于加载数据集 另一个用于使用所述数据训练随机森林模型 这些都工作得很好 但文件上传大约需要
  • 使用 Node.js 设置 SSL

    我在 GoDaddy 购买了 SSL 证书 并使用以下 node js 服务器尝试设置它 var https require https module for https fs require fs required to read cer
  • 使用 Oracle 客户端 64 位和 Visual Studio 2010 时出现 BadImageFormatException!

    我们的一名开发团队成员遇到了错误 尝试加载 Oracle 客户端库抛出 BadImageFormatException 它似乎 当在 64 位模式下运行并安装了 32 位 Oracle 客户端组件时 会出现此问题 但配置系统的是我 以下是规
  • 点击事件被列表视图父项捕获

    我正在编写一个在 Firemonkey 中使用的自定义开关对象TListView每个项目的控制 除了一个奇怪的故障之外 一切都按预期进行 当用户单击其中一项而不是特定的开关对象时 它无论如何都会切换开关 我假设MouseDown当用户单击列
  • R 数据帧聚合列表

    我确实有 53 个数据框 purchase01 到purchase53 的列表 按日期排序 有 18 个变量和不同的行数 已尝试 但无法在下面粘贴示例 我想要总计的每个不同的数据帧通过其重复值 V9 因子 与列 V2 数字相加 我还没找到答
  • AFHTTPRequestOperationManager 返回块中的数据

    我在我的应用程序中创建了一个 APIController 它有几个方法可以调用特定的 api url 并返回一个用 api 调用结果填充的模型对象 该 API 使用 json 到目前为止我的代码如下所示 Definition MyModel
  • 自定义单元格:致命错误:在展开可选值时意外发现 nil

    我有一个带有创建为 xib 的自定义单元格的表格视图 我没有使用故事板 我有一个问题 我无法用来自网络服务结果的数据填充我的表 另外 我在自定义单元格中有 4 个标签 在我的自定义单元类中 当我尝试为每个项目设置标签时 它给了我如上所述的致
  • Django从apache获取环境变量

    我似乎无法让 Django 读取我从环境变量中配置的设置 我遵循了一些在线指南 并发现了一些其他问题 因此尝试配置如下 阿帕奇配置 WSGIScriptAlias v4 usr local myproject4 myproject4 wsg
  • 如何在 ASP.NET DataRepeater 控件中执行条件逻辑?

    我将 DataRepeater 控件绑定到具有许多列的表 我只想显示其中的一个子集 具体取决于填充的内容 我应该如何 在哪里进行数据中继器中的条件测试 这是我的 itemtemplate 中的代码 我得到的错误是 CS0103 名称 容器
  • 尝试从 python 写入 cassandra 时 CQL 查询中出现语法错误

    因此 我正在用 python 构建一个应用程序 该应用程序从 twitter 获取数据 然后将其保存到 cassandra 我当前的问题在于一个从kafka读取数据并尝试将其写入cassandra的脚本 如下所示 import thread