使用名为“note”的现有表
id parent_id
----------- -----------
11 NULL
22 11
33 22
44 33
55 NULL
66 55
PostgreSQL 中的一些混乱表明
WITH RECURSIVE parent (i, id, parent_id)
AS (
SELECT 0, id, parent_id FROM note WHERE id=44
UNION ALL
SELECT i + 1, n.id, n.parent_id
FROM note n INNER JOIN parent p ON p.parent_id = n.id
WHERE p.parent_id IS NOT NULL
)
SELECT * FROM parent ORDER BY i;
returned
i id parent_id
----------- ----------- -----------
0 44 33
1 33 22
2 22 11
3 11 NULL
因此我们可以通过将最后一行更改为来获得顶级父级
WITH RECURSIVE parent (i, id, parent_id)
AS (
SELECT 0, id, parent_id FROM note WHERE id=44
UNION ALL
SELECT i + 1, n.id, n.parent_id
FROM note n INNER JOIN parent p ON p.parent_id = n.id
WHERE p.parent_id IS NOT NULL
)
SELECT id FROM parent ORDER BY i DESC LIMIT 1 ;
返回
id
-----------
11
因此,要将其转换为 SQLAlchemy (1.4):
from sqlalchemy import (
create_engine,
Column,
Integer,
select,
literal_column,
)
from sqlalchemy.orm import declarative_base
connection_uri = "postgresql://scott:[email protected]/test"
engine = create_engine(connection_uri, echo=False)
Base = declarative_base()
class Note(Base):
__tablename__ = "note"
id = Column(Integer, primary_key=True)
parent_id = Column(Integer)
def get_top_level_note_id(start_id):
note_tbl = Note.__table__
parent_cte = (
select(
literal_column("0").label("i"), note_tbl.c.id, note_tbl.c.parent_id
)
.where(note_tbl.c.id == start_id)
.cte(name="parent_cte", recursive=True)
)
parent_cte_alias = parent_cte.alias("parent_cte_alias")
note_tbl_alias = note_tbl.alias()
parent_cte = parent_cte.union_all(
select(
literal_column("parent_cte_alias.i + 1"),
note_tbl_alias.c.id,
note_tbl_alias.c.parent_id,
)
.where(note_tbl_alias.c.id == parent_cte_alias.c.parent_id)
.where(parent_cte_alias.c.parent_id.is_not(None))
)
stmt = select(parent_cte.c.id).order_by(parent_cte.c.i.desc()).limit(1)
with engine.begin() as conn:
result = conn.execute(stmt).scalar()
return result
if __name__ == "__main__":
test_id = 44
print(
f"top level id for note {test_id} is {get_top_level_note_id(test_id)}"
)
# top level id for note 44 is 11
test_id = 66
print(
f"top level id for note {test_id} is {get_top_level_note_id(test_id)}"
)
# top level id for note 66 is 55