从 celery 执行插入时 mysql 命令不同步

2024-03-18

当使用自定义数据库库和 celery 时,我遇到了可怕的 MySQL 命令不同步的问题。

库如下:

import pymysql
import pymysql.cursors
from furl import furl

from flask import current_app

class LegacyDB:
    """Db

    Legacy Database connectivity library

    """

    def __init__(self,app):
        with app.app_context():
            self.rc = current_app.config['RAVEN']
            self.logger = current_app.logger
            self.data = {}
            # setup Mysql
            try:
                uri = furl(current_app.config['DBCX'])
                self.dbcx = pymysql.connect(
                    host=uri.host,
                    user=uri.username,
                    passwd=uri.password,
                    db=str(uri.path.segments[0]),
                    port=int(uri.port),
                    cursorclass=pymysql.cursors.DictCursor
                    )
            except:
                self.rc.captureException()

    def query(self, sql, params = None, TTL=36):
        # INPUT 1 : SQL query
        # INPUT 2 : Parameters
        # INPUT 3 : Time To Live
        # OUTPUT  : Array of result

        # check that we're still connected to the
        # database before we fire off the query
        try:
            db_cursor = self.dbcx.cursor()
            if params:
              self.logger.debug("%s : %s" % (sql, params))
              db_cursor.execute(sql,params)
              self.dbcx.commit()
            else:
              self.logger.debug("%s" % sql)
              db_cursor.execute(sql)
            self.data = db_cursor.fetchall()
            if self.data == None:
              self.data = {}
            db_cursor.close()
        except Exception as ex:
            if ex[0] == "2006":
                db_cursor.close()
                self.connect()
                db_cursor = self.dbcx.cursor()
                if params:
                  db_cursor.execute(sql,params)
                  self.dbcx.commit()
                else:
                  db_cursor.execute(sql)
                self.data = db_cursor.fetchall()
                db_cursor.close()
            else:
                self.rc.captureException()

        return self.data

该库的目的是在我将遗留数据库模式从基于 C++ 的系统迁移到基于 Python 的系统时与 SQLAlchemy 一起工作。

