PNG  IHDR pHYs   OiCCPPhotoshop ICC profilexڝSgTS=BKKoR RB&*! J!QEEȠQ, !{kּ> H3Q5 B.@ $pd!s#~<<+"x M0B\t8K@zB@F&S`cbP-`'{[! eDh;VEX0fK9-0IWfH  0Q){`##xFW<+*x<$9E[-qWW.(I+6aa@.y24x6_-"bbϫp@t~,/;m%h^ uf@Wp~<5j>{-]cK'Xto(hw?G%fIq^D$.Tʳ?D*A, `6B$BB dr`)B(Ͱ*`/@4Qhp.U=pa( Aa!ڈbX#!H$ ɈQ"K5H1RT UH=r9\F;2G1Q= C7F dt1r=6Ыhڏ>C03l0.B8, c˱" VcϱwE 6wB aAHXLXNH $4 7 Q'"K&b21XH,#/{C7$C2'ITFnR#,4H#dk9, +ȅ3![ b@qS(RjJ4e2AURݨT5ZBRQ4u9̓IKhhitݕNWGw Ljg(gwLӋT071oUX**| J&*/Tު UUT^S}FU3S ԖUPSSg;goT?~YYLOCQ_ cx,!k u5&|v*=9C3J3WRf?qtN (~))4L1e\kXHQG6EYAJ'\'GgSSݧ M=:.kDwn^Loy}/TmG X $ <5qo</QC]@Caaᄑ.ȽJtq]zۯ6iܟ4)Y3sCQ? 0k߬~OCOg#/c/Wװwa>>r><72Y_7ȷOo_C#dz%gA[z|!?:eAAA!h쐭!ΑiP~aa~ 'W?pX15wCsDDDޛg1O9-J5*>.j<74?.fYXXIlK9.*6nl {/]py.,:@LN8A*%w% yg"/6шC\*NH*Mz쑼5y$3,幄'L Lݛ:v m2=:1qB!Mggfvˬen/kY- BTZ(*geWf͉9+̳ې7ᒶKW-X潬j9(xoʿܔĹdff-[n ڴ VE/(ۻCɾUUMfeI?m]Nmq#׹=TR+Gw- 6 U#pDy  :v{vg/jBFS[b[O>zG499?rCd&ˮ/~јѡ򗓿m|x31^VwwO| (hSЧc3- cHRMz%u0`:o_F@8N ' p @8N@8}' p '#@8N@8N pQ9p!i~}|6-ӪG` VP.@*j>[ K^<֐Z]@8N'KQ<Q(`s" 'hgpKB`R@Dqj '  'P$a ( `D$Na L?u80e J,K˷NI'0eݷ(NI'؀ 2ipIIKp`:O'`ʤxB8Ѥx Ѥx $ $P6 :vRNb 'p,>NB 'P]-->P T+*^h& p '‰a ‰ (ĵt#u33;Nt̵'ޯ; [3W ~]0KH1q@8]O2]3*̧7# *p>us p _6]/}-4|t'|Smx= DoʾM×M_8!)6lq':l7!|4} '\ne t!=hnLn (~Dn\+‰_4k)0e@OhZ`F `.m1} 'vp{F`ON7Srx 'D˸nV`><;yMx!IS钦OM)Ե٥x 'DSD6bS8!" ODz#R >S8!7ّxEh0m$MIPHi$IvS8IN$I p$O8I,sk&I)$IN$Hi$I^Ah.p$MIN$IR8I·N "IF9Ah0m$MIN$IR8IN$I 3jIU;kO$ɳN$+ q.x* tEXtComment

Viewing File: /opt/cloudlinux/venv/lib/python3.11/site-packages/lvestats/utils/dbmigrator/dbmigrate_lib.py

#!/opt/cloudlinux/venv/bin/python3 -bb
# coding=utf-8
#
# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2019 All Rights Reserved
#
# Licensed under CLOUD LINUX LICENSE AGREEMENT
# http://cloudlinux.com/docs/LICENSE.TXT

import os
import sys
try:
    from alembic.migration import MigrationContext
    from alembic import config, command, script
except ImportError as e:
    if __name__ == '__main__':
        # show message end exit if run as script
        print(f'Alembic Python library is not installed. Error: {e}')
        sys.exit(1)
    else:
        raise

ALEMBIC_CONF = os.path.join(os.path.dirname(__file__), 'alembic.ini')


def generate_revisions_chain(alembic_cfg):

    chain_ = []
    script_ = script.ScriptDirectory.from_config(alembic_cfg)
    revision_pairs = {s.down_revision or 'base': s.revision for s in
                      script_.walk_revisions()}
    curent_revision = 'base'
    while curent_revision:
        chain_.append(curent_revision)
        curent_revision = revision_pairs.get(curent_revision)
    return chain_


def get_database_version(engine):
    connection = engine.connect()
    context = MigrationContext.configure(connection)
    current_rev = context.get_current_revision()
    return current_rev or 'base'


def migration_way(alembic_cfg, revision_from, revision_to):
    """
    Check need migrate or downgrade to specific revisions
    :return bool: True - upgrade; False downgrade
    """
    rev_chain = generate_revisions_chain(alembic_cfg)
    return rev_chain.index(revision_from) <= rev_chain.index(revision_to)


def correct_script_location(alembic_cfg):
    """
    correct script_location in alembic config for relative path support
    :param alembic_cfg:
    :return:
    """
    location = alembic_cfg.get_section_option('alembic', 'script_location')
    here = alembic_cfg.get_section_option('alembic', 'here')
    location_corrected = os.path.abspath(os.path.join(here, location))
    alembic_cfg.set_section_option(
        'alembic', 'script_location', location_corrected)
    return alembic_cfg


def get_orm_version():
    alembic_version = getattr(
        __import__('lvestats.orm', fromlist=['alembic_version']),
        'alembic_version', 'base')
    return alembic_version


def alembic_migrate(engine, alembic_version=None, stamp=False, lve_stats_cfg=None):
    if alembic_version is None:
        alembic_version = get_orm_version()
    alembic_cfg = correct_script_location(config.Config(ALEMBIC_CONF))
    alembic_cfg.attributes['lve-stats'] = lve_stats_cfg  # pylint: disable=unsupported-assignment-operation
    with engine.begin() as connection:
        alembic_cfg.attributes['connection'] = connection  # pylint: disable=unsupported-assignment-operation
        if stamp:
            # write revision version to database only
            command.stamp(alembic_cfg, alembic_version)
        else:
            database_version = get_database_version(engine)
            if migration_way(alembic_cfg, database_version, alembic_version):
                command.upgrade(alembic_cfg, alembic_version)
            else:
                command.downgrade(alembic_cfg, alembic_version)


Back to Directory=ceiIENDB`