PNG  IHDR pHYs   OiCCPPhotoshop ICC profilexڝSgTS=BKKoR RB&*! J!QEEȠQ, !{kּ> H3Q5 B.@ $pd!s#~<<+"x M0B\t8K@zB@F&S`cbP-`'{[! eDh;VEX0fK9-0IWfH  0Q){`##xFW<+*x<$9E[-qWW.(I+6aa@.y24x6_-"bbϫp@t~,/;m%h^ uf@Wp~<5j>{-]cK'Xto(hw?G%fIq^D$.Tʳ?D*A, `6B$BB dr`)B(Ͱ*`/@4Qhp.U=pa( Aa!ڈbX#!H$ ɈQ"K5H1RT UH=r9\F;2G1Q= C7F dt1r=6Ыhڏ>C03l0.B8, c˱" VcϱwE 6wB aAHXLXNH $4 7 Q'"K&b21XH,#/{C7$C2'ITFnR#,4H#dk9, +ȅ3![ b@qS(RjJ4e2AURݨT5ZBRQ4u9̓IKhhitݕNWGw Ljg(gwLӋT071oUX**| J&*/Tު UUT^S}FU3S ԖUPSSg;goT?~YYLOCQ_ cx,!k u5&|v*=9C3J3WRf?qtN (~))4L1e\kXHQG6EYAJ'\'GgSSݧ M=:.kDwn^Loy}/TmG X $ <5qo</QC]@Caaᄑ.ȽJtq]zۯ6iܟ4)Y3sCQ? 0k߬~OCOg#/c/Wװwa>>r><72Y_7ȷOo_C#dz%gA[z|!?:eAAA!h쐭!ΑiP~aa~ 'W?pX15wCsDDDޛg1O9-J5*>.j<74?.fYXXIlK9.*6nl {/]py.,:@LN8A*%w% yg"/6шC\*NH*Mz쑼5y$3,幄'L Lݛ:v m2=:1qB!Mggfvˬen/kY- BTZ(*geWf͉9+̳ې7ᒶKW-X潬j9(xoʿܔĹdff-[n ڴ VE/(ۻCɾUUMfeI?m]Nmq#׹=TR+Gw- 6 U#pDy  :v{vg/jBFS[b[O>zG499?rCd&ˮ/~јѡ򗓿m|x31^VwwO| (hSЧc3- cHRMz%u0`:o_F@8N ' p @8N@8}' p '#@8N@8N pQ9p!i~}|6-ӪG` VP.@*j>[ K^<֐Z]@8N'KQ<Q(`s" 'hgpKB`R@Dqj '  'P$a ( `D$Na L?u80e J,K˷NI'0eݷ(NI'؀ 2ipIIKp`:O'`ʤxB8Ѥx Ѥx $ $P6 :vRNb 'p,>NB 'P]-->P T+*^h& p '‰a ‰ (ĵt#u33;Nt̵'ޯ; [3W ~]0KH1q@8]O2]3*̧7# *p>us p _6]/}-4|t'|Smx= DoʾM×M_8!)6lq':l7!|4} '\ne t!=hnLn (~Dn\+‰_4k)0e@OhZ`F `.m1} 'vp{F`ON7Srx 'D˸nV`><;yMx!IS钦OM)Ե٥x 'DSD6bS8!" ODz#R >S8!7ّxEh0m$MIPHi$IvS8IN$I p$O8I,sk&I)$IN$Hi$I^Ah.p$MIN$IR8I·N "IF9Ah0m$MIN$IR8IN$I 3jIU;kO$ɳN$+ q.x* tEXtComment

Viewing File: /opt/cloudlinux/venv/lib/python3.11/site-packages/lvestats/plugins/generic/burster/__init__.py

# coding=utf-8
#
# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2023 All Rights Reserved
#
# Licensed under CLOUD LINUX LICENSE AGREEMENT
# http://cloudlinux.com/docs/LICENSE.TXT
import atexit
import contextlib
import time
import typing
from datetime import timedelta
from contextlib import ExitStack
from typing import TypedDict, Callable, TYPE_CHECKING, Generator

import sqlalchemy as sa

if TYPE_CHECKING:
    from lvestat import LVEStat
    from lvestats.plugins.generic.analyzers import LVEUsage

from lvestats.orm import BurstingEventType

from ._logs import logger
from .utils import bootstrap_gen
from .config import (
    StartupParams,
    PluginConfig,
    Config,
    ConfigUpdate,
    is_bursting_supported,
    MissingKeysInRawConfig,
)
from .common import (
    BurstingMultipliers,
    LveState,
    SerializedLveId,
    GetNormalLimits,
    ApplyLveSettings,
    AdjustStepData,
    read_normal_limits_from_proc,
    Timestamp,
    PyLveSettingsApplier,
)
from .overload import OverloadChecker, GetStats, read_times_from_proc
from .storage import (
    init_db_schema,
    load_bursting_enabled_intervals_from_db,
    events_saver_running,
    cleanup_running,
    InBurstingEventRow,
)
from .adjust import Adjuster, StepCalculator
from .lve_sm import LveStateManager
from .lves_tracker import LvesTracker, LveStateManagerFactory


class _ExecutePayload(TypedDict):
    lve_active_ids: list[SerializedLveId]
    stats: dict[SerializedLveId, 'LVEStat']
    lve_usage_5s: dict[SerializedLveId, 'LVEUsage']


class LveLimitsBurster:
    """
    Limits Burster plugin
    """

    def __init__(self, *args, **kwargs) -> None:
        super().__init__(*args, **kwargs)

        if is_bursting_supported():
            driver_gen = self._create_driver_gen()
            step = driver_gen.send

            # NOTE(vlebedev): It seems that plugins interface of lvestats does not contain any sane way
            #                 to get norified about server being stopped. Let's resort to hacks =/
            @atexit.register
            def cleanup():
                with contextlib.suppress(StopIteration, GeneratorExit):
                    driver_gen.close()
        else:
            logger.info('Bursting Limits feature is not supported in current environment')

            def step(_, /):
                pass
        self._step: Callable[[sa.engine.Engine | ConfigUpdate | _ExecutePayload], None] = step

    @bootstrap_gen
    def _create_driver_gen(self):
        # NOTE(vlebedev): This import requires some shared library to be present in order to succeed,
        #                 so deffer it until it's really needed to make unittests writing/running easier.
        from lveapi import PyLve  # pylint: disable=import-outside-toplevel

        # NOTE(vlebedev): This is supposed to be a composition root.
        # NOTE(vlebedev): Wait until all data required for proper startup is received.
        engine, initial_config = yield from StartupParams.wait()

        pylve = PyLve()
        if not pylve.initialize():
            raise RuntimeError('Failed to initialize PyLve!')

        with adjuster_machinery_running(
            initial_config=initial_config,
            engine=engine,
            get_normal_limits=read_normal_limits_from_proc,
            apply_lve_settings=PyLveSettingsApplier(
                pylve=pylve,
            ),
            read_stats=read_times_from_proc,
        ) as adjuster:
            logger.info('LveLimitsBurster initialized')

            while True:
                msg = yield
                if not isinstance(msg, dict):
                    logger.warning('Unexpected message type: %s', type(msg))
                    continue
                now = Timestamp(int(time.time()))
                lve_active_ids = msg.get('lve_active_ids', [])
                stats = msg.get('stats', {})
                try:
                    lve_usage_by_id = msg["lve_usages_5s"][-1]
                except (KeyError, IndexError):
                    lve_usage_by_id = {}
                adjuster.step(AdjustStepData(
                    now=now,
                    lve_active_ids=lve_active_ids,
                    stats=stats,
                    lve_usages_by_id=lve_usage_by_id,
                ))

    def set_config(self, config: PluginConfig) -> None:
        # NOTE(vlebedev): Currently config dict contains all the keys from _all_ .cfg files parsed by
        #                 lvestats. So there is no point as report fields not present in `Confg` typing
        #                 as "unknown" or something like that - they might well belong to some other plugin =/
        try:
            config_update = ConfigUpdate.from_plugin_config(config)
        except MissingKeysInRawConfig as e:
            logger.info('Missing config keys: %s', e.missing_raw_keys)
        else:
            self._step(config_update)

    def set_db_engine(self, engine: sa.engine.Engine) -> None:
        # NOTE(vlebedev): 'Engine' is thread safe, so there is no problem in requesting connections
        #                 from it on different threads. For more info have a look at this:
        #                 https://groups.google.com/g/sqlalchemy/c/t8i3RSKZGb0/m/QxWshAS3iKgJ
        self._step(engine)

    def execute(self, lve_data: _ExecutePayload) -> None:
        self._step(lve_data)


@contextlib.contextmanager
def adjuster_machinery_running(
    initial_config: Config,
    engine: sa.engine.Engine,
    apply_lve_settings: ApplyLveSettings,
    get_normal_limits: GetNormalLimits,
    read_stats: GetStats,
) -> Generator[Adjuster, None, None]:
    now = Timestamp(int(time.time()))
    cutoff = Timestamp(int(now - initial_config.bursting_quota_window.total_seconds()))
    init_db_schema(engine)
    lve_to_history = load_bursting_enabled_intervals_from_db(
        engine=engine,
        cutoff=typing.cast(Timestamp, cutoff),
        server_id=initial_config.server_id,
    )
    logger.debug('Loaded intervals: %s', len(lve_to_history))

    with ExitStack() as deffer:
        deffer.enter_context(cleanup_running(
            engine=engine,
            server_id=initial_config.server_id,
            cleanup_interval=timedelta(days=1),
            history_window=timedelta(days=30),
            fail_fast=initial_config.fail_fast,
        ))

        write_event = deffer.enter_context(events_saver_running(
            engine=engine,
            server_id=initial_config.server_id,
            dump_interval=initial_config.db_dump_period,
        ))

        adjuster = Adjuster(
            lves_tracker=(lves_tracker := LvesTracker(
                create_lve_manager=LveStateManagerFactory(
                    _lve_to_history=lve_to_history,
                    _apply_lve_settings=apply_lve_settings,
                    _quota=initial_config.bursting_quota,
                    _quota_window=initial_config.bursting_quota_window,
                    _bursting_multipliers=BurstingMultipliers(
                        initial_config.bursting_cpu_multiplier,
                        initial_config.bursting_io_multiplier,
                    ),
                    _fail_fast=initial_config.fail_fast,
                ),
                fail_fast=initial_config.fail_fast,
            )),
            get_normal_limits=get_normal_limits,
            step_calculator=StepCalculator(
                overload_threshold=1.0 - initial_config.idle_time_threshold,
            ),
            is_server_overloaded=OverloadChecker(
                idle_time_threshold=initial_config.idle_time_threshold,
                get_stats=read_stats,
                max_samples_number=initial_config.idle_time_samples,
            ),
            fail_fast=initial_config.fail_fast,
        )

        @lves_tracker.on_manager_added.register
        def on_new_lve_manager_created(manager: LveStateManager) -> None:
            lve_id = manager.lve_id

            @manager.on_state_changed.register
            def on_lve_state_chagned(old_state: LveState, new_state: LveState) -> None:
                assert old_state != new_state

                now = Timestamp(int(time.time()))

                if new_state == LveState.OVERUSING:
                    write_event(InBurstingEventRow(
                        lve_id=lve_id,
                        timestamp=now,
                        event_type=BurstingEventType.STARTED,
                    ))
                elif old_state == LveState.OVERUSING:
                    write_event(InBurstingEventRow(
                        lve_id=lve_id,
                        timestamp=now,
                        event_type=BurstingEventType.STOPPED,
                    ))

        yield adjuster
Back to Directory=ceiIENDB`