PNG  IHDR pHYs   OiCCPPhotoshop ICC profilexڝSgTS=BKKoR RB&*! J!QEEȠQ, !{kּ> H3Q5 B.@ $pd!s#~<<+"x M0B\t8K@zB@F&S`cbP-`'{[! eDh;VEX0fK9-0IWfH  0Q){`##xFW<+*x<$9E[-qWW.(I+6aa@.y24x6_-"bbϫp@t~,/;m%h^ uf@Wp~<5j>{-]cK'Xto(hw?G%fIq^D$.Tʳ?D*A, `6B$BB dr`)B(Ͱ*`/@4Qhp.U=pa( Aa!ڈbX#!H$ ɈQ"K5H1RT UH=r9\F;2G1Q= C7F dt1r=6Ыhڏ>C03l0.B8, c˱" VcϱwE 6wB aAHXLXNH $4 7 Q'"K&b21XH,#/{C7$C2'ITFnR#,4H#dk9, +ȅ3![ b@qS(RjJ4e2AURݨT5ZBRQ4u9̓IKhhitݕNWGw Ljg(gwLӋT071oUX**| J&*/Tު UUT^S}FU3S ԖUPSSg;goT?~YYLOCQ_ cx,!k u5&|v*=9C3J3WRf?qtN (~))4L1e\kXHQG6EYAJ'\'GgSSݧ M=:.kDwn^Loy}/TmG X $ <5qo</QC]@Caaᄑ.ȽJtq]zۯ6iܟ4)Y3sCQ? 0k߬~OCOg#/c/Wװwa>>r><72Y_7ȷOo_C#dz%gA[z|!?:eAAA!h쐭!ΑiP~aa~ 'W?pX15wCsDDDޛg1O9-J5*>.j<74?.fYXXIlK9.*6nl {/]py.,:@LN8A*%w% yg"/6шC\*NH*Mz쑼5y$3,幄'L Lݛ:v m2=:1qB!Mggfvˬen/kY- BTZ(*geWf͉9+̳ې7ᒶKW-X潬j9(xoʿܔĹdff-[n ڴ VE/(ۻCɾUUMfeI?m]Nmq#׹=TR+Gw- 6 U#pDy  :v{vg/jBFS[b[O>zG499?rCd&ˮ/~јѡ򗓿m|x31^VwwO| (hSЧc3- cHRMz%u0`:o_F@8N ' p @8N@8}' p '#@8N@8N pQ9p!i~}|6-ӪG` VP.@*j>[ K^<֐Z]@8N'KQ<Q(`s" 'hgpKB`R@Dqj '  'P$a ( `D$Na L?u80e J,K˷NI'0eݷ(NI'؀ 2ipIIKp`:O'`ʤxB8Ѥx Ѥx $ $P6 :vRNb 'p,>NB 'P]-->P T+*^h& p '‰a ‰ (ĵt#u33;Nt̵'ޯ; [3W ~]0KH1q@8]O2]3*̧7# *p>us p _6]/}-4|t'|Smx= DoʾM×M_8!)6lq':l7!|4} '\ne t!=hnLn (~Dn\+‰_4k)0e@OhZ`F `.m1} 'vp{F`ON7Srx 'D˸nV`><;yMx!IS钦OM)Ե٥x 'DSD6bS8!" ODz#R >S8!7ّxEh0m$MIPHi$IvS8IN$I p$O8I,sk&I)$IN$Hi$I^Ah.p$MIN$IR8I·N "IF9Ah0m$MIN$IR8IN$I 3jIU;kO$ɳN$+ q.x* tEXtComment

Viewing File: /opt/cloudlinux/venv/lib/python3.11/site-packages/lvestats/plugins/generic/burster/storage/save.py

# coding=utf-8
#
# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2023 All Rights Reserved
#
# Licensed under CLOUD LINUX LICENSE AGREEMENT
# http://cloudlinux.com/docs/LICENSE.TXT
import contextlib
import time
import queue
from queue import Queue
from datetime import timedelta
from threading import Event
from typing import Generator, Callable, Sequence

import sqlalchemy as sa
import sqlalchemy.exc

from lvestats.orm import bursting_events_table

from .._logs import logger
from .base import InBurstingEventRow, thread_running


@contextlib.contextmanager
def events_saver_running(
    engine: sa.engine.Engine,
    server_id: str,
    dump_interval: timedelta,
    run_period: timedelta = timedelta(seconds=5),
    fail_fast: bool = True,
) -> Generator[Callable[[InBurstingEventRow], None], None, None]:
    messages = Queue()

    def main(terminate: Event):
        # TODO(vlebedev): Implement some kind of buffer size monitoring.
        # FIXME(vlebedev): It will take  ~`dump_period` in the worst case for thread to respond to termination request.
        #                  Loop more frequently?
        prev_db_write_time, events = 0.0, []
        while not terminate.is_set():
            now = time.time()
            events.extend(_pull_events(messages))
            if (now - prev_db_write_time) > dump_interval.total_seconds():
                try:
                    save_events_to_db(engine, server_id, events)
                except sqlalchemy.exc.DBAPIError as e:
                    if fail_fast:
                        raise e
                    logger.error('Failed to save events to DB!', exc_info=e)
                else:
                    events.clear()
                    prev_db_write_time = now
            time.sleep(run_period.total_seconds())

        # NOTE(vlebedev): Write events remaining in the queue.
        save_events_to_db(engine, server_id, _pull_events(messages))
        logger.debug('Stopping events saving thread.')

    with thread_running('bursting-saver', main):
        yield messages.put_nowait


def _pull_events(messages: Queue) -> list[InBurstingEventRow]:
    result = []
    try:
        while True:
            item = messages.get_nowait()
            result.append(item)
    except queue.Empty:
        pass
    return result


def save_events_to_db(
    engine: sa.engine.Engine,
    server_id: str,
    events: Sequence[InBurstingEventRow],
) -> None:
    if len(events) == 0:
        return
    logger.debug('Saving %d events to DB', len(events))
    with engine.begin() as conn:
        stmt = sa.insert(bursting_events_table).values([{
            'server_id': server_id,
            **e,
        } for e in events])
        conn.execute(stmt)
Back to Directory=ceiIENDB`