为了将 DataFrame 上传到临时表,然后执行 UPDATE,您不需要自己编写 SQL,您可以让 SQLAlchemy Core 为您完成此操作:
import pandas as pd
import sqlalchemy as sa
def update_table_columns_from_df(engine, df, table_name, cols_to_update):
metadata = sa.MetaData()
main_table = sa.Table(table_name, metadata, autoload_with=engine)
pk_columns = [x.name for x in main_table.primary_key.columns]
df.to_sql("temp_table", engine, index=False, if_exists="replace")
temp_table = sa.Table("temp_table", metadata, autoload_with=engine)
with engine.begin() as conn:
values_clause = {x: temp_table.columns[x] for x in cols_to_update}
where_clause = sa.and_(
main_table.columns[x] == temp_table.columns[x] for x in pk_columns
)
conn.execute(
main_table.update().values(values_clause).where(where_clause)
)
temp_table.drop(engine)
if __name__ == "__main__":
test_engine = sa.create_engine(
"postgresql+psycopg2://scott:[email protected] /cdn-cgi/l/email-protection/test",
echo=True, # (for demonstration purposes)
)
with test_engine.begin() as test_conn:
test_conn.exec_driver_sql("DROP TABLE IF EXISTS main_table")
test_conn.exec_driver_sql(
"""\
CREATE TABLE main_table (
id1 integer NOT NULL,
id2 integer NOT NULL,
txt1 varchar(50),
txt2 varchar(50),
CONSTRAINT main_table_pkey PRIMARY KEY (id1, id2)
)
"""
)
test_conn.exec_driver_sql(
"""\
INSERT INTO main_table (id1, id2, txt1, txt2)
VALUES (1, 1, 'foo', 'x'), (1, 2, 'bar', 'y'), (1, 3, 'baz', 'z')
"""
)
df_updates = pd.DataFrame(
[
(1, 1, "new_foo", "new_x"),
(1, 3, "new_baz", "new_z"),
],
columns=["id1", "id2", "txt1", "txt2"],
)
update_table_columns_from_df(
test_engine, df_updates, "main_table", ["txt1", "txt2"]
)
"""SQL emitted:
UPDATE main_table
SET txt1=temp_table.txt1, txt2=temp_table.txt2
FROM temp_table
WHERE main_table.id1 = temp_table.id1 AND main_table.id2 = temp_table.id2
"""
df_result = pd.read_sql_query(
"SELECT * FROM main_table ORDER BY id1, id2", test_engine
)
print(df_result)
"""
id1 id2 txt1 txt2
0 1 1 new_foo new_x
1 1 2 bar y
2 1 3 new_baz new_z
"""