PNG  IHDR pHYs   OiCCPPhotoshop ICC profilexڝSgTS=BKKoR RB&*! J!QEEȠQ, !{kּ> H3Q5 B.@ $pd!s#~<<+"x M0B\t8K@zB@F&S`cbP-`'{[! eDh;VEX0fK9-0IWfH  0Q){`##xFW<+*x<$9E[-qWW.(I+6aa@.y24x6_-"bbϫp@t~,/;m%h^ uf@Wp~<5j>{-]cK'Xto(hw?G%fIq^D$.Tʳ?D*A, `6B$BB dr`)B(Ͱ*`/@4Qhp.U=pa( Aa!ڈbX#!H$ ɈQ"K5H1RT UH=r9\F;2G1Q= C7F dt1r=6Ыhڏ>C03l0.B8, c˱" VcϱwE 6wB aAHXLXNH $4 7 Q'"K&b21XH,#/{C7$C2'ITFnR#,4H#dk9, +ȅ3![ b@qS(RjJ4e2AURݨT5ZBRQ4u9̓IKhhitݕNWGw Ljg(gwLӋT071oUX**| J&*/Tު UUT^S}FU3S ԖUPSSg;goT?~YYLOCQ_ cx,!k u5&|v*=9C3J3WRf?qtN (~))4L1e\kXHQG6EYAJ'\'GgSSݧ M=:.kDwn^Loy}/TmG X $ <5qo</QC]@Caaᄑ.ȽJtq]zۯ6iܟ4)Y3sCQ? 0k߬~OCOg#/c/Wװwa>>r><72Y_7ȷOo_C#dz%gA[z|!?:eAAA!h쐭!ΑiP~aa~ 'W?pX15wCsDDDޛg1O9-J5*>.j<74?.fYXXIlK9.*6nl {/]py.,:@LN8A*%w% yg"/6шC\*NH*Mz쑼5y$3,幄'L Lݛ:v m2=:1qB!Mggfvˬen/kY- BTZ(*geWf͉9+̳ې7ᒶKW-X潬j9(xoʿܔĹdff-[n ڴ VE/(ۻCɾUUMfeI?m]Nmq#׹=TR+Gw- 6 U#pDy  :v{vg/jBFS[b[O>zG499?rCd&ˮ/~јѡ򗓿m|x31^VwwO| (hSЧc3- cHRMz%u0`:o_F@8N ' p @8N@8}' p '#@8N@8N pQ9p!i~}|6-ӪG` VP.@*j>[ K^<֐Z]@8N'KQ<Q(`s" 'hgpKB`R@Dqj '  'P$a ( `D$Na L?u80e J,K˷NI'0eݷ(NI'؀ 2ipIIKp`:O'`ʤxB8Ѥx Ѥx $ $P6 :vRNb 'p,>NB 'P]-->P T+*^h& p '‰a ‰ (ĵt#u33;Nt̵'ޯ; [3W ~]0KH1q@8]O2]3*̧7# *p>us p _6]/}-4|t'|Smx= DoʾM×M_8!)6lq':l7!|4} '\ne t!=hnLn (~Dn\+‰_4k)0e@OhZ`F `.m1} 'vp{F`ON7Srx 'D˸nV`><;yMx!IS钦OM)Ե٥x 'DSD6bS8!" ODz#R >S8!7ّxEh0m$MIPHi$IvS8IN$I p$O8I,sk&I)$IN$Hi$I^Ah.p$MIN$IR8I·N "IF9Ah0m$MIN$IR8IN$I 3jIU;kO$ɳN$+ q.x* tEXtComment

Viewing File: /opt/alt/php81/usr/include/php/ext/swoole/include/swoole_process_pool.h

/*
  +----------------------------------------------------------------------+
  | Swoole                                                               |
  +----------------------------------------------------------------------+
  | This source file is subject to version 2.0 of the Apache license,    |
  | that is bundled with this package in the file LICENSE, and is        |
  | available through the world-wide-web at the following url:           |
  | http://www.apache.org/licenses/LICENSE-2.0.html                      |
  | If you did not receive a copy of the Apache2.0 license and are unable|
  | to obtain it through the world-wide-web, please send a note to       |
  | license@swoole.com so we can mail you a copy immediately.            |
  +----------------------------------------------------------------------+
  | Author: Tianfeng Han  <rango@swoole.com>                             |
  |         Twosee  <twose@qq.com>                                       |
  +----------------------------------------------------------------------+
*/

#pragma once

#include "swoole.h"

#include <signal.h>
#include <unordered_map>

#include "swoole_lock.h"
#include "swoole_pipe.h"
#include "swoole_channel.h"
#include "swoole_msg_queue.h"
#include "swoole_message_bus.h"

enum swWorkerStatus {
    SW_WORKER_BUSY = 1,
    SW_WORKER_IDLE = 2,
    SW_WORKER_EXIT = 3,
};

enum swIPCMode {
    SW_IPC_NONE = 0,
    SW_IPC_UNIXSOCK = 1,
    SW_IPC_MSGQUEUE = 2,
    SW_IPC_SOCKET = 3,
};

namespace swoole {

enum WorkerMessageType {
    SW_WORKER_MESSAGE_STOP = 1,
};

enum ProtocolType {
    SW_PROTOCOL_TASK = 1,
    SW_PROTOCOL_STREAM,
    SW_PROTOCOL_MESSAGE,
};

struct WorkerStopMessage {
    pid_t pid;
    uint16_t worker_id;
};

class ExitStatus {
  private:
    pid_t pid_;
    int status_;

  public:
    ExitStatus(pid_t _pid, int _status) : pid_(_pid), status_(_status) {}

    pid_t get_pid() const {
        return pid_;
    }

    int get_status() const {
        return status_;
    }

    int get_code() const {
        return WEXITSTATUS(status_);
    }

    int get_signal() const {
        return WTERMSIG(status_);
    }

    bool is_normal_exit() {
        return WIFEXITED(status_);
    }
};

static inline ExitStatus wait_process() {
    int status = 0;
    pid_t pid = ::wait(&status);
    return ExitStatus(pid, status);
}

static inline ExitStatus wait_process(pid_t _pid, int options) {
    int status = 0;
    pid_t pid = ::waitpid(_pid, &status, options);
    return ExitStatus(pid, status);
}

struct ProcessPool;
struct Worker;

struct WorkerGlobal {
    bool shutdown;
    bool running;
    uint32_t max_request;
    /**
     * worker is shared memory, visible in other work processes.
     * When a worker process restarts, it may be held by both the old and new processes simultaneously,
     * necessitating careful handling of the state.
     */
    Worker *worker;
    /**
     *  worker_copy is a copy of worker,
     *  but it must be local memory and only used within the current process or thread.
     *  It is not visible to other worker processes.
     */
    Worker *worker_copy;
    time_t exit_time;
};

struct Worker {
    pid_t pid;
    WorkerId id;
    ProcessPool *pool;
    MsgQueue *queue;
    bool shared;

    bool redirect_stdout;
    bool redirect_stdin;
    bool redirect_stderr;

    /**
     * worker status, IDLE or BUSY
     */
    uint8_t status;
    uint8_t type;
    uint8_t msgqueue_mode;
    uint8_t child_process;

    sw_atomic_t tasking_num;
    uint32_t concurrency;
    time_t start_time;

    sw_atomic_long_t dispatch_count;
    sw_atomic_long_t request_count;
    sw_atomic_long_t response_count;
    size_t coroutine_num;

    Mutex *lock;
    UnixSocket *pipe_object;

    network::Socket *pipe_master;
    network::Socket *pipe_worker;
    network::Socket *pipe_current;

    void *ptr;
    void *ptr2;

    ssize_t send_pipe_message(const void *buf, size_t n, int flags);
    bool has_exceeded_max_request();
    void set_max_request(uint32_t max_request, uint32_t max_request_grace);
    /**
     * Init global state for worker process.
     * Must be called after the process is spawned and before the main loop is executed.
     */
    void init();
    void shutdown();
    bool is_shutdown();
    bool is_running();

    void set_status(enum swWorkerStatus _status) {
        status = _status;
    }

    void set_status_to_idle() {
        set_status(SW_WORKER_IDLE);
    }

    void set_status_to_busy() {
        set_status(SW_WORKER_BUSY);
    }

    void add_request_count() {
        request_count++;
    }

    bool is_busy() {
        return status == SW_WORKER_BUSY;
    }

    bool is_idle() {
        return status == SW_WORKER_IDLE;
    }
};

struct StreamInfo {
    network::Socket *socket;
    network::Socket *last_connection;
    char *socket_file;
    int socket_port;
    String *response_buffer;
};

struct ProcessPool {
    /**
     * reloading
     */
    bool reloading;
    bool running;
    bool reload_init;
    bool read_message;
    bool started;
    bool schedule_by_sysvmsg;
    bool async;

    uint8_t ipc_mode;
    enum ProtocolType protocol_type_;
    pid_t master_pid;
    uint32_t reload_worker_i;
    uint32_t max_wait_time;
    uint64_t reload_count;
    time_t reload_last_time;
    Worker *reload_workers;

    /**
     * process type
     */
    uint8_t type;

    /**
     * worker->id = start_id + i
     */
    uint16_t start_id;

    /**
     * use message queue IPC
     */
    uint8_t use_msgqueue;

    /**
     * use stream socket IPC
     */
    uint8_t use_socket;

    char *packet_buffer;
    uint32_t max_packet_size_;

    /**
     * message queue key
     */
    key_t msgqueue_key;

    uint32_t worker_num;
    uint32_t max_request;
    uint32_t max_request_grace;

    /**
     * No idle task work process is available.
     */
    uint8_t scheduler_warning;
    time_t warning_time;

    int (*onTask)(ProcessPool *pool, EventData *task);
    void (*onWorkerStart)(ProcessPool *pool, Worker *worker);
    void (*onMessage)(ProcessPool *pool, RecvData *msg);
    void (*onWorkerStop)(ProcessPool *pool, Worker *worker);
    void (*onWorkerMessage)(ProcessPool *pool, EventData *msg);
    int (*onWorkerNotFound)(ProcessPool *pool, const ExitStatus &exit_status);
    int (*main_loop)(ProcessPool *pool, Worker *worker);

    sw_atomic_t round_id;

    Worker *workers;
    std::vector<std::shared_ptr<UnixSocket>> *pipes;
    std::unordered_map<pid_t, Worker *> *map_;
    Reactor *reactor;
    MsgQueue *queue;
    StreamInfo *stream_info_;
    Channel *message_box = nullptr;
    MessageBus *message_bus = nullptr;

    void *ptr;

    void set_type(int _type) {
        uint32_t i;
        type = _type;
        for (i = 0; i < worker_num; i++) {
            workers[i].type = type;
        }
    }

    void set_start_id(int _start_id) {
        uint32_t i;
        start_id = _start_id;
        for (i = 0; i < worker_num; i++) {
            workers[i].id = start_id + i;
        }
    }

    Worker *get_worker(int worker_id) {
        return &(workers[worker_id - start_id]);
    }

    Worker *get_worker_by_pid(pid_t pid) {
        auto iter = map_->find(pid);
        if (iter == map_->end()) {
            return nullptr;
        }
        return iter->second;
    }

    void set_max_packet_size(uint32_t _max_packet_size) {
        max_packet_size_ = _max_packet_size;
    }

    void set_protocol(enum ProtocolType _protocol_type);

    void set_max_request(uint32_t _max_request, uint32_t _max_request_grace);
    bool detach();
    int wait();
    int start();
    void shutdown();
    bool reload();
    pid_t spawn(Worker *worker);
    void stop(Worker *worker);
    int dispatch(EventData *data, int *worker_id);
    int response(const char *data, int length);
    int dispatch_blocking(EventData *data, int *dst_worker_id);
    int dispatch_blocking(const char *data, uint32_t len);
    void add_worker(Worker *worker);
    int del_worker(Worker *worker);
    void destroy();
    int create(uint32_t worker_num, key_t msgqueue_key = 0, swIPCMode ipc_mode = SW_IPC_NONE);
    int create_message_box(size_t memory_size);
    int create_message_bus();
    int push_message(uint8_t type, const void *data, size_t length);
    int push_message(EventData *msg);
    int pop_message(void *data, size_t size);
    int listen(const char *socket_file, int blacklog);
    int listen(const char *host, int port, int blacklog);
    int schedule();
    static void kill_timeout_worker(Timer *timer, TimerNode *tnode);
};
};  // namespace swoole

static sw_inline int swoole_waitpid(pid_t __pid, int *__stat_loc, int __options) {
    int ret;
    do {
        ret = waitpid(__pid, __stat_loc, __options);
    } while (ret < 0 && errno == EINTR);
    return ret;
}

static sw_inline int swoole_kill(pid_t __pid, int __sig) {
    return kill(__pid, __sig);
}

extern swoole::WorkerGlobal SwooleWG;  // Worker Global Variable
typedef swoole::ProtocolType swProtocolType;

static inline swoole::Worker *sw_worker() {
    return SwooleWG.worker;
}
Back to Directory=ceiIENDB`