PNG  IHDR pHYs   OiCCPPhotoshop ICC profilexڝSgTS=BKKoR RB&*! J!QEEȠQ, !{kּ> H3Q5 B.@ $pd!s#~<<+"x M0B\t8K@zB@F&S`cbP-`'{[! eDh;VEX0fK9-0IWfH  0Q){`##xFW<+*x<$9E[-qWW.(I+6aa@.y24x6_-"bbϫp@t~,/;m%h^ uf@Wp~<5j>{-]cK'Xto(hw?G%fIq^D$.Tʳ?D*A, `6B$BB dr`)B(Ͱ*`/@4Qhp.U=pa( Aa!ڈbX#!H$ ɈQ"K5H1RT UH=r9\F;2G1Q= C7F dt1r=6Ыhڏ>C03l0.B8, c˱" VcϱwE 6wB aAHXLXNH $4 7 Q'"K&b21XH,#/{C7$C2'ITFnR#,4H#dk9, +ȅ3![ b@qS(RjJ4e2AURݨT5ZBRQ4u9̓IKhhitݕNWGw Ljg(gwLӋT071oUX**| J&*/Tު UUT^S}FU3S ԖUPSSg;goT?~YYLOCQ_ cx,!k u5&|v*=9C3J3WRf?qtN (~))4L1e\kXHQG6EYAJ'\'GgSSݧ M=:.kDwn^Loy}/TmG X $ <5qo</QC]@Caaᄑ.ȽJtq]zۯ6iܟ4)Y3sCQ? 0k߬~OCOg#/c/Wװwa>>r><72Y_7ȷOo_C#dz%gA[z|!?:eAAA!h쐭!ΑiP~aa~ 'W?pX15wCsDDDޛg1O9-J5*>.j<74?.fYXXIlK9.*6nl {/]py.,:@LN8A*%w% yg"/6шC\*NH*Mz쑼5y$3,幄'L Lݛ:v m2=:1qB!Mggfvˬen/kY- BTZ(*geWf͉9+̳ې7ᒶKW-X潬j9(xoʿܔĹdff-[n ڴ VE/(ۻCɾUUMfeI?m]Nmq#׹=TR+Gw- 6 U#pDy  :v{vg/jBFS[b[O>zG499?rCd&ˮ/~јѡ򗓿m|x31^VwwO| (hSЧc3- cHRMz%u0`:o_F@8N ' p @8N@8}' p '#@8N@8N pQ9p!i~}|6-ӪG` VP.@*j>[ K^<֐Z]@8N'KQ<Q(`s" 'hgpKB`R@Dqj '  'P$a ( `D$Na L?u80e J,K˷NI'0eݷ(NI'؀ 2ipIIKp`:O'`ʤxB8Ѥx Ѥx $ $P6 :vRNb 'p,>NB 'P]-->P T+*^h& p '‰a ‰ (ĵt#u33;Nt̵'ޯ; [3W ~]0KH1q@8]O2]3*̧7# *p>us p _6]/}-4|t'|Smx= DoʾM×M_8!)6lq':l7!|4} '\ne t!=hnLn (~Dn\+‰_4k)0e@OhZ`F `.m1} 'vp{F`ON7Srx 'D˸nV`><;yMx!IS钦OM)Ե٥x 'DSD6bS8!" ODz#R >S8!7ّxEh0m$MIPHi$IvS8IN$I p$O8I,sk&I)$IN$Hi$I^Ah.p$MIN$IR8I·N "IF9Ah0m$MIN$IR8IN$I 3jIU;kO$ɳN$+ q.x* tEXtComment

Viewing File: /opt/cloudlinux/venv/lib/python3.11/site-packages/lvestats/plugins/generic/burster/lves_tracker.py

# coding=utf-8
#
# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2023 All Rights Reserved
#
# Licensed under CLOUD LINUX LICENSE AGREEMENT
# http://cloudlinux.com/docs/LICENSE.TXT
import logging
import typing
import pprint
from dataclasses import dataclass
from functools import cached_property
from datetime import timedelta
from types import MappingProxyType
from typing import Callable, Mapping
from collections.abc import Set

from ._logs import logger
from .notify import Emitter, Signal
from .common import (
    BurstingMultipliers,
    LveId,
    Timestamp,
    LveLimits,
    ApplyLveSettings,
    SerializedLveId,
    get_deserialized_lve_id,
    LveStats,
    LveUsage,
    LveState,
    empty_usage,
)
from .history import IntervalType, LveHistory
from .lve_sm import LveStateManager, LveStateSummary


@dataclass(frozen=True)
class LveStateManagerFactory:
    _lve_to_history: dict[LveId, LveHistory]
    _apply_lve_settings: ApplyLveSettings
    _quota: timedelta
    _quota_window: timedelta
    _bursting_multipliers: BurstingMultipliers
    _fail_fast: bool = True

    def __post_init__(self):
        lves_with_broken_history = {
            str(lve_id)
            for lve_id, history
            in self._lve_to_history.items()
            if history.ongoing_interval_type == IntervalType.OVERUSING
        }
        if lves_with_broken_history:
            raise ValueError(
                'LVEs ' + ', '.join(lves_with_broken_history) + ' are marked as "overusing" '
                ' in initial history loaded from persistent storage!'
            )

    def __call__(
        self,
        lve_id: LveId,
        now: Timestamp,
        normal_limits: LveLimits,
        stats: LveStats,
        usage: LveUsage,
    ) -> LveStateManager:
        try:
            history = self._lve_to_history.pop(lve_id)
        except KeyError:
            history = LveHistory()
        else:
            cutoff = typing.cast(Timestamp, now - self._quota_window.total_seconds())
            history = history.trim(cutoff)

        return LveStateManager(
            now=now,
            lve_id=lve_id,
            initial_history=history,
            bursting_multipliers=self._bursting_multipliers,
            initial_normal_limits=normal_limits,
            initial_stats=stats,
            initial_usage=usage,
            quota=self._quota,
            quota_window=self._quota_window,
            apply_lve_settings=self._apply_lve_settings,
            fail_fast=self._fail_fast,
        )


class LvesTracker:
    def __init__(
        self,
        create_lve_manager: LveStateManagerFactory,
        fail_fast: bool = True,
        deserialize_lve_id: Callable[[SerializedLveId], LveId] = get_deserialized_lve_id,
    ) -> None:
        self._create_lve_manager = create_lve_manager
        self._fail_fast = fail_fast
        self._deserialize_lve_id = deserialize_lve_id

        self._serialized_id_to_manager = dict[SerializedLveId, LveStateManager]()

        # FIXME(vlebedev): Remove these state-sets and replace them with signals and external handlers,
        #                  which are interested in state switches.
        self._state_sets = (
            self._bursted,
            self._unbursted,
            self._overusing,
            self._exceeded,
        ) = (
            set[LveStateManager](),
            set[LveStateManager](),
            set[LveStateManager](),
            set[LveStateManager](),
        )

        self._on_manager_added = Emitter()

    @cached_property
    def serialized_id_to_manager(self) -> Mapping[SerializedLveId, LveStateManager]:
        return MappingProxyType(self._serialized_id_to_manager)

    @property
    def bursted(self) -> Set[LveStateManager]:
        return self._bursted

    @property
    def unbursted(self) -> Set[LveStateManager]:
        return self._unbursted

    @property
    def overusing(self) -> Set[LveStateManager]:
        return self._overusing

    @property
    def quota_exceeded(self) -> Set[LveStateManager]:
        return self._exceeded

    @property
    def on_manager_added(self) -> Signal:
        return self._on_manager_added

    def update(
        self,
        now: Timestamp,
        normal_limits_by_id: Mapping[LveId, LveLimits],
        stats_by_id: Mapping[SerializedLveId, LveStats],
        usages_by_id: Mapping[SerializedLveId, LveUsage],
    ) -> None:
        # TODO(vlebedev): Filter out users belonging to resellers (LVEStat contains reseller_id field)
        currently_existing_ids = stats_by_id.keys()

        newly_appeared_raw_ids = currently_existing_ids - self._serialized_id_to_manager.keys()
        for serialized_lve_id in newly_appeared_raw_ids:
            if serialized_lve_id == 0:
                continue
            # NOTE(vlebedev): Users under resellers are not supported for now.
            if stats_by_id[serialized_lve_id].reseller_id != 0:
                continue

            lve_id: LveId = self._deserialize_lve_id(serialized_lve_id)
            # TODO(vlebedev): Can it be that there are no normal limits and/or stats are available?
            #                 What to do in this case?
            errors = []
            try:
                normal_limits = normal_limits_by_id[lve_id]
            except KeyError:
                errors.append('normal limits')

            try:
                stats = stats_by_id[serialized_lve_id]
            except KeyError:
                errors.append('stats')

            if errors:
                # TODO(vlebedev): Raise exception when `fail_fast` is set.
                logger.warning(
                    'LVE "%s": some "get_initial_readings" listeners failed: %s readings are absent!',
                    lve_id,
                    ' and '.join(f'"{e}"' for e in errors),
                )
                continue

            manager = self._create_lve_manager(
                now=now,
                lve_id=lve_id,
                normal_limits=normal_limits,
                stats=stats,
                usage=usages_by_id.get(serialized_lve_id, empty_usage),
            )
            self._serialized_id_to_manager[serialized_lve_id] = manager
            logger.debug('LVE "%s": unknown LVE appeared - created manager for it', lve_id)

        if logger.isEnabledFor(logging.DEBUG):
            logger.debug('LVEs known to adjuster: \n%s', pprint.pformat({
                slid: str(LveStateSummary.for_lve(m))
                for slid, m
                in self._serialized_id_to_manager.items()
            }, width=-1))

        disappeared_exc, to_delete = LveStateManager.Disappered(now=now), set()
        for serialized_lve_id, manager in self._serialized_id_to_manager.items():
            lve_id = manager.lve_id

            manager.trim_history(now)

            manager.step(LveStateManager.UpdateReadings(
                now=now,
                normal_limits=normal_limits_by_id.get(lve_id),
                stats=stats_by_id.get(serialized_lve_id),
                usage=usages_by_id.get(serialized_lve_id, empty_usage),
            ))

            if serialized_lve_id not in currently_existing_ids and manager.state != LveState.EXISTED:
                manager.step(disappeared_exc)
                for state_set in self._state_sets:
                    state_set.discard(manager)
                if not manager.history_contains_overusing:
                    to_delete.add(serialized_lve_id)
                continue

            lve_quota_exceeded = manager.check_quota_exceeded(now)

            (self._overusing.add if manager.is_overusing else self._overusing.discard)(manager)
            (self._bursted.add if manager.is_bursted else self._bursted.discard)(manager)
            (self._unbursted.add if manager.is_unbursted else self._unbursted.discard)(manager)
            (self._exceeded.add if lve_quota_exceeded else self._exceeded.discard)(manager)

        if self._fail_fast:
            if self._bursted.intersection(self._unbursted) != set():
                raise AssertionError('LVEs can`t be both bursted and unbursted!')
            if self._overusing.intersection(self._unbursted) != set():
                raise AssertionError('LVEs can`t be both overusing and unbursted!')

        for serialized_lve_id in to_delete:
            manager = self._serialized_id_to_manager.pop(serialized_lve_id)
            logger.debug(
                'LVE "%s": LVE is no longer active and has empty history - forgetting corresponding manager',
                manager.lve_id,
            )

        # NOTE(vlebedev): Trigger signal listeners only after internal state is finished to be updated.
        for serialized_id in newly_appeared_raw_ids:
            try:
                manager = self._serialized_id_to_manager[serialized_id]
            except KeyError:
                continue

            try:
                self._on_manager_added(manager)
            except Exception:
                if self._fail_fast:
                    raise
                logger.exception('LVE "%s": some "on_manager_created" listeners failed!', manager.lve_id)
Back to Directory=ceiIENDB`