从 Heroku 上运行的 Celery 任务连接时出现 MySQL 查询错误

2024-02-20

在对外部 MySQL 数据库执行查询时,但仅在从 Heroku 上运行的 Celery 任务连接时,我看到错误的查询结果。同样的任务,在我自己的机器上运行时不会显示这些错误,并且错误只出现大约一半的时间(尽管当它们失败时,all任务是错误的)。

这些任务由 Celery 通过 Redis 进行管理,MySQL 数据库本身并不在 Heroku 上运行。我的本地计算机和 Heroku 都连接到同一个 MySQL 数据库。

我使用 MySQL 连接到数据库,使用 pymysql 驱动程序,使用;

DB_URI = 'mysql+pymysql://USER:PW@SERVER/DB'

engine = create_engine(stats_config.DB_URI, convert_unicode=True, echo_pool=True)
db_session = scoped_session(sessionmaker(autocommit=False, autoflush=False, bind=engine))
Base = declarative_base()
Base.query = db_session.query_property()

任务是一项一项执行的。

以下是具有不同结果的任务示例:

@shared_task(bind=True, name="get_gross_revenue_task")
def get_gross_revenue_task(self, g_start_date, g_end_date, START_TIME_FORM):

    db_session.close()
    start_date = datetime.strptime(g_start_date, '%d-%m-%Y')
    end_date = datetime.strptime(g_end_date, '%d-%m-%Y')

    gross_rev_trans_VK = db_session.query(func.sum(UsersTransactionsVK.amount)).filter(UsersTransactionsVK.date_added >= start_date, UsersTransactionsVK.date_added <= end_date, UsersTransactionsVK.payed == 'Yes').scalar()
    gross_rev_trans_Stripe = db_session.query(func.sum(UsersTransactionsStripe.amount)).filter(UsersTransactionsStripe.date_added >= start_date, UsersTransactionsStripe.date_added <= end_date, UsersTransactionsStripe.payed == 'Yes').scalar()
    gross_rev_trans = db_session.query(func.sum(UsersTransactions.amount)).filter(UsersTransactions.date_added >= start_date, UsersTransactions.date_added <= end_date, UsersTransactions.on_hold == 'No').scalar()

    if gross_rev_trans_VK is None:
        gross_rev_trans_VK = 0

    if gross_rev_trans_Stripe is None:
        gross_rev_trans_Stripe = 0

    if gross_rev_trans is None:
        gross_rev_trans = 0

    print ('gross', gross_rev_trans_VK, gross_rev_trans_Stripe, gross_rev_trans)

    total_gross_rev = gross_rev_trans_VK + gross_rev_trans_Stripe + gross_rev_trans

    return {'total_rev' : str(total_gross_rev / 100), 'current': 100, 'total': 100, 'statistic': 'get_gross_revenue', 'time_benchmark': (datetime.today() - START_TIME_FORM).total_seconds()}

# Selects gross revenue between selected dates
@app.route('/get-gross-revenue', methods=["POST"])
@basic_auth.required
@check_verified
def get_gross_revenue():
    if request.method == "POST":
        task = get_gross_revenue_task.apply_async([session['g_start_date'], session['g_end_date'], session['START_TIME_FORM']])
        return json.dumps({}), 202, {'Location': url_for('taskstatus_get_gross_revenue', task_id=task.id)}

这些都是简单而快速的任务,只需几秒钟即可完成。

任务因产生而失败small差异。例如,对于正确结果为 30111 的任务,当出现问题时,任务将产生 29811。始终是使用 db 的代码

我尝试过的:

  • 我已经通过执行使用相同的时区:

    db_session.execute("SET SESSION time_zone = 'Europe/Berlin'")
    
  • 我检查了工作日志中的错误。虽然有一些条目像

    2013 Lost connection to MySQL
    
    sqlalchemy.exc.ResourceClosedError: This result object does not return rows. It has been closed automatically
    
    2014 commands out of sync
    

    我还没有发现 SQL 错误和错误结果之间的关联。即使没有丢失连接,也可能会出现错误的任务结果。

  • 一个非常肮脏的修复方法是对其中一个任务的预期结果进行硬编码,首先执行该结果,然后在生成的结果不正确时重新提交所有内容。

  • 这可能是我使用 SQLAlchemy 会话方式的缓存或隔离级别问题。因为我只需要使用 SELECT(不需要插入或更新),所以在运行任务之前我还尝试了不同的隔离级别设置,例如

    #db_session.close()
    #db_session.commit()
    #db_session.execute('SET TRANSACTION READ ONLY')
    

    当我在 Heroku 上运行它们时,它们会显示错误,但当我在 Windows 计算机上运行它们时,它们会工作。

    我还尝试改变连接本身'isolation_level="READ UNCOMMITTED',没有任何结果。

  • 我确信工人们不会重复使用同样的东西db_session.

  • 似乎只有使用的任务db_session在查询中可能会返回错误的结果。代码使用query属性上的Base基类(一个db_session.query_property()对象,例如Users.query)似乎没有问题。我以为这基本上是同一件事?


您正在不同工作人员的任务之间重复使用会话。创建您的会话每个 Celery 工人,甚至每个任务。

要知道任务实际上是由每个工作人员持久保存的。您可以使用它来缓存每个任务的会话,这样您就不必在每次运行任务时重新创建会话。这是最容易完成的自定义任务类 https://celery.readthedocs.io/en/latest/userguide/tasks.html#custom-task-classes;该文档使用数据库连接缓存作为示例。

要通过 SQLAlchemy 会话执行此操作,请使用:

Session = scoped_session(sessionmaker(autocommit=True, autoflush=True))

class SQLASessionTask(Task):
    _session = None

    @property
    def session(self):
        if self._session is None:
            engine = create_engine(
                stats_config.DB_URI, convert_unicode=True, echo_pool=True) 
            self._session = Session(bind=engine)
        return self._session

使用它作为:

@shared_task(base=SQLASessionTask, bind=True, name="get_gross_revenue_task")
def get_gross_revenue_task(self, g_start_date, g_end_date, START_TIME_FORM):
    db_session = self.session
    # ... etc.

仅当当前任务需要时,即您访问时,这才会为当前任务创建一个 SQLAlchemy 会话self.session.

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

从 Heroku 上运行的 Celery 任务连接时出现 MySQL 查询错误 的相关文章

随机推荐