PNG  IHDR pHYs   OiCCPPhotoshop ICC profilexڝSgTS=BKKoR RB&*! J!QEEȠQ, !{kּ> H3Q5 B.@ $pd!s#~<<+"x M0B\t8K@zB@F&S`cbP-`'{[! eDh;VEX0fK9-0IWfH  0Q){`##xFW<+*x<$9E[-qWW.(I+6aa@.y24x6_-"bbϫp@t~,/;m%h^ uf@Wp~<5j>{-]cK'Xto(hw?G%fIq^D$.Tʳ?D*A, `6B$BB dr`)B(Ͱ*`/@4Qhp.U=pa( Aa!ڈbX#!H$ ɈQ"K5H1RT UH=r9\F;2G1Q= C7F dt1r=6Ыhڏ>C03l0.B8, c˱" VcϱwE 6wB aAHXLXNH $4 7 Q'"K&b21XH,#/{C7$C2'ITFnR#,4H#dk9, +ȅ3![ b@qS(RjJ4e2AURݨT5ZBRQ4u9̓IKhhitݕNWGw Ljg(gwLӋT071oUX**| J&*/Tު UUT^S}FU3S ԖUPSSg;goT?~YYLOCQ_ cx,!k u5&|v*=9C3J3WRf?qtN (~))4L1e\kXHQG6EYAJ'\'GgSSݧ M=:.kDwn^Loy}/TmG X $ <5qo</QC]@Caaᄑ.ȽJtq]zۯ6iܟ4)Y3sCQ? 0k߬~OCOg#/c/Wװwa>>r><72Y_7ȷOo_C#dz%gA[z|!?:eAAA!h쐭!ΑiP~aa~ 'W?pX15wCsDDDޛg1O9-J5*>.j<74?.fYXXIlK9.*6nl {/]py.,:@LN8A*%w% yg"/6шC\*NH*Mz쑼5y$3,幄'L Lݛ:v m2=:1qB!Mggfvˬen/kY- BTZ(*geWf͉9+̳ې7ᒶKW-X潬j9(xoʿܔĹdff-[n ڴ VE/(ۻCɾUUMfeI?m]Nmq#׹=TR+Gw- 6 U#pDy  :v{vg/jBFS[b[O>zG499?rCd&ˮ/~јѡ򗓿m|x31^VwwO| (hSЧc3- cHRMz%u0`:o_F@8N ' p @8N@8}' p '#@8N@8N pQ9p!i~}|6-ӪG` VP.@*j>[ K^<֐Z]@8N'KQ<Q(`s" 'hgpKB`R@Dqj '  'P$a ( `D$Na L?u80e J,K˷NI'0eݷ(NI'؀ 2ipIIKp`:O'`ʤxB8Ѥx Ѥx $ $P6 :vRNb 'p,>NB 'P]-->P T+*^h& p '‰a ‰ (ĵt#u33;Nt̵'ޯ; [3W ~]0KH1q@8]O2]3*̧7# *p>us p _6]/}-4|t'|Smx= DoʾM×M_8!)6lq':l7!|4} '\ne t!=hnLn (~Dn\+‰_4k)0e@OhZ`F `.m1} 'vp{F`ON7Srx 'D˸nV`><;yMx!IS钦OM)Ե٥x 'DSD6bS8!" ODz#R >S8!7ّxEh0m$MIPHi$IvS8IN$I p$O8I,sk&I)$IN$Hi$I^Ah.p$MIN$IR8I·N "IF9Ah0m$MIN$IR8IN$I 3jIU;kO$ɳN$+ q.x* tEXtComment

Viewing File: /opt/cloudlinux/venv/lib/python3.11/site-packages/sqlalchemy/testing/suite/test_cte.py

from .. import config
from .. import fixtures
from ..assertions import eq_
from ..schema import Column
from ..schema import Table
from ... import ForeignKey
from ... import Integer
from ... import select
from ... import String
from ... import testing


class CTETest(fixtures.TablesTest):
    __backend__ = True
    __requires__ = ("ctes",)

    run_inserts = "each"
    run_deletes = "each"

    @classmethod
    def define_tables(cls, metadata):
        Table(
            "some_table",
            metadata,
            Column("id", Integer, primary_key=True),
            Column("data", String(50)),
            Column("parent_id", ForeignKey("some_table.id")),
        )

        Table(
            "some_other_table",
            metadata,
            Column("id", Integer, primary_key=True),
            Column("data", String(50)),
            Column("parent_id", Integer),
        )

    @classmethod
    def insert_data(cls, connection):
        connection.execute(
            cls.tables.some_table.insert(),
            [
                {"id": 1, "data": "d1", "parent_id": None},
                {"id": 2, "data": "d2", "parent_id": 1},
                {"id": 3, "data": "d3", "parent_id": 1},
                {"id": 4, "data": "d4", "parent_id": 3},
                {"id": 5, "data": "d5", "parent_id": 3},
            ],
        )

    def test_select_nonrecursive_round_trip(self):
        some_table = self.tables.some_table

        with config.db.connect() as conn:
            cte = (
                select([some_table])
                .where(some_table.c.data.in_(["d2", "d3", "d4"]))
                .cte("some_cte")
            )
            result = conn.execute(
                select([cte.c.data]).where(cte.c.data.in_(["d4", "d5"]))
            )
            eq_(result.fetchall(), [("d4",)])

    def test_select_recursive_round_trip(self):
        some_table = self.tables.some_table

        with config.db.connect() as conn:
            cte = (
                select([some_table])
                .where(some_table.c.data.in_(["d2", "d3", "d4"]))
                .cte("some_cte", recursive=True)
            )

            cte_alias = cte.alias("c1")
            st1 = some_table.alias()
            # note that SQL Server requires this to be UNION ALL,
            # can't be UNION
            cte = cte.union_all(
                select([st1]).where(st1.c.id == cte_alias.c.parent_id)
            )
            result = conn.execute(
                select([cte.c.data])
                .where(cte.c.data != "d2")
                .order_by(cte.c.data.desc())
            )
            eq_(
                result.fetchall(),
                [("d4",), ("d3",), ("d3",), ("d1",), ("d1",), ("d1",)],
            )

    def test_insert_from_select_round_trip(self):
        some_table = self.tables.some_table
        some_other_table = self.tables.some_other_table

        with config.db.connect() as conn:
            cte = (
                select([some_table])
                .where(some_table.c.data.in_(["d2", "d3", "d4"]))
                .cte("some_cte")
            )
            conn.execute(
                some_other_table.insert().from_select(
                    ["id", "data", "parent_id"], select([cte])
                )
            )
            eq_(
                conn.execute(
                    select([some_other_table]).order_by(some_other_table.c.id)
                ).fetchall(),
                [(2, "d2", 1), (3, "d3", 1), (4, "d4", 3)],
            )

    @testing.requires.ctes_with_update_delete
    @testing.requires.update_from
    def test_update_from_round_trip(self):
        some_table = self.tables.some_table
        some_other_table = self.tables.some_other_table

        with config.db.connect() as conn:
            conn.execute(
                some_other_table.insert().from_select(
                    ["id", "data", "parent_id"], select([some_table])
                )
            )

            cte = (
                select([some_table])
                .where(some_table.c.data.in_(["d2", "d3", "d4"]))
                .cte("some_cte")
            )
            conn.execute(
                some_other_table.update()
                .values(parent_id=5)
                .where(some_other_table.c.data == cte.c.data)
            )
            eq_(
                conn.execute(
                    select([some_other_table]).order_by(some_other_table.c.id)
                ).fetchall(),
                [
                    (1, "d1", None),
                    (2, "d2", 5),
                    (3, "d3", 5),
                    (4, "d4", 5),
                    (5, "d5", 3),
                ],
            )

    @testing.requires.ctes_with_update_delete
    @testing.requires.delete_from
    def test_delete_from_round_trip(self):
        some_table = self.tables.some_table
        some_other_table = self.tables.some_other_table

        with config.db.connect() as conn:
            conn.execute(
                some_other_table.insert().from_select(
                    ["id", "data", "parent_id"], select([some_table])
                )
            )

            cte = (
                select([some_table])
                .where(some_table.c.data.in_(["d2", "d3", "d4"]))
                .cte("some_cte")
            )
            conn.execute(
                some_other_table.delete().where(
                    some_other_table.c.data == cte.c.data
                )
            )
            eq_(
                conn.execute(
                    select([some_other_table]).order_by(some_other_table.c.id)
                ).fetchall(),
                [(1, "d1", None), (5, "d5", 3)],
            )

    @testing.requires.ctes_with_update_delete
    def test_delete_scalar_subq_round_trip(self):

        some_table = self.tables.some_table
        some_other_table = self.tables.some_other_table

        with config.db.connect() as conn:
            conn.execute(
                some_other_table.insert().from_select(
                    ["id", "data", "parent_id"], select([some_table])
                )
            )

            cte = (
                select([some_table])
                .where(some_table.c.data.in_(["d2", "d3", "d4"]))
                .cte("some_cte")
            )
            conn.execute(
                some_other_table.delete().where(
                    some_other_table.c.data
                    == select([cte.c.data]).where(
                        cte.c.id == some_other_table.c.id
                    )
                )
            )
            eq_(
                conn.execute(
                    select([some_other_table]).order_by(some_other_table.c.id)
                ).fetchall(),
                [(1, "d1", None), (5, "d5", 3)],
            )
Back to Directory=ceiIENDB`