所有配置都是通过 Flask 应用程序完成的,app.config['DBCX'] 值读取与 SQLAlchemy 字符串(“mysql://user:pass@host:port/dbname”)相同,使我可以轻松切换未来。

我有许多通过 celery 运行“INSERT”语句的任务,所有这些任务都利用这个库。正如您可以想象的那样,运行 Celery 的主要原因是这样我可以增加该应用程序的吞吐量,但是我似乎在我的库或应用程序中遇到了线程问题,因为一段时间后(大约 500 条已处理的消息)我请参阅日志中的以下内容:

Stacktrace (most recent call last):

  File "legacy/legacydb.py", line 49, in query
    self.dbcx.commit()
  File "pymysql/connections.py", line 662, in commit
    self._read_ok_packet()
  File "pymysql/connections.py", line 643, in _read_ok_packet
    raise OperationalError(2014, "Command Out of Sync")

显然我做错了什么导致了这个错误,但是MySQL是否启用/禁用自动提交或者我在哪里放置connection.commit()调用似乎并不重要。

如果我省略了connection.commit(),那么我不会将任何内容插入到数据库中。

我最近从 mysqldb 迁移到 pymysql,出现次数似乎较低,但是考虑到这些是简单的“插入”命令而不是复杂的选择(该数据库甚至没有任何外键约束!)努力找出问题所在。

就目前情况而言,我无法使用executemany,因为我无法提前准备语句(我正在从“firehose”消息队列中提取数据并将其存储在本地以供以后处理)。


首先,请确保celerythingamajig 使用自己的连接,因为

>>> pymysql.threadsafety
1

意思是:“线程可以共享模块,但不能共享连接” https://www.python.org/dev/peps/pep-0249/#threadsafety.

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

从 celery 执行插入时 mysql 命令不同步 的相关文章

  • 引发 RuntimeError(f"目录 '{directory}' 不存在") RuntimeError: 导入 fitz 时目录 'static/' 不存在

    当我运行 extract img py 文件时出现此错误 RuntimeError f 目录 directory 不存在 运行时错误 导入 fitz 时不存在目录 static 我不明白为什么这会给我发回此错误消息 我之前看到过关于这个话题
  • 如何在Python中的BeautifulSoup4中使用.next_sibling时忽略空行

    由于我想删除 html 网站中重复的占位符 因此我使用 BeautifulSoup 的 next sibling 运算符 只要重复项位于同一行 就可以正常工作 参见数据 但有时它们之间有一个空行 所以我希望 next sibling 忽略它
  • 将 matplotlib png 转换为 base64 以在 html 模板中查看

    背景 你好 我正在尝试制作一个简单的网络应用程序 按照教程计算阻尼振动方程 并将结果的 png 返回到 html 页面 然后将其转换为 Base64 字符串 Problem 该应用程序运行正常 只是在计算结果时返回损坏的图像图标 可能是因为
  • 使用字母而不是数字进行顺序计数[重复]

    这个问题在这里已经有答案了 我需要一种方法 将字符串 递增 到 z 然后将 aa 递增到 az 然后将 ba 递增到 bz 依此类推 就像 Excel 工作表中的列一样 我将向该方法提供前一个字符串 它应该增加到下一个字母 PSEUDO C
  • 在 EXISTS 查询中使用 LIMIT 有什么意义吗?

    添加一个是否有任何性能优势LIMIT to an EXISTS查询 或者 MySQL 会自行应用限制吗 Example IF EXISTS SELECT 1 FROM my table LIMIT 1 can this improve pe
  • 理解@property装饰器和继承[重复]

    这个问题在这里已经有答案了 这里是 Python 3 以防万一它很重要 我试图正确理解如何实现继承 property使用 我已经搜索了 StackOverflow 并阅读了大约 20 个类似的问题 但无济于事 因为他们试图解决的问题略有不同
  • Selenium Webdriver - Python - leboncoin - pb 选择带重音的按钮

    我正在尝试在以下网站上自动填写表格 https www leboncoin fr https www leboncoin fr 我用 Selenium IDE 录制了一个脚本 我有一个通过单击 Se 连接器 按钮并填写我的密码和用户名来自动
  • Pandas Pivot_Table :非数字值的行计算百分比

    这是我在数据框 df 中的数据 Document Name Time SPS2315511 A 1 HOUR SPS2315512 B 1 2 HOUR SPS2315513 C 2 3 HOUR SPS2315514 C 1 HOUR S
  • 如何使用 msgpack 进行读写?

    如何序列化 反序列化字典data with msgpack http msgpack org The Python 文档 http msgpack python readthedocs io en latest badge latest似乎
  • 管理文件字段当前 url 不正确

    在 Django 管理中 只要有 FileField 编辑页面上就会有一个 当前 框 其中包含指向当前文件的超链接 但是 此链接会附加到当前页面 url 因此会导致 404 因为不存在这样的页面 例如 http 127 0 0 1 8000
  • 如何在matplotlib中基于x轴更改直方图颜色

    我有根据 pandas 数据框计算出的直方图 我想根据 x 轴值更改颜色 例如 If the value is 0 the color should be green If the value is gt 0 the color shoul
  • 按时间戳字段中的日期过滤结果

    我已经获得了一些帮助 但不确定为什么这不起作用 我正在尝试使用表单让用户过滤他们的活动 存储在数据库中 My code GET from 01 11 2013 GET to 25 11 2013 from DateTime createFr
  • 如何将mysql数据库移动到另一个安装点

    我有一个 MySQL 数据库 它变得越来越大 我想将整个数据库移动到另一个安装点 在那里我有足够的存储空间 我希望传输当前数据 并将新数据保存到新位置 软件堆栈 在 FreeBSD 6 上运行的 MySQL 5 当然其他答案也是有效的 但如
  • 按组内顺序排序

    order by 在第二个查询中不起作用 我需要先按 DNAID 订购 然后按 DNBID 订购 首先查询其顺序为 111221 第二个查询的顺序为 112112 有关我想要完成的任务的更多信息和细节https stackoverflow
  • PHP strtotime返回Mysql UNIX_TIMESTAMP的不同值

    我在 stackoverflow 上搜索过帖子 发现了一些类似的帖子 但我认为这是一篇不同的帖子 我的 PHP 和 Mysql 服务器的时区全部设置为 UTC 在表中我使用时间戳字段 值为 2010 11 08 02 54 15 我使用这样
  • 从 Python 中编译的正则表达式中提取命名组正则表达式模式

    我有一个 Python 正则表达式 其中包含多个命名组 但是 如果先前的组已匹配 则可能会错过与一组匹配的模式 因为似乎不允许重叠 举个例子 import re myText sgasgAAAaoasgosaegnsBBBausgisego
  • 在 scrapy 中将基本 url 与结果 href 结合起来

    下面是我的蜘蛛代码 class Blurb2Spider BaseSpider name blurb2 allowed domains www domain com def start requests self yield self ma
  • Jinja2中获取请求参数

    如何检索请求参数a在 Jinja2 模板中 http foo bar a 1 我这个答案有点晚了 但其他解决方案并没有真正考虑到您对 Flask 的使用 事实上 您将 Flask 与 Jinja2 一起使用 这使得您的情况与其他框架有点不同
  • 如何动态创建 Luigi 任务

    我正在为 Luigi Tasks 构建一个包装器 但遇到了一个障碍Register http luigi readthedocs io en stable modules luigi task register html Register该
  • Shap - 颜色条不显示在摘要图中

    显示summary plot时 不显示颜色条 shap summary plot shap values X train 我尝试过改变plot size 当绘图较高时 会出现颜色条 但它非常小 看起来不应该 shap summary plo

随机推荐

  • vuejs2:我如何销毁观察者?

    我怎样才能摧毁这个观察者 当我的异步数据从父组件加载时 我的子组件中只需要一次 export default watch data function this sortBy 格雷戈尔 如果通过调用 vm watch 函数动态构造观察程序 它
  • JAVA:如何将私钥保存在具有密码保护的 pem 文件中

    我正在尝试将私钥保存在pem文件 受密码保护 问题是 pem文件已创建 我 甚至可以用 openssl 打开它 但是不需要密码 这是代码 KeyPairGenerator keygen KeyPairGenerator getInstanc
  • For Each 循环的逆序

    VB 最强大的功能之一是能够循环访问集合中的对象而不引用索引 for each loop 我发现它非常有用 只想从集合中删除对象 当从预定义对象 例如电子表格上的行 中删除对象时 如果我使用索引并从最大的对象开始并返回到第一个 则代码会更简
  • smartpackage 中的 Meteor 项目路径

    我正在寻找一种从智能包中查找流星项目路径的方法 例如 获取 meteor 文件夹所在目录的路径 我无法使用节点的 dirname 和 filename 来完成此操作 因为不知何故在流星中它们不可用 有小费吗 从 Meteor 0 6 0 开
  • 如何等待所有协程完成?

    我正在启动一个协程 我希望它在恢复执行主线程之前完成 我的代码简化如下 fun hello for i in 0 100 println hello fun main args Array
  • 多个环境中的 Azure 云服务项目配置(.csdef 和 .cscfg)

    目前我们有一个开发云服务 acme dev service 和一个生产云服务 acme prod service 我们解决方案中当前的设置有一个名为 acme application 的云服务项目 它使用 cscfg 和 csdef 文件的
  • Firebase 电话身份验证在发布时不起作用 (Android)

    当使用我的应用程序的调试版本时 电话身份验证有效 当我在手机上手动安装发布签名的 apk 时 电话身份验证也有效 但是当我将相同的工作 apk 发布到 Play 商店 Alpha 测试 时 我无法使其工作 经过一番挖掘后得到了这个 此应用无
  • Tidyr 与可选组分开

    我正在尝试使用extract来自 tidyr 包 将包含单个字符串的列拆分为 3 个单独的列 该问题的最小工作示例如下 A tibble 3 x 1 question codes
  • jquery ajax post成功返回数据

    我无法取回我的数据 这是我的代码 哪里有问题 谢谢 索引 php
  • 如何使用嵌套生成器编写 jqwik 生成器方法

    使用 jqwik net 尝试生成一个 Rule 类 其中包含一个嵌套的 RuleConfig 类 RuleConfig 类有一个嵌套的ruleProps 它是一个Map statusReturnedFromApplyingRule 方法始
  • 如何序列化包含指向原语的指针的类?

    我正在尝试使用 boost 的功能来序列化指向原语的指针 这样我就不必自己取消引用并进行深度存储 然而 当我尝试这样做时 我遇到了一堆错误 这是一个应该包含的类的简单示例save and load从文件中写入和读取类内容的方法 该程序无法编
  • 如何访问查询结果? [复制]

    这个问题在这里已经有答案了 我正在编写一个简单的测试来验证 id 列中不同值的数量与每个表的行数匹配 我期望能够访问对象的特定值 但是当我运行代码并尝试打印变量的值时 我可以看到我的对象是一个 sqlalchemy engine resul
  • jQuery 滚动对吗?

    我有以下代码 似乎可以在单击时将 div 一直滚动到左侧 我想知道是否 有一种方法可以让它一次只滚动 200px 我也可以让它向右滚动 试图查看 jQuery 文档 但找不到scrollToRight 函数 这是我的代码 leftArrow
  • 是否可以在 Firebase 上查询 !equalTo: null ?

    我使用此查询来验证我的 Firebase 上是否存在数据 使用 AngularFire2 let aux this afData list drivers query orderByChild accountHistory approved
  • Ruby on Rails“content_for:title”如何获得稍后分配的内容?

    简短的问题是 子页面如何 set the title主要布局 details 我们可以在应用程序布局中使用application html erb
  • 通过终端列出视频设备

    如何通过终端列出计算机的视频设备 有没有列出它们的命令 列出视频output设备 例如监视器 system profiler SPDisplaysDataType 这会产生如下所示的输出 Graphics Displays Intel HD
  • 动态加载 Jar 并实例化已加载类的对象

    我尝试将 jar 动态加载到我的 Java 项目中 这是类加载器的代码 public class ClassLoad public static void main String args String filePath new Strin
  • 如何强制将特定程序集加载到 MVC 2 应用程序的每个应用程序域中?

    我有一些松散耦合的代码 这些代码取决于加载到当前应用程序域中的特定程序集 Assembly assembly AppDomain CurrentDomain GetAssemblies Where candidateAssembly gt
  • 为什么我收到#Ecto.Association.NotLoaded?

    我有团队 每个团队都有用户 因此有一个连接表将用户链接到团队 因为它是多对多关系 这是我的模型 defmodule App Team do use App Web model schema teams do field owner id i
  • 从 celery 执行插入时 mysql 命令不同步

    当使用自定义数据库库和 celery 时 我遇到了可怕的 MySQL 命令不同步的问题 库如下 import pymysql import pymysql cursors from furl import furl from flask i