在对外部 MySQL 数据库执行查询时,但仅在从 Heroku 上运行的 Celery 任务连接时,我看到错误的查询结果。同样的任务,在我自己的机器上运行时不会显示这些错误,并且错误只出现大约一半的时间(尽管当它们失败时,all任务是错误的)。
这些任务由 Celery 通过 Redis 进行管理,MySQL 数据库本身并不在 Heroku 上运行。我的本地计算机和 Heroku 都连接到同一个 MySQL 数据库。
我使用 MySQL 连接到数据库,使用 pymysql 驱动程序,使用;
DB_URI = 'mysql+pymysql://USER:PW@SERVER/DB'
engine = create_engine(stats_config.DB_URI, convert_unicode=True, echo_pool=True)
db_session = scoped_session(sessionmaker(autocommit=False, autoflush=False, bind=engine))
Base = declarative_base()
Base.query = db_session.query_property()
任务是一项一项执行的。
以下是具有不同结果的任务示例:
@shared_task(bind=True, name="get_gross_revenue_task")
def get_gross_revenue_task(self, g_start_date, g_end_date, START_TIME_FORM):
db_session.close()
start_date = datetime.strptime(g_start_date, '%d-%m-%Y')
end_date = datetime.strptime(g_end_date, '%d-%m-%Y')
gross_rev_trans_VK = db_session.query(func.sum(UsersTransactionsVK.amount)).filter(UsersTransactionsVK.date_added >= start_date, UsersTransactionsVK.date_added <= end_date, UsersTransactionsVK.payed == 'Yes').scalar()
gross_rev_trans_Stripe = db_session.query(func.sum(UsersTransactionsStripe.amount)).filter(UsersTransactionsStripe.date_added >= start_date, UsersTransactionsStripe.date_added <= end_date, UsersTransactionsStripe.payed == 'Yes').scalar()
gross_rev_trans = db_session.query(func.sum(UsersTransactions.amount)).filter(UsersTransactions.date_added >= start_date, UsersTransactions.date_added <= end_date, UsersTransactions.on_hold == 'No').scalar()
if gross_rev_trans_VK is None:
gross_rev_trans_VK = 0
if gross_rev_trans_Stripe is None:
gross_rev_trans_Stripe = 0
if gross_rev_trans is None:
gross_rev_trans = 0
print ('gross', gross_rev_trans_VK, gross_rev_trans_Stripe, gross_rev_trans)
total_gross_rev = gross_rev_trans_VK + gross_rev_trans_Stripe + gross_rev_trans
return {'total_rev' : str(total_gross_rev / 100), 'current': 100, 'total': 100, 'statistic': 'get_gross_revenue', 'time_benchmark': (datetime.today() - START_TIME_FORM).total_seconds()}
# Selects gross revenue between selected dates
@app.route('/get-gross-revenue', methods=["POST"])
@basic_auth.required
@check_verified
def get_gross_revenue():
if request.method == "POST":
task = get_gross_revenue_task.apply_async([session['g_start_date'], session['g_end_date'], session['START_TIME_FORM']])
return json.dumps({}), 202, {'Location': url_for('taskstatus_get_gross_revenue', task_id=task.id)}
这些都是简单而快速的任务,只需几秒钟即可完成。
任务因产生而失败small差异。例如,对于正确结果为 30111 的任务,当出现问题时,任务将产生 29811。始终是使用 db 的代码
我尝试过的:
-
我已经通过执行使用相同的时区:
db_session.execute("SET SESSION time_zone = 'Europe/Berlin'")
-
我检查了工作日志中的错误。虽然有一些条目像
2013 Lost connection to MySQL
sqlalchemy.exc.ResourceClosedError: This result object does not return rows. It has been closed automatically
2014 commands out of sync
我还没有发现 SQL 错误和错误结果之间的关联。即使没有丢失连接,也可能会出现错误的任务结果。
一个非常肮脏的修复方法是对其中一个任务的预期结果进行硬编码,首先执行该结果,然后在生成的结果不正确时重新提交所有内容。
-
这可能是我使用 SQLAlchemy 会话方式的缓存或隔离级别问题。因为我只需要使用 SELECT(不需要插入或更新),所以在运行任务之前我还尝试了不同的隔离级别设置,例如
#db_session.close()
#db_session.commit()
#db_session.execute('SET TRANSACTION READ ONLY')
当我在 Heroku 上运行它们时,它们会显示错误,但当我在 Windows 计算机上运行它们时,它们会工作。
我还尝试改变连接本身'isolation_level="READ UNCOMMITTED'
,没有任何结果。
我确信工人们不会重复使用同样的东西db_session
.
似乎只有使用的任务db_session
在查询中可能会返回错误的结果。代码使用query
属性上的Base
基类(一个db_session.query_property()
对象,例如Users.query
)似乎没有问题。我以为这基本上是同一件事?