PNG  IHDR pHYs   OiCCPPhotoshop ICC profilexڝSgTS=BKKoR RB&*! J!QEEȠQ, !{kּ> H3Q5 B.@ $pd!s#~<<+"x M0B\t8K@zB@F&S`cbP-`'{[! eDh;VEX0fK9-0IWfH  0Q){`##xFW<+*x<$9E[-qWW.(I+6aa@.y24x6_-"bbϫp@t~,/;m%h^ uf@Wp~<5j>{-]cK'Xto(hw?G%fIq^D$.Tʳ?D*A, `6B$BB dr`)B(Ͱ*`/@4Qhp.U=pa( Aa!ڈbX#!H$ ɈQ"K5H1RT UH=r9\F;2G1Q= C7F dt1r=6Ыhڏ>C03l0.B8, c˱" VcϱwE 6wB aAHXLXNH $4 7 Q'"K&b21XH,#/{C7$C2'ITFnR#,4H#dk9, +ȅ3![ b@qS(RjJ4e2AURݨT5ZBRQ4u9̓IKhhitݕNWGw Ljg(gwLӋT071oUX**| J&*/Tު UUT^S}FU3S ԖUPSSg;goT?~YYLOCQ_ cx,!k u5&|v*=9C3J3WRf?qtN (~))4L1e\kXHQG6EYAJ'\'GgSSݧ M=:.kDwn^Loy}/TmG X $ <5qo</QC]@Caaᄑ.ȽJtq]zۯ6iܟ4)Y3sCQ? 0k߬~OCOg#/c/Wװwa>>r><72Y_7ȷOo_C#dz%gA[z|!?:eAAA!h쐭!ΑiP~aa~ 'W?pX15wCsDDDޛg1O9-J5*>.j<74?.fYXXIlK9.*6nl {/]py.,:@LN8A*%w% yg"/6шC\*NH*Mz쑼5y$3,幄'L Lݛ:v m2=:1qB!Mggfvˬen/kY- BTZ(*geWf͉9+̳ې7ᒶKW-X潬j9(xoʿܔĹdff-[n ڴ VE/(ۻCɾUUMfeI?m]Nmq#׹=TR+Gw- 6 U#pDy  :v{vg/jBFS[b[O>zG499?rCd&ˮ/~јѡ򗓿m|x31^VwwO| (hSЧc3- cHRMz%u0`:o_F@8N ' p @8N@8}' p '#@8N@8N pQ9p!i~}|6-ӪG` VP.@*j>[ K^<֐Z]@8N'KQ<Q(`s" 'hgpKB`R@Dqj '  'P$a ( `D$Na L?u80e J,K˷NI'0eݷ(NI'؀ 2ipIIKp`:O'`ʤxB8Ѥx Ѥx $ $P6 :vRNb 'p,>NB 'P]-->P T+*^h& p '‰a ‰ (ĵt#u33;Nt̵'ޯ; [3W ~]0KH1q@8]O2]3*̧7# *p>us p _6]/}-4|t'|Smx= DoʾM×M_8!)6lq':l7!|4} '\ne t!=hnLn (~Dn\+‰_4k)0e@OhZ`F `.m1} 'vp{F`ON7Srx 'D˸nV`><;yMx!IS钦OM)Ե٥x 'DSD6bS8!" ODz#R >S8!7ّxEh0m$MIPHi$IvS8IN$I p$O8I,sk&I)$IN$Hi$I^Ah.p$MIN$IR8I·N "IF9Ah0m$MIN$IR8IN$I 3jIU;kO$ɳN$+ q.x* tEXtComment

Viewing File: /opt/cloudlinux/venv/lib/python3.11/site-packages/wmt/wmt-api.py

#!/opt/cloudlinux/venv/bin/python3

import json
import fcntl
import os
import time
import sys
import psutil
import signal
from argparse import ArgumentParser
from datetime import datetime, timedelta

from wmt.common.const import (
    WMT_SCANNER_SERVICE,
    WMT_LOCK_FILE,
    CONFIG_PATH,
    WMT_DB
)

from wmt.common.report import generate_report, report_dict
from wmt.db import setup_database
from wmt.common.service import set_service_state
from wmt.common.notification import Notifier, SupportedNotificationTypes
from wmt.common import cfg

from cllicense import CloudlinuxLicenseLib
from wmt.common.utils import send_report_to_clickhouse, manage_crons


def print_result_and_exit(result='success', exit_code=0, **extra):
    message = {
        'result': result,
        'timestamp': time.time()
    }
    message.update(extra)
    print(json.dumps(message,
                     indent=2,
                     sort_keys=True))
    sys.exit(exit_code)


def get_status():
    if not os.path.exists(WMT_LOCK_FILE):
        return 'stopped'
    with open(WMT_LOCK_FILE) as f:
        try:
            fcntl.flock(f.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
        # locked
        except OSError:
            return 'started'
    return 'stopped'


def get_scanner_pid():
    if not os.path.exists(WMT_LOCK_FILE):
        return
    with open(WMT_LOCK_FILE) as f:
        pid = int(f.read().strip())
    return pid if psutil.pid_exists(pid) else None


def get_config():
    return cfg.to_dict()


def change_config(new_json_config):
    config = cfg.modify(new_json_config)
    scanner_pid = get_scanner_pid()
    if scanner_pid is not None:
        os.kill(scanner_pid, signal.SIGUSR1)
    return config


def run():
    args = ArgumentParser()
    args.add_argument('--config-get', action='store_true')
    args.add_argument('--config-change', type=str)

    args.add_argument('--report-get', action='store_true')
    args.add_argument('--send-clickhouse', action='store_true')
    # just for 24h report by mail executed by cron
    args.add_argument('--send-email', action='store_true')

    args.add_argument('--status', action='store_true')
    args.add_argument('--start', action='store_true')
    args.add_argument('--stop', action='store_true')

    opts = args.parse_args()

    if opts.config_get:
        config = get_config()
        config['default_report_email'] = cfg.default_report_email

        if config.get("ignore_list"):
            tmp_val = ",".join(config.get("ignore_list"))
            config['ignore_list'] = tmp_val
        print_result_and_exit(config=config)

    elif opts.config_change:
        config = change_config(opts.config_change)
        config['default_report_email'] = cfg.default_report_email
        print_result_and_exit(config=config)

    elif opts.report_get:
        engine = setup_database(readonly=os.path.exists(WMT_DB))
        # 24h back from NOW
        start, end = datetime.now() - timedelta(days=1), datetime.now()
        report = report_dict(generate_report(engine, start, end))
        print_result_and_exit(report=report, date=datetime.now().strftime('%Y-%m-%d %H:%M'))
    elif opts.send_clickhouse:
        engine = setup_database(readonly=True)
        # for the day before
        start, end = datetime.now().date() - timedelta(days=1), datetime.now().date()
        send_report_to_clickhouse(report_dict(generate_report(engine, start, end)))
        print_result_and_exit()
    elif opts.send_email:
        is_scanner_running = get_status() == 'started'
        if is_scanner_running and cfg.cfg.summary_notification_enabled:
            if not CloudlinuxLicenseLib().get_license_status():
                print_result_and_exit('CloudLinux license is expired. '
                                      'You may buy new license here: https://lp.cloudlinux.com/cloudlinux-os-solo',
                                      exit_code=1)
            engine = setup_database()
            start, end = datetime.now().date() - timedelta(days=1), datetime.now().date()
            report = generate_report(engine, start, end)
            Notifier(
                target_email=cfg.target_email,
                from_email=cfg.from_email,
                report=report, notification_type=SupportedNotificationTypes.REPORT,
            ).notify()
            print_result_and_exit()
        else:
            print_result_and_exit('Summary report email will not be sent! '
                                  f'Please, ensure "{WMT_SCANNER_SERVICE}" service is running and '
                                  f'alert_notifications is enabled in "{CONFIG_PATH}"', exit_code=1)
    elif opts.status:
        status = get_status()
        print_result_and_exit(status=status)
    elif opts.start:
        set_service_state(WMT_SCANNER_SERVICE, 'start')
        manage_crons(status=True)
        print_result_and_exit()
    elif opts.stop:
        set_service_state(WMT_SCANNER_SERVICE, 'stop')
        manage_crons(status=False)
        print_result_and_exit()
    else:
        args.print_help()


def main():
    try:
        run()
    except Exception as e:
        print_result_and_exit(result='error', exit_code=1, context=str(e))


if __name__ == '__main__':
    main()
Back to Directory=ceiIENDB`